Refactor AutoScaler

This commit is contained in:
allegroai 2021-11-17 19:15:48 +02:00
parent 497ba92bff
commit 6a6cf111be
8 changed files with 702 additions and 422 deletions

4
.flake8 Normal file
View File

@ -0,0 +1,4 @@
[flake8]
max-line-length=120
statistics
extend-ignore=E501

1
.gitignore vendored
View File

@ -10,6 +10,7 @@ __pycache__
build/
dist/
*.egg-info
.env
# example data
examples/runs/

View File

@ -1,91 +1,134 @@
import os
import re
from collections import defaultdict, deque
from enum import Enum
from itertools import chain
from operator import itemgetter
from threading import Event
from time import sleep, time
from typing import Union
import attr
from attr.validators import instance_of
from .cloud_driver import CloudDriver
from .. import Task
from ..backend_api import Session
from ..backend_api.session import defs
from ..backend_api.session.client import APIClient
from ..debugging import get_logger
# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ":"
# Example: 'test:m1:g4dn.4xlarge:i-07cf7d6750455cb62'
# cloud_id might be missing
_workers_pattern = re.compile(
r"""^
(?P<prefix>[^:]+):
(?P<name>[^:]+):
(?P<instance_type>[^:]+)
(:(?P<cloud_id>[^:/]+))?
$
""", re.VERBOSE
)
class WorkerId:
def __init__(self, worker_id):
self.prefix = self.name = self.instance_type = self.cloud_id = ""
match = _workers_pattern.match(worker_id)
if not match:
raise ValueError("bad worker ID: {!r}".format(worker_id))
self.prefix = match["prefix"]
self.name = match["name"]
self.instance_type = match["instance_type"]
self.cloud_id = match["cloud_id"] or ''
class State(str, Enum):
STARTING = 'starting'
READY = 'ready'
RUNNING = 'running'
STOPPED = 'stopped'
@attr.s
class ScalerConfig:
max_idle_time_min = attr.ib(validator=instance_of(int), default=15)
polling_interval_time_min = attr.ib(validator=instance_of((float, int)), default=5)
max_spin_up_time_min = attr.ib(validator=instance_of(int), default=30)
workers_prefix = attr.ib(default="dynamic_worker")
resource_configurations = attr.ib(default=None)
queues = attr.ib(default=None)
@classmethod
def from_config(cls, config):
return cls(
max_idle_time_min=config['hyper_params']['max_idle_time_min'],
polling_interval_time_min=config['hyper_params']['polling_interval_time_min'],
max_spin_up_time_min=config['hyper_params']['max_spin_up_time_min'],
workers_prefix=config['hyper_params']['workers_prefix'],
resource_configurations=config['configurations']['resource_configurations'],
queues=config['configurations']['queues'],
)
class AutoScaler(object):
@attr.s
class Settings(object):
git_user = attr.ib(default="")
git_pass = attr.ib(default="")
cloud_credentials_key = attr.ib(default="")
cloud_credentials_secret = attr.ib(default="")
cloud_credentials_region = attr.ib(default=None)
default_docker_image = attr.ib(default="nvidia/cuda")
max_idle_time_min = attr.ib(validator=instance_of(int), default=15)
polling_interval_time_min = attr.ib(validator=instance_of(int), default=5)
max_spin_up_time_min = attr.ib(validator=instance_of(int), default=30)
workers_prefix = attr.ib(default="dynamic_worker")
cloud_provider = attr.ib(default="")
def __init__(self, config, driver: CloudDriver, logger=None):
self.logger = logger or get_logger('auto_scaler')
# Should be after we create logger
self.state = State.STARTING
def as_dict(self):
return attr.asdict(self)
self.driver = driver
self.logger.info('using %s driver', self.driver.kind())
self.driver.set_scaler(self)
@attr.s
class Configuration(object):
resource_configurations = attr.ib(default=None)
queues = attr.ib(default=None)
extra_trains_conf = attr.ib(default="") # Backwards compatibility
extra_clearml_conf = attr.ib(default="")
extra_vm_bash_script = attr.ib(default="")
def as_dict(self):
return attr.asdict(self)
def __init__(self, settings, configuration):
# type: (Union[dict, AutoScaler.Settings], Union[dict, AutoScaler.Configuration]) -> None
if isinstance(settings, dict):
settings = self.Settings(**settings)
if isinstance(configuration, dict):
configuration = self.Configuration(**configuration)
self.web_server = Session.get_app_server_host()
self.api_server = Session.get_api_server_host()
self.files_server = Session.get_files_server_host()
session = Session()
self.access_key = session.access_key
self.secret_key = session.secret_key
self.git_user = settings.git_user
self.git_pass = settings.git_pass
self.cloud_credentials_key = settings.cloud_credentials_key
self.cloud_credentials_secret = settings.cloud_credentials_secret
self.cloud_credentials_region = settings.cloud_credentials_region
self.default_docker_image = settings.default_docker_image
self.extra_clearml_conf = configuration.extra_clearml_conf or configuration.extra_trains_conf
self.extra_vm_bash_script = configuration.extra_vm_bash_script
self.resource_configurations = configuration.resource_configurations
self.queues = configuration.queues
self.resource_configurations = config.resource_configurations
self.queues = config.queues # queue name -> list of resources
self.resource_to_queue = {
item[0]: queue
for queue, resources in self.queues.items()
for item in resources
}
if not self.sanity_check():
return
raise ValueError('health check failed')
self.max_idle_time_min = float(settings.max_idle_time_min)
self.polling_interval_time_min = float(settings.polling_interval_time_min)
self.max_spin_up_time_min = float(settings.max_spin_up_time_min)
self.max_idle_time_min = float(config.max_idle_time_min)
self.polling_interval_time_min = float(config.polling_interval_time_min)
self.max_spin_up_time_min = float(config.max_spin_up_time_min)
# make sure we have our own unique prefix, in case we have multiple dynamic auto-scalers
# they will mix each others instances
self.workers_prefix = settings.workers_prefix
self.cloud_provider = settings.cloud_provider
self.workers_prefix = config.workers_prefix
session = Session()
self.set_auth(session)
# Set up the environment variables for clearml
defs.ENV_HOST.set(session.get_api_server_host())
defs.ENV_WEB_HOST.set(session.get_app_server_host())
defs.ENV_FILES_HOST.set(session.get_files_server_host())
defs.ENV_ACCESS_KEY.set(session.access_key)
defs.ENV_SECRET_KEY.set(session.secret_key)
if self.auth_token:
defs.ENV_AUTH_TOKEN.set(self.auth_token)
self.api_client = APIClient()
self._stop_event = Event()
self.state = State.READY
def set_auth(self, session):
if session.access_key and session.secret_key:
self.access_key = session.access_key
self.secret_key = session.secret_key
self.auth_token = None
return
self.access_key = self.secret_key = None
self.auth_token = defs.ENV_AUTH_TOKEN.get(default=None)
def sanity_check(self):
# Sanity check - Validate queue resources
if len(set(map(itemgetter(0), chain(*self.queues.values())))) != sum(
map(len, self.queues.values())
):
print(
if has_duplicate_resource(self.queues):
self.logger.error(
"Error: at least one resource name is used in multiple queues. "
"A resource name can only appear in a single queue definition."
)
@ -93,42 +136,65 @@ class AutoScaler(object):
return True
def start(self):
# Loop forever, it is okay we are stateless
while True:
self.state = State.RUNNING
# Loop until stopped, it is okay we are stateless
while self._running():
try:
self.supervisor()
except Exception as ex:
print(
"Warning! exception occurred: {ex}\nRetry in 15 seconds".format(
ex=ex
)
)
self.logger.exception('Error: %r, retrying in 15 seconds', ex)
sleep(15)
def spin_up_worker(self, resource, worker_id_prefix, queue_name):
"""
Creates a new worker for clearml (cloud-specific implementation).
First, create an instance in the cloud and install some required packages.
Then, define clearml-agent environment variables and run clearml-agent for the specified queue.
NOTE: - Will wait until instance is running
- This implementation assumes the instance image already has docker installed
def stop(self):
self.logger.info('stopping')
self._stop_event.set()
self.state = State.STOPPED
:param str resource: resource name, as defined in self.resource_configurations and self.queues.
:param str worker_id_prefix: worker name prefix
:param str queue_name: clearml queue to listen to
def ensure_queues(self):
# Verify the requested queues exist and create those that doesn't exist
all_queues = {q.name for q in list(self.api_client.queues.get_all(only_fields=['name']))}
missing_queues = set(self.queues) - all_queues
for q in missing_queues:
self.logger.info("Creating queue %r", q)
self.api_client.queues.create(q)
:return str: worker_id prefix to identify when spin was successful
"""
pass
def queue_mapping(self):
id_to_name = {}
name_to_id = {}
for queue in self.api_client.queues.get_all(only_fields=['id', 'name']):
id_to_name[queue.id] = queue.name
name_to_id[queue.name] = queue.id
def spin_down_worker(self, instance_id):
"""
Destroys the cloud instance (cloud-specific implementation).
return id_to_name, name_to_id
:param instance_id: Cloud instance ID to be destroyed
:type instance_id: str
"""
pass
def get_workers(self):
workers = []
for worker in self.api_client.workers.get_all():
try:
wid = WorkerId(worker.id)
if wid.prefix == self.workers_prefix:
workers.append(worker)
except ValueError:
self.logger.info('ignoring unknown worker: %r', worker.id)
return workers
def stale_workers(self, spun_workers):
now = time()
for worker_id, (resource, spin_time) in list(spun_workers.items()):
if now - spin_time > self.max_idle_time_min*60:
self.logger.info('Stuck spun instance %s of type %s', worker_id, resource)
yield worker_id
def extra_allocations(self):
"""Hook for subclass to use"""
return []
def gen_worker_prefix(self, resource, resource_conf):
return '{workers_prefix}:{worker_type}:{instance_type}'.format(
workers_prefix=self.workers_prefix,
worker_type=resource,
instance_type=resource_conf["instance_type"],
)
def supervisor(self):
"""
@ -143,86 +209,41 @@ class AutoScaler(object):
- spin down instances according to their idle time. instance which is idle for more than self.max_idle_time_min
minutes would be removed.
"""
# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ':'
workers_pattern = re.compile(
r"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)"
)
# Set up the environment variables for clearml
os.environ["CLEARML_API_HOST"] = self.api_server
os.environ["CLEARML_WEB_HOST"] = self.web_server
os.environ["CLEARML_FILES_HOST"] = self.files_server
os.environ["CLEARML_API_ACCESS_KEY"] = self.access_key
os.environ["CLEARML_API_SECRET_KEY"] = self.secret_key
api_client = APIClient()
# Verify the requested queues exist and create those that doesn't exist
all_queues = [q.name for q in list(api_client.queues.get_all(only_fields=['name']))]
missing_queues = [q for q in self.queues if q not in all_queues]
for q in missing_queues:
api_client.queues.create(q)
self.ensure_queues()
idle_workers = {}
# a dict of resource_names and lists of time_stamps of instances that were just spun
# example
# spun_workers['resource_type'] = [time()]
spun_workers = {}
# Workers that we spun but have not yet reported back to the API
spun_workers = {} # worker_id -> (resource type, spin time)
previous_workers = set()
while True:
queue_name_to_id = {
queue.name: queue.id for queue in api_client.queues.get_all(only_fields=['id', 'name'])
}
resource_to_queue = {
item[0]: queue
for queue, resources in self.queues.items()
for item in resources
}
all_workers = [
worker
for worker in api_client.workers.get_all()
if workers_pattern.match(worker.id)
and workers_pattern.match(worker.id)["prefix"] == self.workers_prefix
]
unknown_workers = deque(maxlen=256)
task_logger = get_task_logger()
up_machines = defaultdict(int)
while self._running():
queue_id_to_name, queue_name_to_id = self.queue_mapping()
all_workers = self.get_workers()
# update spun_workers (remove instances that are fully registered)
for worker in all_workers:
if worker.id not in previous_workers:
# look for the spun instance and remove it
resource_name = workers_pattern.match(worker.id)[
"name"
]
spun_workers[resource_name] = spun_workers.get(resource_name, [])[1:]
# remove old spun workers based on time out:
for resource in spun_workers.keys():
time_stamp_list = [
t for t in spun_workers[resource] if time() - t < self.max_spin_up_time_min*60.]
deleted = len(spun_workers[resource]) - len(time_stamp_list)
if deleted:
print('Ignoring {} stuck instances of type {}'.format(deleted, resource))
if not spun_workers.pop(worker.id, None):
if worker.id not in unknown_workers:
self.logger.info('Removed unknown worker from spun_workers: %s', worker.id)
unknown_workers.append(worker.id)
else:
previous_workers.add(worker.id)
# Workers without a task, are added to the idle list
if not all_workers:
idle_workers = {}
else:
for worker in all_workers:
if not hasattr(worker, "task") or not worker.task:
if worker.id not in idle_workers:
resource_name = workers_pattern.match(worker.id)[
"instance_type"
]
idle_workers[worker.id] = (time(), resource_name, worker)
elif (
hasattr(worker, "task")
and worker.task
and worker.id in idle_workers
):
idle_workers.pop(worker.id, None)
for worker_id in self.stale_workers(spun_workers):
del spun_workers[worker_id]
self.update_idle_workers(all_workers, idle_workers)
required_idle_resources = [] # idle resources we'll need to keep running
allocate_new_resources = [] # resources that will need to be started
allocate_new_resources = self.extra_allocations()
# Check if we have tasks waiting on one of the designated queues
for queue in self.queues:
entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries
entries = self.api_client.queues.get_by_id(queue_name_to_id[queue]).entries
self.logger.info("Found %d tasks in queue %r", len(entries), queue)
if entries and len(entries) > 0:
queue_resources = self.queues[queue]
@ -235,9 +256,9 @@ class AutoScaler(object):
]
# if we have an instance waiting to be spun
# remove it from the required allocation resources
for resource, time_stamps_list in spun_workers.items():
if time_stamps_list and any(q_r for q_r in queue_resources if resource in q_r[0]):
free_queue_resources += [resource] * len(time_stamps_list)
for resource, _ in spun_workers.values():
if resource in [qr[0] for qr in queue_resources]:
free_queue_resources.append(resource)
required_idle_resources.extend(free_queue_resources)
spin_up_count = len(entries) - len(free_queue_resources)
@ -249,25 +270,36 @@ class AutoScaler(object):
break
# check if we can add instances to `resource`
currently_running_workers = len(
[worker for worker in all_workers if workers_pattern.match(worker.id)["name"] == resource])
spun_up_workers = len(spun_workers.get(resource, []))
[worker for worker in all_workers if WorkerId(worker.id).name == resource])
spun_up_workers = sum(1 for r, _ in spun_workers.values() if r == resource)
max_allowed = int(max_instances) - currently_running_workers - spun_up_workers
if max_allowed > 0:
spin_up_resources.extend(
[resource] * spin_up_count
[resource] * min(spin_up_count, max_allowed)
)
allocate_new_resources.extend(spin_up_resources)
# Now we actually spin the new machines
for resource in allocate_new_resources:
task_id = None
try:
print('Spinning new instance type={}'.format(resource))
self.spin_up_worker(
resource, self.workers_prefix, resource_to_queue[resource]
)
spun_workers[resource] = spun_workers.get(resource, []) + [time()]
if isinstance(resource, tuple):
worker_id, task_id = resource
resource = WorkerId(worker_id).name
queue = self.resource_to_queue[resource]
self.logger.info(
'Spinning new instance resource=%r, prefix=%r, queue=%r, task_id=%r',
resource, self.workers_prefix, queue, task_id)
resource_conf = self.resource_configurations[resource]
worker_prefix = self.gen_worker_prefix(resource, resource_conf)
instance_id = self.driver.spin_up_worker(resource_conf, worker_prefix, queue, task_id=task_id)
worker_id = '{}:{}'.format(worker_prefix, instance_id)
self.logger.info('New instance ID: %s', instance_id)
spun_workers[worker_id] = (resource, time())
up_machines[resource] += 1
except Exception as ex:
print(f"Error: Failed to start new instance, {ex}")
self.logger.exception("Failed to start new instance (resource %r), Error: %s", resource, ex)
# Go over the idle workers list, and spin down idle workers
for worker_id in list(idle_workers):
@ -275,12 +307,69 @@ class AutoScaler(object):
# skip resource types that might be needed
if resources in required_idle_resources:
continue
# Remove from both aws and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN
# Remove from both cloud and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN
if time() - timestamp > self.max_idle_time_min * 60.0:
cloud_id = workers_pattern.match(worker_id)["cloud_id"]
self.spin_down_worker(cloud_id)
print(f"Spin down instance cloud id {cloud_id}")
wid = WorkerId(worker_id)
cloud_id = wid.cloud_id
self.driver.spin_down_worker(cloud_id)
up_machines[wid.name] -= 1
self.logger.info("Spin down instance cloud id %r", cloud_id)
idle_workers.pop(worker_id, None)
if task_logger:
self.report_app_stats(task_logger, queue_id_to_name, up_machines, idle_workers)
# Nothing else to do
self.logger.info("Idle for %.2f seconds", self.polling_interval_time_min * 60.0)
sleep(self.polling_interval_time_min * 60.0)
def update_idle_workers(self, all_workers, idle_workers):
if not all_workers:
idle_workers.clear()
return
for worker in all_workers:
task = getattr(worker, 'task', None)
if not task:
if worker.id not in idle_workers:
resource_name = WorkerId(worker.id).instance_type
idle_workers[worker.id] = (time(), resource_name, worker)
elif worker.id in idle_workers:
idle_workers.pop(worker.id, None)
def _running(self):
return not self._stop_event.is_set()
def report_app_stats(self, logger, queue_id_to_name, up_machines, idle_workers):
self.logger.info('resources: %r', self.resource_to_queue)
self.logger.info('idle worker: %r', idle_workers)
self.logger.info('up machines: %r', up_machines)
# Using property for state to log state change
@property
def state(self):
return self._state
@state.setter
def state(self, value):
prev = getattr(self, '_state', None)
if prev:
self.logger.info('state change: %s -> %s', prev, value)
else:
self.logger.info('initial state: %s', value)
self._state = value
def get_task_logger():
task = Task.current_task()
return task and task.get_logger()
def has_duplicate_resource(queues: dict):
"""queues: dict[name] -> [(resource, count), (resource, count) ...]"""
seen = set()
for name, _ in chain.from_iterable(queues.values()):
if name in seen:
return True
seen.add(name)
return False

View File

@ -1,195 +0,0 @@
import base64
from typing import Union
import attr
from .auto_scaler import AutoScaler
from .. import Task
from ..utilities.pyhocon import ConfigTree, ConfigFactory
try:
# noinspection PyPackageRequirements
import boto3
Task.add_requirements("boto3")
except ImportError:
raise ValueError(
"AwsAutoScaler requires 'boto3' package, it was not found\n"
"install with: pip install boto3"
)
class AwsAutoScaler(AutoScaler):
@attr.s
class Settings(AutoScaler.Settings):
workers_prefix = attr.ib(default="dynamic_aws")
cloud_provider = attr.ib(default="AWS")
startup_bash_script = [
"#!/bin/bash",
"sudo apt-get update",
"sudo apt-get install -y python3-dev",
"sudo apt-get install -y python3-pip",
"sudo apt-get install -y gcc",
"sudo apt-get install -y git",
"sudo apt-get install -y build-essential",
"python3 -m pip install -U pip",
"python3 -m pip install virtualenv",
"python3 -m virtualenv clearml_agent_venv",
"source clearml_agent_venv/bin/activate",
"python -m pip install clearml-agent",
"echo 'agent.git_user=\"{git_user}\"' >> /root/clearml.conf",
"echo 'agent.git_pass=\"{git_pass}\"' >> /root/clearml.conf",
"echo \"{clearml_conf}\" >> /root/clearml.conf",
"export CLEARML_API_HOST={api_server}",
"export CLEARML_WEB_HOST={web_server}",
"export CLEARML_FILES_HOST={files_server}",
"export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`",
"export CLEARML_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID",
"export CLEARML_API_ACCESS_KEY='{access_key}'",
"export CLEARML_API_SECRET_KEY='{secret_key}'",
"source ~/.bashrc",
"{bash_script}",
"python -m clearml_agent --config-file '/root/clearml.conf' daemon --queue '{queue}' {docker}",
"shutdown",
]
def __init__(self, settings, configuration):
# type: (Union[dict, AwsAutoScaler.Settings], Union[dict, AwsAutoScaler.Configuration]) -> None
super(AwsAutoScaler, self).__init__(settings, configuration)
def spin_up_worker(self, resource, worker_id_prefix, queue_name):
"""
Creates a new worker for clearml.
First, create an instance in the cloud and install some required packages.
Then, define clearml-agent environment variables and run clearml-agent for the specified queue.
NOTE: - Will wait until instance is running
- This implementation assumes the instance image already has docker installed
:param str resource: resource name, as defined in BUDGET and QUEUES.
:param str worker_id_prefix: worker name prefix
:param str queue_name: clearml queue to listen to
"""
resource_conf = self.resource_configurations[resource]
# Add worker type and AWS instance type to the worker name.
worker_id = "{worker_id_prefix}:{worker_type}:{instance_type}".format(
worker_id_prefix=worker_id_prefix,
worker_type=resource,
instance_type=resource_conf["instance_type"],
)
# user_data script will automatically run when the instance is started. it will install the required packages
# for clearml-agent configure it using environment variables and run clearml-agent on the required queue
user_data = ('\n'.join(self.startup_bash_script) + '\n').format(
api_server=self.api_server,
web_server=self.web_server,
files_server=self.files_server,
worker_id=worker_id,
access_key=self.access_key,
secret_key=self.secret_key,
queue=queue_name,
git_user=self.git_user or "",
git_pass=self.git_pass or "",
clearml_conf='\\"'.join(self.extra_clearml_conf.split('"')),
bash_script=self.extra_vm_bash_script,
docker="--docker '{}'".format(self.default_docker_image) if self.default_docker_image else "",
)
ec2 = boto3.client(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
)
launch_specification = ConfigFactory.from_dict(
{
"ImageId": resource_conf["ami_id"],
"InstanceType": resource_conf["instance_type"],
"BlockDeviceMappings": [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf["ebs_volume_size"],
"VolumeType": resource_conf["ebs_volume_type"],
},
}
],
"Placement": {"AvailabilityZone": resource_conf["availability_zone"]},
}
)
if resource_conf.get("key_name", None):
launch_specification["KeyName"] = resource_conf["key_name"]
if resource_conf.get("security_group_ids", None):
launch_specification["SecurityGroupIds"] = resource_conf[
"security_group_ids"
]
if resource_conf["is_spot"]:
# Create a request for a spot instance in AWS
encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode(
"ascii"
)
launch_specification["UserData"] = encoded_user_data
ConfigTree.merge_configs(
launch_specification, resource_conf.get("extra_configurations", {})
)
instances = ec2.request_spot_instances(
LaunchSpecification=launch_specification
)
# Wait until spot request is fulfilled
request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"]
waiter = ec2.get_waiter("spot_instance_request_fulfilled")
waiter.wait(SpotInstanceRequestIds=[request_id])
# Get the instance object for later use
response = ec2.describe_spot_instance_requests(
SpotInstanceRequestIds=[request_id]
)
instance_id = response["SpotInstanceRequests"][0]["InstanceId"]
else:
# Create a new EC2 instance
launch_specification.update(
MinCount=1,
MaxCount=1,
UserData=user_data,
InstanceInitiatedShutdownBehavior="terminate",
)
ConfigTree.merge_configs(
launch_specification, resource_conf.get("extra_configurations", {})
)
instances = ec2.run_instances(**launch_specification)
# Get the instance object for later use
instance_id = instances["Instances"][0]["InstanceId"]
instance = boto3.resource(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
).Instance(instance_id)
# Wait until instance is in running state
instance.wait_until_running()
# Cloud-specific implementation (currently, only AWS EC2 is supported)
def spin_down_worker(self, instance_id):
"""
Destroys the cloud instance.
:param instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported)
:type instance_id: str
"""
try:
boto3.resource(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
).instances.filter(InstanceIds=[instance_id]).terminate()
except Exception as ex:
raise ex

View File

@ -0,0 +1,147 @@
import base64
import attr
from attr.validators import instance_of
from .. import Task
from ..utilities.pyhocon import ConfigFactory, ConfigTree
from .auto_scaler import CloudDriver
from .cloud_driver import parse_tags
try:
# noinspection PyPackageRequirements
import boto3
Task.add_requirements("boto3")
except ImportError as err:
raise ImportError(
"AwsAutoScaler requires 'boto3' package, it was not found\n"
"install with: pip install boto3"
) from err
@attr.s
class AWSDriver(CloudDriver):
"""AWS Driver"""
aws_access_key_id = attr.ib(validator=instance_of(str), default='')
aws_secret_access_key = attr.ib(validator=instance_of(str), default='')
aws_region = attr.ib(validator=instance_of(str), default='')
use_credentials_chain = attr.ib(validator=instance_of(bool), default=False)
@classmethod
def from_config(cls, config):
obj = super().from_config(config)
obj.aws_access_key_id = config['hyper_params'].get('cloud_credentials_key')
obj.aws_secret_access_key = config['hyper_params'].get('cloud_credentials_secret')
obj.aws_region = config['hyper_params'].get('cloud_credentials_region')
obj.use_credentials_chain = config['hyper_params'].get('use_credentials_chain', False)
return obj
def __attrs_post_init__(self):
super().__attrs_post_init__()
self.tags = parse_tags(self.tags)
def spin_up_worker(self, resource_conf, worker_prefix, queue_name, task_id):
# user_data script will automatically run when the instance is started. it will install the required packages
# for clearml-agent configure it using environment variables and run clearml-agent on the required queue
user_data = self.gen_user_data(worker_prefix, queue_name, task_id)
ec2 = boto3.client("ec2", **self.creds())
launch_specification = ConfigFactory.from_dict(
{
"ImageId": resource_conf["ami_id"],
"InstanceType": resource_conf["instance_type"],
"BlockDeviceMappings": [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf["ebs_volume_size"],
"VolumeType": resource_conf["ebs_volume_type"],
},
}
],
"Placement": {"AvailabilityZone": resource_conf["availability_zone"]},
}
)
if resource_conf.get("key_name", None):
launch_specification["KeyName"] = resource_conf["key_name"]
if resource_conf.get("security_group_ids", None):
launch_specification["SecurityGroupIds"] = resource_conf[
"security_group_ids"
]
if resource_conf["is_spot"]:
# Create a request for a spot instance in AWS
encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode(
"ascii"
)
launch_specification["UserData"] = encoded_user_data
ConfigTree.merge_configs(
launch_specification, resource_conf.get("extra_configurations", {})
)
instances = ec2.request_spot_instances(
LaunchSpecification=launch_specification
)
# Wait until spot request is fulfilled
request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"]
waiter = ec2.get_waiter("spot_instance_request_fulfilled")
waiter.wait(SpotInstanceRequestIds=[request_id])
# Get the instance object for later use
response = ec2.describe_spot_instance_requests(
SpotInstanceRequestIds=[request_id]
)
instance_id = response["SpotInstanceRequests"][0]["InstanceId"]
else:
# Create a new EC2 instance
launch_specification.update(
MinCount=1,
MaxCount=1,
UserData=user_data,
InstanceInitiatedShutdownBehavior="terminate",
)
ConfigTree.merge_configs(
launch_specification, resource_conf.get("extra_configurations", {})
)
instances = ec2.run_instances(**launch_specification)
# Get the instance object for later use
instance_id = instances["Instances"][0]["InstanceId"]
instance = boto3.resource("ec2", **self.creds()).Instance(instance_id)
if self.tags:
instance.create_tags(
Resources=[instance_id],
Tags=[{"Key": key, "Value": val} for key, val in self.tags],
)
# Wait until instance is in running state
instance.wait_until_running()
return instance_id
def spin_down_worker(self, instance_id):
instance = boto3.resource("ec2", **self.creds()).Instance(instance_id)
instance.terminate()
def creds(self):
creds = {
'region_name': self.aws_region or None,
}
if not self.use_credentials_chain:
creds.update({
'aws_secret_access_key': self.aws_secret_access_key or None,
'aws_access_key_id': self.aws_access_key_id or None,
})
return creds
def instance_id_command(self):
return 'curl http://169.254.169.254/latest/meta-data/instance-id'
def instance_type_key(self):
return 'instance_type'
def kind(self):
return 'AWS'

View File

@ -0,0 +1,205 @@
import logging
from abc import ABC, abstractmethod
from os import environ
import attr
from ..backend_api import Session
from ..backend_api.session.defs import ENV_AUTH_TOKEN
env_git_user = 'CLEARML_AUTOSCALER_GIT_USER'
env_git_pass = 'CLEARML_AUTOSCALER_GIT_PASSWORD'
bash_script_template = '''\
#!/bin/bash
set -x
set -e
apt-get update
apt-get install -y \
build-essential \
gcc \
git \
python3-dev \
python3-pip
python3 -m pip install -U pip
python3 -m pip install virtualenv
python3 -m virtualenv clearml_agent_venv
source clearml_agent_venv/bin/activate
python -m pip install clearml-agent
cat << EOF >> ~/clearml.conf
{clearml_conf}
EOF
export CLEARML_API_HOST={api_server}
export CLEARML_WEB_HOST={web_server}
export CLEARML_FILES_HOST={files_server}
export DYNAMIC_INSTANCE_ID=$({instance_id_command})
export CLEARML_WORKER_ID={worker_prefix}:$DYNAMIC_INSTANCE_ID
export CLEARML_API_ACCESS_KEY='{access_key}'
export CLEARML_API_SECRET_KEY='{secret_key}'
export CLEARML_AUTH_TOKEN='{auth_token}'
source ~/.bashrc
{bash_script}
{driver_extra}
python -m clearml_agent --config-file ~/clearml.conf daemon --queue '{queue}' {docker}
shutdown
'''
clearml_conf_template = '''\
agent.git_user="{git_user}"
agent.git_pass="{git_pass}"
{extra_clearml_conf}
{auth_token}
'''
@attr.s
class CloudDriver(ABC):
# git
git_user = attr.ib()
git_pass = attr.ib()
# clearml
extra_clearml_conf = attr.ib()
api_server = attr.ib()
web_server = attr.ib()
files_server = attr.ib()
access_key = attr.ib()
secret_key = attr.ib()
auth_token = attr.ib()
# Other
extra_vm_bash_script = attr.ib()
docker_image = attr.ib()
tags = attr.ib(default='')
session = attr.ib(default=None)
def __attrs_post_init__(self):
if self.session is None:
self.session = Session()
@abstractmethod
def spin_up_worker(self, resource, worker_prefix, queue_name, task_id):
"""Creates a new worker for clearml.
First, create an instance in the cloud and install some required packages.
Then, define clearml-agent environment variables and run clearml-agent for the specified queue.
NOTE: - Will wait until instance is running
- This implementation assumes the instance image already has docker installed
:param dict resource: resource configuration, as defined in BUDGET and QUEUES.
:param str worker_prefix: worker name without instance_id
:param str queue_name: clearml queue to listen to
:param str task_id: Task ID to restart
"""
@abstractmethod
def spin_down_worker(self, instance_id):
"""Destroys the cloud instance.
:param str instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported)
"""
@abstractmethod
def kind(self):
"""Return driver kind (e.g. 'AWS')"""
@abstractmethod
def instance_id_command(self):
"""Return a shell command to get instance ID"""
@abstractmethod
def instance_type_key(self):
"""Return key in configuration for instance type"""
def gen_user_data(self, worker_prefix, queue_name, task_id):
return bash_script_template.format(
queue=queue_name,
worker_prefix=worker_prefix,
auth_token=self.auth_token or '',
access_key=self.access_key,
api_server=self.api_server,
clearml_conf=self.clearml_conf(),
files_server=self.files_server,
secret_key=self.secret_key,
web_server=self.web_server,
bash_script=self.extra_vm_bash_script,
driver_extra=self.driver_bash_extra(task_id),
docker="--docker '{}'".format(self.docker_image) if self.docker_image else "",
instance_id_command=self.instance_id_command(),
)
def clearml_conf(self):
auth_token = ''
token = self.session.auth_token or self.auth_token
if token:
auth_token = 'agent.extra_docker_arguments = ["-e", "CLEARML_AUTH_TOKEN={}"]'.format(token)
# TODO: This need to be documented somewhere
git_user = environ.get(env_git_user) or self.git_user or ''
git_pass = environ.get(env_git_pass) or self.git_pass or ''
return clearml_conf_template.format(
git_user=git_user,
git_pass=git_pass,
extra_clearml_conf=self.extra_clearml_conf,
auth_token=auth_token,
)
def driver_bash_extra(self, task_id):
if not task_id:
return ''
return 'python -m clearml_agent --config-file ~/clearml.conf execute --id {}'.format(task_id)
@classmethod
def from_config(cls, config):
session = Session()
hyper_params, configurations = config['hyper_params'], config['configurations']
opts = {
'git_user': hyper_params['git_user'],
'git_pass': hyper_params['git_pass'],
'extra_clearml_conf': configurations['extra_clearml_conf'],
'api_server': session.get_api_server_host(),
'web_server': session.get_app_server_host(),
'files_server': session.get_files_server_host(),
'access_key': session.access_key,
'secret_key': session.secret_key,
'auth_token': ENV_AUTH_TOKEN.get(),
'extra_vm_bash_script': configurations['extra_vm_bash_script'],
'docker_image': hyper_params['default_docker_image'],
'tags': hyper_params.get('tags', ''),
'session': session,
}
return cls(**opts)
def set_scaler(self, scaler):
self.scaler = scaler
@property
def logger(self):
if self.scaler:
return self.scaler.logger
return logging.getLogger('AWSDriver')
def parse_tags(s):
"""
>>> parse_tags('k1=v1, k2=v2')
[('k1', 'v1'), ('k2', 'v2')]
"""
s = s.strip()
if not s:
return []
tags = []
for kv in s.split(','):
if '=' not in kv:
raise ValueError(kv)
key, value = [v.strip() for v in kv.split('=', 1)]
if not key or not value:
raise ValueError(kv)
tags.append((key, value))
return tags

6
dev-requirements.txt Normal file
View File

@ -0,0 +1,6 @@
-r requirements.txt
flake8-bugbear~=21.4
flake8~=3.9
pytest~=6.2
mock~=3.0 ; python_version < '3'

View File

@ -2,27 +2,46 @@ import json
from argparse import ArgumentParser
from collections import defaultdict
from itertools import chain
from pathlib import Path
from typing import Tuple
import yaml
from pathlib2 import Path
from six.moves import input
from clearml import Task
from clearml.automation.aws_auto_scaler import AwsAutoScaler
from clearml.automation.auto_scaler import AutoScaler, ScalerConfig
from clearml.automation.aws_driver import AWSDriver
from clearml.config import running_remotely
from clearml.utilities.wizard.user_input import (
get_input,
input_int,
input_bool,
multiline_input,
input_list,
get_input, input_bool, input_int, input_list, multiline_input
)
CONF_FILE = "aws_autoscaler.yaml"
DEFAULT_DOCKER_IMAGE = "nvidia/cuda:10.1-runtime-ubuntu18.04"
default_config = {
'hyper_params': {
'git_user': '',
'git_pass': '',
'cloud_credentials_key': '',
'cloud_credentials_secret': '',
'cloud_credentials_region': None,
'default_docker_image': 'nvidia/cuda',
'max_idle_time_min': 15,
'polling_interval_time_min': 5,
'max_spin_up_time_min': 30,
'workers_prefix': 'dynamic_worker',
'cloud_provider': '',
},
'configurations': {
'resource_configurations': None,
'queues': None,
'extra_trains_conf': '',
'extra_clearml_conf': '',
'extra_vm_bash_script': '',
},
}
def main():
parser = ArgumentParser()
parser.add_argument(
@ -37,11 +56,16 @@ def main():
action="store_true",
default=False,
)
parser.add_argument(
"--config-file",
help="Configuration file name",
type=Path,
default=Path("aws_autoscaler.yaml"),
)
args = parser.parse_args()
if running_remotely():
hyper_params = AwsAutoScaler.Settings().as_dict()
configurations = AwsAutoScaler.Configuration().as_dict()
conf = default_config
else:
print("AWS Autoscaler setup wizard\n"
"---------------------------\n"
@ -49,30 +73,26 @@ def main():
"Once completed, you will be able to view and change the configuration in the clearml-server web UI.\n"
"It means there is no need to worry about typos or mistakes :)\n")
config_file = Path(CONF_FILE).absolute()
if config_file.exists() and input_bool(
"Load configurations from config file '{}' [Y/n]? ".format(str(CONF_FILE)),
if args.config_file.exists() and input_bool(
"Load configurations from config file '{}' [Y/n]? ".format(args.config_file),
default=True,
):
with config_file.open("r") as f:
with args.config_file.open("r") as f:
conf = yaml.load(f, Loader=yaml.SafeLoader)
hyper_params = conf["hyper_params"]
configurations = conf["configurations"]
else:
configurations, hyper_params = run_wizard()
conf = {
"hyper_params": hyper_params,
"configurations": configurations,
}
# noinspection PyBroadException
try:
with config_file.open("w+") as f:
conf = {
"hyper_params": hyper_params,
"configurations": configurations,
}
with args.config_file.open("w+") as f:
yaml.safe_dump(conf, f)
except Exception:
print(
"Error! Could not write configuration file at: {}".format(
str(CONF_FILE)
args.config_file
)
)
return
@ -80,7 +100,8 @@ def main():
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init(project_name="DevOps", task_name="AWS Auto-Scaler", task_type=Task.TaskTypes.service)
task.connect(hyper_params)
task.connect(conf['hyper_params'])
configurations = conf['configurations']
configurations.update(json.loads(task.get_configuration_object(name="General") or "{}"))
task.set_configuration_object(name="General", config_text=json.dumps(configurations, indent=2))
@ -92,7 +113,9 @@ def main():
# the clearml-agent services will pick it up and execute it for us.
task.execute_remotely(queue_name='services')
autoscaler = AwsAutoScaler(hyper_params, configurations)
driver = AWSDriver.from_config(conf)
conf = ScalerConfig.from_config(conf)
autoscaler = AutoScaler(conf, driver)
if running_remotely() or args.run:
autoscaler.start()
@ -100,14 +123,14 @@ def main():
def run_wizard():
# type: () -> Tuple[dict, dict]
hyper_params = AwsAutoScaler.Settings()
configurations = AwsAutoScaler.Configuration()
hyper_params = default_config['hyper_params']
configurations = default_config['configurations']
hyper_params.cloud_credentials_key = get_input("AWS Access Key ID", required=True)
hyper_params.cloud_credentials_secret = get_input(
hyper_params['cloud_credentials_key'] = get_input("AWS Access Key ID", required=True)
hyper_params['cloud_credentials_secret'] = get_input(
"AWS Secret Access Key", required=True
)
hyper_params.cloud_credentials_region = get_input(
hyper_params['cloud_credentials_region'] = get_input(
"AWS region name",
"[us-east-1]",
default='us-east-1')
@ -127,13 +150,13 @@ def run_wizard():
)
)
else:
git_user = None
git_pass = None
git_user = ''
git_pass = ''
hyper_params.git_user = git_user
hyper_params.git_pass = git_pass
hyper_params['git_user'] = git_user
hyper_params['git_pass'] = git_pass
hyper_params.default_docker_image = get_input(
hyper_params['default_docker_image'] = get_input(
"default docker image/parameters",
"to use [{}]".format(DEFAULT_DOCKER_IMAGE),
default=DEFAULT_DOCKER_IMAGE,
@ -204,21 +227,21 @@ def run_wizard():
if not input_bool("\nDefine another instance type? [y/N]"):
break
configurations.resource_configurations = resource_configurations
configurations['resource_configurations'] = resource_configurations
configurations.extra_vm_bash_script, num_lines_bash_script = multiline_input(
configurations['extra_vm_bash_script'], num_lines_bash_script = multiline_input(
"\nEnter any pre-execution bash script to be executed on the newly created instances []"
)
print("Entered {} lines of pre-execution bash script".format(num_lines_bash_script))
configurations.extra_clearml_conf, num_lines_clearml_conf = multiline_input(
configurations['extra_clearml_conf'], num_lines_clearml_conf = multiline_input(
"\nEnter anything you'd like to include in your clearml.conf file []"
)
print("Entered {} extra lines for clearml.conf file".format(num_lines_clearml_conf))
print("\nDefine the machines budget:")
print("-----------------------------")
resource_configurations_names = list(configurations.resource_configurations.keys())
resource_configurations_names = list(configurations['resource_configurations'].keys())
queues = defaultdict(list)
while True:
while True:
@ -238,9 +261,9 @@ def run_wizard():
question="Select",
required=True,
)
if queue_type not in configurations.resource_configurations:
if queue_type not in configurations['resource_configurations']:
print("\tError: instance type '{}' not in predefined instances {}!".format(
queue_type, list(configurations.resource_configurations.keys())))
queue_type, resource_configurations_names))
continue
if queue_type in (q[0] for q in queues[queue_name]):
@ -252,8 +275,8 @@ def run_wizard():
queue_type_new = '{}_{}'.format(queue_type, queue_name)
print("\tInstance type '{}' already used, renaming instance to {}".format(
queue_type, queue_type_new))
configurations.resource_configurations[queue_type_new] = \
dict(**configurations.resource_configurations[queue_type])
configurations['resource_configurations'][queue_type_new] = \
dict(**configurations['resource_configurations'][queue_type])
queue_type = queue_type_new
# make sure the renamed name is not reused
@ -269,7 +292,7 @@ def run_wizard():
)
queues[queue_name].append((queue_type, max_instances))
valid_instances = [k for k in configurations.resource_configurations.keys()
valid_instances = [k for k in configurations['resource_configurations'].keys()
if k not in (q[0] for q in queues[queue_name])]
if not valid_instances:
break
@ -278,19 +301,19 @@ def run_wizard():
break
if not input_bool("\nAdd another queue? [y/N]"):
break
configurations.queues = dict(queues)
configurations['queues'] = dict(queues)
hyper_params.max_idle_time_min = input_int(
hyper_params['max_idle_time_min'] = input_int(
"maximum idle time",
"for the auto-scaler to spin down an instance (in minutes) [15]",
default=15,
new_line=True,
)
hyper_params.polling_interval_time_min = input_int(
hyper_params['polling_interval_time_min'] = input_int(
"instances polling interval", "for the auto-scaler (in minutes) [5]", default=5,
)
return configurations.as_dict(), hyper_params.as_dict()
return configurations, hyper_params
if __name__ == "__main__":