Autoscaler improvements and optimizations

Add customizable boot bash script
Fix custom bash script should be called last before starting agent
Fix auto scaler spins too many instances at onces then killing the idle ones (spin time is longer than poll time)
This commit is contained in:
allegroai 2021-06-12 23:10:05 +03:00
parent 332ceab3ea
commit d769582332
2 changed files with 107 additions and 65 deletions

View File

@ -23,6 +23,7 @@ class AutoScaler(object):
default_docker_image = attr.ib(default="nvidia/cuda")
max_idle_time_min = attr.ib(validator=instance_of(int), default=15)
polling_interval_time_min = attr.ib(validator=instance_of(int), default=5)
max_spin_up_time_min = attr.ib(validator=instance_of(int), default=30)
workers_prefix = attr.ib(default="dynamic_worker")
cloud_provider = attr.ib(default="")
@ -70,9 +71,12 @@ class AutoScaler(object):
if not self.sanity_check():
return
self.max_idle_time_min = int(settings.max_idle_time_min)
self.polling_interval_time_min = int(settings.polling_interval_time_min)
self.max_idle_time_min = float(settings.max_idle_time_min)
self.polling_interval_time_min = float(settings.polling_interval_time_min)
self.max_spin_up_time_min = float(settings.max_spin_up_time_min)
# make sure we have our own unique prefix, in case we have multiple dynamic auto-scalers
# they will mix each others instances
self.workers_prefix = settings.workers_prefix
self.cloud_provider = settings.cloud_provider
@ -112,6 +116,8 @@ class AutoScaler(object):
:param str resource: resource name, as defined in self.resource_configurations and self.queues.
:param str worker_id_prefix: worker name prefix
:param str queue_name: clearml queue to listen to
:return str: worker_id prefix to identify when spin was successful
"""
pass
@ -138,7 +144,7 @@ class AutoScaler(object):
minutes would be removed.
"""
# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ';'
# Worker's id in clearml would be composed from prefix, name, instance_type and cloud_id separated by ':'
workers_pattern = re.compile(
r"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)"
)
@ -152,15 +158,20 @@ class AutoScaler(object):
api_client = APIClient()
# Verify the requested queues exist and create those that doesn't exist
all_queues = [q.name for q in list(api_client.queues.get_all())]
all_queues = [q.name for q in list(api_client.queues.get_all(only_fields=['name']))]
missing_queues = [q for q in self.queues if q not in all_queues]
for q in missing_queues:
api_client.queues.create(q)
idle_workers = {}
# a dict of resource_names and lists of time_stamps of instances that were just spun
# example
# spun_workers['resource_type'] = [time()]
spun_workers = {}
previous_workers = set()
while True:
queue_name_to_id = {
queue.name: queue.id for queue in api_client.queues.get_all()
queue.name: queue.id for queue in api_client.queues.get_all(only_fields=['id', 'name'])
}
resource_to_queue = {
item[0]: queue
@ -173,21 +184,39 @@ class AutoScaler(object):
if workers_pattern.match(worker.id)
and workers_pattern.match(worker.id)["prefix"] == self.workers_prefix
]
# update spun_workers (remove instances that are fully registered)
for worker in all_workers:
if worker.id not in previous_workers:
# look for the spun instance and remove it
resource_name = workers_pattern.match(worker.id)[
"name"
]
spun_workers[resource_name] = spun_workers.get(resource_name, [])[1:]
# remove old spun workers based on time out:
for resource in spun_workers.keys():
time_stamp_list = [
t for t in spun_workers[resource] if time() - t < self.max_spin_up_time_min*60.]
deleted = len(spun_workers[resource]) - len(time_stamp_list)
if deleted:
print('Ignoring {} stuck instances of type {}'.format(deleted, resource))
# Workers without a task, are added to the idle list
for worker in all_workers:
if not hasattr(worker, "task") or not worker.task:
if worker.id not in idle_workers:
resource_name = workers_pattern.match(worker.id)[
"instance_type"
]
idle_workers[worker.id] = (time(), resource_name, worker)
elif (
hasattr(worker, "task")
and worker.task
and worker.id in idle_workers
):
idle_workers.pop(worker.id, None)
if not all_workers:
idle_workers = {}
else:
for worker in all_workers:
if not hasattr(worker, "task") or not worker.task:
if worker.id not in idle_workers:
resource_name = workers_pattern.match(worker.id)[
"instance_type"
]
idle_workers[worker.id] = (time(), resource_name, worker)
elif (
hasattr(worker, "task")
and worker.task
and worker.id in idle_workers
):
idle_workers.pop(worker.id, None)
required_idle_resources = [] # idle resources we'll need to keep running
allocate_new_resources = [] # resources that will need to be started
@ -202,8 +231,14 @@ class AutoScaler(object):
free_queue_resources = [
resource
for _, resource, _ in idle_workers.values()
if resource in queue_resources
if any(q_r for q_r in queue_resources if resource in q_r[0])
]
# if we have an instance waiting to be spun
# remove it from the required allocation resources
for resource, time_stamps_list in spun_workers.items():
if time_stamps_list and any(q_r for q_r in queue_resources if resource in q_r[0]):
free_queue_resources += [resource] * len(time_stamps_list)
required_idle_resources.extend(free_queue_resources)
spin_up_count = len(entries) - len(free_queue_resources)
spin_up_resources = []
@ -212,34 +247,40 @@ class AutoScaler(object):
for resource, max_instances in queue_resources:
if len(spin_up_resources) >= spin_up_count:
break
max_allowed = int(max_instances) - len(
[
worker
for worker in all_workers
if workers_pattern.match(worker.id)["name"] == resource
]
)
spin_up_resources.extend(
[resource] * min(max_allowed, spin_up_count)
)
# check if we can add instances to `resource`
currently_running_workers = len(
[worker for worker in all_workers if workers_pattern.match(worker.id)["name"] == resource])
spun_up_workers = len(spun_workers.get(resource, []))
max_allowed = int(max_instances) - currently_running_workers - spun_up_workers
if max_allowed > 0:
spin_up_resources.extend(
[resource] * spin_up_count
)
allocate_new_resources.extend(spin_up_resources)
# Now we actually spin the new machines
for resource in allocate_new_resources:
self.spin_up_worker(
resource, self.workers_prefix, resource_to_queue[resource]
)
try:
print('Spinning new instance type={}'.format(resource))
self.spin_up_worker(
resource, self.workers_prefix, resource_to_queue[resource]
)
spun_workers[resource] = spun_workers.get(resource, []) + [time()]
except Exception as ex:
print(f"Error: Failed to start new instance, {ex}")
# Go over the idle workers list, and spin down idle workers
for timestamp, resources, worker in idle_workers.values():
for worker_id in list(idle_workers):
timestamp, resources, worker = idle_workers[worker_id]
# skip resource types that might be needed
if resources in required_idle_resources:
continue
# Remove from both aws and clearml all instances that are idle for longer than MAX_IDLE_TIME_MIN
if time() - timestamp > self.max_idle_time_min * 60.0:
cloud_id = workers_pattern.match(worker.id)["cloud_id"]
cloud_id = workers_pattern.match(worker_id)["cloud_id"]
self.spin_down_worker(cloud_id)
worker.unregister()
print(f"Spin down instance cloud id {cloud_id}")
idle_workers.pop(worker_id, None)
# Nothing else to do
sleep(self.polling_interval_time_min * 60.0)

View File

@ -25,6 +25,35 @@ class AwsAutoScaler(AutoScaler):
workers_prefix = attr.ib(default="dynamic_aws")
cloud_provider = attr.ib(default="AWS")
startup_bash_script = [
"#!/bin/bash",
"sudo apt-get update",
"sudo apt-get install -y python3-dev",
"sudo apt-get install -y python3-pip",
"sudo apt-get install -y gcc",
"sudo apt-get install -y git",
"sudo apt-get install -y build-essential",
"python3 -m pip install -U pip",
"python3 -m pip install virtualenv",
"python3 -m virtualenv clearml_agent_venv",
"source clearml_agent_venv/bin/activate",
"python -m pip install clearml-agent",
"echo 'agent.git_user=\"{git_user}\"' >> /root/clearml.conf",
"echo 'agent.git_pass=\"{git_pass}\"' >> /root/clearml.conf",
"echo \"{clearml_conf}\" >> /root/clearml.conf",
"export CLEARML_API_HOST={api_server}",
"export CLEARML_WEB_HOST={web_server}",
"export CLEARML_FILES_HOST={files_server}",
"export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`",
"export CLEARML_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID",
"export CLEARML_API_ACCESS_KEY='{access_key}'",
"export CLEARML_API_SECRET_KEY='{secret_key}'",
"source ~/.bashrc",
"{bash_script}",
"python -m clearml_agent --config-file '/root/clearml.conf' daemon --queue '{queue}' {docker}",
"shutdown",
]
def __init__(self, settings, configuration):
# type: (Union[dict, AwsAutoScaler.Settings], Union[dict, AwsAutoScaler.Configuration]) -> None
super(AwsAutoScaler, self).__init__(settings, configuration)
@ -51,33 +80,7 @@ class AwsAutoScaler(AutoScaler):
# user_data script will automatically run when the instance is started. it will install the required packages
# for clearml-agent configure it using environment variables and run clearml-agent on the required queue
user_data = """#!/bin/bash
sudo apt-get update
sudo apt-get install -y python3-dev
sudo apt-get install -y python3-pip
sudo apt-get install -y gcc
sudo apt-get install -y git
sudo apt-get install -y build-essential
python3 -m pip install -U pip
python3 -m pip install virtualenv
python3 -m virtualenv clearml_agent_venv
source clearml_agent_venv/bin/activate
python -m pip install clearml-agent
echo 'agent.git_user=\"{git_user}\"' >> /root/clearml.conf
echo 'agent.git_pass=\"{git_pass}\"' >> /root/clearml.conf
echo "{clearml_conf}" >> /root/clearml.conf
export CLEARML_API_HOST={api_server}
export CLEARML_WEB_HOST={web_server}
export CLEARML_FILES_HOST={files_server}
export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`
export CLEARML_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID
export CLEARML_API_ACCESS_KEY='{access_key}'
export CLEARML_API_SECRET_KEY='{secret_key}'
{bash_script}
source ~/.bashrc
python -m clearml_agent --config-file '/root/clearml.conf' daemon --queue '{queue}' {docker}
shutdown
""".format(
user_data = ('\n'.join(self.startup_bash_script) + '\n').format(
api_server=self.api_server,
web_server=self.web_server,
files_server=self.files_server,
@ -89,9 +92,7 @@ class AwsAutoScaler(AutoScaler):
git_pass=self.git_pass or "",
clearml_conf='\\"'.join(self.extra_clearml_conf.split('"')),
bash_script=self.extra_vm_bash_script,
docker="--docker '{}'".format(self.default_docker_image)
if self.default_docker_image
else "",
docker="--docker '{}'".format(self.default_docker_image) if self.default_docker_image else "",
)
ec2 = boto3.client(