# Auto-Magically Spin AWS EC2 Instances on Demand 
# And Create a Dynamic Cluster Running *Trains-Agent*

### Define your budget and execute the notebook, That's is it
### You now have a fully managed cluster on AWS 🎉 🎊 

**trains-agent**'s main goal is to quickly pull a job from an execution queue, setup the environment (as defined in the experiment, including git cloning, python packages etc.) then execute the experiment and monitor it.

This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)

Configuration steps
- Define maximum budget to be used (instance type / number of instances).
- Create new execution *queues* in the **trains-server**.
- Define mapping between the created the *queues* and an instance budget.

**TL;DR - This notebook:**
- Will spin instances if there are jobs in the execution *queues*, until it will hit the budget limit. 
- If machines are idle, it will spin them down.

The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.

It is as simple as it sounds, but extremely powerful

Enjoy your newly created dynamic cluster :)

##### Install & import required packages

In [None]:
!pip install trains-agent
!pip install boto3

##### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)

In [3]:
# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3
RESOURCE_CONFIGURATIONS = {
 "amazon_ec2_normal": {
 "instance_type": "g4dn.4xlarge",
 "is_spot": False,
 "availability_zone": "us-east-1b",
 "ami_id": "ami-07c95cafbb788face",
 "ebs_device_name": "/dev/xvda",
 "ebs_volume_size": 100,
 "ebs_volume_type": "gp2",
 },
 "amazon_ec2_high": {
 "instance_type": "g4dn.8xlarge",
 "is_spot": False,
 "availability_zone": "us-east-1b",
 "ami_id": "ami-07c95cafbb788face",
 "ebs_device_name": "/dev/xvda",
 "ebs_volume_size": 100,
 "ebs_volume_type": "gp2",
 },
}


##### Define machine budget per execution queue

Now that we defined our budget, we need to connect it with the **Trains** cluster.

We map each queue to a resource type (instance type).

Create two queues in the WebUI:
- Browse to http://your_trains_server_ip:8080/workers-and-queues/queues
- Then click on the "New Queue" button and name your queues "aws_normal" and "aws_high" respectively

The QUEUES dictionary hold the mapping between the queue name and the type/number of instances to spin connected to the specific queue.
```
QUEUES = {
 'queue_name': [("instance-type-as-defined-in-RESOURCE_CONFIGURATIONS", max_number_of_instances), ]
}
```


In [4]:
# Trains-Agent Queues - Machines budget per Queue
# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS,
# max instances for the specific queue). Order machines from most preferred to least.
QUEUES = {
 "aws_normal": [("amazon_ec2_normal", 2),],
 "aws_high": [("amazon_ec2_high", 1)],
}

##### Credentials for your AWS account, as well as for your **Trains-Server**

In [None]:
# AWS credentials (leave empty to use credentials set using the aws cli)
CLOUD_CREDENTIALS_KEY = ""
CLOUD_CREDENTIALS_SECRET = ""
CLOUD_CREDENTIALS_REGION = "us-east-1"

# TRAINS configuration
TRAINS_SERVER_WEB_SERVER = "http://localhost:8080"
TRAINS_SERVER_API_SERVER = "http://localhost:8008"
TRAINS_SERVER_FILES_SERVER = "http://localhost:8081"
# TRAINS credentials
TRAINS_ACCESS_KEY = ""
TRAINS_SECRET_KEY = ""
# Git User/Pass to be used by trains-agent,
# leave empty if image already contains git ssh-key
TRAINS_GIT_USER = ""
TRAINS_GIT_PASS = ""

# Additional fields for trains.conf file created on the remote instance
# for example: 'agent.default_docker.image: "nvidia/cuda:10.0-cudnn7-runtime"'
EXTRA_TRAINS_CONF = """
"""

# Bash script to run on instances before running trains-agent
# Example: """
# echo "This is the first line"
# echo "This is the second line"
# """
EXTRA_BASH_SCRIPT = """
"""

# Default docker for trains-agent when running in docker mode (requires docker v19.03 and above). 
# Leave empty to run trains-agent in non-docker mode.
DEFAULT_DOCKER_IMAGE = "nvidia/cuda"

In [None]:
# Controller Internal Definitions

# maximum idle time in minutes, after which the instance will be shutdown
MAX_IDLE_TIME_MIN = 15
# polling interval in minutes
# make sure to increase in case bash commands were added in EXTRA_BASH_SCRIPT
POLLING_INTERVAL_MIN = 5.0

##### Import Packages and Budget Definition Sanity Check

In [None]:
import base64
import re
import os
from itertools import chain
from operator import itemgetter
from time import sleep, time

import boto3
from trains_agent.backend_api.session.client import APIClient

In [None]:
# Sanity Check - Validate Queue Resources
if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(
 map(len, QUEUES.values())
):
 print(
 "Error: at least one resource name is used in multiple queues. "
 "A resource name can only appear in a single queue definition."
 )

# Encode EXTRA_TRAINS_CONF for later bash script usage
EXTRA_TRAINS_CONF_ENCODED = "\\\"".join(EXTRA_TRAINS_CONF.split("\""))

##### Cloud specific implementation of spin up/down - currently supports AWS only

In [None]:
# Cloud-specific implementation (currently, only AWS EC2 is supported)
def spin_up_worker(resource, worker_id_prefix, queue_name):
 """
 Creates a new worker for trains.
 First, create an instance in the cloud and install some required packages.
 Then, define trains-agent environment variables and run 
 trains-agent for the specified queue.
 NOTE: - Will wait until instance is running
 - This implementation assumes the instance image already has docker installed

 :param str resource: resource name, as defined in BUDGET and QUEUES.
 :param str worker_id_prefix: worker name prefix
 :param str queue_name: trains queue to listen to
 """
 resource_conf = RESOURCE_CONFIGURATIONS[resource]
 # Add worker type and AWS instance type to the worker name.
 worker_id = "{worker_id_prefix}:{worker_type}:{instance_type}".format(
 worker_id_prefix=worker_id_prefix,
 worker_type=resource,
 instance_type=resource_conf["instance_type"],
 )

 # user_data script will automatically run when the instance is started. 
 # It will install the required packages for trains-agent configure it using 
 # environment variables and run trains-agent on the required queue
 user_data = """#!/bin/bash 
 sudo apt-get update
 sudo apt-get install -y python3-dev
 sudo apt-get install -y python3-pip
 sudo apt-get install -y gcc
 sudo apt-get install -y git
 sudo python3 -m pip install screen
 sudo python3 -m pip install trains-agent
 echo 'agent.git_user=\"{git_user}\"' >> /root/trains.conf
 echo 'agent.git_pass=\"{git_pass}\"' >> /root/trains.conf
 echo {trains_conf} >> /root/trains.conf
 export TRAINS_API_HOST={api_server}
 export TRAINS_WEB_HOST={web_server}
 export TRAINS_FILES_HOST={files_server}
 export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`
 export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID
 export TRAINS_API_ACCESS_KEY='{access_key}'
 export TRAINS_API_SECRET_KEY='{secret_key}'
 screen
 {bash_script}
 python3 -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}
 shutdown
 """.format(
 api_server=TRAINS_SERVER_API_SERVER,
 web_server=TRAINS_SERVER_WEB_SERVER,
 files_server=TRAINS_SERVER_FILES_SERVER,
 worker_id=worker_id,
 access_key=TRAINS_ACCESS_KEY,
 secret_key=TRAINS_SECRET_KEY,
 queue=queue_name,
 git_user=TRAINS_GIT_USER,
 git_pass=TRAINS_GIT_PASS,
 trains_conf=EXTRA_TRAINS_CONF_ENCODED,
 bash_script=EXTRA_BASH_SCRIPT,
 docker="--docker '{}'".format(DEFAULT_DOCKER_IMAGE) if DEFAULT_DOCKER_IMAGE else ""
 )

 ec2 = boto3.client(
 "ec2",
 aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,
 aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,
 region_name=CLOUD_CREDENTIALS_REGION
 )

 if resource_conf["is_spot"]:
 # Create a request for a spot instance in AWS
 encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode("ascii")
 instances = ec2.request_spot_instances(
 LaunchSpecification={
 "ImageId": resource_conf["ami_id"],
 "InstanceType": resource_conf["instance_type"],
 "Placement": {"AvailabilityZone": resource_conf["availability_zone"]},
 "UserData": encoded_user_data,
 "BlockDeviceMappings": [
 {
 "DeviceName": resource_conf["ebs_device_name"],
 "Ebs": {
 "VolumeSize": resource_conf["ebs_volume_size"],
 "VolumeType": resource_conf["ebs_volume_type"],
 },
 }
 ],
 }
 )

 # Wait until spot request is fulfilled
 request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"]
 waiter = ec2.get_waiter("spot_instance_request_fulfilled")
 waiter.wait(SpotInstanceRequestIds=[request_id])
 # Get the instance object for later use
 response = ec2.describe_spot_instance_requests(
 SpotInstanceRequestIds=[request_id]
 )
 instance_id = response["SpotInstanceRequests"][0]["InstanceId"]

 else:
 # Create a new EC2 instance
 instances = ec2.run_instances(
 ImageId=resource_conf["ami_id"],
 MinCount=1,
 MaxCount=1,
 InstanceType=resource_conf["instance_type"],
 UserData=user_data,
 InstanceInitiatedShutdownBehavior='terminate',
 BlockDeviceMappings=[
 {
 "DeviceName": resource_conf["ebs_device_name"],
 "Ebs": {
 "VolumeSize": resource_conf["ebs_volume_size"],
 "VolumeType": resource_conf["ebs_volume_type"],
 },
 }
 ],
 )

 # Get the instance object for later use
 instance_id = instances["Instances"][0]["InstanceId"]

 instance = boto3.resource(
 "ec2",
 aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,
 aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,
 region_name=CLOUD_CREDENTIALS_REGION
 ).Instance(instance_id)

 # Wait until instance is in running state
 instance.wait_until_running()


# Cloud-specific implementation (currently, only AWS EC2 is supported)
def spin_down_worker(instance_id):
 """
 Destroys the cloud instance.

 :param str instance_id: Cloud instance ID to be destroyed 
 (currently, only AWS EC2 is supported)
 """
 try:
 boto3.resource(
 "ec2",
 aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,
 aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,
 region_name=CLOUD_CREDENTIALS_REGION
 ).instances.filter(InstanceIds=[instance_id]).terminate()
 except Exception as ex:
 raise ex

###### Controller Implementation and Logic

In [None]:
def supervisor():
 """
 Spin up or down resources as necessary.
 - For every queue in QUEUES do the following:
 1. Check if there are tasks waiting in the queue.
 2. Check if there are enough idle workers available for those tasks.
 3. In case more instances are required, and we haven't reached max instances allowed,
 create the required instances with regards to the maximum number defined in QUEUES
 Choose which instance to create according to their order QUEUES. Won't create 
 more instances if maximum number defined has already reached.
 - spin down instances according to their idle time. instance which is idle for 
 more than MAX_IDLE_TIME_MIN minutes would be removed.
 """

 # Internal definitions
 workers_prefix = "dynamic_aws"
 # Worker's id in trains would be composed from:
 # prefix, name, instance_type and cloud_id separated by ';'
 workers_pattern = re.compile(
 r"^(?P[^:]+):(?P[^:]+):(?P[^:]+):(?P[^:]+)"
 )

 # Set up the environment variables for trains
 os.environ["TRAINS_API_HOST"] = TRAINS_SERVER_API_SERVER
 os.environ["TRAINS_WEB_HOST"] = TRAINS_SERVER_WEB_SERVER
 os.environ["TRAINS_FILES_HOST"] = TRAINS_SERVER_FILES_SERVER
 os.environ["TRAINS_API_ACCESS_KEY"] = TRAINS_ACCESS_KEY
 os.environ["TRAINS_API_SECRET_KEY"] = TRAINS_SECRET_KEY
 api_client = APIClient()

 idle_workers = {}
 while True:
 queue_name_to_id = {
 queue.name: queue.id for queue in api_client.queues.get_all()
 }
 resource_to_queue = {
 item[0]: queue
 for queue, resources in QUEUES.items()
 for item in resources
 }
 all_workers = [
 worker
 for worker in api_client.workers.get_all()
 if workers_pattern.match(worker.id)
 and workers_pattern.match(worker.id)["prefix"] == workers_prefix
 ]

 # Workers without a task, are added to the idle list
 for worker in all_workers:
 if not hasattr(worker, "task") or not worker.task:
 if worker.id not in idle_workers:
 resource_name = workers_pattern.match(worker.id)["instance_type"]
 idle_workers[worker.id] = (time(), resource_name, worker)
 elif hasattr(worker, "task") and worker.task and worker.id in idle_workers:
 idle_workers.pop(worker.id, None)

 required_idle_resources = [] # idle resources we'll need to keep running
 allocate_new_resources = [] # resources that will need to be started
 # Check if we have tasks waiting on one of the designated queues
 for queue in QUEUES:
 entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries
 if entries and len(entries) > 0:
 queue_resources = QUEUES[queue]

 # If we have an idle worker matching the required resource,
 # remove it from the required allocation resources
 free_queue_resources = [
 resource
 for _, resource, _ in idle_workers.values()
 if resource in queue_resources
 ]
 required_idle_resources.extend(free_queue_resources)
 spin_up_count = len(entries) - len(free_queue_resources)
 spin_up_resources = []

 # Add as many resources as possible to handle this queue's entries
 for resource, max_instances in queue_resources:
 if len(spin_up_resources) >= spin_up_count:
 break
 max_allowed = max_instances - len(
 [
 worker
 for worker in all_workers
 if workers_pattern.match(worker.id)["name"] == resource
 ]
 )
 spin_up_resources.extend(
 [resource] * min(max_allowed, spin_up_count)
 )
 allocate_new_resources.extend(spin_up_resources)

 # Now we actually spin the new machines
 for resource in allocate_new_resources:
 spin_up_worker(resource, workers_prefix, resource_to_queue[resource])

 # Go over the idle workers list, and spin down idle workers
 for timestamp, resources, worker in idle_workers.values():
 # skip resource types that might be needed
 if resources in required_idle_resources:
 continue
 # Remove from both aws and trains all instances that are 
 # idle for longer than MAX_IDLE_TIME_MIN
 if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:
 cloud_id = workers_pattern.match(worker.id)["cloud_id"]
 spin_down_worker(cloud_id)
 worker.unregister()

 # Nothing else to do
 sleep(POLLING_INTERVAL_MIN * 60.0)

##### Execute Forever* (the controller is stateless, so you can always re-execute the notebook)

In [None]:
# Loop forever, it is okay we are stateless
while True:
 try:
 supervisor()
 except Exception as ex:
 print("Warning! exception occurred: {ex}\nRetry in 15 seconds".format(ex=ex))
 sleep(15)