clearml-agent/examples/dynamic_cloud_cluster.ipynb
pollfly 2c7f091e57
Update example (#177)
* Edit README

* Edit README

* small edits

* update example

* update example

* update example
2023-12-09 12:52:44 +02:00

592 lines
24 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Auto-Magically Spin AWS EC2 Instances On Demand \n",
"# and Create a Dynamic Cluster Running *ClearML-Agent*\n",
"\n",
"## Define your budget and execute the notebook, that's it\n",
"## You now have a fully managed cluster on AWS 🎉 🎊"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**clearml-agent**'s main goal is to quickly pull a job from an execution queue, set up the environment (as defined in the experiment, including git cloning, python packages etc.), then execute the experiment and monitor it.\n",
"\n",
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
"\n",
"> **Note:**\n",
"> This is just an example of how you can use ClearML Agent to implement custom autoscaling. For a more structured autoscaler script, see [here](https://github.com/allegroai/clearml/blob/master/clearml/automation/auto_scaler.py).\n",
"\n",
"Configuration steps:\n",
"- Define maximum budget to be used (instance type / number of instances).\n",
"- Create new execution *queues* in the **clearml-server**.\n",
"- Define mapping between the created *queues* and an instance budget.\n",
"\n",
"**TL;DR - This notebook:**\n",
"- Will spin instances if there are jobs in the execution *queues* until it will hit the budget limit.\n",
"- If machines are idle, it will spin them down.\n",
"\n",
"The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.\n",
"\n",
"It is as simple as it sounds, but extremely powerful\n",
"\n",
"Enjoy your newly created dynamic cluster :)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Install & import required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install clearml-agent\n",
"!pip install boto3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3\n",
"RESOURCE_CONFIGURATIONS = {\n",
" \"amazon_ec2_normal\": {\n",
" \"instance_type\": \"g4dn.4xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
" \"amazon_ec2_high\": {\n",
" \"instance_type\": \"g4dn.8xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
"}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define machine budget per execution queue\n",
"\n",
"Now that we defined our budget, we need to connect it with the **ClearML** cluster.\n",
"\n",
"We map each queue to a resource type (instance type).\n",
"\n",
"Create two queues in the Web UI:\n",
"- Browse to http://your_clearml_server_ip:8080/workers-and-queues/queues\n",
"- Then click on the \"New Queue\" button and name your queues \"aws_normal\" and \"aws_high\" respectively\n",
"\n",
"The QUEUES dictionary holds the mapping between the queue name and the type/number of instances to spin connected to the specific queue.\n",
"```\n",
"QUEUES = {\n",
" 'queue_name': [(\"instance-type-as-defined-in-RESOURCE_CONFIGURATIONS\", max_number_of_instances), ]\n",
"}\n",
"```\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# ClearML Agent Queues - Machines budget per Queue\n",
"# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS,\n",
"# max instances for the specific queue). Order machines from most preferred to least.\n",
"QUEUES = {\n",
" \"aws_normal\": [(\"amazon_ec2_normal\", 2),],\n",
" \"aws_high\": [(\"amazon_ec2_high\", 1)],\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Credentials for your AWS account, as well as for your **ClearML Server**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# AWS credentials (leave empty to use credentials set using the aws cli)\n",
"CLOUD_CREDENTIALS_KEY = \"\"\n",
"CLOUD_CREDENTIALS_SECRET = \"\"\n",
"CLOUD_CREDENTIALS_REGION = \"us-east-1\"\n",
"\n",
"# CLEARML configuration\n",
"CLEARML_WEB_SERVER = \"http://localhost:8080\"\n",
"CLEARML_API_SERVER = \"http://localhost:8008\"\n",
"CLEARML_FILES_SERVER = \"http://localhost:8081\"\n",
"# CLEARML credentials\n",
"CLEARML_API_ACCESS_KEY = \"\"\n",
"CLEARML_API_SECRET_KEY = \"\"\n",
"# Git User/Pass to be used by clearml-agent,\n",
"# leave empty if image already contains git ssh-key\n",
"CLEARML_AGENT_GIT_USER = \"\"\n",
"CLEARML_AGENT_GIT_PASS = \"\"\n",
"\n",
"# Additional fields for clearml.conf file created on the remote instance\n",
"# for example: 'agent.default_docker.image: \"nvidia/cuda:11.0.3-cudnn8-runtime-ubuntu20.04\"'\n",
"\n",
"EXTRA_CLEARML_CONF = \"\"\"\n",
"\"\"\"\n",
"\n",
"# Bash script to run on instances before running clearml-agent\n",
"# Example: \"\"\"\n",
"# echo \"This is the first line\"\n",
"# echo \"This is the second line\"\n",
"# \"\"\"\n",
"EXTRA_BASH_SCRIPT = \"\"\"\n",
"\"\"\"\n",
"\n",
"# Default docker for clearml-agent when running in docker mode (requires docker v19.03 and above).\n",
"# Leave empty to run clearml-agent in non-docker mode.\n",
"CLEARML_AGENT_DOCKER_IMAGE = \"nvidia/cuda\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Controller Internal Definitions\n",
"\n",
"# maximum idle time in minutes, after which the instance will be shutdown\n",
"MAX_IDLE_TIME_MIN = 15\n",
"# polling interval in minutes\n",
"# make sure to increase in case bash commands were added in EXTRA_BASH_SCRIPT\n",
"POLLING_INTERVAL_MIN = 5.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import Packages and Budget Definition Sanity Check"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import base64\n",
"import re\n",
"import os\n",
"from itertools import chain\n",
"from operator import itemgetter\n",
"from time import sleep, time\n",
"\n",
"import boto3\n",
"from clearml_agent.backend_api.session.client import APIClient"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Sanity Check - Validate Queue Resources\n",
"if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(\n",
" map(len, QUEUES.values())\n",
"):\n",
" print(\n",
" \"Error: at least one resource name is used in multiple queues. \"\n",
" \"A resource name can only appear in a single queue definition.\"\n",
" )\n",
"\n",
"# Encode EXTRA_CLEARML_CONF for later bash script usage\n",
"EXTRA_CLEARML_CONF_ENCODED = \"\\\\\\\"\".join(EXTRA_CLEARML_CONF.split(\"\\\"\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Cloud specific implementation of spin up/down - currently supports AWS only"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_up_worker(resource, worker_id_prefix, queue_name):\n",
" \"\"\"\n",
" Creates a new worker for clearml.\n",
" First, create an instance in the cloud and install some required packages.\n",
" Then, define clearml-agent environment variables and run\n",
" clearml-agent for the specified queue.\n",
" NOTE: - Will wait until instance is running\n",
" - This implementation assumes the instance image already has docker installed\n",
"\n",
" :param str resource: resource name, as defined in BUDGET and QUEUES.\n",
" :param str worker_id_prefix: worker name prefix\n",
" :param str queue_name: clearml queue to listen to\n",
" \"\"\"\n",
" resource_conf = RESOURCE_CONFIGURATIONS[resource]\n",
" # Add worker type and AWS instance type to the worker name.\n",
" worker_id = \"{worker_id_prefix}:{worker_type}:{instance_type}\".format(\n",
" worker_id_prefix=worker_id_prefix,\n",
" worker_type=resource,\n",
" instance_type=resource_conf[\"instance_type\"],\n",
" )\n",
"\n",
" # user_data script will automatically run when the instance is started. \n",
" # It will install the required packages for clearml-agent configure it using\n",
" # environment variables and run clearml-agent on the required queue\n",
" user_data = \"\"\"#!/bin/bash\n",
" sudo apt-get update\n",
" sudo apt-get install -y python3-dev\n",
" sudo apt-get install -y python3-pip\n",
" sudo apt-get install -y gcc\n",
" sudo apt-get install -y git\n",
" sudo apt-get install -y build-essential\n",
" python3 -m pip install -U pip\n",
" python3 -m pip install virtualenv\n",
" python3 -m virtualenv clearml_agent_venv\n",
" source clearml_agent_venv/bin/activate\n",
" python -m pip install clearml-agent\n",
" echo 'agent.git_user=\\\"{git_user}\\\"' >> /root/clearml.conf\n",
" echo 'agent.git_pass=\\\"{git_pass}\\\"' >> /root/clearml.conf\n",
" echo \"{clearml_conf}\" >> /root/clearml.conf\n",
" export CLEARML_API_HOST={api_server}\n",
" export CLEARML_WEB_HOST={web_server}\n",
" export CLEARML_FILES_HOST={files_server}\n",
" export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`\n",
" export CLEARML_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID\n",
" export CLEARML_API_ACCESS_KEY='{access_key}'\n",
" export CLEARML_API_SECRET_KEY='{secret_key}'\n",
" {bash_script}\n",
" source ~/.bashrc\n",
" python -m clearml_agent --config-file '/root/clearml.conf' daemon --queue '{queue}' {docker}\n",
" shutdown\n",
" \"\"\".format(\n",
" api_server=CLEARML_API_SERVER,\n",
" web_server=CLEARML_WEB_SERVER,\n",
" files_server=CLEARML_FILES_SERVER,\n",
" worker_id=worker_id,\n",
" access_key=CLEARML_API_ACCESS_KEY,\n",
" secret_key=CLEARML_API_SECRET_KEY,\n",
" queue=queue_name,\n",
" git_user=CLEARML_AGENT_GIT_USER,\n",
" git_pass=CLEARML_AGENT_GIT_PASS,\n",
" clearml_conf=EXTRA_CLEARML_CONF_ENCODED,\n",
" bash_script=EXTRA_BASH_SCRIPT,\n",
" docker=\"--docker '{}'\".format(CLEARML_AGENT_DOCKER_IMAGE) if CLEARML_AGENT_DOCKER_IMAGE else \"\"\n",
" )\n",
"\n",
" ec2 = boto3.client(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" )\n",
"\n",
" if resource_conf[\"is_spot\"]:\n",
" # Create a request for a spot instance in AWS\n",
" encoded_user_data = base64.b64encode(user_data.encode(\"ascii\")).decode(\"ascii\")\n",
" instances = ec2.request_spot_instances(\n",
" LaunchSpecification={\n",
" \"ImageId\": resource_conf[\"ami_id\"],\n",
" \"InstanceType\": resource_conf[\"instance_type\"],\n",
" \"Placement\": {\"AvailabilityZone\": resource_conf[\"availability_zone\"]},\n",
" \"UserData\": encoded_user_data,\n",
" \"BlockDeviceMappings\": [\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" }\n",
" )\n",
"\n",
" # Wait until spot request is fulfilled\n",
" request_id = instances[\"SpotInstanceRequests\"][0][\"SpotInstanceRequestId\"]\n",
" waiter = ec2.get_waiter(\"spot_instance_request_fulfilled\")\n",
" waiter.wait(SpotInstanceRequestIds=[request_id])\n",
" # Get the instance object for later use\n",
" response = ec2.describe_spot_instance_requests(\n",
" SpotInstanceRequestIds=[request_id]\n",
" )\n",
" instance_id = response[\"SpotInstanceRequests\"][0][\"InstanceId\"]\n",
"\n",
" else:\n",
" # Create a new EC2 instance\n",
" instances = ec2.run_instances(\n",
" ImageId=resource_conf[\"ami_id\"],\n",
" MinCount=1,\n",
" MaxCount=1,\n",
" InstanceType=resource_conf[\"instance_type\"],\n",
" UserData=user_data,\n",
" InstanceInitiatedShutdownBehavior='terminate',\n",
" BlockDeviceMappings=[\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" )\n",
"\n",
" # Get the instance object for later use\n",
" instance_id = instances[\"Instances\"][0][\"InstanceId\"]\n",
"\n",
" instance = boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).Instance(instance_id)\n",
"\n",
" # Wait until instance is in running state\n",
" instance.wait_until_running()\n",
"\n",
"\n",
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_down_worker(instance_id):\n",
" \"\"\"\n",
" Destroys the cloud instance.\n",
"\n",
" :param str instance_id: Cloud instance ID to be destroyed \n",
" (currently, only AWS EC2 is supported)\n",
" \"\"\"\n",
" try:\n",
" boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).instances.filter(InstanceIds=[instance_id]).terminate()\n",
" except Exception as ex:\n",
" raise ex"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Controller Implementation and Logic"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def supervisor():\n",
" \"\"\"\n",
" Spin up or down resources as necessary.\n",
" - For every queue in QUEUES do the following:\n",
" 1. Check if there are tasks waiting in the queue.\n",
" 2. Check if there are enough idle workers available for those tasks.\n",
" 3. In case more instances are required, and we haven't reached max instances allowed,\n",
" create the required instances with regards to the maximum number defined in QUEUES\n",
" Choose which instance to create according to their order QUEUES. Won't create \n",
" more instances if maximum number defined has already reached.\n",
" - spin down instances according to their idle time. instance which is idle for \n",
" more than MAX_IDLE_TIME_MIN minutes would be removed.\n",
" \"\"\"\n",
"\n",
" # Internal definitions\n",
" workers_prefix = \"dynamic_aws\"\n",
" # Worker's id in clearml would be composed from:\n",
" # prefix, name, instance_type and cloud_id separated by ';'\n",
" workers_pattern = re.compile(\n",
" r\"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)\"\n",
" )\n",
"\n",
" # Set up the environment variables for clearml\n",
" os.environ[\"CLEARML_API_HOST\"] = CLEARML_API_SERVER\n",
" os.environ[\"CLEARML_WEB_HOST\"] = CLEARML_WEB_SERVER\n",
" os.environ[\"CLEARML_FILES_HOST\"] = CLEARML_FILES_SERVER\n",
" os.environ[\"CLEARML_API_ACCESS_KEY\"] = CLEARM_API_ACCESS_KEY\n",
" os.environ[\"CLEARML_API_SECRET_KEY\"] = CLEARML_API_SECRET_KEY\n",
" api_client = APIClient()\n",
"\n",
" # Verify the requested queues exist and create those that doesn't exist\n",
" all_queues = [q.name for q in list(api_client.queues.get_all())]\n",
" missing_queues = [q for q in QUEUES if q not in all_queues]\n",
" for q in missing_queues:\n",
" api_client.queues.create(q)\n",
"\n",
" idle_workers = {}\n",
" while True:\n",
" queue_name_to_id = {\n",
" queue.name: queue.id for queue in api_client.queues.get_all()\n",
" }\n",
" resource_to_queue = {\n",
" item[0]: queue\n",
" for queue, resources in QUEUES.items()\n",
" for item in resources\n",
" }\n",
" all_workers = [\n",
" worker\n",
" for worker in api_client.workers.get_all()\n",
" if workers_pattern.match(worker.id)\n",
" and workers_pattern.match(worker.id)[\"prefix\"] == workers_prefix\n",
" ]\n",
"\n",
" # Workers without a task, are added to the idle list\n",
" for worker in all_workers:\n",
" if not hasattr(worker, \"task\") or not worker.task:\n",
" if worker.id not in idle_workers:\n",
" resource_name = workers_pattern.match(worker.id)[\"instance_type\"]\n",
" idle_workers[worker.id] = (time(), resource_name, worker)\n",
" elif hasattr(worker, \"task\") and worker.task and worker.id in idle_workers:\n",
" idle_workers.pop(worker.id, None)\n",
"\n",
" required_idle_resources = [] # idle resources we'll need to keep running\n",
" allocate_new_resources = [] # resources that will need to be started\n",
" # Check if we have tasks waiting on one of the designated queues\n",
" for queue in QUEUES:\n",
" entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries\n",
" if entries and len(entries) > 0:\n",
" queue_resources = QUEUES[queue]\n",
"\n",
" # If we have an idle worker matching the required resource,\n",
" # remove it from the required allocation resources\n",
" free_queue_resources = [\n",
" resource\n",
" for _, resource, _ in idle_workers.values()\n",
" if resource in queue_resources\n",
" ]\n",
" required_idle_resources.extend(free_queue_resources)\n",
" spin_up_count = len(entries) - len(free_queue_resources)\n",
" spin_up_resources = []\n",
"\n",
" # Add as many resources as possible to handle this queue's entries\n",
" for resource, max_instances in queue_resources:\n",
" if len(spin_up_resources) >= spin_up_count:\n",
" break\n",
" max_allowed = max_instances - len(\n",
" [\n",
" worker\n",
" for worker in all_workers\n",
" if workers_pattern.match(worker.id)[\"name\"] == resource\n",
" ]\n",
" )\n",
" spin_up_resources.extend(\n",
" [resource] * min(max_allowed, spin_up_count)\n",
" )\n",
" allocate_new_resources.extend(spin_up_resources)\n",
"\n",
" # Now we actually spin the new machines\n",
" for resource in allocate_new_resources:\n",
" spin_up_worker(resource, workers_prefix, resource_to_queue[resource])\n",
"\n",
" # Go over the idle workers list, and spin down idle workers\n",
" for timestamp, resources, worker in idle_workers.values():\n",
" # skip resource types that might be needed\n",
" if resources in required_idle_resources:\n",
" continue\n",
" # Remove from both aws and clearml all instances that are\n",
" # idle for longer than MAX_IDLE_TIME_MIN\n",
" if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:\n",
" cloud_id = workers_pattern.match(worker.id)[\"cloud_id\"]\n",
" spin_down_worker(cloud_id)\n",
" worker.unregister()\n",
"\n",
" # Nothing else to do\n",
" sleep(POLLING_INTERVAL_MIN * 60.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Execute Forever* (the controller is stateless, so you can always re-execute the notebook)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Loop forever, it is okay we are stateless\n",
"while True:\n",
" try:\n",
" supervisor()\n",
" except Exception as ex:\n",
" print(\"Warning! exception occurred: {ex}\\nRetry in 15 seconds\".format(ex=ex))\n",
" sleep(15)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}