From 6a6cf111bec27a6dbbbd163ba09fd049e87edab1 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Wed, 17 Nov 2021 19:15:48 +0200 Subject: [PATCH] Refactor AutoScaler --- .flake8 | 4 + .gitignore | 1 + clearml/automation/auto_scaler.py | 445 +++++++++++------- clearml/automation/aws_auto_scaler.py | 195 -------- clearml/automation/aws_driver.py | 147 ++++++ clearml/automation/cloud_driver.py | 205 ++++++++ dev-requirements.txt | 6 + .../services/aws-autoscaler/aws_autoscaler.py | 121 +++-- 8 files changed, 702 insertions(+), 422 deletions(-) create mode 100644 .flake8 delete mode 100644 clearml/automation/aws_auto_scaler.py create mode 100644 clearml/automation/aws_driver.py create mode 100644 clearml/automation/cloud_driver.py create mode 100644 dev-requirements.txt diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..92dad0cb --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length=120 +statistics +extend-ignore=E501 diff --git a/.gitignore b/.gitignore index f22a5583..3e4d0098 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ __pycache__ build/ dist/ *.egg-info +.env # example data examples/runs/ diff --git a/clearml/automation/auto_scaler.py b/clearml/automation/auto_scaler.py index d0a36955..b229581a 100644 --- a/clearml/automation/auto_scaler.py +++ b/clearml/automation/auto_scaler.py @@ -1,91 +1,134 @@ -import os import re +from collections import defaultdict, deque +from enum import Enum from itertools import chain -from operator import itemgetter +from threading import Event from time import sleep, time -from typing import Union import attr from attr.validators import instance_of +from .cloud_driver import CloudDriver +from .. import Task from ..backend_api import Session +from ..backend_api.session import defs from ..backend_api.session.client import APIClient +from ..debugging import get_logger + +# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ":" +# Example: 'test:m1:g4dn.4xlarge:i-07cf7d6750455cb62' +# cloud_id might be missing + +_workers_pattern = re.compile( + r"""^ + (?P[^:]+): + (?P[^:]+): + (?P[^:]+) + (:(?P[^:/]+))? + $ + """, re.VERBOSE +) + + +class WorkerId: + def __init__(self, worker_id): + self.prefix = self.name = self.instance_type = self.cloud_id = "" + match = _workers_pattern.match(worker_id) + if not match: + raise ValueError("bad worker ID: {!r}".format(worker_id)) + + self.prefix = match["prefix"] + self.name = match["name"] + self.instance_type = match["instance_type"] + self.cloud_id = match["cloud_id"] or '' + + +class State(str, Enum): + STARTING = 'starting' + READY = 'ready' + RUNNING = 'running' + STOPPED = 'stopped' + + +@attr.s +class ScalerConfig: + max_idle_time_min = attr.ib(validator=instance_of(int), default=15) + polling_interval_time_min = attr.ib(validator=instance_of((float, int)), default=5) + max_spin_up_time_min = attr.ib(validator=instance_of(int), default=30) + workers_prefix = attr.ib(default="dynamic_worker") + resource_configurations = attr.ib(default=None) + queues = attr.ib(default=None) + + @classmethod + def from_config(cls, config): + return cls( + max_idle_time_min=config['hyper_params']['max_idle_time_min'], + polling_interval_time_min=config['hyper_params']['polling_interval_time_min'], + max_spin_up_time_min=config['hyper_params']['max_spin_up_time_min'], + workers_prefix=config['hyper_params']['workers_prefix'], + resource_configurations=config['configurations']['resource_configurations'], + queues=config['configurations']['queues'], + ) class AutoScaler(object): - @attr.s - class Settings(object): - git_user = attr.ib(default="") - git_pass = attr.ib(default="") - cloud_credentials_key = attr.ib(default="") - cloud_credentials_secret = attr.ib(default="") - cloud_credentials_region = attr.ib(default=None) - default_docker_image = attr.ib(default="nvidia/cuda") - max_idle_time_min = attr.ib(validator=instance_of(int), default=15) - polling_interval_time_min = attr.ib(validator=instance_of(int), default=5) - max_spin_up_time_min = attr.ib(validator=instance_of(int), default=30) - workers_prefix = attr.ib(default="dynamic_worker") - cloud_provider = attr.ib(default="") + def __init__(self, config, driver: CloudDriver, logger=None): + self.logger = logger or get_logger('auto_scaler') + # Should be after we create logger + self.state = State.STARTING - def as_dict(self): - return attr.asdict(self) + self.driver = driver + self.logger.info('using %s driver', self.driver.kind()) + self.driver.set_scaler(self) - @attr.s - class Configuration(object): - resource_configurations = attr.ib(default=None) - queues = attr.ib(default=None) - extra_trains_conf = attr.ib(default="") # Backwards compatibility - extra_clearml_conf = attr.ib(default="") - extra_vm_bash_script = attr.ib(default="") - - def as_dict(self): - return attr.asdict(self) - - def __init__(self, settings, configuration): - # type: (Union[dict, AutoScaler.Settings], Union[dict, AutoScaler.Configuration]) -> None - if isinstance(settings, dict): - settings = self.Settings(**settings) - if isinstance(configuration, dict): - configuration = self.Configuration(**configuration) - - self.web_server = Session.get_app_server_host() - self.api_server = Session.get_api_server_host() - self.files_server = Session.get_files_server_host() - - session = Session() - self.access_key = session.access_key - self.secret_key = session.secret_key - - self.git_user = settings.git_user - self.git_pass = settings.git_pass - self.cloud_credentials_key = settings.cloud_credentials_key - self.cloud_credentials_secret = settings.cloud_credentials_secret - self.cloud_credentials_region = settings.cloud_credentials_region - self.default_docker_image = settings.default_docker_image - - self.extra_clearml_conf = configuration.extra_clearml_conf or configuration.extra_trains_conf - self.extra_vm_bash_script = configuration.extra_vm_bash_script - self.resource_configurations = configuration.resource_configurations - self.queues = configuration.queues + self.resource_configurations = config.resource_configurations + self.queues = config.queues # queue name -> list of resources + self.resource_to_queue = { + item[0]: queue + for queue, resources in self.queues.items() + for item in resources + } if not self.sanity_check(): - return + raise ValueError('health check failed') - self.max_idle_time_min = float(settings.max_idle_time_min) - self.polling_interval_time_min = float(settings.polling_interval_time_min) - self.max_spin_up_time_min = float(settings.max_spin_up_time_min) + self.max_idle_time_min = float(config.max_idle_time_min) + self.polling_interval_time_min = float(config.polling_interval_time_min) + self.max_spin_up_time_min = float(config.max_spin_up_time_min) # make sure we have our own unique prefix, in case we have multiple dynamic auto-scalers # they will mix each others instances - self.workers_prefix = settings.workers_prefix - self.cloud_provider = settings.cloud_provider + self.workers_prefix = config.workers_prefix + + session = Session() + self.set_auth(session) + + # Set up the environment variables for clearml + defs.ENV_HOST.set(session.get_api_server_host()) + defs.ENV_WEB_HOST.set(session.get_app_server_host()) + defs.ENV_FILES_HOST.set(session.get_files_server_host()) + defs.ENV_ACCESS_KEY.set(session.access_key) + defs.ENV_SECRET_KEY.set(session.secret_key) + if self.auth_token: + defs.ENV_AUTH_TOKEN.set(self.auth_token) + + self.api_client = APIClient() + self._stop_event = Event() + self.state = State.READY + + def set_auth(self, session): + if session.access_key and session.secret_key: + self.access_key = session.access_key + self.secret_key = session.secret_key + self.auth_token = None + return + + self.access_key = self.secret_key = None + self.auth_token = defs.ENV_AUTH_TOKEN.get(default=None) def sanity_check(self): - # Sanity check - Validate queue resources - if len(set(map(itemgetter(0), chain(*self.queues.values())))) != sum( - map(len, self.queues.values()) - ): - print( + if has_duplicate_resource(self.queues): + self.logger.error( "Error: at least one resource name is used in multiple queues. " "A resource name can only appear in a single queue definition." ) @@ -93,42 +136,65 @@ class AutoScaler(object): return True def start(self): - # Loop forever, it is okay we are stateless - while True: + self.state = State.RUNNING + # Loop until stopped, it is okay we are stateless + while self._running(): try: self.supervisor() except Exception as ex: - print( - "Warning! exception occurred: {ex}\nRetry in 15 seconds".format( - ex=ex - ) - ) + self.logger.exception('Error: %r, retrying in 15 seconds', ex) sleep(15) - def spin_up_worker(self, resource, worker_id_prefix, queue_name): - """ - Creates a new worker for clearml (cloud-specific implementation). - First, create an instance in the cloud and install some required packages. - Then, define clearml-agent environment variables and run clearml-agent for the specified queue. - NOTE: - Will wait until instance is running - - This implementation assumes the instance image already has docker installed + def stop(self): + self.logger.info('stopping') + self._stop_event.set() + self.state = State.STOPPED - :param str resource: resource name, as defined in self.resource_configurations and self.queues. - :param str worker_id_prefix: worker name prefix - :param str queue_name: clearml queue to listen to + def ensure_queues(self): + # Verify the requested queues exist and create those that doesn't exist + all_queues = {q.name for q in list(self.api_client.queues.get_all(only_fields=['name']))} + missing_queues = set(self.queues) - all_queues + for q in missing_queues: + self.logger.info("Creating queue %r", q) + self.api_client.queues.create(q) - :return str: worker_id prefix to identify when spin was successful - """ - pass + def queue_mapping(self): + id_to_name = {} + name_to_id = {} + for queue in self.api_client.queues.get_all(only_fields=['id', 'name']): + id_to_name[queue.id] = queue.name + name_to_id[queue.name] = queue.id - def spin_down_worker(self, instance_id): - """ - Destroys the cloud instance (cloud-specific implementation). + return id_to_name, name_to_id - :param instance_id: Cloud instance ID to be destroyed - :type instance_id: str - """ - pass + def get_workers(self): + workers = [] + for worker in self.api_client.workers.get_all(): + try: + wid = WorkerId(worker.id) + if wid.prefix == self.workers_prefix: + workers.append(worker) + except ValueError: + self.logger.info('ignoring unknown worker: %r', worker.id) + return workers + + def stale_workers(self, spun_workers): + now = time() + for worker_id, (resource, spin_time) in list(spun_workers.items()): + if now - spin_time > self.max_idle_time_min*60: + self.logger.info('Stuck spun instance %s of type %s', worker_id, resource) + yield worker_id + + def extra_allocations(self): + """Hook for subclass to use""" + return [] + + def gen_worker_prefix(self, resource, resource_conf): + return '{workers_prefix}:{worker_type}:{instance_type}'.format( + workers_prefix=self.workers_prefix, + worker_type=resource, + instance_type=resource_conf["instance_type"], + ) def supervisor(self): """ @@ -143,86 +209,41 @@ class AutoScaler(object): - spin down instances according to their idle time. instance which is idle for more than self.max_idle_time_min minutes would be removed. """ - - # Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ':' - workers_pattern = re.compile( - r"^(?P[^:]+):(?P[^:]+):(?P[^:]+):(?P[^:]+)" - ) - - # Set up the environment variables for clearml - os.environ["CLEARML_API_HOST"] = self.api_server - os.environ["CLEARML_WEB_HOST"] = self.web_server - os.environ["CLEARML_FILES_HOST"] = self.files_server - os.environ["CLEARML_API_ACCESS_KEY"] = self.access_key - os.environ["CLEARML_API_SECRET_KEY"] = self.secret_key - api_client = APIClient() - - # Verify the requested queues exist and create those that doesn't exist - all_queues = [q.name for q in list(api_client.queues.get_all(only_fields=['name']))] - missing_queues = [q for q in self.queues if q not in all_queues] - for q in missing_queues: - api_client.queues.create(q) + self.ensure_queues() idle_workers = {} - # a dict of resource_names and lists of time_stamps of instances that were just spun - # example - # spun_workers['resource_type'] = [time()] - spun_workers = {} + # Workers that we spun but have not yet reported back to the API + spun_workers = {} # worker_id -> (resource type, spin time) previous_workers = set() - while True: - queue_name_to_id = { - queue.name: queue.id for queue in api_client.queues.get_all(only_fields=['id', 'name']) - } - resource_to_queue = { - item[0]: queue - for queue, resources in self.queues.items() - for item in resources - } - all_workers = [ - worker - for worker in api_client.workers.get_all() - if workers_pattern.match(worker.id) - and workers_pattern.match(worker.id)["prefix"] == self.workers_prefix - ] + unknown_workers = deque(maxlen=256) + task_logger = get_task_logger() + up_machines = defaultdict(int) + + while self._running(): + queue_id_to_name, queue_name_to_id = self.queue_mapping() + all_workers = self.get_workers() + # update spun_workers (remove instances that are fully registered) for worker in all_workers: if worker.id not in previous_workers: - # look for the spun instance and remove it - resource_name = workers_pattern.match(worker.id)[ - "name" - ] - spun_workers[resource_name] = spun_workers.get(resource_name, [])[1:] - # remove old spun workers based on time out: - for resource in spun_workers.keys(): - time_stamp_list = [ - t for t in spun_workers[resource] if time() - t < self.max_spin_up_time_min*60.] - deleted = len(spun_workers[resource]) - len(time_stamp_list) - if deleted: - print('Ignoring {} stuck instances of type {}'.format(deleted, resource)) + if not spun_workers.pop(worker.id, None): + if worker.id not in unknown_workers: + self.logger.info('Removed unknown worker from spun_workers: %s', worker.id) + unknown_workers.append(worker.id) + else: + previous_workers.add(worker.id) - # Workers without a task, are added to the idle list - if not all_workers: - idle_workers = {} - else: - for worker in all_workers: - if not hasattr(worker, "task") or not worker.task: - if worker.id not in idle_workers: - resource_name = workers_pattern.match(worker.id)[ - "instance_type" - ] - idle_workers[worker.id] = (time(), resource_name, worker) - elif ( - hasattr(worker, "task") - and worker.task - and worker.id in idle_workers - ): - idle_workers.pop(worker.id, None) + for worker_id in self.stale_workers(spun_workers): + del spun_workers[worker_id] + self.update_idle_workers(all_workers, idle_workers) required_idle_resources = [] # idle resources we'll need to keep running - allocate_new_resources = [] # resources that will need to be started + allocate_new_resources = self.extra_allocations() + # Check if we have tasks waiting on one of the designated queues for queue in self.queues: - entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries + entries = self.api_client.queues.get_by_id(queue_name_to_id[queue]).entries + self.logger.info("Found %d tasks in queue %r", len(entries), queue) if entries and len(entries) > 0: queue_resources = self.queues[queue] @@ -235,9 +256,9 @@ class AutoScaler(object): ] # if we have an instance waiting to be spun # remove it from the required allocation resources - for resource, time_stamps_list in spun_workers.items(): - if time_stamps_list and any(q_r for q_r in queue_resources if resource in q_r[0]): - free_queue_resources += [resource] * len(time_stamps_list) + for resource, _ in spun_workers.values(): + if resource in [qr[0] for qr in queue_resources]: + free_queue_resources.append(resource) required_idle_resources.extend(free_queue_resources) spin_up_count = len(entries) - len(free_queue_resources) @@ -249,25 +270,36 @@ class AutoScaler(object): break # check if we can add instances to `resource` currently_running_workers = len( - [worker for worker in all_workers if workers_pattern.match(worker.id)["name"] == resource]) - spun_up_workers = len(spun_workers.get(resource, [])) + [worker for worker in all_workers if WorkerId(worker.id).name == resource]) + spun_up_workers = sum(1 for r, _ in spun_workers.values() if r == resource) max_allowed = int(max_instances) - currently_running_workers - spun_up_workers if max_allowed > 0: spin_up_resources.extend( - [resource] * spin_up_count + [resource] * min(spin_up_count, max_allowed) ) allocate_new_resources.extend(spin_up_resources) # Now we actually spin the new machines for resource in allocate_new_resources: + task_id = None try: - print('Spinning new instance type={}'.format(resource)) - self.spin_up_worker( - resource, self.workers_prefix, resource_to_queue[resource] - ) - spun_workers[resource] = spun_workers.get(resource, []) + [time()] + if isinstance(resource, tuple): + worker_id, task_id = resource + resource = WorkerId(worker_id).name + + queue = self.resource_to_queue[resource] + self.logger.info( + 'Spinning new instance resource=%r, prefix=%r, queue=%r, task_id=%r', + resource, self.workers_prefix, queue, task_id) + resource_conf = self.resource_configurations[resource] + worker_prefix = self.gen_worker_prefix(resource, resource_conf) + instance_id = self.driver.spin_up_worker(resource_conf, worker_prefix, queue, task_id=task_id) + worker_id = '{}:{}'.format(worker_prefix, instance_id) + self.logger.info('New instance ID: %s', instance_id) + spun_workers[worker_id] = (resource, time()) + up_machines[resource] += 1 except Exception as ex: - print(f"Error: Failed to start new instance, {ex}") + self.logger.exception("Failed to start new instance (resource %r), Error: %s", resource, ex) # Go over the idle workers list, and spin down idle workers for worker_id in list(idle_workers): @@ -275,12 +307,69 @@ class AutoScaler(object): # skip resource types that might be needed if resources in required_idle_resources: continue - # Remove from both aws and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN + # Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN if time() - timestamp > self.max_idle_time_min * 60.0: - cloud_id = workers_pattern.match(worker_id)["cloud_id"] - self.spin_down_worker(cloud_id) - print(f"Spin down instance cloud id {cloud_id}") + wid = WorkerId(worker_id) + cloud_id = wid.cloud_id + self.driver.spin_down_worker(cloud_id) + up_machines[wid.name] -= 1 + self.logger.info("Spin down instance cloud id %r", cloud_id) idle_workers.pop(worker_id, None) + if task_logger: + self.report_app_stats(task_logger, queue_id_to_name, up_machines, idle_workers) + # Nothing else to do + self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * 60.0) sleep(self.polling_interval_time_min * 60.0) + + def update_idle_workers(self, all_workers, idle_workers): + if not all_workers: + idle_workers.clear() + return + + for worker in all_workers: + task = getattr(worker, 'task', None) + if not task: + if worker.id not in idle_workers: + resource_name = WorkerId(worker.id).instance_type + idle_workers[worker.id] = (time(), resource_name, worker) + elif worker.id in idle_workers: + idle_workers.pop(worker.id, None) + + def _running(self): + return not self._stop_event.is_set() + + def report_app_stats(self, logger, queue_id_to_name, up_machines, idle_workers): + self.logger.info('resources: %r', self.resource_to_queue) + self.logger.info('idle worker: %r', idle_workers) + self.logger.info('up machines: %r', up_machines) + + # Using property for state to log state change + @property + def state(self): + return self._state + + @state.setter + def state(self, value): + prev = getattr(self, '_state', None) + if prev: + self.logger.info('state change: %s -> %s', prev, value) + else: + self.logger.info('initial state: %s', value) + self._state = value + + +def get_task_logger(): + task = Task.current_task() + return task and task.get_logger() + + +def has_duplicate_resource(queues: dict): + """queues: dict[name] -> [(resource, count), (resource, count) ...]""" + seen = set() + for name, _ in chain.from_iterable(queues.values()): + if name in seen: + return True + seen.add(name) + return False diff --git a/clearml/automation/aws_auto_scaler.py b/clearml/automation/aws_auto_scaler.py deleted file mode 100644 index 6ddfd507..00000000 --- a/clearml/automation/aws_auto_scaler.py +++ /dev/null @@ -1,195 +0,0 @@ -import base64 -from typing import Union - -import attr - -from .auto_scaler import AutoScaler -from .. import Task -from ..utilities.pyhocon import ConfigTree, ConfigFactory - -try: - # noinspection PyPackageRequirements - import boto3 - - Task.add_requirements("boto3") -except ImportError: - raise ValueError( - "AwsAutoScaler requires 'boto3' package, it was not found\n" - "install with: pip install boto3" - ) - - -class AwsAutoScaler(AutoScaler): - @attr.s - class Settings(AutoScaler.Settings): - workers_prefix = attr.ib(default="dynamic_aws") - cloud_provider = attr.ib(default="AWS") - - startup_bash_script = [ - "#!/bin/bash", - "sudo apt-get update", - "sudo apt-get install -y python3-dev", - "sudo apt-get install -y python3-pip", - "sudo apt-get install -y gcc", - "sudo apt-get install -y git", - "sudo apt-get install -y build-essential", - "python3 -m pip install -U pip", - "python3 -m pip install virtualenv", - "python3 -m virtualenv clearml_agent_venv", - "source clearml_agent_venv/bin/activate", - "python -m pip install clearml-agent", - "echo 'agent.git_user=\"{git_user}\"' >> /root/clearml.conf", - "echo 'agent.git_pass=\"{git_pass}\"' >> /root/clearml.conf", - "echo \"{clearml_conf}\" >> /root/clearml.conf", - "export CLEARML_API_HOST={api_server}", - "export CLEARML_WEB_HOST={web_server}", - "export CLEARML_FILES_HOST={files_server}", - "export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`", - "export CLEARML_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID", - "export CLEARML_API_ACCESS_KEY='{access_key}'", - "export CLEARML_API_SECRET_KEY='{secret_key}'", - "source ~/.bashrc", - "{bash_script}", - "python -m clearml_agent --config-file '/root/clearml.conf' daemon --queue '{queue}' {docker}", - "shutdown", - ] - - def __init__(self, settings, configuration): - # type: (Union[dict, AwsAutoScaler.Settings], Union[dict, AwsAutoScaler.Configuration]) -> None - super(AwsAutoScaler, self).__init__(settings, configuration) - - def spin_up_worker(self, resource, worker_id_prefix, queue_name): - """ - Creates a new worker for clearml. - First, create an instance in the cloud and install some required packages. - Then, define clearml-agent environment variables and run clearml-agent for the specified queue. - NOTE: - Will wait until instance is running - - This implementation assumes the instance image already has docker installed - - :param str resource: resource name, as defined in BUDGET and QUEUES. - :param str worker_id_prefix: worker name prefix - :param str queue_name: clearml queue to listen to - """ - resource_conf = self.resource_configurations[resource] - # Add worker type and AWS instance type to the worker name. - worker_id = "{worker_id_prefix}:{worker_type}:{instance_type}".format( - worker_id_prefix=worker_id_prefix, - worker_type=resource, - instance_type=resource_conf["instance_type"], - ) - - # user_data script will automatically run when the instance is started. it will install the required packages - # for clearml-agent configure it using environment variables and run clearml-agent on the required queue - user_data = ('\n'.join(self.startup_bash_script) + '\n').format( - api_server=self.api_server, - web_server=self.web_server, - files_server=self.files_server, - worker_id=worker_id, - access_key=self.access_key, - secret_key=self.secret_key, - queue=queue_name, - git_user=self.git_user or "", - git_pass=self.git_pass or "", - clearml_conf='\\"'.join(self.extra_clearml_conf.split('"')), - bash_script=self.extra_vm_bash_script, - docker="--docker '{}'".format(self.default_docker_image) if self.default_docker_image else "", - ) - - ec2 = boto3.client( - "ec2", - aws_access_key_id=self.cloud_credentials_key or None, - aws_secret_access_key=self.cloud_credentials_secret or None, - region_name=self.cloud_credentials_region, - ) - - launch_specification = ConfigFactory.from_dict( - { - "ImageId": resource_conf["ami_id"], - "InstanceType": resource_conf["instance_type"], - "BlockDeviceMappings": [ - { - "DeviceName": resource_conf["ebs_device_name"], - "Ebs": { - "VolumeSize": resource_conf["ebs_volume_size"], - "VolumeType": resource_conf["ebs_volume_type"], - }, - } - ], - "Placement": {"AvailabilityZone": resource_conf["availability_zone"]}, - } - ) - if resource_conf.get("key_name", None): - launch_specification["KeyName"] = resource_conf["key_name"] - if resource_conf.get("security_group_ids", None): - launch_specification["SecurityGroupIds"] = resource_conf[ - "security_group_ids" - ] - - if resource_conf["is_spot"]: - # Create a request for a spot instance in AWS - encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode( - "ascii" - ) - launch_specification["UserData"] = encoded_user_data - ConfigTree.merge_configs( - launch_specification, resource_conf.get("extra_configurations", {}) - ) - - instances = ec2.request_spot_instances( - LaunchSpecification=launch_specification - ) - - # Wait until spot request is fulfilled - request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"] - waiter = ec2.get_waiter("spot_instance_request_fulfilled") - waiter.wait(SpotInstanceRequestIds=[request_id]) - # Get the instance object for later use - response = ec2.describe_spot_instance_requests( - SpotInstanceRequestIds=[request_id] - ) - instance_id = response["SpotInstanceRequests"][0]["InstanceId"] - - else: - # Create a new EC2 instance - launch_specification.update( - MinCount=1, - MaxCount=1, - UserData=user_data, - InstanceInitiatedShutdownBehavior="terminate", - ) - ConfigTree.merge_configs( - launch_specification, resource_conf.get("extra_configurations", {}) - ) - - instances = ec2.run_instances(**launch_specification) - - # Get the instance object for later use - instance_id = instances["Instances"][0]["InstanceId"] - - instance = boto3.resource( - "ec2", - aws_access_key_id=self.cloud_credentials_key or None, - aws_secret_access_key=self.cloud_credentials_secret or None, - region_name=self.cloud_credentials_region, - ).Instance(instance_id) - - # Wait until instance is in running state - instance.wait_until_running() - - # Cloud-specific implementation (currently, only AWS EC2 is supported) - def spin_down_worker(self, instance_id): - """ - Destroys the cloud instance. - - :param instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported) - :type instance_id: str - """ - try: - boto3.resource( - "ec2", - aws_access_key_id=self.cloud_credentials_key or None, - aws_secret_access_key=self.cloud_credentials_secret or None, - region_name=self.cloud_credentials_region, - ).instances.filter(InstanceIds=[instance_id]).terminate() - except Exception as ex: - raise ex diff --git a/clearml/automation/aws_driver.py b/clearml/automation/aws_driver.py new file mode 100644 index 00000000..22c598b6 --- /dev/null +++ b/clearml/automation/aws_driver.py @@ -0,0 +1,147 @@ +import base64 + +import attr +from attr.validators import instance_of + +from .. import Task +from ..utilities.pyhocon import ConfigFactory, ConfigTree +from .auto_scaler import CloudDriver +from .cloud_driver import parse_tags + +try: + # noinspection PyPackageRequirements + import boto3 + + Task.add_requirements("boto3") +except ImportError as err: + raise ImportError( + "AwsAutoScaler requires 'boto3' package, it was not found\n" + "install with: pip install boto3" + ) from err + + +@attr.s +class AWSDriver(CloudDriver): + """AWS Driver""" + aws_access_key_id = attr.ib(validator=instance_of(str), default='') + aws_secret_access_key = attr.ib(validator=instance_of(str), default='') + aws_region = attr.ib(validator=instance_of(str), default='') + use_credentials_chain = attr.ib(validator=instance_of(bool), default=False) + + @classmethod + def from_config(cls, config): + obj = super().from_config(config) + obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key') + obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret') + obj.aws_region = config['hyper_params'].get('cloud_credentials_region') + obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False) + return obj + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self.tags = parse_tags(self.tags) + + def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id): + # user_data script will automatically run when the instance is started. it will install the required packages + # for clearml-agent configure it using environment variables and run clearml-agent on the required queue + user_data = self.gen_user_data(worker_prefix, queue_name, task_id) + + ec2 = boto3.client("ec2", **self.creds()) + launch_specification = ConfigFactory.from_dict( + { + "ImageId": resource_conf["ami_id"], + "InstanceType": resource_conf["instance_type"], + "BlockDeviceMappings": [ + { + "DeviceName": resource_conf["ebs_device_name"], + "Ebs": { + "VolumeSize": resource_conf["ebs_volume_size"], + "VolumeType": resource_conf["ebs_volume_type"], + }, + } + ], + "Placement": {"AvailabilityZone": resource_conf["availability_zone"]}, + } + ) + if resource_conf.get("key_name", None): + launch_specification["KeyName"] = resource_conf["key_name"] + if resource_conf.get("security_group_ids", None): + launch_specification["SecurityGroupIds"] = resource_conf[ + "security_group_ids" + ] + + if resource_conf["is_spot"]: + # Create a request for a spot instance in AWS + encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode( + "ascii" + ) + launch_specification["UserData"] = encoded_user_data + ConfigTree.merge_configs( + launch_specification, resource_conf.get("extra_configurations", {}) + ) + + instances = ec2.request_spot_instances( + LaunchSpecification=launch_specification + ) + + # Wait until spot request is fulfilled + request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"] + waiter = ec2.get_waiter("spot_instance_request_fulfilled") + waiter.wait(SpotInstanceRequestIds=[request_id]) + # Get the instance object for later use + response = ec2.describe_spot_instance_requests( + SpotInstanceRequestIds=[request_id] + ) + instance_id = response["SpotInstanceRequests"][0]["InstanceId"] + + else: + # Create a new EC2 instance + launch_specification.update( + MinCount=1, + MaxCount=1, + UserData=user_data, + InstanceInitiatedShutdownBehavior="terminate", + ) + ConfigTree.merge_configs( + launch_specification, resource_conf.get("extra_configurations", {}) + ) + + instances = ec2.run_instances(**launch_specification) + + # Get the instance object for later use + instance_id = instances["Instances"][0]["InstanceId"] + + instance = boto3.resource("ec2", **self.creds()).Instance(instance_id) + if self.tags: + instance.create_tags( + Resources=[instance_id], + Tags=[{"Key": key, "Value": val} for key, val in self.tags], + ) + # Wait until instance is in running state + instance.wait_until_running() + return instance_id + + def spin_down_worker(self, instance_id): + instance = boto3.resource("ec2", **self.creds()).Instance(instance_id) + instance.terminate() + + def creds(self): + creds = { + 'region_name': self.aws_region or None, + } + + if not self.use_credentials_chain: + creds.update({ + 'aws_secret_access_key': self.aws_secret_access_key or None, + 'aws_access_key_id': self.aws_access_key_id or None, + }) + return creds + + def instance_id_command(self): + return 'curl http://169.254.169.254/latest/meta-data/instance-id' + + def instance_type_key(self): + return 'instance_type' + + def kind(self): + return 'AWS' diff --git a/clearml/automation/cloud_driver.py b/clearml/automation/cloud_driver.py new file mode 100644 index 00000000..3dac773e --- /dev/null +++ b/clearml/automation/cloud_driver.py @@ -0,0 +1,205 @@ +import logging +from abc import ABC, abstractmethod +from os import environ + +import attr + +from ..backend_api import Session +from ..backend_api.session.defs import ENV_AUTH_TOKEN + +env_git_user = 'CLEARML_AUTOSCALER_GIT_USER' +env_git_pass = 'CLEARML_AUTOSCALER_GIT_PASSWORD' + +bash_script_template = '''\ +#!/bin/bash + +set -x +set -e + +apt-get update +apt-get install -y \ + build-essential \ + gcc \ + git \ + python3-dev \ + python3-pip +python3 -m pip install -U pip +python3 -m pip install virtualenv +python3 -m virtualenv clearml_agent_venv +source clearml_agent_venv/bin/activate +python -m pip install clearml-agent +cat << EOF >> ~/clearml.conf +{clearml_conf} +EOF +export CLEARML_API_HOST={api_server} +export CLEARML_WEB_HOST={web_server} +export CLEARML_FILES_HOST={files_server} +export DYNAMIC_INSTANCE_ID=$({instance_id_command}) +export CLEARML_WORKER_ID={worker_prefix}:$DYNAMIC_INSTANCE_ID +export CLEARML_API_ACCESS_KEY='{access_key}' +export CLEARML_API_SECRET_KEY='{secret_key}' +export CLEARML_AUTH_TOKEN='{auth_token}' +source ~/.bashrc +{bash_script} +{driver_extra} +python -m clearml_agent --config-file ~/clearml.conf daemon --queue '{queue}' {docker} +shutdown +''' + +clearml_conf_template = '''\ +agent.git_user="{git_user}" +agent.git_pass="{git_pass}" +{extra_clearml_conf} +{auth_token} +''' + + +@attr.s +class CloudDriver(ABC): + # git + git_user = attr.ib() + git_pass = attr.ib() + + # clearml + extra_clearml_conf = attr.ib() + api_server = attr.ib() + web_server = attr.ib() + files_server = attr.ib() + access_key = attr.ib() + secret_key = attr.ib() + auth_token = attr.ib() + + # Other + extra_vm_bash_script = attr.ib() + docker_image = attr.ib() + tags = attr.ib(default='') + session = attr.ib(default=None) + + def __attrs_post_init__(self): + if self.session is None: + self.session = Session() + + @abstractmethod + def spin_up_worker(self, resource, worker_prefix, queue_name, task_id): + """Creates a new worker for clearml. + + First, create an instance in the cloud and install some required packages. + Then, define clearml-agent environment variables and run clearml-agent for the specified queue. + NOTE: - Will wait until instance is running + - This implementation assumes the instance image already has docker installed + + :param dict resource: resource configuration, as defined in BUDGET and QUEUES. + :param str worker_prefix: worker name without instance_id + :param str queue_name: clearml queue to listen to + :param str task_id: Task ID to restart + """ + + @abstractmethod + def spin_down_worker(self, instance_id): + """Destroys the cloud instance. + + :param str instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported) + """ + + @abstractmethod + def kind(self): + """Return driver kind (e.g. 'AWS')""" + + @abstractmethod + def instance_id_command(self): + """Return a shell command to get instance ID""" + + @abstractmethod + def instance_type_key(self): + """Return key in configuration for instance type""" + + def gen_user_data(self, worker_prefix, queue_name, task_id): + return bash_script_template.format( + queue=queue_name, + worker_prefix=worker_prefix, + + auth_token=self.auth_token or '', + access_key=self.access_key, + api_server=self.api_server, + clearml_conf=self.clearml_conf(), + files_server=self.files_server, + secret_key=self.secret_key, + web_server=self.web_server, + + bash_script=self.extra_vm_bash_script, + driver_extra=self.driver_bash_extra(task_id), + docker="--docker '{}'".format(self.docker_image) if self.docker_image else "", + instance_id_command=self.instance_id_command(), + ) + + def clearml_conf(self): + auth_token = '' + token = self.session.auth_token or self.auth_token + if token: + auth_token = 'agent.extra_docker_arguments = ["-e", "CLEARML_AUTH_TOKEN={}"]'.format(token) + + # TODO: This need to be documented somewhere + git_user = environ.get(env_git_user) or self.git_user or '' + git_pass = environ.get(env_git_pass) or self.git_pass or '' + + return clearml_conf_template.format( + git_user=git_user, + git_pass=git_pass, + extra_clearml_conf=self.extra_clearml_conf, + auth_token=auth_token, + ) + + def driver_bash_extra(self, task_id): + if not task_id: + return '' + return 'python -m clearml_agent --config-file ~/clearml.conf execute --id {}'.format(task_id) + + @classmethod + def from_config(cls, config): + session = Session() + hyper_params, configurations = config['hyper_params'], config['configurations'] + opts = { + 'git_user': hyper_params['git_user'], + 'git_pass': hyper_params['git_pass'], + 'extra_clearml_conf': configurations['extra_clearml_conf'], + 'api_server': session.get_api_server_host(), + 'web_server': session.get_app_server_host(), + 'files_server': session.get_files_server_host(), + 'access_key': session.access_key, + 'secret_key': session.secret_key, + 'auth_token': ENV_AUTH_TOKEN.get(), + 'extra_vm_bash_script': configurations['extra_vm_bash_script'], + 'docker_image': hyper_params['default_docker_image'], + 'tags': hyper_params.get('tags', ''), + 'session': session, + } + return cls(**opts) + + def set_scaler(self, scaler): + self.scaler = scaler + + @property + def logger(self): + if self.scaler: + return self.scaler.logger + return logging.getLogger('AWSDriver') + + +def parse_tags(s): + """ + >>> parse_tags('k1=v1, k2=v2') + [('k1', 'v1'), ('k2', 'v2')] + """ + s = s.strip() + if not s: + return [] + + tags = [] + for kv in s.split(','): + if '=' not in kv: + raise ValueError(kv) + key, value = [v.strip() for v in kv.split('=', 1)] + if not key or not value: + raise ValueError(kv) + tags.append((key, value)) + return tags diff --git a/dev-requirements.txt b/dev-requirements.txt new file mode 100644 index 00000000..9117c98b --- /dev/null +++ b/dev-requirements.txt @@ -0,0 +1,6 @@ +-r requirements.txt + +flake8-bugbear~=21.4 +flake8~=3.9 +pytest~=6.2 +mock~=3.0 ; python_version < '3' diff --git a/examples/services/aws-autoscaler/aws_autoscaler.py b/examples/services/aws-autoscaler/aws_autoscaler.py index 8026921c..9a8e873e 100644 --- a/examples/services/aws-autoscaler/aws_autoscaler.py +++ b/examples/services/aws-autoscaler/aws_autoscaler.py @@ -2,27 +2,46 @@ import json from argparse import ArgumentParser from collections import defaultdict from itertools import chain +from pathlib import Path from typing import Tuple import yaml -from pathlib2 import Path -from six.moves import input from clearml import Task -from clearml.automation.aws_auto_scaler import AwsAutoScaler +from clearml.automation.auto_scaler import AutoScaler, ScalerConfig +from clearml.automation.aws_driver import AWSDriver from clearml.config import running_remotely from clearml.utilities.wizard.user_input import ( - get_input, - input_int, - input_bool, - multiline_input, - input_list, + get_input, input_bool, input_int, input_list, multiline_input ) -CONF_FILE = "aws_autoscaler.yaml" DEFAULT_DOCKER_IMAGE = "nvidia/cuda:10.1-runtime-ubuntu18.04" +default_config = { + 'hyper_params': { + 'git_user': '', + 'git_pass': '', + 'cloud_credentials_key': '', + 'cloud_credentials_secret': '', + 'cloud_credentials_region': None, + 'default_docker_image': 'nvidia/cuda', + 'max_idle_time_min': 15, + 'polling_interval_time_min': 5, + 'max_spin_up_time_min': 30, + 'workers_prefix': 'dynamic_worker', + 'cloud_provider': '', + }, + 'configurations': { + 'resource_configurations': None, + 'queues': None, + 'extra_trains_conf': '', + 'extra_clearml_conf': '', + 'extra_vm_bash_script': '', + }, +} + + def main(): parser = ArgumentParser() parser.add_argument( @@ -37,11 +56,16 @@ def main(): action="store_true", default=False, ) + parser.add_argument( + "--config-file", + help="Configuration file name", + type=Path, + default=Path("aws_autoscaler.yaml"), + ) args = parser.parse_args() if running_remotely(): - hyper_params = AwsAutoScaler.Settings().as_dict() - configurations = AwsAutoScaler.Configuration().as_dict() + conf = default_config else: print("AWS Autoscaler setup wizard\n" "---------------------------\n" @@ -49,30 +73,26 @@ def main(): "Once completed, you will be able to view and change the configuration in the clearml-server web UI.\n" "It means there is no need to worry about typos or mistakes :)\n") - config_file = Path(CONF_FILE).absolute() - if config_file.exists() and input_bool( - "Load configurations from config file '{}' [Y/n]? ".format(str(CONF_FILE)), + if args.config_file.exists() and input_bool( + "Load configurations from config file '{}' [Y/n]? ".format(args.config_file), default=True, ): - with config_file.open("r") as f: + with args.config_file.open("r") as f: conf = yaml.load(f, Loader=yaml.SafeLoader) - hyper_params = conf["hyper_params"] - configurations = conf["configurations"] else: configurations, hyper_params = run_wizard() - + conf = { + "hyper_params": hyper_params, + "configurations": configurations, + } # noinspection PyBroadException try: - with config_file.open("w+") as f: - conf = { - "hyper_params": hyper_params, - "configurations": configurations, - } + with args.config_file.open("w+") as f: yaml.safe_dump(conf, f) except Exception: print( "Error! Could not write configuration file at: {}".format( - str(CONF_FILE) + args.config_file ) ) return @@ -80,7 +100,8 @@ def main(): # Connecting ClearML with the current process, # from here on everything is logged automatically task = Task.init(project_name="DevOps", task_name="AWS Auto-Scaler", task_type=Task.TaskTypes.service) - task.connect(hyper_params) + task.connect(conf['hyper_params']) + configurations = conf['configurations'] configurations.update(json.loads(task.get_configuration_object(name="General") or "{}")) task.set_configuration_object(name="General", config_text=json.dumps(configurations, indent=2)) @@ -92,7 +113,9 @@ def main(): # the clearml-agent services will pick it up and execute it for us. task.execute_remotely(queue_name='services') - autoscaler = AwsAutoScaler(hyper_params, configurations) + driver = AWSDriver.from_config(conf) + conf = ScalerConfig.from_config(conf) + autoscaler = AutoScaler(conf, driver) if running_remotely() or args.run: autoscaler.start() @@ -100,14 +123,14 @@ def main(): def run_wizard(): # type: () -> Tuple[dict, dict] - hyper_params = AwsAutoScaler.Settings() - configurations = AwsAutoScaler.Configuration() + hyper_params = default_config['hyper_params'] + configurations = default_config['configurations'] - hyper_params.cloud_credentials_key = get_input("AWS Access Key ID", required=True) - hyper_params.cloud_credentials_secret = get_input( + hyper_params['cloud_credentials_key'] = get_input("AWS Access Key ID", required=True) + hyper_params['cloud_credentials_secret'] = get_input( "AWS Secret Access Key", required=True ) - hyper_params.cloud_credentials_region = get_input( + hyper_params['cloud_credentials_region'] = get_input( "AWS region name", "[us-east-1]", default='us-east-1') @@ -127,13 +150,13 @@ def run_wizard(): ) ) else: - git_user = None - git_pass = None + git_user = '' + git_pass = '' - hyper_params.git_user = git_user - hyper_params.git_pass = git_pass + hyper_params['git_user'] = git_user + hyper_params['git_pass'] = git_pass - hyper_params.default_docker_image = get_input( + hyper_params['default_docker_image'] = get_input( "default docker image/parameters", "to use [{}]".format(DEFAULT_DOCKER_IMAGE), default=DEFAULT_DOCKER_IMAGE, @@ -204,21 +227,21 @@ def run_wizard(): if not input_bool("\nDefine another instance type? [y/N]"): break - configurations.resource_configurations = resource_configurations + configurations['resource_configurations'] = resource_configurations - configurations.extra_vm_bash_script, num_lines_bash_script = multiline_input( + configurations['extra_vm_bash_script'], num_lines_bash_script = multiline_input( "\nEnter any pre-execution bash script to be executed on the newly created instances []" ) print("Entered {} lines of pre-execution bash script".format(num_lines_bash_script)) - configurations.extra_clearml_conf, num_lines_clearml_conf = multiline_input( + configurations['extra_clearml_conf'], num_lines_clearml_conf = multiline_input( "\nEnter anything you'd like to include in your clearml.conf file []" ) print("Entered {} extra lines for clearml.conf file".format(num_lines_clearml_conf)) print("\nDefine the machines budget:") print("-----------------------------") - resource_configurations_names = list(configurations.resource_configurations.keys()) + resource_configurations_names = list(configurations['resource_configurations'].keys()) queues = defaultdict(list) while True: while True: @@ -238,9 +261,9 @@ def run_wizard(): question="Select", required=True, ) - if queue_type not in configurations.resource_configurations: + if queue_type not in configurations['resource_configurations']: print("\tError: instance type '{}' not in predefined instances {}!".format( - queue_type, list(configurations.resource_configurations.keys()))) + queue_type, resource_configurations_names)) continue if queue_type in (q[0] for q in queues[queue_name]): @@ -252,8 +275,8 @@ def run_wizard(): queue_type_new = '{}_{}'.format(queue_type, queue_name) print("\tInstance type '{}' already used, renaming instance to {}".format( queue_type, queue_type_new)) - configurations.resource_configurations[queue_type_new] = \ - dict(**configurations.resource_configurations[queue_type]) + configurations['resource_configurations'][queue_type_new] = \ + dict(**configurations['resource_configurations'][queue_type]) queue_type = queue_type_new # make sure the renamed name is not reused @@ -269,7 +292,7 @@ def run_wizard(): ) queues[queue_name].append((queue_type, max_instances)) - valid_instances = [k for k in configurations.resource_configurations.keys() + valid_instances = [k for k in configurations['resource_configurations'].keys() if k not in (q[0] for q in queues[queue_name])] if not valid_instances: break @@ -278,19 +301,19 @@ def run_wizard(): break if not input_bool("\nAdd another queue? [y/N]"): break - configurations.queues = dict(queues) + configurations['queues'] = dict(queues) - hyper_params.max_idle_time_min = input_int( + hyper_params['max_idle_time_min'] = input_int( "maximum idle time", "for the auto-scaler to spin down an instance (in minutes) [15]", default=15, new_line=True, ) - hyper_params.polling_interval_time_min = input_int( + hyper_params['polling_interval_time_min'] = input_int( "instances polling interval", "for the auto-scaler (in minutes) [5]", default=5, ) - return configurations.as_dict(), hyper_params.as_dict() + return configurations, hyper_params if __name__ == "__main__":