mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
facbee0005 | ||
|
|
c486cfd09f | ||
|
|
119ecaa2e3 | ||
|
|
d6cc2be653 | ||
|
|
41d75df40c | ||
|
|
901c4be9ae | ||
|
|
966b14f914 | ||
|
|
847d35cbbb | ||
|
|
4022cb5c63 | ||
|
|
2b239829de | ||
|
|
402856656f | ||
|
|
7b94ff410c | ||
|
|
0a03dced50 | ||
|
|
ffe653afc6 |
@@ -1,5 +1,5 @@
|
||||
# TRAINS Agent
|
||||
## Deep Learning DevOps For Everyone - Now supports all platforms (Linux, macOS, and Windows)
|
||||
## Deep Learning DevOps For Everyone - Now supporting all platforms (Linux, macOS, and Windows)
|
||||
|
||||
"All the Deep-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
|
||||
|
||||
@@ -14,7 +14,7 @@ It is a zero configuration fire-and-forget execution agent, which combined with
|
||||
|
||||
**Full AutoML in 5 steps**
|
||||
1. Install the [TRAINS server](https://github.com/allegroai/trains-agent) (or use our [open server](https://demoapp.trains.allegro.ai))
|
||||
2. `pip install trains_agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
|
||||
2. `pip install trains-agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
|
||||
3. Add [TRAINS](https://github.com/allegroai/trains) to your code with just 2 lines & run it once (on your machine / laptop)
|
||||
4. Change the [parameters](#using-the-trains-agent) in the UI & schedule for [execution](#using-the-trains-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
|
||||
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
|
||||
@@ -133,7 +133,7 @@ Development Machine |
|
||||
### Installing the TRAINS Agent
|
||||
|
||||
```bash
|
||||
pip install trains_agent
|
||||
pip install trains-agent
|
||||
```
|
||||
|
||||
### TRAINS Agent Usage Examples
|
||||
|
||||
568
examples/dynamic_cloud_cluster.ipynb
Normal file
568
examples/dynamic_cloud_cluster.ipynb
Normal file
@@ -0,0 +1,568 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Auto-Magically Spin AWS EC2 Instances on Demand \n",
|
||||
"# And Create a Dynamic Cluster Running *Trains-Agent*\n",
|
||||
"\n",
|
||||
"### Define your budget and execute the notebook, That's is it\n",
|
||||
"### You now have a fully managed cluster on AWS 🎉 🎊 "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"**trains-agent**'s main goal is to quickly pull a job from an execution queue, setup the environment (as defined in the experiment, including git cloning, python packages etc.) then execute the experiment and monitor it.\n",
|
||||
"\n",
|
||||
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
|
||||
"\n",
|
||||
"Configuration steps\n",
|
||||
"- Define maximum budget to be used (instance type / number of instances).\n",
|
||||
"- Create new execution *queues* in the **trains-server**.\n",
|
||||
"- Define mapping between the created the *queues* and an instance budget.\n",
|
||||
"\n",
|
||||
"**TL;DR - This notebook:**\n",
|
||||
"- Will spin instances if there are jobs in the execution *queues*, until it will hit the budget limit. \n",
|
||||
"- If machines are idle, it will spin them down.\n",
|
||||
"\n",
|
||||
"The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.\n",
|
||||
"\n",
|
||||
"It is as simple as it sounds, but extremely powerful\n",
|
||||
"\n",
|
||||
"Enjoy your newly created dynamic cluster :)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Install & import required packages"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!pip install trains-agent\n",
|
||||
"!pip install boto3"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3\n",
|
||||
"RESOURCE_CONFIGURATIONS = {\n",
|
||||
" \"amazon_ec2_normal\": {\n",
|
||||
" \"instance_type\": \"g4dn.4xlarge\",\n",
|
||||
" \"is_spot\": False,\n",
|
||||
" \"availability_zone\": \"us-east-1b\",\n",
|
||||
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
|
||||
" \"ebs_device_name\": \"/dev/xvda\",\n",
|
||||
" \"ebs_volume_size\": 100,\n",
|
||||
" \"ebs_volume_type\": \"gp2\",\n",
|
||||
" },\n",
|
||||
" \"amazon_ec2_high\": {\n",
|
||||
" \"instance_type\": \"g4dn.8xlarge\",\n",
|
||||
" \"is_spot\": False,\n",
|
||||
" \"availability_zone\": \"us-east-1b\",\n",
|
||||
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
|
||||
" \"ebs_device_name\": \"/dev/xvda\",\n",
|
||||
" \"ebs_volume_size\": 100,\n",
|
||||
" \"ebs_volume_type\": \"gp2\",\n",
|
||||
" },\n",
|
||||
"}\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Define machine budget per execution queue\n",
|
||||
"\n",
|
||||
"Now that we defined our budget, we need to connect it with the **Trains** cluster.\n",
|
||||
"\n",
|
||||
"We map each queue to a resource type (instance type).\n",
|
||||
"\n",
|
||||
"Create two queues in the WebUI:\n",
|
||||
"- Browse to http://your_trains_server_ip:8080/workers-and-queues/queues\n",
|
||||
"- Then click on the \"New Queue\" button and name your queues \"aws_normal\" and \"aws_high\" respectively\n",
|
||||
"\n",
|
||||
"The QUEUES dictionary hold the mapping between the queue name and the type/number of instances to spin connected to the specific queue.\n",
|
||||
"```\n",
|
||||
"QUEUES = {\n",
|
||||
" 'queue_name': [(\"instance-type-as-defined-in-RESOURCE_CONFIGURATIONS\", max_number_of_instances), ]\n",
|
||||
"}\n",
|
||||
"```\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Trains-Agent Queues - Machines budget per Queue\n",
|
||||
"# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS,\n",
|
||||
"# max instances for the specific queue). Order machines from most preferred to least.\n",
|
||||
"QUEUES = {\n",
|
||||
" \"aws_normal\": [(\"amazon_ec2_normal\", 2),],\n",
|
||||
" \"aws_high\": [(\"amazon_ec2_high\", 1)],\n",
|
||||
"}"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Credentials for your AWS account, as well as for your **Trains-Server**"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# AWS credentials (leave empty to use credentials set using the aws cli)\n",
|
||||
"CLOUD_CREDENTIALS_KEY = \"\"\n",
|
||||
"CLOUD_CREDENTIALS_SECRET = \"\"\n",
|
||||
"CLOUD_CREDENTIALS_REGION = \"us-east-1\"\n",
|
||||
"\n",
|
||||
"# TRAINS configuration\n",
|
||||
"TRAINS_SERVER_WEB_SERVER = \"http://localhost:8080\"\n",
|
||||
"TRAINS_SERVER_API_SERVER = \"http://localhost:8008\"\n",
|
||||
"TRAINS_SERVER_FILES_SERVER = \"http://localhost:8081\"\n",
|
||||
"# TRAINS credentials\n",
|
||||
"TRAINS_ACCESS_KEY = \"\"\n",
|
||||
"TRAINS_SECRET_KEY = \"\"\n",
|
||||
"# Git User/Pass to be used by trains-agent,\n",
|
||||
"# leave empty if image already contains git ssh-key\n",
|
||||
"TRAINS_GIT_USER = \"\"\n",
|
||||
"TRAINS_GIT_PASS = \"\"\n",
|
||||
"\n",
|
||||
"# Additional fields for trains.conf file created on the remote instance\n",
|
||||
"# for example: 'agent.default_docker.image: \"nvidia/cuda:10.0-cudnn7-runtime\"'\n",
|
||||
"EXTRA_TRAINS_CONF = \"\"\"\n",
|
||||
"\"\"\"\n",
|
||||
"\n",
|
||||
"# Bash script to run on instances before running trains-agent\n",
|
||||
"# Example: \"\"\"\n",
|
||||
"# echo \"This is the first line\"\n",
|
||||
"# echo \"This is the second line\"\n",
|
||||
"# \"\"\"\n",
|
||||
"EXTRA_BASH_SCRIPT = \"\"\"\n",
|
||||
"\"\"\""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Controller Internal Definitions\n",
|
||||
"\n",
|
||||
"# maximum idle time in minutes, after which the instance will be shutdown\n",
|
||||
"MAX_IDLE_TIME_MIN = 15\n",
|
||||
"# polling interval in minutes\n",
|
||||
"POLLING_INTERVAL_MIN = 2.0"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Import Packages and Budget Definition Sanity Check"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import base64\n",
|
||||
"import re\n",
|
||||
"import os\n",
|
||||
"from itertools import chain\n",
|
||||
"from operator import itemgetter\n",
|
||||
"from time import sleep, time\n",
|
||||
"\n",
|
||||
"import boto3\n",
|
||||
"from trains_agent.backend_api.session.client import APIClient"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Sanity Check - Validate Queue Resources\n",
|
||||
"if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(\n",
|
||||
" map(len, QUEUES.values())\n",
|
||||
"):\n",
|
||||
" print(\n",
|
||||
" \"Error: at least one resource name is used in multiple queues. \"\n",
|
||||
" \"A resource name can only appear in a single queue definition.\"\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
"# Encode EXTRA_TRAINS_CONF for later bash script usage\n",
|
||||
"EXTRA_TRAINS_CONF_ENCODED = \"\\\\\\\"\".join(EXTRA_TRAINS_CONF.split(\"\\\"\"))"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Cloud specific implementation of spin up/down - currently supports AWS only"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
|
||||
"def spin_up_worker(resource, worker_id_prefix, queue_name):\n",
|
||||
" \"\"\"\n",
|
||||
" Creates a new worker for trains.\n",
|
||||
" First, create an instance in the cloud and install some required packages.\n",
|
||||
" Then, define trains-agent environment variables and run \n",
|
||||
" trains-agent for the specified queue.\n",
|
||||
" NOTE: - Will wait until instance is running\n",
|
||||
" - This implementation assumes the instance image already has docker installed\n",
|
||||
"\n",
|
||||
" :param str resource: resource name, as defined in BUDGET and QUEUES.\n",
|
||||
" :param str worker_id_prefix: worker name prefix\n",
|
||||
" :param str queue_name: trains queue to listen to\n",
|
||||
" \"\"\"\n",
|
||||
" resource_conf = RESOURCE_CONFIGURATIONS[resource]\n",
|
||||
" # Add worker type and AWS instance type to the worker name.\n",
|
||||
" worker_id = \"{worker_id_prefix}:{worker_type}:{instance_type}\".format(\n",
|
||||
" worker_id_prefix=worker_id_prefix,\n",
|
||||
" worker_type=resource,\n",
|
||||
" instance_type=resource_conf[\"instance_type\"],\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" # user_data script will automatically run when the instance is started. \n",
|
||||
" # It will install the required packages for trains-agent configure it using \n",
|
||||
" # environment variables and run trains-agent on the required queue\n",
|
||||
" user_data = \"\"\"#!/bin/bash \n",
|
||||
" sudo apt-get update\n",
|
||||
" sudo apt-get install -y python3-dev\n",
|
||||
" sudo apt-get install -y python3-pip\n",
|
||||
" sudo apt-get install -y gcc\n",
|
||||
" sudo apt-get install -y git\n",
|
||||
" sudo python3 -m pip install screen\n",
|
||||
" sudo python3 -m pip install trains-agent\n",
|
||||
" echo 'agent.git_user=\\\"{git_user}\\\"' >> /root/trains.conf\n",
|
||||
" echo 'agent.git_pass=\\\"{git_pass}\\\"' >> /root/trains.conf\n",
|
||||
" echo {trains_conf} >> /root/trains.conf\n",
|
||||
" export TRAINS_API_HOST={api_server}\n",
|
||||
" export TRAINS_WEB_HOST={web_server}\n",
|
||||
" export TRAINS_FILES_HOST={files_server}\n",
|
||||
" export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`\n",
|
||||
" export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID\n",
|
||||
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
|
||||
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
|
||||
" screen\n",
|
||||
" {bash_script}\n",
|
||||
" python3 -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' --docker\"\"\".format(\n",
|
||||
" api_server=TRAINS_SERVER_API_SERVER,\n",
|
||||
" web_server=TRAINS_SERVER_WEB_SERVER,\n",
|
||||
" files_server=TRAINS_SERVER_FILES_SERVER,\n",
|
||||
" worker_id=worker_id,\n",
|
||||
" access_key=TRAINS_ACCESS_KEY,\n",
|
||||
" secret_key=TRAINS_SECRET_KEY,\n",
|
||||
" queue=queue_name,\n",
|
||||
" git_user=TRAINS_GIT_USER,\n",
|
||||
" git_pass=TRAINS_GIT_PASS,\n",
|
||||
" trains_conf=EXTRA_TRAINS_CONF_ENCODED,\n",
|
||||
" bash_script=EXTRA_BASH_SCRIPT\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" ec2 = boto3.client(\n",
|
||||
" \"ec2\",\n",
|
||||
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
|
||||
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
|
||||
" region_name=CLOUD_CREDENTIALS_REGION\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" if resource_conf[\"is_spot\"]:\n",
|
||||
" # Create a request for a spot instance in AWS\n",
|
||||
" encoded_user_data = base64.b64encode(user_data.encode(\"ascii\")).decode(\"ascii\")\n",
|
||||
" instances = ec2.request_spot_instances(\n",
|
||||
" LaunchSpecification={\n",
|
||||
" \"ImageId\": resource_conf[\"ami_id\"],\n",
|
||||
" \"InstanceType\": resource_conf[\"instance_type\"],\n",
|
||||
" \"Placement\": {\"AvailabilityZone\": resource_conf[\"availability_zone\"]},\n",
|
||||
" \"UserData\": encoded_user_data,\n",
|
||||
" \"BlockDeviceMappings\": [\n",
|
||||
" {\n",
|
||||
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
|
||||
" \"Ebs\": {\n",
|
||||
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
|
||||
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
|
||||
" },\n",
|
||||
" }\n",
|
||||
" ],\n",
|
||||
" }\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" # Wait until spot request is fulfilled\n",
|
||||
" request_id = instances[\"SpotInstanceRequests\"][0][\"SpotInstanceRequestId\"]\n",
|
||||
" waiter = ec2.get_waiter(\"spot_instance_request_fulfilled\")\n",
|
||||
" waiter.wait(SpotInstanceRequestIds=[request_id])\n",
|
||||
" # Get the instance object for later use\n",
|
||||
" response = ec2.describe_spot_instance_requests(\n",
|
||||
" SpotInstanceRequestIds=[request_id]\n",
|
||||
" )\n",
|
||||
" instance_id = response[\"SpotInstanceRequests\"][0][\"InstanceId\"]\n",
|
||||
"\n",
|
||||
" else:\n",
|
||||
" # Create a new EC2 instance\n",
|
||||
" instances = ec2.run_instances(\n",
|
||||
" ImageId=resource_conf[\"ami_id\"],\n",
|
||||
" MinCount=1,\n",
|
||||
" MaxCount=1,\n",
|
||||
" InstanceType=resource_conf[\"instance_type\"],\n",
|
||||
" UserData=user_data,\n",
|
||||
" BlockDeviceMappings=[\n",
|
||||
" {\n",
|
||||
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
|
||||
" \"Ebs\": {\n",
|
||||
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
|
||||
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
|
||||
" },\n",
|
||||
" }\n",
|
||||
" ],\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" # Get the instance object for later use\n",
|
||||
" instance_id = instances[\"Instances\"][0][\"InstanceId\"]\n",
|
||||
"\n",
|
||||
" instance = boto3.resource(\n",
|
||||
" \"ec2\",\n",
|
||||
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
|
||||
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
|
||||
" region_name=CLOUD_CREDENTIALS_REGION\n",
|
||||
" ).Instance(instance_id)\n",
|
||||
"\n",
|
||||
" # Wait until instance is in running state\n",
|
||||
" instance.wait_until_running()\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
|
||||
"def spin_down_worker(instance_id):\n",
|
||||
" \"\"\"\n",
|
||||
" Destroys the cloud instance.\n",
|
||||
"\n",
|
||||
" :param str instance_id: Cloud instance ID to be destroyed \n",
|
||||
" (currently, only AWS EC2 is supported)\n",
|
||||
" \"\"\"\n",
|
||||
" try:\n",
|
||||
" boto3.resource(\n",
|
||||
" \"ec2\",\n",
|
||||
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
|
||||
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
|
||||
" region_name=CLOUD_CREDENTIALS_REGION\n",
|
||||
" ).instances.filter(InstanceIds=[instance_id]).terminate()\n",
|
||||
" except Exception as ex:\n",
|
||||
" raise ex"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"###### Controller Implementation and Logic"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def supervisor():\n",
|
||||
" \"\"\"\n",
|
||||
" Spin up or down resources as necessary.\n",
|
||||
" - For every queue in QUEUES do the following:\n",
|
||||
" 1. Check if there are tasks waiting in the queue.\n",
|
||||
" 2. Check if there are enough idle workers available for those tasks.\n",
|
||||
" 3. In case more instances are required, and we haven't reached max instances allowed,\n",
|
||||
" create the required instances with regards to the maximum number defined in QUEUES\n",
|
||||
" Choose which instance to create according to their order QUEUES. Won't create \n",
|
||||
" more instances if maximum number defined has already reached.\n",
|
||||
" - spin down instances according to their idle time. instance which is idle for \n",
|
||||
" more than MAX_IDLE_TIME_MIN minutes would be removed.\n",
|
||||
" \"\"\"\n",
|
||||
"\n",
|
||||
" # Internal definitions\n",
|
||||
" workers_prefix = \"dynamic_aws\"\n",
|
||||
" # Worker's id in trains would be composed from:\n",
|
||||
" # prefix, name, instance_type and cloud_id separated by ';'\n",
|
||||
" workers_pattern = re.compile(\n",
|
||||
" r\"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)\"\n",
|
||||
" )\n",
|
||||
"\n",
|
||||
" # Set up the environment variables for trains\n",
|
||||
" os.environ[\"TRAINS_API_HOST\"] = TRAINS_SERVER_API_SERVER\n",
|
||||
" os.environ[\"TRAINS_WEB_HOST\"] = TRAINS_SERVER_WEB_SERVER\n",
|
||||
" os.environ[\"TRAINS_FILES_HOST\"] = TRAINS_SERVER_FILES_SERVER\n",
|
||||
" os.environ[\"TRAINS_API_ACCESS_KEY\"] = TRAINS_ACCESS_KEY\n",
|
||||
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
|
||||
" api_client = APIClient()\n",
|
||||
"\n",
|
||||
" idle_workers = {}\n",
|
||||
" while True:\n",
|
||||
" queue_name_to_id = {\n",
|
||||
" queue.name: queue.id for queue in api_client.queues.get_all()\n",
|
||||
" }\n",
|
||||
" resource_to_queue = {\n",
|
||||
" item[0]: queue\n",
|
||||
" for queue, resources in QUEUES.items()\n",
|
||||
" for item in resources\n",
|
||||
" }\n",
|
||||
" all_workers = [\n",
|
||||
" worker\n",
|
||||
" for worker in api_client.workers.get_all()\n",
|
||||
" if workers_pattern.match(worker.id)\n",
|
||||
" and workers_pattern.match(worker.id)[\"prefix\"] == workers_prefix\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
" # Workers without a task, are added to the idle list\n",
|
||||
" for worker in all_workers:\n",
|
||||
" if not hasattr(worker, \"task\") or not worker.task:\n",
|
||||
" if worker.id not in idle_workers:\n",
|
||||
" resource_name = workers_pattern.match(worker.id)[\"instance_type\"]\n",
|
||||
" idle_workers[worker.id] = (time(), resource_name, worker)\n",
|
||||
" elif hasattr(worker, \"task\") and worker.task and worker.id in idle_workers:\n",
|
||||
" idle_workers.pop(worker.id, None)\n",
|
||||
"\n",
|
||||
" required_idle_resources = [] # idle resources we'll need to keep running\n",
|
||||
" allocate_new_resources = [] # resources that will need to be started\n",
|
||||
" # Check if we have tasks waiting on one of the designated queues\n",
|
||||
" for queue in QUEUES:\n",
|
||||
" entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries\n",
|
||||
" if entries and len(entries) > 0:\n",
|
||||
" queue_resources = QUEUES[queue]\n",
|
||||
"\n",
|
||||
" # If we have an idle worker matching the required resource,\n",
|
||||
" # remove it from the required allocation resources\n",
|
||||
" free_queue_resources = [\n",
|
||||
" resource\n",
|
||||
" for _, resource, _ in idle_workers.values()\n",
|
||||
" if resource in queue_resources\n",
|
||||
" ]\n",
|
||||
" required_idle_resources.extend(free_queue_resources)\n",
|
||||
" spin_up_count = len(entries) - len(free_queue_resources)\n",
|
||||
" spin_up_resources = []\n",
|
||||
"\n",
|
||||
" # Add as many resources as possible to handle this queue's entries\n",
|
||||
" for resource, max_instances in queue_resources:\n",
|
||||
" if len(spin_up_resources) >= spin_up_count:\n",
|
||||
" break\n",
|
||||
" max_allowed = max_instances - len(\n",
|
||||
" [\n",
|
||||
" worker\n",
|
||||
" for worker in all_workers\n",
|
||||
" if workers_pattern.match(worker.id)[\"name\"] == resource\n",
|
||||
" ]\n",
|
||||
" )\n",
|
||||
" spin_up_resources.extend(\n",
|
||||
" [resource] * min(max_allowed, spin_up_count)\n",
|
||||
" )\n",
|
||||
" allocate_new_resources.extend(spin_up_resources)\n",
|
||||
"\n",
|
||||
" # Now we actually spin the new machines\n",
|
||||
" for resource in allocate_new_resources:\n",
|
||||
" spin_up_worker(resource, workers_prefix, resource_to_queue[resource])\n",
|
||||
"\n",
|
||||
" # Go over the idle workers list, and spin down idle workers\n",
|
||||
" for timestamp, resources, worker in idle_workers.values():\n",
|
||||
" # skip resource types that might be needed\n",
|
||||
" if resources in required_idle_resources:\n",
|
||||
" continue\n",
|
||||
" # Remove from both aws and trains all instances that are \n",
|
||||
" # idle for longer than MAX_IDLE_TIME_MIN\n",
|
||||
" if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:\n",
|
||||
" cloud_id = workers_pattern.match(worker.id)[\"cloud_id\"]\n",
|
||||
" spin_down_worker(cloud_id)\n",
|
||||
" worker.unregister()\n",
|
||||
"\n",
|
||||
" # Nothing else to do\n",
|
||||
" sleep(POLLING_INTERVAL_MIN * 60.0)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"##### Execute Forever* (the controller is stateless, so you can always re-execute the notebook)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Loop forever, it is okay we are stateless\n",
|
||||
"while True:\n",
|
||||
" try:\n",
|
||||
" supervisor()\n",
|
||||
" except Exception as ex:\n",
|
||||
" print(\"Warning! exception occurred: {ex}\\nRetry in 15 seconds\".format(ex=ex))\n",
|
||||
" sleep(15)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.7.0"
|
||||
},
|
||||
"pycharm": {
|
||||
"stem_cell": {
|
||||
"cell_type": "raw",
|
||||
"source": [],
|
||||
"metadata": {
|
||||
"collapsed": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
@@ -1 +1 @@
|
||||
|
||||
from .backend_api.session.client import APIClient
|
||||
|
||||
@@ -33,7 +33,6 @@
|
||||
|
||||
# additional artifact repositories to use when installing python packages
|
||||
# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]
|
||||
extra_index_url: []
|
||||
|
||||
# additional conda channels to use when installing with conda package manager
|
||||
conda_channels: ["defaults", "conda-forge", "pytorch", ]
|
||||
|
||||
@@ -3,6 +3,7 @@ from .v2_4 import debug
|
||||
from .v2_4 import queues
|
||||
from .v2_4 import tasks
|
||||
from .v2_4 import workers
|
||||
from .v2_4 import events
|
||||
|
||||
__all__ = [
|
||||
'auth',
|
||||
@@ -10,4 +11,5 @@ __all__ = [
|
||||
'queues',
|
||||
'tasks',
|
||||
'workers',
|
||||
'events',
|
||||
]
|
||||
|
||||
2977
trains_agent/backend_api/services/v2_4/events.py
Normal file
2977
trains_agent/backend_api/services/v2_4/events.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -139,13 +139,25 @@ class Response(object):
|
||||
:param dest: if all of a response's data is contained in one field, use that field
|
||||
:type dest: Text
|
||||
"""
|
||||
self.response = None
|
||||
self._result = result
|
||||
response = getattr(result, "response", result)
|
||||
if getattr(response, "_service") == "events" and \
|
||||
getattr(response, "_action") in ("scalar_metrics_iter_histogram",
|
||||
"multi_task_scalar_metrics_iter_histogram",
|
||||
"vector_metrics_iter_histogram",
|
||||
):
|
||||
# put all the response data under metrics:
|
||||
response.metrics = result.response_data
|
||||
if 'metrics' not in response.__class__._get_data_props():
|
||||
response.__class__._data_props_list['metrics'] = 'metrics'
|
||||
if dest:
|
||||
response = getattr(response, dest)
|
||||
self.response = response
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if self.response is None:
|
||||
return None
|
||||
return getattr(self.response, attr)
|
||||
|
||||
@property
|
||||
@@ -493,6 +505,7 @@ class APIClient(object):
|
||||
queues = None # type: Any
|
||||
tasks = None # type: Any
|
||||
workers = None # type: Any
|
||||
events = None # type: Any
|
||||
|
||||
def __init__(self, session=None, api_version=None):
|
||||
self.session = session or StrictSession()
|
||||
|
||||
@@ -150,6 +150,18 @@ def main():
|
||||
git_user = None
|
||||
git_pass = None
|
||||
|
||||
# get extra-index-url for pip installations
|
||||
extra_index_urls = []
|
||||
print('\nEnter additional artifact repository (extra-index-url) to use when installing python packages '
|
||||
'(leave blank if not required):', end='')
|
||||
index_url = input().strip()
|
||||
while index_url:
|
||||
extra_index_urls.append(index_url)
|
||||
print('Another artifact repository? (enter another url or leave blank if done):', end='')
|
||||
index_url = input().strip()
|
||||
if len(extra_index_urls):
|
||||
print("The following artifact repositories will be added:\n\t- {}".format("\n\t- ".join(extra_index_urls)))
|
||||
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
conf_folder = Path(__file__).parent.absolute() / '..' / 'backend_api' / 'config' / 'default'
|
||||
@@ -183,6 +195,10 @@ def main():
|
||||
'agent.git_pass=\"{}\"\n' \
|
||||
'\n'.format(git_user or '', git_pass or '')
|
||||
f.write(git_credentials)
|
||||
extra_index_str = '# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]\n' \
|
||||
'agent.package_manager.extra_index_url= ' \
|
||||
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
|
||||
f.write(extra_index_str)
|
||||
f.write(default_conf)
|
||||
except Exception:
|
||||
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))
|
||||
|
||||
@@ -239,7 +239,23 @@ class PytorchRequirement(SimpleSubstitution):
|
||||
def get_url_for_platform(self, req):
|
||||
assert self.package_manager == "pip"
|
||||
assert self.os != "mac"
|
||||
|
||||
# check if package is already installed with system packages
|
||||
try:
|
||||
if self.config.get("agent.package_manager.system_site_packages"):
|
||||
from pip._internal.commands.show import search_packages_info
|
||||
installed_torch = list(search_packages_info([req.name]))
|
||||
op, version = req.specs[0] if req.specs else (None, None)
|
||||
# notice the comparision order, the first part will make sure we have a valid installed package
|
||||
if installed_torch[0]['version'] and (installed_torch[0]['version'] == version or not version):
|
||||
# package already installed, do nothing
|
||||
return str(req), True
|
||||
except:
|
||||
pass
|
||||
|
||||
# make sure we have a specific version to retrieve
|
||||
assert req.specs
|
||||
|
||||
try:
|
||||
req.specs[0] = (req.specs[0][0], req.specs[0][1].split('+')[0])
|
||||
except:
|
||||
|
||||
@@ -206,6 +206,15 @@ class Session(_Session):
|
||||
config.pop('env', None)
|
||||
if remove_secret_keys:
|
||||
recursive_remove_secrets(config, secret_keys=remove_secret_keys)
|
||||
# remove logging.loggers.urllib3.level from the print
|
||||
try:
|
||||
config['logging']['loggers']['urllib3'].pop('level', None)
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
pass
|
||||
try:
|
||||
config['logging'].pop('version', None)
|
||||
except (KeyError, TypeError, AttributeError):
|
||||
pass
|
||||
config = ConfigFactory.from_dict(config)
|
||||
self.log.debug("Run by interpreter: %s", sys.executable)
|
||||
print(
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '0.12.2'
|
||||
__version__ = '0.13.0'
|
||||
|
||||
Reference in New Issue
Block a user