Add AWS EC2 Auto-Scaler service example

This commit is contained in:
allegroai 2020-07-11 01:32:51 +03:00
parent 25fd425bf7
commit 8d7740ea68
4 changed files with 693 additions and 0 deletions

View File

@ -0,0 +1,260 @@
import distutils
from argparse import ArgumentParser
from collections import defaultdict
from pathlib import Path
from typing import Optional, Tuple
import yaml
from six.moves import input
from trains import Task
from trains.automation.aws_auto_scaler import AwsAutoScaler
from trains.config import running_remotely
CONF_FILE = "aws_autoscaler.yaml"
DEFAULT_DOCKER_IMAGE = "nvidia/cuda"
def main():
parser = ArgumentParser()
parser.add_argument(
"--run",
help="Run the autoscaler after wizard finished",
action="store_true",
default=False,
)
args = parser.parse_args()
if running_remotely():
hyper_params = AwsAutoScaler.Settings().as_dict()
configurations = AwsAutoScaler.Configuration().as_dict()
else:
print("AWS Autoscaler setup\n")
config_file = Path(CONF_FILE).absolute()
if config_file.exists() and input_bool(
"Load configurations from config file '{}' [Y/n]? ".format(str(CONF_FILE)),
default=True,
):
with config_file.open("r") as f:
conf = yaml.load(f, Loader=yaml.SafeLoader)
hyper_params = conf["hyper_params"]
configurations = conf["configurations"]
else:
configurations, hyper_params = run_wizard()
try:
with config_file.open("w+") as f:
conf = {
"hyper_params": hyper_params,
"configurations": configurations,
}
yaml.safe_dump(conf, f)
except Exception:
print(
"Error! Could not write configuration file at: {}".format(
str(CONF_FILE)
)
)
return
task = Task.init(project_name="Auto-Scaler", task_name="AWS Auto-Scaler")
task.connect(hyper_params)
task.connect_configuration(configurations)
autoscaler = AwsAutoScaler(hyper_params, configurations)
if running_remotely() or args.run:
autoscaler.start()
def run_wizard():
# type: () -> Tuple[dict, dict]
hyper_params = AwsAutoScaler.Settings()
configurations = AwsAutoScaler.Configuration()
hyper_params.cloud_credentials_key = get_input("AWS Access Key ID", required=True)
hyper_params.cloud_credentials_secret = get_input(
"AWS Secret Access Key", required=True
)
hyper_params.cloud_credentials_region = get_input("AWS region name", required=True)
# get GIT User/Pass for cloning
print(
"\nGIT credentials:"
"\nEnter GIT username for repository cloning (leave blank for SSH key authentication): [] ",
end="",
)
git_user = input()
if git_user.strip():
print("Enter password for user '{}': ".format(git_user), end="")
git_pass = input()
print(
"Git repository cloning will be using user={} password={}".format(
git_user, git_pass
)
)
else:
git_user = None
git_pass = None
hyper_params.git_user = git_user
hyper_params.git_pass = git_pass
hyper_params.default_docker_image = get_input(
"default docker image/parameters",
"to use [default is {}]".format(DEFAULT_DOCKER_IMAGE),
default=DEFAULT_DOCKER_IMAGE,
new_line=True,
)
print("\nDefine the type of machines you want the autoscaler to use")
resource_configurations = {}
while True:
resource_name = get_input(
"machine type name",
"(remember it, we will later use it in the budget section)",
required=True,
new_line=True,
)
resource_configurations[resource_name] = {
"instance_type": get_input(
"instance type",
"for resource '{}' [default is 'g4dn.4xlarge']".format(resource_name),
default="g4dn.4xlarge",
),
"is_spot": input_bool(
"is '{}' resource using spot instances? [t/F]".format(resource_name)
),
"availability_zone": get_input(
"availability zone",
"for resource '{}' [default is 'us-east-1b']".format(resource_name),
default="us-east-1b",
),
"ami_id": get_input(
"ami_id",
"for resource '{}' [default is 'ami-07c95cafbb788face']".format(
resource_name
),
default="ami-07c95cafbb788face",
),
"ebs_device_name": get_input(
"ebs_device_name",
"for resource '{}' [default is '/dev/xvda']".format(resource_name),
default="/dev/xvda",
),
"ebs_volume_size": input_int(
"ebs_volume_size",
" for resource '{}' [default is '100']".format(resource_name),
default=100,
),
"ebs_volume_type": get_input(
"ebs_volume_type",
"for resource '{}' [default is 'gp2']".format(resource_name),
default="gp2",
),
}
if not input_bool("\nDefine another resource? [y/N]"):
break
configurations.resource_configurations = resource_configurations
configurations.extra_vm_bash_script = input(
"\nEnter any pre-execution bash script to be executed on the newly created instances: "
)
print("\nSet up the budget\n")
queues = defaultdict(list)
while True:
queue_name = get_input("queue name", required=True)
while True:
queue_type = get_input(
"queue type",
"(use the resources names defined earlier)",
required=True,
)
max_instances = input_int(
"maximum number of instances allowed", required=True
)
queues[queue_name].append((queue_type, max_instances))
if not input_bool("\nAdd another type to queue? [y/N]: "):
break
if not input_bool("Define another queue? [y/N]: "):
break
configurations.queues = dict(queues)
hyper_params.max_idle_time_min = input_int(
"maximum idle time",
"for the autoscaler (in minutes, default is 15)",
default=15,
new_line=True,
)
hyper_params.polling_interval_time_min = input_int(
"polling interval", "for the autoscaler (in minutes, default is 5)", default=5,
)
return configurations.as_dict(), hyper_params.as_dict()
def get_input(
key, # type: str
description="", # type: str
question="Enter", # type: str
required=False, # type: bool
default=None, # type: Optional[str]
new_line=False, # type: bool
):
# type: (...) -> Optional[str]
if new_line:
print()
while True:
value = input("{} {} {}: ".format(question, key, description))
if not value.strip() and required:
print("{} is required".format(key))
elif not (value.strip() or required):
return default
else:
return value
def input_int(
key, # type: str
description="", # type: str
required=False, # type: bool
default=None, # type: Optional[int]
new_line=False, # type: bool
):
# type: (...) -> Optional[int]
while True:
try:
value = int(
get_input(
key,
description,
required=required,
default=default,
new_line=new_line,
)
)
return value
except ValueError:
print(
"Invalid input: {} should be a number. Please enter an integer".format(
key
)
)
def input_bool(question, default=False):
# type: (str, bool) -> bool
while True:
try:
response = input("{}: ".format(question)).lower()
if not response:
return default
return distutils.util.strtobool(response)
except ValueError:
print("Invalid input: please enter yes or no")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
boto3
pyYaml
six
trains

View File

@ -0,0 +1,244 @@
import os
import re
from itertools import chain
from operator import itemgetter
from time import sleep, time
from typing import Union
import attr
from attr.validators import instance_of
from ..backend_api import Session
from ..backend_api.session.client import APIClient
class AutoScaler(object):
@attr.s
class Settings(object):
git_user = attr.ib(default=None)
git_pass = attr.ib(default=None)
cloud_credentials_key = attr.ib(default=None)
cloud_credentials_secret = attr.ib(default=None)
cloud_credentials_region = attr.ib(default=None)
default_docker_image = attr.ib(default="nvidia/cuda")
max_idle_time_min = attr.ib(validator=instance_of(int), default=15)
polling_interval_time_min = attr.ib(validator=instance_of(int), default=5)
workers_prefix = attr.ib(default="dynamic_worker")
cloud_provider = attr.ib(default="")
def as_dict(self):
return attr.asdict(self)
@attr.s
class Configuration(object):
resource_configurations = attr.ib(default=None)
queues = attr.ib(default=None)
extra_trains_conf = attr.ib(default="")
extra_vm_bash_script = attr.ib(default="")
def as_dict(self):
return attr.asdict(self)
def __init__(self, settings, configuration):
# type: (Union[dict, AutoScaler.Settings], Union[dict, AutoScaler.Configuration]) -> None
if isinstance(settings, dict):
settings = self.Settings(**settings)
if isinstance(configuration, dict):
configuration = self.Configuration(**configuration)
self.web_server = Session.get_app_server_host()
self.api_server = Session.get_api_server_host()
self.files_server = Session.get_files_server_host()
session = Session()
self.access_key = session.access_key
self.secret_key = session.secret_key
self.git_user = settings.git_user
self.git_pass = settings.git_pass
self.cloud_credentials_key = settings.cloud_credentials_key
self.cloud_credentials_secret = settings.cloud_credentials_secret
self.cloud_credentials_region = settings.cloud_credentials_region
self.default_docker_image = settings.default_docker_image
self.extra_trains_conf = configuration.extra_trains_conf
self.extra_vm_bash_script = configuration.extra_vm_bash_script
self.resource_configurations = configuration.resource_configurations
self.queues = configuration.queues
if not self.sanity_check():
return
self.max_idle_time_min = int(settings.max_idle_time_min)
self.polling_interval_time_min = int(settings.polling_interval_time_min)
self.workers_prefix = settings.workers_prefix
self.cloud_provider = settings.cloud_provider
def sanity_check(self):
# Sanity check - Validate queue resources
if len(set(map(itemgetter(0), chain(*self.queues.values())))) != sum(
map(len, self.queues.values())
):
print(
"Error: at least one resource name is used in multiple queues. "
"A resource name can only appear in a single queue definition."
)
return False
return True
def start(self):
# Loop forever, it is okay we are stateless
while True:
try:
self.supervisor()
except Exception as ex:
print(
"Warning! exception occurred: {ex}\nRetry in 15 seconds".format(
ex=ex
)
)
sleep(15)
def spin_up_worker(self, resource, worker_id_prefix, queue_name):
"""
Creates a new worker for trains (cloud-specific implementation).
First, create an instance in the cloud and install some required packages.
Then, define trains-agent environment variables and run trains-agent for the specified queue.
NOTE: - Will wait until instance is running
- This implementation assumes the instance image already has docker installed
:param str resource: resource name, as defined in self.resource_configurations and self.queues.
:param str worker_id_prefix: worker name prefix
:param str queue_name: trains queue to listen to
"""
pass
def spin_down_worker(self, instance_id):
"""
Destroys the cloud instance (cloud-specific implementation).
:param instance_id: Cloud instance ID to be destroyed
:type instance_id: str
"""
pass
def supervisor(self):
"""
Spin up or down resources as necessary.
- For every queue in self.queues do the following:
1. Check if there are tasks waiting in the queue.
2. Check if there are enough idle workers available for those tasks.
3. In case more instances are required, and we haven't reached max instances allowed,
create the required instances with regards to the maximum number defined in self.queues
Choose which instance to create according to their order in self.queues. Won't create more instances
if maximum number defined has already reached.
- spin down instances according to their idle time. instance which is idle for more than self.max_idle_time_min
minutes would be removed.
"""
# Worker's id in trains would be composed from prefix, name, instance_type and cloud_id separated by ';'
workers_pattern = re.compile(
r"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)"
)
# Set up the environment variables for trains
os.environ["TRAINS_API_HOST"] = self.api_server
os.environ["TRAINS_WEB_HOST"] = self.web_server
os.environ["TRAINS_FILES_HOST"] = self.files_server
os.environ["TRAINS_API_ACCESS_KEY"] = self.access_key
os.environ["TRAINS_API_SECRET_KEY"] = self.secret_key
api_client = APIClient()
# Verify the requested queues exist and create those that doesn't exist
all_queues = [q.name for q in list(api_client.queues.get_all())]
missing_queues = [q for q in self.queues if q not in all_queues]
for q in missing_queues:
api_client.queues.create(q)
idle_workers = {}
while True:
queue_name_to_id = {
queue.name: queue.id for queue in api_client.queues.get_all()
}
resource_to_queue = {
item[0]: queue
for queue, resources in self.queues.items()
for item in resources
}
all_workers = [
worker
for worker in api_client.workers.get_all()
if workers_pattern.match(worker.id)
and workers_pattern.match(worker.id)["prefix"] == self.workers_prefix
]
# Workers without a task, are added to the idle list
for worker in all_workers:
if not hasattr(worker, "task") or not worker.task:
if worker.id not in idle_workers:
resource_name = workers_pattern.match(worker.id)[
"instance_type"
]
idle_workers[worker.id] = (time(), resource_name, worker)
elif (
hasattr(worker, "task")
and worker.task
and worker.id in idle_workers
):
idle_workers.pop(worker.id, None)
required_idle_resources = [] # idle resources we'll need to keep running
allocate_new_resources = [] # resources that will need to be started
# Check if we have tasks waiting on one of the designated queues
for queue in self.queues:
entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries
if entries and len(entries) > 0:
queue_resources = self.queues[queue]
# If we have an idle worker matching the required resource,
# remove it from the required allocation resources
free_queue_resources = [
resource
for _, resource, _ in idle_workers.values()
if resource in queue_resources
]
required_idle_resources.extend(free_queue_resources)
spin_up_count = len(entries) - len(free_queue_resources)
spin_up_resources = []
# Add as many resources as possible to handle this queue's entries
for resource, max_instances in queue_resources:
if len(spin_up_resources) >= spin_up_count:
break
max_allowed = int(max_instances) - len(
[
worker
for worker in all_workers
if workers_pattern.match(worker.id)["name"] == resource
]
)
spin_up_resources.extend(
[resource] * min(max_allowed, spin_up_count)
)
allocate_new_resources.extend(spin_up_resources)
# Now we actually spin the new machines
for resource in allocate_new_resources:
self.spin_up_worker(
resource, self.workers_prefix, resource_to_queue[resource]
)
# Go over the idle workers list, and spin down idle workers
for timestamp, resources, worker in idle_workers.values():
# skip resource types that might be needed
if resources in required_idle_resources:
continue
# Remove from both aws and trains all instances that are idle for longer than MAX_IDLE_TIME_MIN
if time() - timestamp > self.max_idle_time_min * 60.0:
cloud_id = workers_pattern.match(worker.id)["cloud_id"]
self.spin_down_worker(cloud_id)
worker.unregister()
# Nothing else to do
sleep(self.polling_interval_time_min * 60.0)

View File

@ -0,0 +1,185 @@
import base64
from typing import Union
import attr
from .auto_scaler import AutoScaler
from .. import Task
try:
# noinspection PyPackageRequirements
import boto3
Task.add_requirements('boto3')
except ImportError:
raise ValueError("AwsAutoScaler requires 'boto3' package, it was not found\n"
"install with: pip install boto3")
class AwsAutoScaler(AutoScaler):
@attr.s
class Settings(AutoScaler.Settings):
workers_prefix = attr.ib(default="dynamic_aws")
cloud_provider = attr.ib(default="AWS")
def __init__(self, settings, configuration):
# type: (Union[dict, AwsAutoScaler.Settings], Union[dict, AwsAutoScaler.Configuration]) -> None
super(AwsAutoScaler, self).__init__(settings, configuration)
def spin_up_worker(self, resource, worker_id_prefix, queue_name):
"""
Creates a new worker for trains.
First, create an instance in the cloud and install some required packages.
Then, define trains-agent environment variables and run trains-agent for the specified queue.
NOTE: - Will wait until instance is running
- This implementation assumes the instance image already has docker installed
:param str resource: resource name, as defined in BUDGET and QUEUES.
:param str worker_id_prefix: worker name prefix
:param str queue_name: trains queue to listen to
"""
resource_conf = self.resource_configurations[resource]
# Add worker type and AWS instance type to the worker name.
worker_id = "{worker_id_prefix}:{worker_type}:{instance_type}".format(
worker_id_prefix=worker_id_prefix,
worker_type=resource,
instance_type=resource_conf["instance_type"],
)
# user_data script will automatically run when the instance is started. it will install the required packages
# for trains-agent configure it using environment variables and run trains-agent on the required queue
user_data = """#!/bin/bash
sudo apt-get update
sudo apt-get install -y python3-dev
sudo apt-get install -y python3-pip
sudo apt-get install -y gcc
sudo apt-get install -y git
sudo apt-get install -y build-essential
python3 -m pip install -U pip
python3 -m pip install virtualenv
python3 -m virtualenv trains_agent_venv
source trains_agent_venv/bin/activate
python -m pip install trains-agent
echo 'agent.git_user=\"{git_user}\"' >> /root/trains.conf
echo 'agent.git_pass=\"{git_pass}\"' >> /root/trains.conf
echo "{trains_conf}" >> /root/trains.conf
export TRAINS_API_HOST={api_server}
export TRAINS_WEB_HOST={web_server}
export TRAINS_FILES_HOST={files_server}
export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`
export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID
export TRAINS_API_ACCESS_KEY='{access_key}'
export TRAINS_API_SECRET_KEY='{secret_key}'
{bash_script}
source ~/.bashrc
python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}
shutdown
""".format(
api_server=self.api_server,
web_server=self.web_server,
files_server=self.files_server,
worker_id=worker_id,
access_key=self.access_key,
secret_key=self.secret_key,
queue=queue_name,
git_user=self.git_user,
git_pass=self.git_pass,
trains_conf=self.extra_trains_conf,
bash_script=self.extra_vm_bash_script,
docker="--docker '{}'".format(self.default_docker_image)
if self.default_docker_image
else "",
)
ec2 = boto3.client(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
)
if resource_conf["is_spot"]:
# Create a request for a spot instance in AWS
encoded_user_data = base64.b64encode(user_data.encode("ascii")).decode(
"ascii"
)
instances = ec2.request_spot_instances(
LaunchSpecification={
"ImageId": resource_conf["ami_id"],
"InstanceType": resource_conf["instance_type"],
"Placement": {
"AvailabilityZone": resource_conf["availability_zone"]
},
"UserData": encoded_user_data,
"BlockDeviceMappings": [
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf["ebs_volume_size"],
"VolumeType": resource_conf["ebs_volume_type"],
},
}
],
}
)
# Wait until spot request is fulfilled
request_id = instances["SpotInstanceRequests"][0]["SpotInstanceRequestId"]
waiter = ec2.get_waiter("spot_instance_request_fulfilled")
waiter.wait(SpotInstanceRequestIds=[request_id])
# Get the instance object for later use
response = ec2.describe_spot_instance_requests(
SpotInstanceRequestIds=[request_id]
)
instance_id = response["SpotInstanceRequests"][0]["InstanceId"]
else:
# Create a new EC2 instance
instances = ec2.run_instances(
ImageId=resource_conf["ami_id"],
MinCount=1,
MaxCount=1,
InstanceType=resource_conf["instance_type"],
UserData=user_data,
InstanceInitiatedShutdownBehavior="terminate",
BlockDeviceMappings=[
{
"DeviceName": resource_conf["ebs_device_name"],
"Ebs": {
"VolumeSize": resource_conf["ebs_volume_size"],
"VolumeType": resource_conf["ebs_volume_type"],
},
}
],
)
# Get the instance object for later use
instance_id = instances["Instances"][0]["InstanceId"]
instance = boto3.resource(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
).Instance(instance_id)
# Wait until instance is in running state
instance.wait_until_running()
# Cloud-specific implementation (currently, only AWS EC2 is supported)
def spin_down_worker(self, instance_id):
"""
Destroys the cloud instance.
:param instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported)
:type instance_id: str
"""
try:
boto3.resource(
"ec2",
aws_access_key_id=self.cloud_credentials_key or None,
aws_secret_access_key=self.cloud_credentials_secret or None,
region_name=self.cloud_credentials_region,
).instances.filter(InstanceIds=[instance_id]).terminate()
except Exception as ex:
raise ex