Compare commits

...

41 Commits

Author SHA1 Message Date
allegroai
facbee0005 Version bump to v0.13.0 2020-01-06 15:27:12 +02:00
allegroai
c486cfd09f Add extra agent configuration and bash script for the AWS dynamic cluster management service 2020-01-06 15:26:55 +02:00
allegroai
119ecaa2e3 Fix AWS dynamic cluster management service use of AWS credentials 2020-01-05 14:12:40 +02:00
allegroai
d6cc2be653 Fix AWS dynamic cluster management service use of AWS credentials 2020-01-05 13:57:29 +02:00
allegroai
41d75df40c Add AWS dynamic cluster management service 2019-12-24 23:22:17 +02:00
allegroai
901c4be9ae Add AWS dynamic cluster management service 2019-12-24 23:09:26 +02:00
Allegro AI
966b14f914 Update README.md 2019-12-24 22:52:30 +02:00
allegroai
847d35cbbb Add AWS dynamic cluster management service 2019-12-24 22:48:44 +02:00
allegroai
4022cb5c63 Add AWS dynamic cluster management service 2019-12-24 22:35:26 +02:00
allegroai
2b239829de Add extra_index_url to the configuration wizard 2019-12-24 18:23:59 +02:00
allegroai
402856656f Support various events endpoints for APIClient 2019-12-24 18:10:40 +02:00
allegroai
7b94ff410c Documentation 2019-12-24 12:47:35 +02:00
allegroai
0a03dced50 Do not show urllib3 logging level as part of the agent's configuration dump 2019-12-21 18:23:17 +02:00
allegroai
ffe653afc6 Support docker pre-installed pytorch versions that do not exist on PyPI/PyTorch.org 2019-12-21 18:22:21 +02:00
allegroai
8ce621cc44 version bump 2019-12-15 15:42:46 +02:00
allegroai
7c0a2c4d50 version bump 2019-12-15 00:04:27 +02:00
allegroai
5e063c9195 Add docker build command and improve k8s integration 2019-12-15 00:04:15 +02:00
allegroai
24329a21fe Fix docker CUDA support 2019-12-15 00:03:39 +02:00
allegroai
3a301b0b6c Improve docker support and add docker build command 2019-12-15 00:03:04 +02:00
allegroai
1f0bb4906b Improve configuration wizard 2019-12-15 00:02:04 +02:00
allegroai
88f1031e5d Sync with trains default configuration 2019-12-15 00:01:47 +02:00
allegroai
fc2842c9a2 Add initial Poetry support 2019-12-15 00:00:55 +02:00
allegroai
e9d3aab115 version bump 2019-11-23 23:39:02 +02:00
allegroai
0ed7b2a0c8 Fix support for shared cache folder between multiple nodes in the cluster 2019-11-23 23:38:36 +02:00
allegroai
bd73be928a Improve trains-agent config wizard 2019-11-23 23:37:41 +02:00
Allegro AI
79babdd149 Update README.md 2019-11-15 23:38:45 +02:00
Allegro AI
02a21ba826 Update README.md 2019-11-15 23:37:01 +02:00
allegroai
e7b6eb5c5a version bump 2019-11-15 23:25:16 +02:00
allegroai
619ee3e8cf Documentation 2019-11-15 23:25:08 +02:00
allegroai
8b05bb1605 Add initial Conda support 2019-11-15 23:24:47 +02:00
allegroai
72c26499c0 Add Windows support and Conda package manager support 2019-11-15 23:24:04 +02:00
allegroai
741be2ae42 Fix git clone optimization 2019-11-15 23:22:05 +02:00
allegroai
831b36c424 Add specify python binary 2019-11-15 23:21:16 +02:00
allegroai
d715ec4b14 version bump 2019-11-08 22:36:34 +02:00
allegroai
a7873705ec Add --gpus / --cpu-only (equivalent to NVIDIA_VISIBLE_DEVICE)
Add agent.python_binary specifying full path to python binary to use for virtual environement creation
Fix Windows support
2019-11-08 22:36:24 +02:00
allegroai
4a8f52b5a5 Match defaults with trains 2019-11-08 22:33:44 +02:00
allegroai
a5c7ff4ee1 Merge remote-tracking branch 'origin/master' 2019-11-01 20:25:15 +02:00
allegroai
c352c2711c Improve multiple GPU's docker PyTorch support
Fix potential zombie dockers if task is aborted
2019-11-01 20:24:53 +02:00
allegroai
2aea36c864 Changed connection timeouts, default to 30sec 2019-11-01 20:23:32 +02:00
Allegro AI
40b15dfe64 Update README.md 2019-10-31 18:24:49 +02:00
Allegro AI
25e49df121 Update README.md 2019-10-30 13:15:32 +02:00
24 changed files with 4279 additions and 203 deletions

View File

@@ -1,5 +1,5 @@
# TRAINS Agent
## Deep Learning DevOps For Everyone
## Deep Learning DevOps For Everyone - Now supporting all platforms (Linux, macOS, and Windows)
"All the Deep-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
@@ -8,13 +8,13 @@
[![PyPI version shields.io](https://img.shields.io/pypi/v/trains-agent.svg)](https://img.shields.io/pypi/v/trains-agent.svg)
[![PyPI status](https://img.shields.io/pypi/status/trains-agent.svg)](https://pypi.python.org/pypi/trains-agent/)
TRAINS Agent is an AI experiment cluster solution.
**TRAINS Agent is an AI experiment cluster solution.**
It is a zero configuration fire-and-forget execution agent, which combined with trains-server provides a full AI cluster solution.
**Full AutoML in 5 steps**
1. Install the [TRAINS server](https://github.com/allegroai/trains-agent) (or use our [open server](https://demoapp.trains.allegro.ai))
2. `pip install trains_agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
2. `pip install trains-agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
3. Add [TRAINS](https://github.com/allegroai/trains) to your code with just 2 lines & run it once (on your machine / laptop)
4. Change the [parameters](#using-the-trains-agent) in the UI & schedule for [execution](#using-the-trains-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
@@ -133,7 +133,7 @@ Development Machine |
### Installing the TRAINS Agent
```bash
pip install trains_agent
pip install trains-agent
```
### TRAINS Agent Usage Examples
@@ -169,21 +169,21 @@ For actual service mode, all the stdout will be stored automatically into a temp
trains-agent daemon --queue default
```
GPU allocation is controlled via the standard OS environment NVIDIA_VISIBLE_DEVICES.
GPU allocation is controlled via the standard OS environment `NVIDIA_VISIBLE_DEVICES` or `--gpus` flag (or disabled with `--cpu-only`).
If NVIDIA_VISIBLE_DEVICES variable doesn't exist, all GPU's will be allocated for the `trains-agent` <br>
If NVIDIA_VISIBLE_DEVICES is an empty string ("") No gpu will be allocated for the `trains-agent`
If no flag is set, and `NVIDIA_VISIBLE_DEVICES` variable doesn't exist, all GPU's will be allocated for the `trains-agent` <br>
If `--cpu-only` flag is set, or `NVIDIA_VISIBLE_DEVICES` is an empty string (""), no gpu will be allocated for the `trains-agent`
Example: spin two agents, one per gpu on the same machine:
```bash
NVIDIA_VISIBLE_DEVICES=0 trains-agent daemon --queue default &
NVIDIA_VISIBLE_DEVICES=1 trains-agent daemon --queue default &
trains-agent daemon --gpus 0 --queue default &
trains-agent daemon --gpus 1 --queue default &
```
Example: spin two agents, with two gpu's per agent:
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent
```bash
NVIDIA_VISIBLE_DEVICES=0,1 trains-agent daemon --queue default &
NVIDIA_VISIBLE_DEVICES=2,3 trains-agent daemon --queue default &
trains-agent daemon --gpus 0,1 --queue dual_gpu &
trains-agent daemon --gpus 2,3 --queue dual_gpu &
```
#### Starting the TRAINS Agent in docker mode
@@ -198,16 +198,16 @@ For actual service mode, all the stdout will be stored automatically into a file
trains-agent daemon --queue default --docker
```
Example: spin two agents, one per gpu on the same machine:
Example: spin two agents, one per gpu on the same machine, with default nvidia/cuda docker:
```bash
NVIDIA_VISIBLE_DEVICES=0 trains-agent daemon --queue default --docker &
NVIDIA_VISIBLE_DEVICES=1 trains-agent daemon --queue default --docker &
trains-agent daemon --gpus 0 --queue default --docker nvidia/cuda &
trains-agent daemon --gpus 1 --queue default --docker nvidia/cuda &
```
Example: spin two agents, with two gpu's per agent:
Example: spin two agents, pulling from dedicated `dual_gpu` queue, two gpu's per agent, with default nvidia/cuda docker:
```bash
NVIDIA_VISIBLE_DEVICES=0,1 trains-agent daemon --queue default --docker &
NVIDIA_VISIBLE_DEVICES=2,3 trains-agent daemon --queue default --docker &
trains-agent daemon --gpus 0,1 --queue dual_gpu --docker nvidia/cuda &
trains-agent daemon --gpus 2,3 --queue dual_gpu --docker nvidia/cuda &
```
#### Starting the TRAINS Agent - Priority Queues
@@ -220,7 +220,9 @@ trains-agent daemon --queue important_jobs default
```
The **TRAINS agent** will first try to pull jobs from the `important_jobs` queue, only then it will fetch a job from the `default` queue.
# How do I create an experiment on the TRAINS server? <a name="from-scratch"></a>
Adding queues, managing job order within a queue and moving jobs between queues, is available using the Web UI, see example on our [open server](https://demoapp.trains.allegro.ai/workers-and-queues/queues)
# How do I create an experiment on the TRAINS server? <a name="from-scratch"></a>
* Integrate [TRAINS](https://github.com/allegroai/trains) with your code
* Execute the code on your machine (Manually / PyCharm / Jupyter Notebook)
* As your code is running, **TRAINS** creates an experiment logging all the necessary execution information:

View File

@@ -29,6 +29,11 @@ agent {
# worker_name: "trains-agent-machine1"
worker_name: ""
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the trains_agent
python_binary: ""
# select python package manager:
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
@@ -110,6 +115,11 @@ sdk {
# X files are stored in the upload destination for each metric/variant combination.
file_history_size: 100
# Max history size for matplotlib imshow files per plot title.
# File names for the uploaded images will be recycled in such a way that no more than
# X images are stored in the upload destination for each matplotlib plot title.
matplotlib_untitled_history_size: 100
# Settings for generated debug images
images {
format: JPEG
@@ -219,6 +229,9 @@ sdk {
# Support stopping an experiment in case it was externally stopped, status was changed or task was reset
support_stopping: True
# Default Task output_uri. if output_uri is not provided to Task.init, default_output_uri will be used instead.
default_output_uri: ""
# Development mode worker
worker {
# Status report period in seconds

View File

@@ -0,0 +1,568 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Auto-Magically Spin AWS EC2 Instances on Demand \n",
"# And Create a Dynamic Cluster Running *Trains-Agent*\n",
"\n",
"### Define your budget and execute the notebook, That's is it\n",
"### You now have a fully managed cluster on AWS 🎉 🎊 "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**trains-agent**'s main goal is to quickly pull a job from an execution queue, setup the environment (as defined in the experiment, including git cloning, python packages etc.) then execute the experiment and monitor it.\n",
"\n",
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
"\n",
"Configuration steps\n",
"- Define maximum budget to be used (instance type / number of instances).\n",
"- Create new execution *queues* in the **trains-server**.\n",
"- Define mapping between the created the *queues* and an instance budget.\n",
"\n",
"**TL;DR - This notebook:**\n",
"- Will spin instances if there are jobs in the execution *queues*, until it will hit the budget limit. \n",
"- If machines are idle, it will spin them down.\n",
"\n",
"The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.\n",
"\n",
"It is as simple as it sounds, but extremely powerful\n",
"\n",
"Enjoy your newly created dynamic cluster :)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Install & import required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install trains-agent\n",
"!pip install boto3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3\n",
"RESOURCE_CONFIGURATIONS = {\n",
" \"amazon_ec2_normal\": {\n",
" \"instance_type\": \"g4dn.4xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
" \"amazon_ec2_high\": {\n",
" \"instance_type\": \"g4dn.8xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
"}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Define machine budget per execution queue\n",
"\n",
"Now that we defined our budget, we need to connect it with the **Trains** cluster.\n",
"\n",
"We map each queue to a resource type (instance type).\n",
"\n",
"Create two queues in the WebUI:\n",
"- Browse to http://your_trains_server_ip:8080/workers-and-queues/queues\n",
"- Then click on the \"New Queue\" button and name your queues \"aws_normal\" and \"aws_high\" respectively\n",
"\n",
"The QUEUES dictionary hold the mapping between the queue name and the type/number of instances to spin connected to the specific queue.\n",
"```\n",
"QUEUES = {\n",
" 'queue_name': [(\"instance-type-as-defined-in-RESOURCE_CONFIGURATIONS\", max_number_of_instances), ]\n",
"}\n",
"```\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# Trains-Agent Queues - Machines budget per Queue\n",
"# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS,\n",
"# max instances for the specific queue). Order machines from most preferred to least.\n",
"QUEUES = {\n",
" \"aws_normal\": [(\"amazon_ec2_normal\", 2),],\n",
" \"aws_high\": [(\"amazon_ec2_high\", 1)],\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Credentials for your AWS account, as well as for your **Trains-Server**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# AWS credentials (leave empty to use credentials set using the aws cli)\n",
"CLOUD_CREDENTIALS_KEY = \"\"\n",
"CLOUD_CREDENTIALS_SECRET = \"\"\n",
"CLOUD_CREDENTIALS_REGION = \"us-east-1\"\n",
"\n",
"# TRAINS configuration\n",
"TRAINS_SERVER_WEB_SERVER = \"http://localhost:8080\"\n",
"TRAINS_SERVER_API_SERVER = \"http://localhost:8008\"\n",
"TRAINS_SERVER_FILES_SERVER = \"http://localhost:8081\"\n",
"# TRAINS credentials\n",
"TRAINS_ACCESS_KEY = \"\"\n",
"TRAINS_SECRET_KEY = \"\"\n",
"# Git User/Pass to be used by trains-agent,\n",
"# leave empty if image already contains git ssh-key\n",
"TRAINS_GIT_USER = \"\"\n",
"TRAINS_GIT_PASS = \"\"\n",
"\n",
"# Additional fields for trains.conf file created on the remote instance\n",
"# for example: 'agent.default_docker.image: \"nvidia/cuda:10.0-cudnn7-runtime\"'\n",
"EXTRA_TRAINS_CONF = \"\"\"\n",
"\"\"\"\n",
"\n",
"# Bash script to run on instances before running trains-agent\n",
"# Example: \"\"\"\n",
"# echo \"This is the first line\"\n",
"# echo \"This is the second line\"\n",
"# \"\"\"\n",
"EXTRA_BASH_SCRIPT = \"\"\"\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Controller Internal Definitions\n",
"\n",
"# maximum idle time in minutes, after which the instance will be shutdown\n",
"MAX_IDLE_TIME_MIN = 15\n",
"# polling interval in minutes\n",
"POLLING_INTERVAL_MIN = 2.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Import Packages and Budget Definition Sanity Check"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import base64\n",
"import re\n",
"import os\n",
"from itertools import chain\n",
"from operator import itemgetter\n",
"from time import sleep, time\n",
"\n",
"import boto3\n",
"from trains_agent.backend_api.session.client import APIClient"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Sanity Check - Validate Queue Resources\n",
"if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(\n",
" map(len, QUEUES.values())\n",
"):\n",
" print(\n",
" \"Error: at least one resource name is used in multiple queues. \"\n",
" \"A resource name can only appear in a single queue definition.\"\n",
" )\n",
"\n",
"# Encode EXTRA_TRAINS_CONF for later bash script usage\n",
"EXTRA_TRAINS_CONF_ENCODED = \"\\\\\\\"\".join(EXTRA_TRAINS_CONF.split(\"\\\"\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Cloud specific implementation of spin up/down - currently supports AWS only"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_up_worker(resource, worker_id_prefix, queue_name):\n",
" \"\"\"\n",
" Creates a new worker for trains.\n",
" First, create an instance in the cloud and install some required packages.\n",
" Then, define trains-agent environment variables and run \n",
" trains-agent for the specified queue.\n",
" NOTE: - Will wait until instance is running\n",
" - This implementation assumes the instance image already has docker installed\n",
"\n",
" :param str resource: resource name, as defined in BUDGET and QUEUES.\n",
" :param str worker_id_prefix: worker name prefix\n",
" :param str queue_name: trains queue to listen to\n",
" \"\"\"\n",
" resource_conf = RESOURCE_CONFIGURATIONS[resource]\n",
" # Add worker type and AWS instance type to the worker name.\n",
" worker_id = \"{worker_id_prefix}:{worker_type}:{instance_type}\".format(\n",
" worker_id_prefix=worker_id_prefix,\n",
" worker_type=resource,\n",
" instance_type=resource_conf[\"instance_type\"],\n",
" )\n",
"\n",
" # user_data script will automatically run when the instance is started. \n",
" # It will install the required packages for trains-agent configure it using \n",
" # environment variables and run trains-agent on the required queue\n",
" user_data = \"\"\"#!/bin/bash \n",
" sudo apt-get update\n",
" sudo apt-get install -y python3-dev\n",
" sudo apt-get install -y python3-pip\n",
" sudo apt-get install -y gcc\n",
" sudo apt-get install -y git\n",
" sudo python3 -m pip install screen\n",
" sudo python3 -m pip install trains-agent\n",
" echo 'agent.git_user=\\\"{git_user}\\\"' >> /root/trains.conf\n",
" echo 'agent.git_pass=\\\"{git_pass}\\\"' >> /root/trains.conf\n",
" echo {trains_conf} >> /root/trains.conf\n",
" export TRAINS_API_HOST={api_server}\n",
" export TRAINS_WEB_HOST={web_server}\n",
" export TRAINS_FILES_HOST={files_server}\n",
" export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`\n",
" export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID\n",
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
" screen\n",
" {bash_script}\n",
" python3 -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' --docker\"\"\".format(\n",
" api_server=TRAINS_SERVER_API_SERVER,\n",
" web_server=TRAINS_SERVER_WEB_SERVER,\n",
" files_server=TRAINS_SERVER_FILES_SERVER,\n",
" worker_id=worker_id,\n",
" access_key=TRAINS_ACCESS_KEY,\n",
" secret_key=TRAINS_SECRET_KEY,\n",
" queue=queue_name,\n",
" git_user=TRAINS_GIT_USER,\n",
" git_pass=TRAINS_GIT_PASS,\n",
" trains_conf=EXTRA_TRAINS_CONF_ENCODED,\n",
" bash_script=EXTRA_BASH_SCRIPT\n",
" )\n",
"\n",
" ec2 = boto3.client(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" )\n",
"\n",
" if resource_conf[\"is_spot\"]:\n",
" # Create a request for a spot instance in AWS\n",
" encoded_user_data = base64.b64encode(user_data.encode(\"ascii\")).decode(\"ascii\")\n",
" instances = ec2.request_spot_instances(\n",
" LaunchSpecification={\n",
" \"ImageId\": resource_conf[\"ami_id\"],\n",
" \"InstanceType\": resource_conf[\"instance_type\"],\n",
" \"Placement\": {\"AvailabilityZone\": resource_conf[\"availability_zone\"]},\n",
" \"UserData\": encoded_user_data,\n",
" \"BlockDeviceMappings\": [\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" }\n",
" )\n",
"\n",
" # Wait until spot request is fulfilled\n",
" request_id = instances[\"SpotInstanceRequests\"][0][\"SpotInstanceRequestId\"]\n",
" waiter = ec2.get_waiter(\"spot_instance_request_fulfilled\")\n",
" waiter.wait(SpotInstanceRequestIds=[request_id])\n",
" # Get the instance object for later use\n",
" response = ec2.describe_spot_instance_requests(\n",
" SpotInstanceRequestIds=[request_id]\n",
" )\n",
" instance_id = response[\"SpotInstanceRequests\"][0][\"InstanceId\"]\n",
"\n",
" else:\n",
" # Create a new EC2 instance\n",
" instances = ec2.run_instances(\n",
" ImageId=resource_conf[\"ami_id\"],\n",
" MinCount=1,\n",
" MaxCount=1,\n",
" InstanceType=resource_conf[\"instance_type\"],\n",
" UserData=user_data,\n",
" BlockDeviceMappings=[\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" )\n",
"\n",
" # Get the instance object for later use\n",
" instance_id = instances[\"Instances\"][0][\"InstanceId\"]\n",
"\n",
" instance = boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).Instance(instance_id)\n",
"\n",
" # Wait until instance is in running state\n",
" instance.wait_until_running()\n",
"\n",
"\n",
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_down_worker(instance_id):\n",
" \"\"\"\n",
" Destroys the cloud instance.\n",
"\n",
" :param str instance_id: Cloud instance ID to be destroyed \n",
" (currently, only AWS EC2 is supported)\n",
" \"\"\"\n",
" try:\n",
" boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).instances.filter(InstanceIds=[instance_id]).terminate()\n",
" except Exception as ex:\n",
" raise ex"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###### Controller Implementation and Logic"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def supervisor():\n",
" \"\"\"\n",
" Spin up or down resources as necessary.\n",
" - For every queue in QUEUES do the following:\n",
" 1. Check if there are tasks waiting in the queue.\n",
" 2. Check if there are enough idle workers available for those tasks.\n",
" 3. In case more instances are required, and we haven't reached max instances allowed,\n",
" create the required instances with regards to the maximum number defined in QUEUES\n",
" Choose which instance to create according to their order QUEUES. Won't create \n",
" more instances if maximum number defined has already reached.\n",
" - spin down instances according to their idle time. instance which is idle for \n",
" more than MAX_IDLE_TIME_MIN minutes would be removed.\n",
" \"\"\"\n",
"\n",
" # Internal definitions\n",
" workers_prefix = \"dynamic_aws\"\n",
" # Worker's id in trains would be composed from:\n",
" # prefix, name, instance_type and cloud_id separated by ';'\n",
" workers_pattern = re.compile(\n",
" r\"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)\"\n",
" )\n",
"\n",
" # Set up the environment variables for trains\n",
" os.environ[\"TRAINS_API_HOST\"] = TRAINS_SERVER_API_SERVER\n",
" os.environ[\"TRAINS_WEB_HOST\"] = TRAINS_SERVER_WEB_SERVER\n",
" os.environ[\"TRAINS_FILES_HOST\"] = TRAINS_SERVER_FILES_SERVER\n",
" os.environ[\"TRAINS_API_ACCESS_KEY\"] = TRAINS_ACCESS_KEY\n",
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
" api_client = APIClient()\n",
"\n",
" idle_workers = {}\n",
" while True:\n",
" queue_name_to_id = {\n",
" queue.name: queue.id for queue in api_client.queues.get_all()\n",
" }\n",
" resource_to_queue = {\n",
" item[0]: queue\n",
" for queue, resources in QUEUES.items()\n",
" for item in resources\n",
" }\n",
" all_workers = [\n",
" worker\n",
" for worker in api_client.workers.get_all()\n",
" if workers_pattern.match(worker.id)\n",
" and workers_pattern.match(worker.id)[\"prefix\"] == workers_prefix\n",
" ]\n",
"\n",
" # Workers without a task, are added to the idle list\n",
" for worker in all_workers:\n",
" if not hasattr(worker, \"task\") or not worker.task:\n",
" if worker.id not in idle_workers:\n",
" resource_name = workers_pattern.match(worker.id)[\"instance_type\"]\n",
" idle_workers[worker.id] = (time(), resource_name, worker)\n",
" elif hasattr(worker, \"task\") and worker.task and worker.id in idle_workers:\n",
" idle_workers.pop(worker.id, None)\n",
"\n",
" required_idle_resources = [] # idle resources we'll need to keep running\n",
" allocate_new_resources = [] # resources that will need to be started\n",
" # Check if we have tasks waiting on one of the designated queues\n",
" for queue in QUEUES:\n",
" entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries\n",
" if entries and len(entries) > 0:\n",
" queue_resources = QUEUES[queue]\n",
"\n",
" # If we have an idle worker matching the required resource,\n",
" # remove it from the required allocation resources\n",
" free_queue_resources = [\n",
" resource\n",
" for _, resource, _ in idle_workers.values()\n",
" if resource in queue_resources\n",
" ]\n",
" required_idle_resources.extend(free_queue_resources)\n",
" spin_up_count = len(entries) - len(free_queue_resources)\n",
" spin_up_resources = []\n",
"\n",
" # Add as many resources as possible to handle this queue's entries\n",
" for resource, max_instances in queue_resources:\n",
" if len(spin_up_resources) >= spin_up_count:\n",
" break\n",
" max_allowed = max_instances - len(\n",
" [\n",
" worker\n",
" for worker in all_workers\n",
" if workers_pattern.match(worker.id)[\"name\"] == resource\n",
" ]\n",
" )\n",
" spin_up_resources.extend(\n",
" [resource] * min(max_allowed, spin_up_count)\n",
" )\n",
" allocate_new_resources.extend(spin_up_resources)\n",
"\n",
" # Now we actually spin the new machines\n",
" for resource in allocate_new_resources:\n",
" spin_up_worker(resource, workers_prefix, resource_to_queue[resource])\n",
"\n",
" # Go over the idle workers list, and spin down idle workers\n",
" for timestamp, resources, worker in idle_workers.values():\n",
" # skip resource types that might be needed\n",
" if resources in required_idle_resources:\n",
" continue\n",
" # Remove from both aws and trains all instances that are \n",
" # idle for longer than MAX_IDLE_TIME_MIN\n",
" if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:\n",
" cloud_id = workers_pattern.match(worker.id)[\"cloud_id\"]\n",
" spin_down_worker(cloud_id)\n",
" worker.unregister()\n",
"\n",
" # Nothing else to do\n",
" sleep(POLLING_INTERVAL_MIN * 60.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Execute Forever* (the controller is stateless, so you can always re-execute the notebook)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Loop forever, it is okay we are stateless\n",
"while True:\n",
" try:\n",
" supervisor()\n",
" except Exception as ex:\n",
" print(\"Warning! exception occurred: {ex}\\nRetry in 15 seconds\".format(ex=ex))\n",
" sleep(15)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -1 +1 @@
from .backend_api.session.client import APIClient

View File

@@ -13,6 +13,11 @@
# git_user: ""
# git_pass: ""
# Set the python version to use when creating the virtual environment and launching the experiment
# Example values: "/usr/bin/python3" or "/usr/local/bin/python3.6"
# The default is the python executing the trains_agent
python_binary: ""
# select python package manager:
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
@@ -28,7 +33,6 @@
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]
extra_index_url: []
# additional conda channels to use when installing with conda package manager
conda_channels: ["defaults", "conda-forge", "pytorch", ]

View File

@@ -26,6 +26,11 @@
# X files are stored in the upload destination for each metric/variant combination.
file_history_size: 100
# Max history size for matplotlib imshow files per plot title.
# File names for the uploaded images will be recycled in such a way that no more than
# X images are stored in the upload destination for each matplotlib plot title.
matplotlib_untitled_history_size: 100
# Settings for generated debug images
images {
format: JPEG

View File

@@ -3,6 +3,7 @@ from .v2_4 import debug
from .v2_4 import queues
from .v2_4 import tasks
from .v2_4 import workers
from .v2_4 import events
__all__ = [
'auth',
@@ -10,4 +11,5 @@ __all__ = [
'queues',
'tasks',
'workers',
'events',
]

File diff suppressed because it is too large Load Diff

View File

@@ -139,13 +139,25 @@ class Response(object):
:param dest: if all of a response's data is contained in one field, use that field
:type dest: Text
"""
self.response = None
self._result = result
response = getattr(result, "response", result)
if getattr(response, "_service") == "events" and \
getattr(response, "_action") in ("scalar_metrics_iter_histogram",
"multi_task_scalar_metrics_iter_histogram",
"vector_metrics_iter_histogram",
):
# put all the response data under metrics:
response.metrics = result.response_data
if 'metrics' not in response.__class__._get_data_props():
response.__class__._data_props_list['metrics'] = 'metrics'
if dest:
response = getattr(response, dest)
self.response = response
def __getattr__(self, attr):
if self.response is None:
return None
return getattr(self.response, attr)
@property
@@ -212,8 +224,8 @@ class TableResponse(Response):
fields = fields or self.fields
from trains_agent.helper.base import create_table
return create_table(
(tuple(getter(item, attr) for attr in fields) for item in self),
titles=fields, headers=True,
(dict((attr, getter(item, attr)) for attr in fields) for item in self),
titles=fields, columns=fields, headers=True,
)
def display(self, fields=None):
@@ -493,6 +505,7 @@ class APIClient(object):
queues = None # type: Any
tasks = None # type: Any
workers = None # type: Any
events = None # type: Any
def __init__(self, session=None, api_version=None):
self.session = session or StrictSession()

View File

@@ -38,9 +38,9 @@ class Session(TokenManager):
_async_status_code = 202
_session_requests = 0
_session_initial_timeout = (3.0, 10.)
_session_timeout = (10.0, 300.)
_session_timeout = (10.0, 30.)
_write_session_data_size = 15000
_write_session_timeout = (300.0, 300.)
_write_session_timeout = (30.0, 30.)
api_version = '2.1'
default_host = "https://demoapi.trains.allegro.ai"

View File

@@ -358,7 +358,7 @@ class ServiceCommandSection(BaseCommandSection):
**locals())
self.exit(message)
message = 'Could not find {} with name "{}"'.format(service.rstrip('s'), name)
message = 'Could not find {} with name/id "{}"'.format(service.rstrip('s'), name)
if not response:
raise NameResolutionError(message)

View File

@@ -1,19 +1,21 @@
from __future__ import print_function
from six.moves import input
from pyhocon import ConfigFactory
from pyhocon import ConfigFactory, ConfigMissingException
from pathlib2 import Path
from six.moves.urllib.parse import urlparse
from trains_agent.backend_api.session import Session
from trains_agent.backend_api.session.defs import ENV_HOST
from trains_agent.backend_config.defs import LOCAL_CONFIG_FILES
description = """
Please create new credentials using the web app: {}/profile
In the Admin page, press "Create new credentials", then press "Copy to clipboard"
Please create new trains credentials through the profile page in your trains web app (e.g. https://demoapp.trains.allegro.ai/profile)
In the profile page, press "Create new credentials", then press "Copy to clipboard".
Paste credentials here: """
Paste copied configuration here:
"""
def_host = 'http://localhost:8080'
try:
@@ -38,20 +40,39 @@ def main():
print('Leaving setup, feel free to edit the configuration file.')
return
print(host_description)
web_host = input_url('Web Application Host', '')
parsed_host = verify_url(web_host)
print(description, end='')
sentinel = ''
parse_input = '\n'.join(iter(input, sentinel))
credentials = None
api_host = None
web_server = None
# noinspection PyBroadException
try:
parsed = ConfigFactory.parse_string(parse_input)
if parsed:
# Take the credentials in raw form or from api section
credentials = get_parsed_field(parsed, ["credentials"])
api_host = get_parsed_field(parsed, ["api_server", "host"])
web_server = get_parsed_field(parsed, ["web_server"])
except Exception:
credentials = credentials or None
api_host = api_host or None
web_server = web_server or None
if parsed_host.port == 8008:
print('Port 8008 is the api port. Replacing 8080 with 8008 for Web application')
api_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8008', ':8080', 1) + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8008', ':8081', 1) + parsed_host.path
elif parsed_host.port == 8080:
api_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8080', ':8008', 1) + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8080', ':8081', 1) + parsed_host.path
elif parsed_host.netloc.startswith('demoapp.'):
while not credentials or set(credentials) != {"access_key", "secret_key"}:
print('Could not parse credentials, please try entering them manually.')
credentials = read_manual_credentials()
print('Detected credentials key=\"{}\" secret=\"{}\"'.format(credentials['access_key'],
credentials['secret_key'][0:4] + "***"))
if api_host:
api_host = input_url('API Host', api_host)
else:
print(host_description)
api_host = input_url('API Host', '')
parsed_host = verify_url(api_host)
if parsed_host.netloc.startswith('demoapp.'):
# this is our demo server
api_host = parsed_host.scheme + "://" + parsed_host.netloc.replace('demoapp.', 'demoapi.', 1) + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
@@ -73,61 +94,50 @@ def main():
api_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc.replace('api.', 'app.', 1) + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc.replace('api.', 'files.', 1) + parsed_host.path
elif parsed_host.port == 8008:
print('Port 8008 is the api port. Replacing 8080 with 8008 for Web application')
api_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8008', ':8080', 1) + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8008', ':8081', 1) + parsed_host.path
elif parsed_host.port == 8080:
api_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8080', ':8008', 1) + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc.replace(':8080', ':8081', 1) + parsed_host.path
else:
api_host = ''
web_host = ''
files_host = ''
if not parsed_host.port:
print('Host port not detected, do you wish to use the default 8008 port n/[y]? ', end='')
print('Host port not detected, do you wish to use the default 8080 port n/[y]? ', end='')
replace_port = input().lower()
if not replace_port or replace_port == 'y' or replace_port == 'yes':
api_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8008' + parsed_host.path
web_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8080' + parsed_host.path
files_host = parsed_host.scheme + "://" + parsed_host.netloc + ':8081' + parsed_host.path
elif not replace_port or replace_port.lower() == 'n' or replace_port.lower() == 'no':
web_host = input_host_port("Web", parsed_host)
api_host = input_host_port("API", parsed_host)
files_host = input_host_port("Files", parsed_host)
if not api_host:
api_host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
api_host = input_url('API Host', api_host)
web_host = input_url('Web Application Host', web_server if web_server else web_host)
files_host = input_url('File Store Host', files_host)
print('\nTRAINS Hosts configuration:\nAPI: {}\nWeb App: {}\nFile Store: {}\n'.format(
api_host, web_host, files_host))
print('\nTRAINS Hosts configuration:\nWeb App: {}\nAPI: {}\nFile Store: {}\n'.format(
web_host, api_host, files_host))
while True:
print(description.format(web_host), end='')
parse_input = input()
# check if these are valid credentials
credentials = None
# noinspection PyBroadException
try:
parsed = ConfigFactory.parse_string(parse_input)
if parsed:
credentials = parsed.get("credentials", None)
except Exception:
credentials = None
if not credentials or set(credentials) != {"access_key", "secret_key"}:
print('Could not parse user credentials, try again one after the other.')
credentials = {}
# parse individual
print('Enter user access key: ', end='')
credentials['access_key'] = input()
print('Enter user secret: ', end='')
credentials['secret_key'] = input()
print('Detected credentials key=\"{}\" secret=\"{}\"'.format(credentials['access_key'],
credentials['secret_key'], ))
from trains_agent.backend_api.session import Session
# noinspection PyBroadException
try:
print('Verifying credentials ...')
Session(api_key=credentials['access_key'], secret_key=credentials['secret_key'], host=api_host)
print('Credentials verified!')
retry = 1
max_retries = 2
while retry <= max_retries: # Up to 2 tries by the user
if verify_credentials(api_host, credentials):
break
except Exception:
print('Error: could not verify credentials: host={} access={} secret={}'.format(
api_host, credentials['access_key'], credentials['secret_key']))
retry += 1
if retry < max_retries + 1:
credentials = read_manual_credentials()
else:
print('Exiting setup without creating configuration file')
return
# get GIT User/Pass for cloning
print('Enter git username for repository cloning (leave blank for SSH key authentication): [] ', end='')
@@ -140,6 +150,18 @@ def main():
git_user = None
git_pass = None
# get extra-index-url for pip installations
extra_index_urls = []
print('\nEnter additional artifact repository (extra-index-url) to use when installing python packages '
'(leave blank if not required):', end='')
index_url = input().strip()
while index_url:
extra_index_urls.append(index_url)
print('Another artifact repository? (enter another url or leave blank if done):', end='')
index_url = input().strip()
if len(extra_index_urls):
print("The following artifact repositories will be added:\n\t- {}".format("\n\t- ".join(extra_index_urls)))
# noinspection PyBroadException
try:
conf_folder = Path(__file__).parent.absolute() / '..' / 'backend_api' / 'config' / 'default'
@@ -173,6 +195,10 @@ def main():
'agent.git_pass=\"{}\"\n' \
'\n'.format(git_user or '', git_pass or '')
f.write(git_credentials)
extra_index_str = '# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]\n' \
'agent.package_manager.extra_index_url= ' \
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
f.write(extra_index_str)
f.write(default_conf)
except Exception:
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))
@@ -182,18 +208,72 @@ def main():
print('TRAINS-AGENT setup completed successfully.')
def verify_credentials(api_host, credentials):
"""check if the credentials are valid"""
# noinspection PyBroadException
try:
print('Verifying credentials ...')
if api_host:
Session(api_key=credentials['access_key'], secret_key=credentials['secret_key'], host=api_host)
print('Credentials verified!')
return True
else:
print("Can't verify credentials")
return False
except Exception:
print('Error: could not verify credentials: key={} secret={}'.format(
credentials.get('access_key'), credentials.get('secret_key')))
return False
def get_parsed_field(parsed_config, fields):
"""
Parsed the value from web profile page, 'copy to clipboard' option
:param parsed_config: The parsed value from the web ui
:type parsed_config: Config object
:param fields: list of values to parse, will parse by the list order
:type fields: List[str]
:return: parsed value if found, None else
"""
try:
return parsed_config.get("api").get(fields[0])
except ConfigMissingException: # fallback - try to parse the field like it was in web older version
if len(fields) == 1:
return parsed_config.get(fields[0])
elif len(fields) == 2:
return parsed_config.get(fields[1])
else:
return None
def read_manual_credentials():
print('Enter user access key: ', end='')
access_key = input()
print('Enter user secret: ', end='')
secret_key = input()
return {"access_key": access_key, "secret_key": secret_key}
def input_url(host_type, host=None):
while True:
print('{} configured to: [{}] '.format(host_type, host), end='')
parse_input = input()
if host and (not parse_input or parse_input.lower() == 'yes' or parse_input.lower() == 'y'):
break
if parse_input and verify_url(parse_input):
host = parse_input
parsed_host = verify_url(parse_input) if parse_input else None
if parse_input and parsed_host:
host = parsed_host.scheme + "://" + parsed_host.netloc + parsed_host.path
break
return host
def input_host_port(host_type, parsed_host):
print('Enter port for {} host '.format(host_type), end='')
replace_port = input().lower()
return parsed_host.scheme + "://" + parsed_host.netloc + (':{}'.format(replace_port) if replace_port else '') + \
parsed_host.path
def verify_url(parse_input):
try:
if not parse_input.startswith('http://') and not parse_input.startswith('https://'):

View File

@@ -3,6 +3,7 @@ from __future__ import print_function, division, unicode_literals
import errno
import json
import logging
import os
import os.path
import re
import signal
@@ -16,7 +17,6 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from os import environ, getpid
from tempfile import gettempdir, mkdtemp
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@@ -59,7 +59,7 @@ from trains_agent.helper.base import (
is_conda,
named_temporary_file,
ExecutionInfo,
HOCONEncoder, error)
HOCONEncoder, error, get_python_path)
from trains_agent.helper.console import ensure_text
from trains_agent.helper.package.base import PackageManager
from trains_agent.helper.package.conda_api import CondaAPI
@@ -78,7 +78,7 @@ from trains_agent.helper.process import (
Argv,
COMMAND_SUCCESS,
Executable,
get_bash_output)
get_bash_output, shutdown_docker_process, get_docker_id, commit_docker)
from trains_agent.helper.package.cython_req import CythonRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
from trains_agent.helper.resource_monitor import ResourceMonitor
@@ -145,6 +145,11 @@ class LiteralScriptManager(object):
"found task with `script.working_dir` (`%s`) but without `script.repository`, ignoring",
execution.working_dir,
)
if not execution.entry_point:
execution.entry_point = 'untitled.py'
else:
# ignore any folders in the entry point we only need the file name
execution.entry_point = execution.entry_point.split(os.path.sep)[-1]
location = None
location = location or (repo_info and repo_info.root)
if not location:
@@ -321,7 +326,7 @@ class Worker(ServiceCommandSection):
extra_url = [extra_url]
# put external pip url before default ones, so we first look for packages there
for e in reversed(extra_url):
PIP_EXTRA_INDICES.insert(0, e)
self._pip_extra_index_url.insert(0, e)
except Exception:
self.log.warning('Failed adding extra-index-url to pip environment: {}'.format(extra_url))
# update pip install command
@@ -334,7 +339,7 @@ class Worker(ServiceCommandSection):
)
self.pip_install_cmd = tuple(pip_install_cmd)
self.worker_id = self._session.config["agent.worker_id"] or "{}:{}".format(
self._session.config["agent.worker_name"], getpid()
self._session.config["agent.worker_name"], os.getpid()
)
self._last_stats = defaultdict(lambda: 0)
self._last_report_timestamp = psutil.time.time()
@@ -350,6 +355,7 @@ class Worker(ServiceCommandSection):
self._docker_image = None
self._docker_arguments = None
self._daemon_foreground = None
self._standalone_mode = None
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
requirements_manager = RequirementsManager(
@@ -420,24 +426,27 @@ class Worker(ServiceCommandSection):
lines=['Running Task {} inside docker: {}\n'.format(task_id, task_docker_cmd)],
level="INFO")
task_docker_cmd = task_docker_cmd.split(' ')
full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0],
docker_arguments=task_docker_cmd[1:])
docker_image = task_docker_cmd[0]
docker_arguments = task_docker_cmd[1:]
else:
self.send_logs(task_id=task_id,
lines=['No docker image specified, running Task {} inside docker: {} {}\n'.format(
lines=['running Task {} inside default docker image: {} {}\n'.format(
task_id, self._docker_image, self._docker_arguments or '')],
level="INFO")
full_docker_cmd = self.docker_image_func(docker_image=self._docker_image,
docker_arguments=self._docker_arguments)
# Update docker command
try:
docker_cmd = ' '.join([self._docker_image] + self._docker_arguments)
self._session.send_api(
tasks_api.EditRequest(task_id, execution=dict(docker_cmd=docker_cmd), force=True))
except Exception:
pass
docker_image = self._docker_image
docker_arguments = self._docker_arguments
full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring --id ' + task_id
# Update docker command
full_docker_cmd = self.docker_image_func(docker_image=docker_image, docker_arguments=docker_arguments)
try:
self._session.send_api(
tasks_api.EditRequest(task_id, force=True, execution=dict(
docker_cmd=' '.join([docker_image] + docker_arguments) if docker_arguments else docker_image)))
except Exception:
pass
full_docker_cmd[-1] = full_docker_cmd[-1] + 'execute --disable-monitoring {} --id {}'.format(
'--standalone-mode' if self._standalone_mode else '', task_id)
cmd = Argv(*full_docker_cmd)
else:
cmd = worker_args.get_argv_for_command("execute") + (
@@ -483,6 +492,8 @@ class Worker(ServiceCommandSection):
# remove temp files after we sent everything to the backend
safe_remove_file(temp_stdout_name)
safe_remove_file(temp_stderr_name)
if self.docker_image_func:
shutdown_docker_process(docker_cmd_contains='--id {}\'\"'.format(task_id))
def run_tasks_loop(self, queues, worker_params):
"""
@@ -576,10 +587,10 @@ class Worker(ServiceCommandSection):
)
if is_windows_platform():
if not self.is_conda:
self.warning("Worker on Windows without Conda are not supported")
# if not self.is_conda:
# self.warning("Worker on Windows without Conda are not supported")
if self._session.config.agent.venv_update:
self.warning("venv-update is not supported on Windows")
# self.warning("venv-update is not supported on Windows")
self.is_venv_update = False
self._session.print_configuration()
@@ -593,6 +604,8 @@ class Worker(ServiceCommandSection):
# check if we have the latest version
start_check_update_daemon()
self._standalone_mode = kwargs.get('standalone_mode', False)
self.check(**kwargs)
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
@@ -856,15 +869,21 @@ class Worker(ServiceCommandSection):
def build(
self,
task_id,
target_folder=None,
target=None,
python_version=None,
docker=None,
**_
):
if not task_id:
raise CommandFailedError("Worker build must have valid task id")
if not check_if_command_exists("virtualenv"):
raise CommandFailedError("Worker must have virtualenv installed")
self._session.print_configuration()
if docker is not False and docker is not None:
return self._build_docker(docker, target, task_id)
current_task = self._session.api_client.tasks.get_by_id(task_id)
execution = self.get_execution_info(current_task)
@@ -875,7 +894,7 @@ class Worker(ServiceCommandSection):
requirements = None
# TODO: make sure we pass the correct python_version
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target_folder,
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target,
requested_python_version=python_version)
if self._default_pip:
@@ -906,6 +925,72 @@ class Worker(ServiceCommandSection):
return 0
def _build_docker(self, docker, target, task_id):
self.temp_config_path = safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
if not target:
ValueError("--target container name must be provided for docker build")
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
except Exception:
task_docker_cmd = None
if task_docker_cmd:
print('Building Task {} inside docker: {}\n'.format(task_id, task_docker_cmd))
task_docker_cmd = task_docker_cmd.split(' ')
full_docker_cmd = self.docker_image_func(docker_image=task_docker_cmd[0],
docker_arguments=task_docker_cmd[1:])
else:
print('running Task {} inside default docker image: {} {}\n'.format(
task_id, self._docker_image, self._docker_arguments or ''))
full_docker_cmd = self.docker_image_func(docker_image=self._docker_image,
docker_arguments=self._docker_arguments)
end_of_build_marker = "build.done=true"
docker_cmd_suffix = ' build --id {} ; ' \
'echo "" >> /root/trains.conf ; ' \
'echo {} >> /root/trains.conf ; ' \
'bash'.format(task_id, end_of_build_marker)
full_docker_cmd[-1] = full_docker_cmd[-1] + docker_cmd_suffix
cmd = Argv(*full_docker_cmd)
# we will be checking the configuration file for changes
temp_config = Path(self.temp_config_path)
base_time_stamp = temp_config.stat().st_mtime
# start the docker
print('Starting docker build')
cmd.call_subprocess(subprocess.Popen)
# now we need to wait until the line shows on our configuration file.
while True:
while temp_config.stat().st_mtime == base_time_stamp:
sleep(5.0)
with open(temp_config.as_posix()) as f:
lines = [l.strip() for l in f.readlines()]
if 'build.done=true' in lines:
break
base_time_stamp = temp_config.stat().st_mtime
print('\nDocker build done')
# get the docker id.
docker_id = get_docker_id(docker_cmd_contains='--id {} '.format(task_id))
if not docker_id:
print("Error: cannot locate docker for storage")
return
print('Committing docker container to: {}'.format(target))
print(commit_docker(container_name=target, docker_id=docker_id))
shutdown_docker_process(docker_id=docker_id)
return
@resolve_names
def execute(
self,
@@ -914,13 +999,33 @@ class Worker(ServiceCommandSection):
optimization=0,
disable_monitoring=False,
full_monitoring=False,
require_queue=False,
log_file=None,
standalone_mode=None,
**_
):
if not task_id:
raise CommandFailedError("Worker execute must have valid task id")
if not check_if_command_exists("virtualenv"):
raise CommandFailedError("Worker must have virtualenv installed")
try:
current_task = self._session.api_client.tasks.get_by_id(task_id)
if not current_task.id:
pass
except Exception:
raise ValueError("Could not find task id={}".format(task_id))
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
except Exception:
if require_queue:
raise
if full_monitoring:
worker_params = WorkerParams(
log_level=log_level,
@@ -935,13 +1040,8 @@ class Worker(ServiceCommandSection):
return
self._session.print_configuration()
current_task = self._session.api_client.tasks.get_by_id(task_id)
try:
if not current_task.id:
pass
except Exception:
raise ValueError("Could not find task id={}".format(task_id))
# now mark the task as started
self._session.api_client.tasks.started(
task=current_task.id,
status_reason="worker started execution",
@@ -959,12 +1059,13 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
venv_folder, requirements_manager = self.install_virtualenv()
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode)
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
if not standalone_mode:
if self._default_pip:
self.package_api.install_packages(*self._default_pip)
print("\n")
print("\n")
directory, vcs, repo_info = self.get_repo_info(
execution, current_task, venv_folder
@@ -972,13 +1073,17 @@ class Worker(ServiceCommandSection):
print("\n")
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
)
freeze = self.freeze_task_environment(current_task.id)
if not standalone_mode:
self.install_requirements(
execution,
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
)
# do not update the task packages if we are using conda,
# it will most likely make the task environment unreproducible
freeze = self.freeze_task_environment(current_task.id if not self.is_conda else None)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
@@ -1007,7 +1112,7 @@ class Worker(ServiceCommandSection):
"log_to_backend": "0",
"config_file": self._session.config_file, # The config file is the tmp file that trains_agent created
}
environ.update(
os.environ.update(
{
sdk_key: str(value)
for key, value in sdk_env.items()
@@ -1018,6 +1123,11 @@ class Worker(ServiceCommandSection):
if repo_info:
self._update_commit_id(task_id, execution, repo_info)
# Add the script CWD to the python path
python_path = get_python_path(script_dir, execution.entry_point, self.package_api)
if python_path:
os.environ['PYTHONPATH'] = python_path
print("Starting Task Execution:\n".format(task_id))
exit_code = -1
try:
@@ -1026,11 +1136,20 @@ class Worker(ServiceCommandSection):
sys.stdout.flush()
sys.stderr.flush()
os.chdir(script_dir)
os.execv(command.argv[0].as_posix(), tuple([command.argv[0].as_posix()])+command.argv[1:])
# exit_code = command.check_call(cwd=script_dir)
if not is_windows_platform():
os.execv(command.argv[0].as_posix(), tuple([command.argv[0].as_posix()])+command.argv[1:])
else:
exit_code = command.check_call(cwd=script_dir)
exit(exit_code)
except subprocess.CalledProcessError as ex:
# non zero return code
exit_code = ex.returncode
if is_windows_platform():
exit(exit_code)
except Exception as ex:
if is_windows_platform():
exit(-1)
raise ex
else:
# store stdout/stderr into file, and send to backend
temp_stdout_fname = log_file or safe_mkstemp(
@@ -1135,7 +1254,10 @@ class Worker(ServiceCommandSection):
execution=execution,
destination=Path(venv_folder) / WORKING_REPOSITORY_DIR,
)
except Exception:
except CommandFailedError:
raise
except Exception as ex:
print('Repository cloning failed: {}'.format(ex))
task.failed(
status_reason="failed cloning repository",
status_message=self._task_status_change_message,
@@ -1291,19 +1413,21 @@ class Worker(ServiceCommandSection):
self.package_api.out_of_scope_install_package('Cython')
cached_requirements_failed = False
if cached_requirements:
self.log("Found cached requirements, trying to install")
if cached_requirements and ('pip' in cached_requirements or 'conda' in cached_requirements):
self.log("Found task requirements section, trying to install")
try:
self.package_api.load_requirements(cached_requirements)
except Exception as e:
self.log_traceback(e)
self.error("Could not install cached requirements, installing requirements from repository")
cached_requirements_failed = True
raise ValueError("Could not install task requirements!")
else:
self.log("Cached requirements installation success")
self.log("task requirements installation passed")
return
if not repo_info:
if cached_requirements_failed:
raise ValueError("Could not install task requirements!")
self.log("no repository to install requirements from")
return
@@ -1346,7 +1470,7 @@ class Worker(ServiceCommandSection):
# if we reached here without installing anything, and
# we failed installing from cached requirements, them this is an error
if cached_requirements_failed and not repo_requirements_installed:
raise ValueError("Could not install cached requirements or repository requirements")
raise ValueError("Failed installing task requirements and repository requirements")
def named_temporary_file(self, *args, **kwargs):
kwargs.setdefault("delete", not self._session.debug_mode)
@@ -1416,12 +1540,28 @@ class Worker(ServiceCommandSection):
for i in range(len(it) + 1):
yield it[:i]
python_executables = [
(version, "python{}".format(version))
for version in map(
".".join, reversed(list(suffixes(config_version.split("."))))
)
]
def rreplace(s, old, new, count):
return (s[::-1].replace(old[::-1], new[::-1], count))[::-1]
if is_windows_platform():
python_executables = [
(version, config_version if os.path.sep in config_version else 'python{}'.format(version))
for version in map(
".".join, reversed(list(suffixes(
rreplace(
rreplace(config_version.split(os.path.sep)[-1].lower(), 'python', '', 1),
'.exe', '', 1).split("."))))
)
]
else:
python_executables = [
(version, config_version if os.path.sep in config_version else 'python{}'.format(version))
for version in map(
".".join, reversed(list(suffixes(
rreplace(config_version.split(os.path.sep)[-1], 'python', '', 1).split("."))))
)
]
for version, executable in python_executables:
self.log.debug("Searching for {}".format(executable))
if find_executable(executable):
@@ -1433,11 +1573,12 @@ class Worker(ServiceCommandSection):
self.log.warning("error getting %s version: %s", executable, ex)
continue
match = re.search(
r"Python ({}(?:\.\d+)*)".format(config_version or r"\d+"), output
r"Python ({}(?:\.\d+)*)".format(
r"\d+" if not config_version or os.path.sep in config_version else config_version), output
)
if match:
self.log.debug("Found: {}".format(executable))
return match.group(1), version, executable
return match.group(1), version or '.'.join(match.group(1).split('.')[:2]), executable
raise CommandFailedError(
"Python executable with version {!r} defined in configuration file, "
"key 'agent.default_python', not found in path, tried: {}".format(
@@ -1445,20 +1586,24 @@ class Worker(ServiceCommandSection):
)
)
def install_virtualenv(self, venv_dir=None, requested_python_version=None):
def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False):
# type: (str, str) -> Tuple[Path, RequirementsManager]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
"""
requested_python_version = requested_python_version or Text(self._session.config["agent.default_python"])
venv_dir = Path(venv_dir) if venv_dir else \
Path(self._session.config["agent.venvs_dir"], requested_python_version)
requested_python_version = requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
executable_version, executable_version_suffix, executable_name = self.find_python_executable_for_version(
requested_python_version
)
venv_dir = Path(venv_dir) if venv_dir else \
Path(self._session.config["agent.venvs_dir"], executable_version_suffix)
self._session.config.put("agent.default_python", executable_version)
first_time = (
self._session.config.put("agent.python_binary", executable_name)
first_time = not standalone_mode and (
is_windows_platform()
or self.is_conda
or not venv_dir.is_dir()
@@ -1472,7 +1617,7 @@ class Worker(ServiceCommandSection):
rm_tree(normalize_path(venv_dir, WORKING_REPOSITORY_DIR))
package_manager_params = dict(
session=self._session,
python=executable_version_suffix,
python=executable_version_suffix if self.is_conda else executable_name,
path=venv_dir,
requirements_manager=requirements_manager,
)
@@ -1488,6 +1633,10 @@ class Worker(ServiceCommandSection):
if first_time:
self.package_api.remove()
self.package_api.create()
elif standalone_mode:
# conda with standalone mode
get_conda = partial(CondaAPI, **package_manager_params)
self.package_api = get_conda()
else:
get_conda = partial(CondaAPI, **package_manager_params)
@@ -1534,7 +1683,8 @@ class Worker(ServiceCommandSection):
args.update(kwargs)
return self._get_docker_cmd(**args)
docker_image = str(self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or os.environ.get("ALG_DOCKER_IMAGE") or
self._session.config.get("agent.default_docker.image", "nvidia/cuda")) \
if not docker_args else docker_args[0]
docker_arguments = docker_image.split(' ') if docker_image else []
if len(docker_arguments) > 1:
@@ -1547,8 +1697,8 @@ class Worker(ServiceCommandSection):
python_version = '3'
if not python_version.startswith('python'):
python_version = 'python'+python_version
print("Running in Docker mode (v19.03 and above) - using default docker image: {} running {}\n".format(
docker_image, python_version))
print("Running in Docker {} mode (v19.03 and above) - using default docker image: {} running {}\n".format(
'*standalone*' if self._standalone_mode else '', docker_image, python_version))
temp_config = self._session.config.copy()
mounted_cache_dir = '/root/.trains/cache'
mounted_pip_dl_dir = '/root/.trains/pip-download-cache'
@@ -1562,6 +1712,9 @@ class Worker(ServiceCommandSection):
temp_config.put("agent.vcs_cache.path", mounted_vcs_cache)
temp_config.put("agent.package_manager.system_site_packages", True)
temp_config.put("agent.default_python", "")
temp_config.put("agent.python_binary", "")
temp_config.put("agent.cuda_version", "")
temp_config.put("agent.cudnn_version", "")
temp_config.put("agent.venvs_dir", mounted_venv_dir)
host_apt_cache = Path(os.path.expandvars(self._session.config.get(
@@ -1601,7 +1754,8 @@ class Worker(ServiceCommandSection):
host_ssh_cache=host_ssh_cache,
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache)
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode)
return temp_config, partial(docker_cmd_functor, docker_cmd)
@staticmethod
@@ -1612,14 +1766,14 @@ class Worker(ServiceCommandSection):
host_ssh_cache,
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache):
host_vcs_cache, mounted_vcs_cache, standalone_mode=False):
docker = 'docker'
base_cmd = [docker, 'run', '-t']
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
if gpu_devices is None:
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
base_cmd += ['--gpus', 'all', ]
elif gpu_devices.strip():
elif gpu_devices.strip() and gpu_devices.strip() != 'none':
base_cmd += ['--gpus', 'device='+gpu_devices, ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
@@ -1634,23 +1788,41 @@ class Worker(ServiceCommandSection):
if host_ssh_cache:
base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ]
# if we are running a RC version, install the same version in the docker
# because the default latest, will be a release version (not RC)
specify_version = ''
try:
from trains_agent.version import __version__
_version_parts = __version__.split('.')
if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
specify_version = '=={}'.format(__version__)
except:
pass
if standalone_mode:
update_scheme = ""
else:
update_scheme = \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \
"apt-get update ; " \
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " \
"{python} -m pip install -U pip ; " \
"{python} -m pip install -U trains-agent{specify_version} ; ".format(
python_single_digit=python_version.split('.')[0],
python=python_version, specify_version=specify_version)
base_cmd += [
'-v', conf_file+':/root/trains.conf',
'-v', host_apt_cache+':/var/cache/apt/archives',
'-v', host_pip_cache+':/root/.cache/pip',
'-v', host_pip_dl+':'+mounted_pip_dl,
'-v', host_cache+':'+mounted_cache,
'-v', host_vcs_cache+':'+mounted_vcs_cache,
'--rm', docker_image, 'bash', '-c',
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; "
"chown -R root /root/.cache/pip ; "
"apt-get update ; "
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; "
"{python} -m pip install -U pip ; "
"{python} -m pip install -U trains-agent ; "
"{python} -u -m trains_agent ".format(
python_single_digit=python_version.split('.')[0],
python=python_version)]
'-v', conf_file+':/root/trains.conf',
'-v', host_apt_cache+':/var/cache/apt/archives',
'-v', host_pip_cache+':/root/.cache/pip',
'-v', host_pip_dl+':'+mounted_pip_dl,
'-v', host_cache+':'+mounted_cache,
'-v', host_vcs_cache+':'+mounted_vcs_cache,
'--rm', docker_image, 'bash', '-c',
update_scheme +
"NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version)
]
return base_cmd
@@ -1658,8 +1830,15 @@ class Worker(ServiceCommandSection):
# ensure singleton
worker_id = self._session.config["agent.worker_id"]
worker_name = self._session.config["agent.worker_name"]
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES'):
worker_id = '{}:gpu{}'.format(worker_name, os.environ.get('NVIDIA_VISIBLE_DEVICES'))
if not worker_id and os.environ.get('NVIDIA_VISIBLE_DEVICES') is not None:
nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES')
if nvidia_visible_devices and nvidia_visible_devices.lower() != 'none':
worker_id = '{}:gpu{}'.format(worker_name, nvidia_visible_devices)
elif nvidia_visible_devices == '':
pass
else:
worker_name = '{}:cpu'.format(worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name)
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))

View File

@@ -176,6 +176,25 @@ def safe_remove_file(filename, error_message=None):
print(error_message)
def get_python_path(script_dir, entry_point, package_api):
try:
python_path_sep = ';' if is_windows_platform() else ':'
python_path_cmd = package_api.get_python_command(
["-c", "import sys; print('{}'.join(sys.path))".format(python_path_sep)])
org_python_path = python_path_cmd.get_output(cwd=script_dir)
# Add path of the script directory and executable directory
python_path = '{}{python_path_sep}{}{python_path_sep}'.format(
Path(script_dir).absolute().as_posix(),
(Path(script_dir) / Path(entry_point)).parent.absolute().as_posix(),
python_path_sep=python_path_sep)
if is_windows_platform():
return python_path.replace('/', '\\') + org_python_path
return python_path + org_python_path
except Exception:
return None
class Singleton(ABCMeta):
_instances = {}

View File

@@ -2,20 +2,24 @@ from __future__ import unicode_literals
import json
import re
import shutil
import subprocess
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from typing import Text, Iterable, Union, Dict, Set, Sequence, Any
import six
import yaml
from time import time
from attr import attrs, attrib, Factory
from pathlib2 import Path
from semantic_version import Version
from requirements import parse
from requirements.requirement import Requirement
from trains_agent.errors import CommandFailedError
from trains_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform
from trains_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform
from trains_agent.helper.process import Argv, Executable, DEVNULL, CommandSequence, PathLike
from trains_agent.session import Session
from .base import PackageManager
@@ -36,7 +40,8 @@ def _package_diff(path, packages):
class CondaPip(VirtualenvPip):
def __init__(self, source=None, *args, **kwargs):
super(CondaPip, self).__init__(*args, **kwargs)
super(CondaPip, self).__init__(*args, interpreter=Path(kwargs.get('path'), "python.exe") \
if is_windows_platform() and kwargs.get('path') else None, **kwargs)
self.source = source
def run_with_env(self, command, output=False, **kwargs):
@@ -80,7 +85,7 @@ class CondaAPI(PackageManager):
or Argv(select_for_platform(windows="where", linux="which"), "conda").get_output(shell=True).strip()
)
try:
output = Argv(self.conda, "--version").get_output()
output = Argv(self.conda, "--version").get_output(stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as ex:
raise CommandFailedError(
"Unable to determine conda version: {ex}, output={ex.output}".format(
@@ -131,7 +136,7 @@ class CondaAPI(PackageManager):
else ("activate", self.path)
)
conda_env = Path(self.conda).parent.parent / 'etc' / 'profile.d' / 'conda.sh'
if conda_env.is_file():
if conda_env.is_file() and not is_windows_platform():
self.source = self.pip.source = CommandSequence(('source', conda_env.as_posix()), self.source)
# install cuda toolkit
@@ -161,6 +166,12 @@ class CondaAPI(PackageManager):
except Exception:
pass
rm_tree(self.path)
# if we failed removing the path, change it's name
if is_windows_platform() and Path(self.path).exists():
try:
Path(self.path).rename(Path(self.path).as_posix() + '_' + str(time()))
except Exception:
pass
def _install_from_file(self, path):
"""
@@ -235,9 +246,43 @@ class CondaAPI(PackageManager):
# create new environment file
conda_env = dict()
conda_env['channels'] = self.extra_channels
reqs = [MarkerRequirement(next(parse(r))) for r in requirements['pip']]
reqs = []
if isinstance(requirements['pip'], six.string_types):
requirements['pip'] = requirements['pip'].split('\n')
has_torch = False
has_matplotlib = False
try:
cuda_version = int(self.session.config.get('agent.cuda_version', 0))
except:
cuda_version = 0
for r in requirements['pip']:
marker = list(parse(r))
if marker:
m = MarkerRequirement(marker[0])
if m.req.name.lower() == 'matplotlib':
has_matplotlib = True
elif m.req.name.lower().startswith('torch'):
has_torch = True
if m.req.name.lower() in ('torch', 'pytorch'):
has_torch = True
m.req.name = 'pytorch'
if m.req.name.lower() in ('tensorflow_gpu', 'tensorflow-gpu', 'tensorflow'):
has_torch = True
m.req.name = 'tensorflow-gpu' if cuda_version > 0 else 'tensorflow'
reqs.append(m)
pip_requirements = []
# Conda requirements Hacks:
if has_matplotlib:
reqs.append(MarkerRequirement(Requirement.parse('graphviz')))
reqs.append(MarkerRequirement(Requirement.parse('kiwisolver')))
if has_torch and cuda_version == 0:
reqs.append(MarkerRequirement(Requirement.parse('cpuonly')))
while reqs:
conda_env['dependencies'] = [r.tostr().replace('==', '=') for r in reqs]
with self.temp_file("conda_env", yaml.dump(conda_env), suffix=".yml") as name:
@@ -308,6 +353,10 @@ class CondaAPI(PackageManager):
:param kwargs: kwargs for Argv.get_output()
:return: JSON output or text output
"""
def escape_ansi(line):
ansi_escape = re.compile(r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]')
return ansi_escape.sub('', line)
command = Argv(*command) # type: Executable
if not raw:
command = (self.conda,) + command + ("--quiet", "--json")
@@ -320,7 +369,8 @@ class CondaAPI(PackageManager):
result = e.output if hasattr(e, 'output') else ''
if raw:
return result
result = json.loads(result) if result else {}
result = json.loads(escape_ansi(result)) if result else {}
if result.get('success', False):
print('Pass')
elif result.get('error'):

View File

@@ -30,7 +30,7 @@ class VirtualenvPip(SystemPip, PackageManager):
self.session = session
self.path = path
self.requirements_manager = requirements_manager
self.python = "python{}".format(python)
self.python = python
def _make_command(self, command):
return self.session.command(self.bin, "-m", "pip", *command)

View File

@@ -96,3 +96,15 @@ class PoetryAPI(object):
def get_python_command(self, extra):
return Argv("poetry", "run", "python", *extra)
def upgrade_pip(self, *args, **kwargs):
pass
def set_selected_package_manager(self, *args, **kwargs):
pass
def out_of_scope_install_package(self, *args, **kwargs):
pass
def install_from_file(self, *args, **kwargs):
pass

View File

@@ -239,7 +239,23 @@ class PytorchRequirement(SimpleSubstitution):
def get_url_for_platform(self, req):
assert self.package_manager == "pip"
assert self.os != "mac"
# check if package is already installed with system packages
try:
if self.config.get("agent.package_manager.system_site_packages"):
from pip._internal.commands.show import search_packages_info
installed_torch = list(search_packages_info([req.name]))
op, version = req.specs[0] if req.specs else (None, None)
# notice the comparision order, the first part will make sure we have a valid installed package
if installed_torch[0]['version'] and (installed_torch[0]['version'] == version or not version):
# package already installed, do nothing
return str(req), True
except:
pass
# make sure we have a specific version to retrieve
assert req.specs
try:
req.specs[0] = (req.specs[0][0], req.specs[0][1].split('+')[0])
except:

View File

@@ -288,10 +288,24 @@ class RequirementsManager(object):
if cuda_version and cudnn_version:
return normalize_cuda_version(cuda_version), normalize_cuda_version(cudnn_version)
if not cuda_version and is_windows_platform():
try:
cuda_vers = [int(k.replace('CUDA_PATH_V', '').replace('_', '')) for k in os.environ.keys()
if k.startswith('CUDA_PATH_V')]
cuda_vers = max(cuda_vers)
if cuda_vers > 40:
cuda_version = cuda_vers
except:
pass
if not cuda_version:
try:
try:
output = Argv('nvcc', '--version').get_output()
nvcc = 'nvcc.exe' if is_windows_platform() else 'nvcc'
if is_windows_platform() and 'CUDA_PATH' in os.environ:
nvcc = os.path.join(os.environ['CUDA_PATH'], nvcc)
output = Argv(nvcc, '--version').get_output()
except OSError:
raise CudaNotFound('nvcc not found')
match = re.search(r'release (.{3})', output).group(1)

View File

@@ -59,6 +59,49 @@ def kill_all_child_processes(pid=None):
parent.kill()
def get_docker_id(docker_cmd_contains):
try:
containers_running = get_bash_output(cmd='docker ps --no-trunc --format \"{{.ID}}: {{.Command}}\"')
for docker_line in containers_running.split('\n'):
parts = docker_line.split(':')
if docker_cmd_contains in parts[-1]:
# we found our docker, return it
return parts[0]
except Exception:
pass
return None
def shutdown_docker_process(docker_cmd_contains=None, docker_id=None):
try:
if not docker_id:
docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains)
if docker_id:
# we found our docker, stop it
get_bash_output(cmd='docker stop -t 1 {}'.format(docker_id))
except Exception:
pass
def commit_docker(container_name, docker_cmd_contains=None, docker_id=None):
try:
if not docker_id:
docker_id = get_docker_id(docker_cmd_contains=docker_cmd_contains)
if not docker_id:
print("Failed locating requested docker")
return False
if docker_id:
# we found our docker, stop it
output = get_bash_output(cmd='docker commit {} {}'.format(docker_id, container_name))
return output
except Exception:
pass
print("Failed storing requested docker")
return False
def check_if_command_exists(cmd):
return bool(find_executable(cmd))
@@ -124,6 +167,8 @@ class Argv(Executable):
"""
Returns a string of the shell command
"""
if is_windows_platform():
return self.ARGV_SEPARATOR.join(map(double_quote, self))
return self.ARGV_SEPARATOR.join(map(quote, self))
def call_subprocess(self, func, censor_password=False, *args, **kwargs):
@@ -144,6 +189,9 @@ class Argv(Executable):
return "Executing: {}".format(self.argv)
def __iter__(self):
if is_windows_platform():
return (word.as_posix().replace('/', '\\') if isinstance(word, Path) else six.text_type(word)
for word in self.argv)
return (six.text_type(word) for word in self.argv)
def __getitem__(self, item):
@@ -224,7 +272,8 @@ class CommandSequence(Executable):
return islice(chain.from_iterable(zip(repeat(delimiter), seq)), 1, None)
def normalize(command):
return list(command) if is_windows_platform() else command.serialize()
# return list(command) if is_windows_platform() else command.serialize()
return command.serialize()
return ' '.join(list(intersperse(self.JOIN_COMMAND_OPERATOR, map(normalize, self.commands))))
@@ -266,8 +315,6 @@ class CommandSequence(Executable):
def pretty(self):
serialized = self.serialize()
if is_windows_platform():
return " ".join(serialized)
return serialized
@@ -361,3 +408,18 @@ def quote(s):
# use single quotes, and put single quotes into double quotes
# the string $'b is then quoted as '$'"'"'b'
return "'" + s.replace("'", "'\"'\"'") + "'"
def double_quote(s):
"""
Backport of shlex.quote():
Return a shell-escaped version of the string *s*.
"""
if not s:
return "''"
if _find_unsafe(s) is None:
return s
# use single quotes, and put single quotes into double quotes
# the string $"b is then quoted as "$"""b"
return '"' + s.replace('"', '"\'\"\'"') + '"'

View File

@@ -263,8 +263,9 @@ class VCS(object):
"""
self._set_ssh_url()
clone_command = ("clone", self.url_with_auth, self.location) + self.clone_flags
if branch:
clone_command += ("-b", branch)
# clone all branches regardless of when we want to later checkout
# if branch:
# clone_command += ("-b", branch)
if self.session.debug_mode:
self.call(*clone_command)
return
@@ -453,13 +454,13 @@ class Git(VCS):
)
def pull(self):
self.call("fetch", "origin", cwd=self.location)
self.call("fetch", "--all", cwd=self.location)
info_commands = dict(
url=Argv("git", "remote", "get-url", "origin"),
branch=Argv("git", "rev-parse", "--abbrev-ref", "HEAD"),
commit=Argv("git", "rev-parse", "HEAD"),
root=Argv("git", "rev-parse", "--show-toplevel"),
url=Argv(executable_name, "ls-remote", "--get-url", "origin"),
branch=Argv(executable_name, "rev-parse", "--abbrev-ref", "HEAD"),
commit=Argv(executable_name, "rev-parse", "HEAD"),
root=Argv(executable_name, "rev-parse", "--show-toplevel"),
)
patch_base = ("apply",)
@@ -493,10 +494,10 @@ class Hg(VCS):
)
info_commands = dict(
url=Argv("hg", "paths", "--verbose"),
branch=Argv("hg", "--debug", "id", "-b"),
commit=Argv("hg", "--debug", "id", "-i"),
root=Argv("hg", "root"),
url=Argv(executable_name, "paths", "--verbose"),
branch=Argv(executable_name, "--debug", "id", "-b"),
commit=Argv(executable_name, "--debug", "id", "-i"),
root=Argv(executable_name, "root"),
)
@@ -533,11 +534,10 @@ def clone_repository_cached(session, execution, destination):
else:
print("cloning: {}".format(no_password_url))
rm_tree(cached_repo_path)
vcs.clone(branch=execution.branch)
# We clone the entire repository, not a specific branch
vcs.clone() # branch=execution.branch)
vcs.pull()
vcs.checkout()
rm_tree(destination)
shutil.copytree(Text(cached_repo_path), Text(clone_folder))
if not clone_folder.is_dir():
@@ -547,6 +547,10 @@ def clone_repository_cached(session, execution, destination):
)
)
# checkout in the newly copy destination
vcs.location = Text(clone_folder)
vcs.checkout()
repo_info = vcs.get_repository_copy_info(clone_folder)
# make sure we have no user/pass in the returned repository structure

View File

@@ -37,10 +37,19 @@ DAEMON_ARGS = dict({
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
'action': 'store_true',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'set NVIDIA_VISIBLE_DEVICES to limit gpu visibility for docker',
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
},
@@ -51,6 +60,11 @@ DAEMON_ARGS = dict({
'dest': 'queues',
'type': foreign_object_id('queues'),
},
'--standalone-mode': {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
}, **WORKER_ARGS)
COMMANDS = {
@@ -74,6 +88,15 @@ COMMANDS = {
'help': 'Full environment setup log & task logging & monitoring (stdout is still visible)',
'action': 'store_true',
},
'--require-queue': {
'help': 'If the specified task is not queued (in any Queue), the execution will fail. '
'(Used for 3rd party scheduler integration, e.g. K8s, SLURM, etc.)',
'action': 'store_true',
},
'--standalone-mode': {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
}, **WORKER_ARGS),
},
'build': {
@@ -87,8 +110,25 @@ COMMANDS = {
'dest': 'task_id',
'type': foreign_object_id('tasks'),
},
'--target-folder': {
'help': 'Where to build the task\'s virtual environment and source code',
'--target': {
'help': 'Where to build the task\'s virtual environment and source code. '
'When used with --docker, target docker image name to create',
},
'--docker': {
'help': 'Build the experiment inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
},
'--gpus': {
'help': 'Specify active GPUs for the docker to use'
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
},
'--cpu-only': {
'help': 'Disable GPU access (cpu only) for the docker',
'action': 'store_true',
},
'--python-version': {
'help': 'Virtual environment python version to use',

View File

@@ -72,6 +72,11 @@ class Session(_Session):
os.environ[LOCAL_CONFIG_FILE_OVERRIDE_VAR] = config_file
if not Path(config_file).is_file():
raise ValueError("Could not open configuration file: {}".format(config_file))
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
if kwargs.get('gpus'):
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
if kwargs.get('only_load_config'):
from trains_agent.backend_api.config import load
self.config = load()
@@ -94,10 +99,12 @@ class Session(_Session):
if not self.config.get('api.host', None) and self.config.get('api.api_server', None):
self.config['api']['host'] = self.config.get('api.api_server')
# initialize nvidia visibility variable
# initialize nvidia visibility variables
os.environ['CUDA_DEVICE_ORDER'] = "PCI_BUS_ID"
if os.environ.get('NVIDIA_VISIBLE_DEVICES') and not os.environ.get('CUDA_VISIBLE_DEVICES'):
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ.get('NVIDIA_VISIBLE_DEVICES')
# do not create CUDA_VISIBLE_DEVICES if it doesn't exist, it breaks TF/PyTotch CUDA detection
# os.environ['CUDA_VISIBLE_DEVICES'] = os.environ.get('NVIDIA_VISIBLE_DEVICES')
pass
elif os.environ.get('CUDA_VISIBLE_DEVICES') and not os.environ.get('NVIDIA_VISIBLE_DEVICES'):
os.environ['NVIDIA_VISIBLE_DEVICES'] = os.environ.get('CUDA_VISIBLE_DEVICES')
@@ -115,7 +122,7 @@ class Session(_Session):
from trains_agent.helper.package.requirements import RequirementsManager
agent = self.config['agent']
agent['cuda_version'], agent['cudnn_version'] = \
RequirementsManager.get_cuda_version(self.config)
RequirementsManager.get_cuda_version(self.config) if not cpu_only else ('0', '0')
except Exception:
pass
@@ -199,6 +206,15 @@ class Session(_Session):
config.pop('env', None)
if remove_secret_keys:
recursive_remove_secrets(config, secret_keys=remove_secret_keys)
# remove logging.loggers.urllib3.level from the print
try:
config['logging']['loggers']['urllib3'].pop('level', None)
except (KeyError, TypeError, AttributeError):
pass
try:
config['logging'].pop('version', None)
except (KeyError, TypeError, AttributeError):
pass
config = ConfigFactory.from_dict(config)
self.log.debug("Run by interpreter: %s", sys.executable)
print(

View File

@@ -1 +1 @@
__version__ = '0.12.0'
__version__ = '0.13.0'