diff --git a/examples/dynamic_cloud_cluster.ipynb b/examples/dynamic_cloud_cluster.ipynb new file mode 100644 index 0000000..30f6624 --- /dev/null +++ b/examples/dynamic_cloud_cluster.ipynb @@ -0,0 +1,534 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Auto-Magically Spin AWS EC2 Instances on Demand \n", + "# And Create a Dynamic Cluster Running *Trains-Agent*\n", + "\n", + "### Define your budget and execute the notebook, That's is it\n", + "### You now have a fully managed cluster on AWS 🎉 🎊 " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "**trains-agent**'s main goal is to quickly pull a job from an execution queue, setup the environment (as defined in the experiment, including git cloning, python packages etc.) then execute the experiment and monitor it.\n", + "\n", + "This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n", + "\n", + "Configuration steps\n", + "- Define maximum budget to be used (instance type / number of instances).\n", + "- Create new execution *queues* in the **trains-server**.\n", + "- Define mapping between the created the *queues* and an instance budget.\n", + "\n", + "**TL;DR - This notebook:**\n", + "- Will spin instances if there are jobs in the execution *queues*, until it will hit the budget limit. \n", + "- If machines are idle, it will spin them down.\n", + "\n", + "The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.\n", + "\n", + "It is as simple as it sounds, but extremely powerful\n", + "\n", + "Enjoy your newly created dynamic cluster :)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Install & import required packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pip install trains-agent\n", + "!pip install boto3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3\n", + "RESOURCE_CONFIGURATIONS = {\n", + " \"amazon_ec2_normal\": {\n", + " \"instance_type\": \"g4dn.4xlarge\",\n", + " \"is_spot\": False,\n", + " \"availability_zone\": \"us-east-1b\",\n", + " \"ami_id\": \"ami-07c95cafbb788face\",\n", + " \"ebs_device_name\": \"/dev/xvda\",\n", + " \"ebs_volume_size\": 100,\n", + " \"ebs_volume_type\": \"gp2\",\n", + " },\n", + " \"amazon_ec2_high\": {\n", + " \"instance_type\": \"g4dn.8xlarge\",\n", + " \"is_spot\": False,\n", + " \"availability_zone\": \"us-east-1b\",\n", + " \"ami_id\": \"ami-07c95cafbb788face\",\n", + " \"ebs_device_name\": \"/dev/xvda\",\n", + " \"ebs_volume_size\": 100,\n", + " \"ebs_volume_type\": \"gp2\",\n", + " },\n", + "}\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Define machine budget per execution queue\n", + "\n", + "Now that we defined our budget, we need to connect it with the **Trains** cluster.\n", + "\n", + "We map each queue to a resource type (instance type).\n", + "\n", + "Create two queues in the WebUI:\n", + "- Browse to http://your_trains_server_ip:8080/workers-and-queues/queues\n", + "- Then click on the \"New Queue\" button and name your queues \"aws_normal\" and \"aws_high\" respectively\n", + "\n", + "The QUEUES dictionary hold the mapping between the queue name and the type/number of instances to spin connected to the specific queue.\n", + "```\n", + "QUEUES = {\n", + " 'queue_name': [(\"instance-type-as-defined-in-RESOURCE_CONFIGURATIONS\", max_number_of_instances), ]\n", + "}\n", + "```\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Trains-Agent Queues - Machines budget per Queue\n", + "# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS, max instances for the specific queue).\n", + "# Order machines from most preferred to least.\n", + "QUEUES = {\n", + " \"aws_normal\": [(\"amazon_ec2_normal\", 2),],\n", + " \"aws_high\": [(\"amazon_ec2_high\", 1)],\n", + "}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Credentials for your AWS account, as well as for your **Trains-Server**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# AWS credentials\n", + "CLOUD_CREDENTIALS_KEY = \"\"\n", + "CLOUD_CREDENTIALS_SECRET = \"\"\n", + "CLOUD_CREDENTIALS_REGION = \"us-east-1\"\n", + "\n", + "# TRAINS configuration\n", + "TRAINS_SERVER_WEB_SERVER = \"http://localhost:8080\"\n", + "TRAINS_SERVER_API_SERVER = \"http://localhost:8008\"\n", + "TRAINS_SERVER_FILES_SERVER = \"http://localhost:8081\"\n", + "# TRAINS credentials\n", + "TRAINS_ACCESS_KEY = \"\"\n", + "TRAINS_SECRET_KEY = \"\"\n", + "# Git User/Pass to be used by trains-agent,\n", + "# leave empty if image already contains git ssh-key\n", + "TRAINS_GIT_USER = \"\"\n", + "TRAINS_GIT_PASS = \"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Controller Internal Definitions\n", + "\n", + "# maximum idle time in minutes, after which the instance will be shutdown\n", + "MAX_IDLE_TIME_MIN = 15\n", + "# polling interval in minutes\n", + "POLLING_INTERVAL_MIN = 2.0" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Import Packages and Budget Definition Sanity Sheck" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import base64\n", + "import re\n", + "import os\n", + "from itertools import chain\n", + "from operator import itemgetter\n", + "from time import sleep, time\n", + "\n", + "import boto3\n", + "from trains_agent.backend_api.session.client import APIClient" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Sanity Check - Validate Queue Resources\n", + "if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(\n", + " map(len, QUEUES.values())\n", + "):\n", + " print(\n", + " \"Error: at least one resource name is used in multiple queues. \"\n", + " \"A resource name can only appear in a single queue definition.\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Cloud specific implementation of spin up/down - currently supports AWS only" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Cloud-specific implementation (currently, only AWS EC2 is supported)\n", + "def spin_up_worker(resource, worker_id_prefix, queue_name):\n", + " \"\"\"\n", + " Creates a new worker for trains.\n", + " First, create an instance in the cloud and install some required packages.\n", + " Then, define trains-agent environment variables and run trains-agent for the specified queue.\n", + " NOTE: - Will wait until instance is running\n", + " - This implementation assumes the instance image already has docker installed\n", + "\n", + " :param str resource: resource name, as defined in BUDGET and QUEUES.\n", + " :param str worker_id_prefix: worker name prefix\n", + " :param str queue_name: trains queue to listen to\n", + " \"\"\"\n", + " resource_conf = RESOURCE_CONFIGURATIONS[resource]\n", + " # Add worker type and AWS instance type to the worker name.\n", + " worker_id = \"{worker_id_prefix}:{worker_type}:{instance_type}\".format(\n", + " worker_id_prefix=worker_id_prefix,\n", + " worker_type=resource,\n", + " instance_type=resource_conf[\"instance_type\"],\n", + " )\n", + "\n", + " # user_data script will automatically run when the instance is started. it will install the required packages\n", + " # for trains-agent configure it using environment variables and run trains-agent on the required queue\n", + " user_data = \"\"\"#!/bin/bash \n", + " sudo apt-get update\n", + " sudo apt-get install -y python3-dev\n", + " sudo apt-get install -y python3-pip\n", + " sudo apt-get install -y gcc\n", + " sudo apt-get install -y git\n", + " sudo python3 -m pip install screen\n", + " sudo python3 -m pip install trains-agent\n", + " echo 'agent.git_user=\\\"{git_user}\\\"' >> /root/trains.conf\n", + " echo 'agent.git_pass=\\\"{git_pass}\\\"' >> /root/trains.conf\n", + " export TRAINS_API_HOST={api_server}\n", + " export TRAINS_WEB_HOST={web_server}\n", + " export TRAINS_FILES_HOST={files_server}\n", + " export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`\n", + " export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID\n", + " export TRAINS_API_ACCESS_KEY='{access_key}'\n", + " export TRAINS_API_SECRET_KEY='{secret_key}'\n", + " screen\n", + " python3 -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' --docker\"\"\".format(\n", + " api_server=TRAINS_SERVER_API_SERVER,\n", + " web_server=TRAINS_SERVER_WEB_SERVER,\n", + " files_server=TRAINS_SERVER_FILES_SERVER,\n", + " worker_id=worker_id,\n", + " access_key=TRAINS_ACCESS_KEY,\n", + " secret_key=TRAINS_SECRET_KEY,\n", + " queue=queue_name,\n", + " git_user=TRAINS_GIT_USER,\n", + " git_pass=TRAINS_GIT_PASS,\n", + " )\n", + "\n", + " ec2 = boto3.client(\"ec2\", region_name=CLOUD_CREDENTIALS_REGION)\n", + "\n", + " if resource_conf[\"is_spot\"]:\n", + " # Create a request for a spot instance in AWS\n", + " encoded_user_data = base64.b64encode(user_data.encode(\"ascii\")).decode(\"ascii\")\n", + " instances = ec2.request_spot_instances(\n", + " LaunchSpecification={\n", + " \"ImageId\": resource_conf[\"ami_id\"],\n", + " \"InstanceType\": resource_conf[\"instance_type\"],\n", + " \"Placement\": {\"AvailabilityZone\": resource_conf[\"availability_zone\"]},\n", + " \"UserData\": encoded_user_data,\n", + " \"BlockDeviceMappings\": [\n", + " {\n", + " \"DeviceName\": resource_conf[\"ebs_device_name\"],\n", + " \"Ebs\": {\n", + " \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n", + " \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n", + " },\n", + " }\n", + " ],\n", + " }\n", + " )\n", + "\n", + " # Wait until spot request is fulfilled\n", + " request_id = instances[\"SpotInstanceRequests\"][0][\"SpotInstanceRequestId\"]\n", + " waiter = ec2.get_waiter(\"spot_instance_request_fulfilled\")\n", + " waiter.wait(SpotInstanceRequestIds=[request_id])\n", + " # Get the instance object for later use\n", + " response = ec2.describe_spot_instance_requests(\n", + " SpotInstanceRequestIds=[request_id]\n", + " )\n", + " instance_id = response[\"SpotInstanceRequests\"][0][\"InstanceId\"]\n", + " instance = boto3.resource(\"ec2\", region_name=CLOUD_CREDENTIALS_REGION).Instance(\n", + " instance_id\n", + " )\n", + "\n", + " else:\n", + " # Create a new EC2 instance\n", + " instances = ec2.run_instances(\n", + " ImageId=resource_conf[\"ami_id\"],\n", + " MinCount=1,\n", + " MaxCount=1,\n", + " InstanceType=resource_conf[\"instance_type\"],\n", + " UserData=user_data,\n", + " BlockDeviceMappings=[\n", + " {\n", + " \"DeviceName\": resource_conf[\"ebs_device_name\"],\n", + " \"Ebs\": {\n", + " \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n", + " \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n", + " },\n", + " }\n", + " ],\n", + " )\n", + "\n", + " # Get the instance object for later use\n", + " instance_id = instances[\"Instances\"][0][\"InstanceId\"]\n", + " instance = boto3.resource(\"ec2\", region_name=CLOUD_CREDENTIALS_REGION).Instance(\n", + " instance_id\n", + " )\n", + "\n", + " # Wait until instance is in running state\n", + " instance.wait_until_running()\n", + "\n", + "\n", + "# Cloud-specific implementation (currently, only AWS EC2 is supported)\n", + "def spin_down_worker(instance_id):\n", + " \"\"\"\n", + " Destroys the cloud instance.\n", + "\n", + " :param instance_id: Cloud instance ID to be destroyed (currently, only AWS EC2 is supported)\n", + " :type instance_id: str\n", + " \"\"\"\n", + " try:\n", + " boto3.resource(\"ec2\", region_name=CLOUD_CREDENTIALS_REGION).instances.filter(\n", + " InstanceIds=[instance_id]\n", + " ).terminate()\n", + " except Exception as ex:\n", + " raise ex" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "###### Controller Implementation and Logic" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def supervisor():\n", + " \"\"\"\n", + " Spin up or down resources as necessary.\n", + " - For every queue in QUEUES do the following:\n", + " 1. Check if there are tasks waiting in the queue.\n", + " 2. Check if there are enough idle workers available for those tasks.\n", + " 3. In case more instances are required, and we haven't reached max instances allowed,\n", + " create the required instances with regards to the maximum number defined in QUEUES\n", + " Choose which instance to create according to their order QUEUES. Won't create more instances\n", + " if maximum number defined has already reached.\n", + " - spin down instances according to their idle time. instance which is idle for more than MAX_IDLE_TIME_MIN\n", + " minutes would be removed.\n", + " \"\"\"\n", + "\n", + " # Internal definitions\n", + " workers_prefix = \"dynamic_aws\"\n", + " # Worker's id in trains would be composed from prefix, name, instance_type and cloud_id separated by ';'\n", + " workers_pattern = re.compile(\n", + " r\"^(?P[^:]+):(?P[^:]+):(?P[^:]+):(?P[^:]+)\"\n", + " )\n", + "\n", + " # Set up the environment variables for trains\n", + " os.environ[\"TRAINS_API_HOST\"] = TRAINS_SERVER_API_SERVER\n", + " os.environ[\"TRAINS_WEB_HOST\"] = TRAINS_SERVER_WEB_SERVER\n", + " os.environ[\"TRAINS_FILES_HOST\"] = TRAINS_SERVER_FILES_SERVER\n", + " os.environ[\"TRAINS_API_ACCESS_KEY\"] = TRAINS_ACCESS_KEY\n", + " os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n", + " api_client = APIClient()\n", + "\n", + " idle_workers = {}\n", + " while True:\n", + " queue_name_to_id = {\n", + " queue.name: queue.id for queue in api_client.queues.get_all()\n", + " }\n", + " resource_to_queue = {\n", + " item[0]: queue\n", + " for queue, resources in QUEUES.items()\n", + " for item in resources\n", + " }\n", + " all_workers = [\n", + " worker\n", + " for worker in api_client.workers.get_all()\n", + " if workers_pattern.match(worker.id)\n", + " and workers_pattern.match(worker.id)[\"prefix\"] == workers_prefix\n", + " ]\n", + "\n", + " # Workers without a task, are added to the idle list\n", + " for worker in all_workers:\n", + " if not hasattr(worker, \"task\") or not worker.task:\n", + " if worker.id not in idle_workers:\n", + " resource_name = workers_pattern.match(worker.id)[\"instance_type\"]\n", + " idle_workers[worker.id] = (time(), resource_name, worker)\n", + " elif hasattr(worker, \"task\") and worker.task and worker.id in idle_workers:\n", + " idle_workers.pop(worker.id, None)\n", + "\n", + " required_idle_resources = [] # idle resources we'll need to keep running\n", + " allocate_new_resources = [] # resources that will need to be started\n", + " # Check if we have tasks waiting on one of the designated queues\n", + " for queue in QUEUES:\n", + " entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries\n", + " if entries and len(entries) > 0:\n", + " queue_resources = QUEUES[queue]\n", + "\n", + " # If we have an idle worker matching the required resource,\n", + " # remove it from the required allocation resources\n", + " free_queue_resources = [\n", + " resource\n", + " for _, resource, _ in idle_workers.values()\n", + " if resource in queue_resources\n", + " ]\n", + " required_idle_resources.extend(free_queue_resources)\n", + " spin_up_count = len(entries) - len(free_queue_resources)\n", + " spin_up_resources = []\n", + "\n", + " # Add as many resources as possible to handle this queue's entries\n", + " for resource, max_instances in queue_resources:\n", + " if len(spin_up_resources) >= spin_up_count:\n", + " break\n", + " max_allowed = max_instances - len(\n", + " [\n", + " worker\n", + " for worker in all_workers\n", + " if workers_pattern.match(worker.id)[\"name\"] == resource\n", + " ]\n", + " )\n", + " spin_up_resources.extend(\n", + " [resource] * min(max_allowed, spin_up_count)\n", + " )\n", + " allocate_new_resources.extend(spin_up_resources)\n", + "\n", + " # Now we actually spin the new machines\n", + " for resource in allocate_new_resources:\n", + " spin_up_worker(resource, workers_prefix, resource_to_queue[resource])\n", + "\n", + " # Go over the idle workers list, and spin down idle workers\n", + " for timestamp, resources, worker in idle_workers.values():\n", + " # skip resource types that might be needed\n", + " if resources in required_idle_resources:\n", + " continue\n", + " # Remove from both aws and trains all instances that are idle for longer than MAX_IDLE_TIME_MIN\n", + " if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:\n", + " cloud_id = workers_pattern.match(worker.id)[\"cloud_id\"]\n", + " spin_down_worker(cloud_id)\n", + " worker.unregister()\n", + "\n", + " # Nothing else to do\n", + " sleep(POLLING_INTERVAL_MIN * 60.0)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "##### Execute Forever* (the controller is stateless, so you can always re-execute the notbook)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Loop forever, it is okay we are stateless\n", + "while True:\n", + " try:\n", + " supervisor()\n", + " except Exception as ex:\n", + " print(\"Warning! exception occurred: {ex}\".format(ex=ex))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.0" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}