From 8d7740ea6832c41b3e0ddec1317b9e2408e47c14 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 11 Jul 2020 01:32:51 +0300 Subject: [PATCH] Add AWS EC2 Auto-Scaler service example --- .../services/aws-autoscaler/aws_autoscaler.py | 260 ++++++++++++++++++ .../services/aws-autoscaler/requirements.txt | 4 + trains/automation/auto_scaler.py | 244 ++++++++++++++++ trains/automation/aws_auto_scaler.py | 185 +++++++++++++ 4 files changed, 693 insertions(+) create mode 100644 examples/services/aws-autoscaler/aws_autoscaler.py create mode 100644 examples/services/aws-autoscaler/requirements.txt create mode 100644 trains/automation/auto_scaler.py create mode 100644 trains/automation/aws_auto_scaler.py diff --git a/examples/services/aws-autoscaler/aws_autoscaler.py b/examples/services/aws-autoscaler/aws_autoscaler.py new file mode 100644 index 00000000..24de6c4d --- /dev/null +++ b/examples/services/aws-autoscaler/aws_autoscaler.py @@ -0,0 +1,260 @@ +import distutils +from argparse import ArgumentParser +from collections import defaultdict +from pathlib import Path +from typing import Optional, Tuple + +import yaml +from six.moves import input + +from trains import Task +from trains.automation.aws_auto_scaler import AwsAutoScaler +from trains.config import running_remotely + +CONF_FILE = "aws_autoscaler.yaml" +DEFAULT_DOCKER_IMAGE = "nvidia/cuda" + + +def main(): + parser = ArgumentParser() + parser.add_argument( + "--run", + help="Run the autoscaler after wizard finished", + action="store_true", + default=False, + ) + args = parser.parse_args() + + if running_remotely(): + hyper_params = AwsAutoScaler.Settings().as_dict() + configurations = AwsAutoScaler.Configuration().as_dict() + else: + print("AWS Autoscaler setup\n") + + config_file = Path(CONF_FILE).absolute() + if config_file.exists() and input_bool( + "Load configurations from config file '{}' [Y/n]? ".format(str(CONF_FILE)), + default=True, + ): + with config_file.open("r") as f: + conf = yaml.load(f, Loader=yaml.SafeLoader) + hyper_params = conf["hyper_params"] + configurations = conf["configurations"] + else: + configurations, hyper_params = run_wizard() + + try: + with config_file.open("w+") as f: + conf = { + "hyper_params": hyper_params, + "configurations": configurations, + } + yaml.safe_dump(conf, f) + except Exception: + print( + "Error! Could not write configuration file at: {}".format( + str(CONF_FILE) + ) + ) + return + + task = Task.init(project_name="Auto-Scaler", task_name="AWS Auto-Scaler") + task.connect(hyper_params) + task.connect_configuration(configurations) + + autoscaler = AwsAutoScaler(hyper_params, configurations) + + if running_remotely() or args.run: + autoscaler.start() + + +def run_wizard(): + # type: () -> Tuple[dict, dict] + + hyper_params = AwsAutoScaler.Settings() + configurations = AwsAutoScaler.Configuration() + + hyper_params.cloud_credentials_key = get_input("AWS Access Key ID", required=True) + hyper_params.cloud_credentials_secret = get_input( + "AWS Secret Access Key", required=True + ) + hyper_params.cloud_credentials_region = get_input("AWS region name", required=True) + # get GIT User/Pass for cloning + print( + "\nGIT credentials:" + "\nEnter GIT username for repository cloning (leave blank for SSH key authentication): [] ", + end="", + ) + git_user = input() + if git_user.strip(): + print("Enter password for user '{}': ".format(git_user), end="") + git_pass = input() + print( + "Git repository cloning will be using user={} password={}".format( + git_user, git_pass + ) + ) + else: + git_user = None + git_pass = None + + hyper_params.git_user = git_user + hyper_params.git_pass = git_pass + + hyper_params.default_docker_image = get_input( + "default docker image/parameters", + "to use [default is {}]".format(DEFAULT_DOCKER_IMAGE), + default=DEFAULT_DOCKER_IMAGE, + new_line=True, + ) + print("\nDefine the type of machines you want the autoscaler to use") + resource_configurations = {} + while True: + resource_name = get_input( + "machine type name", + "(remember it, we will later use it in the budget section)", + required=True, + new_line=True, + ) + resource_configurations[resource_name] = { + "instance_type": get_input( + "instance type", + "for resource '{}' [default is 'g4dn.4xlarge']".format(resource_name), + default="g4dn.4xlarge", + ), + "is_spot": input_bool( + "is '{}' resource using spot instances? [t/F]".format(resource_name) + ), + "availability_zone": get_input( + "availability zone", + "for resource '{}' [default is 'us-east-1b']".format(resource_name), + default="us-east-1b", + ), + "ami_id": get_input( + "ami_id", + "for resource '{}' [default is 'ami-07c95cafbb788face']".format( + resource_name + ), + default="ami-07c95cafbb788face", + ), + "ebs_device_name": get_input( + "ebs_device_name", + "for resource '{}' [default is '/dev/xvda']".format(resource_name), + default="/dev/xvda", + ), + "ebs_volume_size": input_int( + "ebs_volume_size", + " for resource '{}' [default is '100']".format(resource_name), + default=100, + ), + "ebs_volume_type": get_input( + "ebs_volume_type", + "for resource '{}' [default is 'gp2']".format(resource_name), + default="gp2", + ), + } + if not input_bool("\nDefine another resource? [y/N]"): + break + configurations.resource_configurations = resource_configurations + + configurations.extra_vm_bash_script = input( + "\nEnter any pre-execution bash script to be executed on the newly created instances: " + ) + + print("\nSet up the budget\n") + queues = defaultdict(list) + while True: + queue_name = get_input("queue name", required=True) + while True: + queue_type = get_input( + "queue type", + "(use the resources names defined earlier)", + required=True, + ) + max_instances = input_int( + "maximum number of instances allowed", required=True + ) + queues[queue_name].append((queue_type, max_instances)) + + if not input_bool("\nAdd another type to queue? [y/N]: "): + break + if not input_bool("Define another queue? [y/N]: "): + break + configurations.queues = dict(queues) + + hyper_params.max_idle_time_min = input_int( + "maximum idle time", + "for the autoscaler (in minutes, default is 15)", + default=15, + new_line=True, + ) + hyper_params.polling_interval_time_min = input_int( + "polling interval", "for the autoscaler (in minutes, default is 5)", default=5, + ) + + return configurations.as_dict(), hyper_params.as_dict() + + +def get_input( + key, # type: str + description="", # type: str + question="Enter", # type: str + required=False, # type: bool + default=None, # type: Optional[str] + new_line=False, # type: bool +): + # type: (...) -> Optional[str] + if new_line: + print() + while True: + value = input("{} {} {}: ".format(question, key, description)) + if not value.strip() and required: + print("{} is required".format(key)) + elif not (value.strip() or required): + return default + else: + return value + + +def input_int( + key, # type: str + description="", # type: str + required=False, # type: bool + default=None, # type: Optional[int] + new_line=False, # type: bool +): + # type: (...) -> Optional[int] + while True: + try: + value = int( + get_input( + key, + description, + required=required, + default=default, + new_line=new_line, + ) + ) + return value + except ValueError: + print( + "Invalid input: {} should be a number. Please enter an integer".format( + key + ) + ) + + +def input_bool(question, default=False): + # type: (str, bool) -> bool + while True: + try: + response = input("{}: ".format(question)).lower() + if not response: + return default + return distutils.util.strtobool(response) + except ValueError: + print("Invalid input: please enter yes or no") + + +if __name__ == "__main__": + main() diff --git a/examples/services/aws-autoscaler/requirements.txt b/examples/services/aws-autoscaler/requirements.txt new file mode 100644 index 00000000..d21f0080 --- /dev/null +++ b/examples/services/aws-autoscaler/requirements.txt @@ -0,0 +1,4 @@ +boto3 +pyYaml +six +trains \ No newline at end of file diff --git a/trains/automation/auto_scaler.py b/trains/automation/auto_scaler.py new file mode 100644 index 00000000..49bc3ec6 --- /dev/null +++ b/trains/automation/auto_scaler.py @@ -0,0 +1,244 @@ +import os +import re +from itertools import chain +from operator import itemgetter +from time import sleep, time +from typing import Union + +import attr +from attr.validators import instance_of + +from ..backend_api import Session +from ..backend_api.session.client import APIClient + + +class AutoScaler(object): + @attr.s + class Settings(object): + git_user = attr.ib(default=None) + git_pass = attr.ib(default=None) + cloud_credentials_key = attr.ib(default=None) + cloud_credentials_secret = attr.ib(default=None) + cloud_credentials_region = attr.ib(default=None) + default_docker_image = attr.ib(default="nvidia/cuda") + max_idle_time_min = attr.ib(validator=instance_of(int), default=15) + polling_interval_time_min = attr.ib(validator=instance_of(int), default=5) + workers_prefix = attr.ib(default="dynamic_worker") + cloud_provider = attr.ib(default="") + + def as_dict(self): + return attr.asdict(self) + + @attr.s + class Configuration(object): + resource_configurations = attr.ib(default=None) + queues = attr.ib(default=None) + extra_trains_conf = attr.ib(default="") + extra_vm_bash_script = attr.ib(default="") + + def as_dict(self): + return attr.asdict(self) + + def __init__(self, settings, configuration): + # type: (Union[dict, AutoScaler.Settings], Union[dict, AutoScaler.Configuration]) -> None + if isinstance(settings, dict): + settings = self.Settings(**settings) + if isinstance(configuration, dict): + configuration = self.Configuration(**configuration) + + self.web_server = Session.get_app_server_host() + self.api_server = Session.get_api_server_host() + self.files_server = Session.get_files_server_host() + + session = Session() + self.access_key = session.access_key + self.secret_key = session.secret_key + + self.git_user = settings.git_user + self.git_pass = settings.git_pass + self.cloud_credentials_key = settings.cloud_credentials_key + self.cloud_credentials_secret = settings.cloud_credentials_secret + self.cloud_credentials_region = settings.cloud_credentials_region + self.default_docker_image = settings.default_docker_image + + self.extra_trains_conf = configuration.extra_trains_conf + self.extra_vm_bash_script = configuration.extra_vm_bash_script + self.resource_configurations = configuration.resource_configurations + self.queues = configuration.queues + + if not self.sanity_check(): + return + + self.max_idle_time_min = int(settings.max_idle_time_min) + self.polling_interval_time_min = int(settings.polling_interval_time_min) + + self.workers_prefix = settings.workers_prefix + self.cloud_provider = settings.cloud_provider + + def sanity_check(self): + # Sanity check - Validate queue resources + if len(set(map(itemgetter(0), chain(*self.queues.values())))) != sum( + map(len, self.queues.values()) + ): + print( + "Error: at least one resource name is used in multiple queues. " + "A resource name can only appear in a single queue definition." + ) + return False + return True + + def start(self): + # Loop forever, it is okay we are stateless + while True: + try: + self.supervisor() + except Exception as ex: + print( + "Warning! exception occurred: {ex}\nRetry in 15 seconds".format( + ex=ex + ) + ) + sleep(15) + + def spin_up_worker(self, resource, worker_id_prefix, queue_name): + """ + Creates a new worker for trains (cloud-specific implementation). + First, create an instance in the cloud and install some required packages. + Then, define trains-agent environment variables and run trains-agent for the specified queue. + NOTE: - Will wait until instance is running + - This implementation assumes the instance image already has docker installed + + :param str resource: resource name, as defined in self.resource_configurations and self.queues. + :param str worker_id_prefix: worker name prefix + :param str queue_name: trains queue to listen to + """ + pass + + def spin_down_worker(self, instance_id): + """ + Destroys the cloud instance (cloud-specific implementation). + + :param instance_id: Cloud instance ID to be destroyed + :type instance_id: str + """ + pass + + def supervisor(self): + """ + Spin up or down resources as necessary. + - For every queue in self.queues do the following: + 1. Check if there are tasks waiting in the queue. + 2. Check if there are enough idle workers available for those tasks. + 3. In case more instances are required, and we haven't reached max instances allowed, + create the required instances with regards to the maximum number defined in self.queues + Choose which instance to create according to their order in self.queues. Won't create more instances + if maximum number defined has already reached. + - spin down instances according to their idle time. instance which is idle for more than self.max_idle_time_min + minutes would be removed. + """ + + # Worker's id in trains would be composed from prefix, name, instance_type and cloud_id separated by ';' + workers_pattern = re.compile( + r"^(?P[^:]+):(?P[^:]+):(?P[^:]+):(?P[^:]+)" + ) + + # Set up the environment variables for trains + os.environ["TRAINS_API_HOST"] = self.api_server + os.environ["TRAINS_WEB_HOST"] = self.web_server + os.environ["TRAINS_FILES_HOST"] = self.files_server + os.environ["TRAINS_API_ACCESS_KEY"] = self.access_key + os.environ["TRAINS_API_SECRET_KEY"] = self.secret_key + api_client = APIClient() + + # Verify the requested queues exist and create those that doesn't exist + all_queues = [q.name for q in list(api_client.queues.get_all())] + missing_queues = [q for q in self.queues if q not in all_queues] + for q in missing_queues: + api_client.queues.create(q) + + idle_workers = {} + while True: + queue_name_to_id = { + queue.name: queue.id for queue in api_client.queues.get_all() + } + resource_to_queue = { + item[0]: queue + for queue, resources in self.queues.items() + for item in resources + } + all_workers = [ + worker + for worker in api_client.workers.get_all() + if workers_pattern.match(worker.id) + and workers_pattern.match(worker.id)["prefix"] == self.workers_prefix + ] + + # Workers without a task, are added to the idle list + for worker in all_workers: + if not hasattr(worker, "task") or not worker.task: + if worker.id not in idle_workers: + resource_name = workers_pattern.match(worker.id)[ + "instance_type" + ] + idle_workers[worker.id] = (time(), resource_name, worker) + elif ( + hasattr(worker, "task") + and worker.task + and worker.id in idle_workers + ): + idle_workers.pop(worker.id, None) + + required_idle_resources = [] # idle resources we'll need to keep running + allocate_new_resources = [] # resources that will need to be started + # Check if we have tasks waiting on one of the designated queues + for queue in self.queues: + entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries + if entries and len(entries) > 0: + queue_resources = self.queues[queue] + + # If we have an idle worker matching the required resource, + # remove it from the required allocation resources + free_queue_resources = [ + resource + for _, resource, _ in idle_workers.values() + if resource in queue_resources + ] + required_idle_resources.extend(free_queue_resources) + spin_up_count = len(entries) - len(free_queue_resources) + spin_up_resources = [] + + # Add as many resources as possible to handle this queue's entries + for resource, max_instances in queue_resources: + if len(spin_up_resources) >= spin_up_count: + break + max_allowed = int(max_instances) - len( + [ + worker + for worker in all_workers + if workers_pattern.match(worker.id)["name"] == resource + ] + ) + spin_up_resources.extend( + [resource] * min(max_allowed, spin_up_count) + ) + allocate_new_resources.extend(spin_up_resources) + + # Now we actually spin the new machines + for resource in allocate_new_resources: + self.spin_up_worker( + resource, self.workers_prefix, resource_to_queue[resource] + ) + + # Go over the idle workers list, and spin down idle workers + for timestamp, resources, worker in idle_workers.values(): + # skip resource types that might be needed + if resources in required_idle_resources: + continue + # Remove from both aws and trains all instances that are idle for longer than MAX_IDLE_TIME_MIN + if time() - timestamp > self.max_idle_time_min * 60.0: + cloud_id = workers_pattern.match(worker.id)["cloud_id"] + self.spin_down_worker(cloud_id) + worker.unregister() + + # Nothing else to do + sleep(self.polling_interval_time_min * 60.0) diff --git a/trains/automation/aws_auto_scaler.py b/trains/automation/aws_auto_scaler.py new file mode 100644 index 00000000..4d1294fb --- /dev/null +++ b/trains/automation/aws_auto_scaler.py @@ -0,0 +1,185 @@ +import base64 +from typing import Union + +import attr + +from .auto_scaler import AutoScaler +from .. import Task + +try: + # noinspection PyPackageRequirements + import boto3 + + Task.add_requirements('boto3') +except ImportError: + raise ValueError("AwsAutoScaler requires 'boto3' package, it was not found\n" + "install with: pip install boto3") + + +class AwsAutoScaler(AutoScaler): + @attr.s + class Settings(AutoScaler.Settings): + workers_prefix = attr.ib(default="dynamic_aws") + cloud_provider = attr.ib(default="AWS") + + def __init__(self, settings, configuration): + # type: (Union[dict, AwsAutoScaler.Settings], Union[dict, AwsAutoScaler.Configuration]) -> None + super(AwsAutoScaler, self).__init__(settings, configuration) + + def spin_up_worker(self, resource, worker_id_prefix, queue_name): + """ + Creates a new worker for trains. + First, create an instance in the cloud and install some required packages. + Then, define trains-agent environment variables and run trains-agent for the specified queue. + NOTE: - Will wait until instance is running + - This implementation assumes the instance image already has docker installed + + :param str resource: resource name, as defined in BUDGET and QUEUES. + :param str worker_id_prefix: worker name prefix + :param str queue_name: trains queue to listen to + """ + resource_conf = self.resource_configurations[resource] + # Add worker type and AWS instance type to the worker name. + worker_id = "{worker_id_prefix}:{worker_type}:{instance_type}".format( + worker_id_prefix=worker_id_prefix, + worker_type=resource, + instance_type=resource_conf["instance_type"], + ) + + # user_data script will automatically run when the instance is started. it will install the required packages + # for trains-agent configure it using environment variables and run trains-agent on the required queue + user_data = """#!/bin/bash + sudo apt-get update + sudo apt-get install -y python3-dev + sudo apt-get install -y python3-pip + sudo apt-get install -y gcc + sudo apt-get install -y git + sudo apt-get install -y build-essential + python3 -m pip install -U pip + python3 -m pip install virtualenv + python3 -m virtualenv trains_agent_venv + source trains_agent_venv/bin/activate + python -m pip install trains-agent + echo 'agent.git_user=\"{git_user}\"' >> /root/trains.conf + echo 'agent.git_pass=\"{git_pass}\"' >> /root/trains.conf + echo "{trains_conf}" >> /root/trains.conf + export TRAINS_API_HOST={api_server} + export TRAINS_WEB_HOST={web_server} + export TRAINS_FILES_HOST={files_server} + export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id` + export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID + export TRAINS_API_ACCESS_KEY='{access_key}' + export TRAINS_API_SECRET_KEY='{secret_key}' + {bash_script} + source ~/.bashrc + python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker} + shutdown + """.format( + api_server=self.api_server, + web_server=self.web_server, + files_server=self.files_server, + worker_id=worker_id, + access_key=self.access_key, + secret_key=self.secret_key, + queue=queue_name, + git_user=self.git_user, + git_pass=self.git_pass, + trains_conf=self.extra_trains_conf, + bash_script=self.extra_vm_bash_script, + docker="--docker '{}'".format(self.default_docker_image) + if self.default_docker_image + else "", + ) + + ec2 = boto3.client( + "ec2", + aws_access_key_id=self.cloud_credentials_key or None, + aws_secret_access_key=self.cloud_credentials_secret or None, + region_name=self.cloud_credentials_region, + ) + + if resource_conf["is_spot"]: + # Create a request for a spot instance in AWS + encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode( + "ascii" + ) + instances = ec2.request_spot_instances( + LaunchSpecification={ + "ImageId": resource_conf["ami_id"], + "InstanceType": resource_conf["instance_type"], + "Placement": { + "AvailabilityZone": resource_conf["availability_zone"] + }, + "UserData": encoded_user_data, + "BlockDeviceMappings": [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "VolumeSize": resource_conf["ebs_volume_size"], + "VolumeType": resource_conf["ebs_volume_type"], + }, + } + ], + } + ) + + # Wait until spot request is fulfilled + request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"] + waiter = ec2.get_waiter("spot_instance_request_fulfilled") + waiter.wait(SpotInstanceRequestIds=[request_id]) + # Get the instance object for later use + response = ec2.describe_spot_instance_requests( + SpotInstanceRequestIds=[request_id] + ) + instance_id = response["SpotInstanceRequests"][0]["InstanceId"] + + else: + # Create a new EC2 instance + instances = ec2.run_instances( + ImageId=resource_conf["ami_id"], + MinCount=1, + MaxCount=1, + InstanceType=resource_conf["instance_type"], + UserData=user_data, + InstanceInitiatedShutdownBehavior="terminate", + BlockDeviceMappings=[ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "VolumeSize": resource_conf["ebs_volume_size"], + "VolumeType": resource_conf["ebs_volume_type"], + }, + } + ], + ) + + # Get the instance object for later use + instance_id = instances["Instances"][0]["InstanceId"] + + instance = boto3.resource( + "ec2", + aws_access_key_id=self.cloud_credentials_key or None, + aws_secret_access_key=self.cloud_credentials_secret or None, + region_name=self.cloud_credentials_region, + ).Instance(instance_id) + + # Wait until instance is in running state + instance.wait_until_running() + + # Cloud-specific implementation (currently, only AWS EC2 is supported) + def spin_down_worker(self, instance_id): + """ + Destroys the cloud instance. + + :param instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported) + :type instance_id: str + """ + try: + boto3.resource( + "ec2", + aws_access_key_id=self.cloud_credentials_key or None, + aws_secret_access_key=self.cloud_credentials_secret or None, + region_name=self.cloud_credentials_region, + ).instances.filter(InstanceIds=[instance_id]).terminate() + except Exception as ex: + raise ex