Compare commits

...

92 Commits

Author SHA1 Message Date
allegroai
2b561f6066 Version bump to v0.14.1 2020-03-24 20:37:18 +02:00
allegroai
61232d05dd Fix run as user support in Windows and add fall-back for created user folders 2020-03-22 19:16:11 +02:00
allegroai
b3418e4496 Add daemon detached mode (--detached, -d) that runs agent in the background and returns immediately 2020-03-22 19:00:29 +02:00
allegroai
5ef627165c Fix PyTorch support to ignore minor versions when looking for package to install or to download 2020-03-20 10:48:48 +02:00
allegroai
98a983d9a2 Add TRAINS_AGENT_EXTRA_PYTHON_PATH to allow adding additional python path for task execution (helpful when using extra untracked modules) 2020-03-20 10:46:56 +02:00
allegroai
482007c4ce Fix run as user feature (TRAINS_AGENT_EXEC_USER) 2020-03-20 10:42:32 +02:00
allegroai
98198b8006 Auto mount ~/.git-credentials into docker container if file exists 2020-03-20 10:39:59 +02:00
allegroai
94bb11a81a Change message when using local torch 2020-03-20 10:37:42 +02:00
allegroai
4158d08f6f Fix test 2020-03-20 10:36:20 +02:00
allegroai
58ab67ea31 Fix execution output handling 2020-03-20 10:35:25 +02:00
allegroai
ea0ed4807e Version bump to v0.14.0 2020-03-12 19:42:32 +02:00
allegroai
389600b91e Fix git checkout with submodules 2020-03-12 18:39:47 +02:00
allegroai
5fb2550212 Update to backend API v2.5 2020-03-12 18:39:10 +02:00
allegroai
15e9e6b778 Fix "execute --clone" support 2020-03-12 18:38:35 +02:00
allegroai
aa75b92e46 Prefer docker image from command line over the one in the experiment 2020-03-12 18:35:49 +02:00
allegroai
757210d5b3 Add support for "execute --docker" and for cloning an experiment before execution 2020-03-12 18:33:07 +02:00
allegroai
00eb2f10ec Version bump to v0.13.3 2020-03-09 16:07:50 +02:00
allegroai
3393372b9c Do not share apt cache among agents on the same machine 2020-03-09 12:38:51 +02:00
allegroai
f2d2d702de Fix k8s support to allow a specific network for the docker (do not use the parent daemon network definition) 2020-03-09 12:38:32 +02:00
allegroai
e3d0680d39 Improve Unicode/UTF stdout handling 2020-03-09 12:34:48 +02:00
allegroai
618c2ac5c4 Add default storage environment vars to generated agent configuration 2020-03-09 12:33:03 +02:00
allegroai
0272c4c79c Add "--force-current-version" daemon command-line flag 2020-03-09 12:31:43 +02:00
allegroai
ff8cf63abf Add "--force-current-version" daemon command-line flag 2020-03-09 12:27:39 +02:00
allegroai
2c7c7f5b44 Add K8s/trains glue service example 2020-03-05 14:10:08 +02:00
allegroai
01f57c1e44 Create missing queues when starting the AWS dynamic cluster management service 2020-03-05 14:08:32 +02:00
allegroai
47bcd3839a Pass correct GPU limit when skipping gpus flag in docker mode 2020-03-05 14:07:44 +02:00
allegroai
0a3a8a1c52 Add support for mounting dockerized experiment folders to host when running on K8s in daemon mode 2020-03-05 13:13:03 +02:00
allegroai
231a907cff Add support for running daemon inside a K8s pod in daemon mode 2020-03-05 13:03:36 +02:00
allegroai
8f95eecf2e Add TRAINS_AGENT_EXEC_USER support for multiple daemon instances 2020-03-05 12:46:53 +02:00
allegroai
81008ee00e Add support for launching a specific python version based on Task.script.binary 2020-03-01 17:15:18 +02:00
allegroai
25bc44c0cf Add poetry to the list of supported package managers 2020-03-01 17:13:15 +02:00
allegroai
f838c8fc70 Allow providing queue names to daemon 2020-02-26 16:58:25 +02:00
allegroai
596093aac6 Version bump to v0.13.2 2020-02-23 16:25:14 +02:00
allegroai
8f23f3b4c0 Add support for pulling recursive git modules as as well as main project 2020-02-23 15:48:12 +02:00
allegroai
95d503afdd Fix pip install or upgrade with limit in conda 2020-02-23 15:47:28 +02:00
allegroai
73ee33be99 Print error in case Poetry configuration failed 2020-02-23 14:43:21 +02:00
allegroai
ee3adf625f Add single-series-per-graph setting to the configuration example 2020-02-23 12:38:14 +02:00
allegroai
afec38a50e Add missing models service 2020-02-18 11:31:58 +02:00
allegroai
f9c60904f4 version bump 2020-02-12 11:23:53 +02:00
allegroai
a09dc85c67 Limit virtualenv version to <20 due to an import issue in v20.0.0 2020-02-12 11:23:48 +02:00
allegroai
5d74f4b376 version bump 2020-02-10 10:47:20 +02:00
allegroai
d558c66d3c Do not stop experiments if network is down 2020-02-10 10:47:13 +02:00
allegroai
714c6a05d0 Add .bashrc reloading before running trains-agent in the AWS dynamic cluster management service 2020-02-10 10:36:00 +02:00
allegroai
43b2f7f41d version bump 2020-02-04 18:06:45 +02:00
allegroai
28d752d568 Preinstall numpy if it exists in the requirements (temporary fix) 2020-02-04 18:06:25 +02:00
allegroai
6d091d8e08 Add experiment archiving example 2020-02-02 14:51:09 +02:00
allegroai
5c6b3ccc94 Version bump to v0.13.1 2020-01-27 19:45:26 +02:00
allegroai
df10e6ed46 Fix conda support to install graphviz packages even if matplotlib was installed from pip 2020-01-27 19:22:51 +02:00
allegroai
8ef78fd058 version bump 2020-01-27 16:23:23 +02:00
allegroai
640c83288a Add pip --disable-pip-version-check, to remove pip version warnings 2020-01-27 16:23:15 +02:00
allegroai
788c79a66f Support git repositories without ".git" suffix 2020-01-27 15:43:35 +02:00
allegroai
bef87c7744 Fix typos 2020-01-27 15:42:37 +02:00
allegroai
f139891276 version bump 2020-01-26 15:06:45 +02:00
allegroai
2afaff1713 Fix poetry support inside virtualenv with pyenv 2020-01-26 15:05:59 +02:00
allegroai
a57a5b151c Daemon support for conda and poetry 2020-01-26 15:05:20 +02:00
allegroai
97f446d523 Improve conda support for .post versions and bad packages 2020-01-26 13:58:50 +02:00
allegroai
a88262c097 version bump 2020-01-22 12:38:20 +02:00
allegroai
284271c654 Support limiting pip version, limit to <20 by default 2020-01-22 12:02:12 +02:00
allegroai
ae2775f7b8 Support poetry when agent is installed inside virtualenv 2020-01-22 11:22:43 +02:00
allegroai
eb012f5c24 version bump 2020-01-21 16:23:53 +02:00
allegroai
06897f7606 Fix poetry support 2020-01-21 16:23:36 +02:00
allegroai
599219b02d Add conda support 2020-01-21 16:21:18 +02:00
allegroai
b6e04ab982 Fix YAML warning 2020-01-21 16:19:43 +02:00
allegroai
98fe162878 Fix poetry support 2020-01-16 11:17:05 +02:00
allegroai
f829d80a49 version bump 2020-01-16 11:11:02 +02:00
allegroai
b7e568e299 Fix requirements handling and poetry support 2020-01-16 11:10:38 +02:00
allegroai
6912846326 version bump 2020-01-14 15:26:29 +02:00
allegroai
224868c9a4 Fix relative requirements "-e" support by installing from the code's cwd 2020-01-14 13:05:12 +02:00
allegroai
b1ca90a303 Run under virtualenv in AWS dynamic cluster management service 2020-01-14 11:44:20 +02:00
allegroai
dee2475698 Add build-essential for pip-installed packages requiring compilation in AWS dynamic cluster management service 2020-01-14 11:43:41 +02:00
allegroai
aeede81474 Fix trains.conf injection in AWS dynamic cluster management service 2020-01-14 11:40:57 +02:00
allegroai
2d91d4cde6 Add support for "-e ./folder" lines in requirements 2020-01-14 11:37:41 +02:00
allegroai
7a11c7c165 Make sure logs are sent even in case an exception occurs inside the logging monitor 2020-01-13 18:14:12 +02:00
allegroai
a9f479cfcd Add extra docker parameters bash script to use when running an experiment using a docker image 2020-01-13 12:17:59 +02:00
allegroai
c1d91b0d6a Use packaging instead of semantic_version 2020-01-13 12:14:43 +02:00
allegroai
cbfba6acb2 Do not try to check for virtualenv command, we use it as python package 2020-01-13 12:12:38 +02:00
allegroai
f2e2e1f94a Add configuration option to force docker pull 2020-01-13 12:11:06 +02:00
allegroai
23668a403a Add auto terminate, increased polling interval and default docker image in AWS dynamic cluster management service 2020-01-08 12:27:40 +02:00
allegroai
facbee0005 Version bump to v0.13.0 2020-01-06 15:27:12 +02:00
allegroai
c486cfd09f Add extra agent configuration and bash script for the AWS dynamic cluster management service 2020-01-06 15:26:55 +02:00
allegroai
119ecaa2e3 Fix AWS dynamic cluster management service use of AWS credentials 2020-01-05 14:12:40 +02:00
allegroai
d6cc2be653 Fix AWS dynamic cluster management service use of AWS credentials 2020-01-05 13:57:29 +02:00
allegroai
41d75df40c Add AWS dynamic cluster management service 2019-12-24 23:22:17 +02:00
allegroai
901c4be9ae Add AWS dynamic cluster management service 2019-12-24 23:09:26 +02:00
Allegro AI
966b14f914 Update README.md 2019-12-24 22:52:30 +02:00
allegroai
847d35cbbb Add AWS dynamic cluster management service 2019-12-24 22:48:44 +02:00
allegroai
4022cb5c63 Add AWS dynamic cluster management service 2019-12-24 22:35:26 +02:00
allegroai
2b239829de Add extra_index_url to the configuration wizard 2019-12-24 18:23:59 +02:00
allegroai
402856656f Support various events endpoints for APIClient 2019-12-24 18:10:40 +02:00
allegroai
7b94ff410c Documentation 2019-12-24 12:47:35 +02:00
allegroai
0a03dced50 Do not show urllib3 logging level as part of the agent's configuration dump 2019-12-21 18:23:17 +02:00
allegroai
ffe653afc6 Support docker pre-installed pytorch versions that do not exist on PyPI/PyTorch.org 2019-12-21 18:22:21 +02:00
46 changed files with 26161 additions and 250 deletions

View File

@@ -1,5 +1,5 @@
# TRAINS Agent
## Deep Learning DevOps For Everyone - Now supports all platforms (Linux, macOS, and Windows)
## Deep Learning DevOps For Everyone - Now supporting all platforms (Linux, macOS, and Windows)
"All the Deep-Learning DevOps your research needs, and then some... Because ain't nobody got time for that"
@@ -14,7 +14,7 @@ It is a zero configuration fire-and-forget execution agent, which combined with
**Full AutoML in 5 steps**
1. Install the [TRAINS server](https://github.com/allegroai/trains-agent) (or use our [open server](https://demoapp.trains.allegro.ai))
2. `pip install trains_agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
2. `pip install trains-agent` ([install](#installing-the-trains-agent) the TRAINS agent on any GPU machine: on-premises / cloud / ...)
3. Add [TRAINS](https://github.com/allegroai/trains) to your code with just 2 lines & run it once (on your machine / laptop)
4. Change the [parameters](#using-the-trains-agent) in the UI & schedule for [execution](#using-the-trains-agent) (or automate with an [AutoML pipeline](#automl-and-orchestration-pipelines-))
5. :chart_with_downwards_trend: :chart_with_upwards_trend: :eyes: :beer:
@@ -133,7 +133,7 @@ Development Machine |
### Installing the TRAINS Agent
```bash
pip install trains_agent
pip install trains-agent
```
### TRAINS Agent Usage Examples

View File

@@ -38,8 +38,12 @@ agent {
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
package_manager: {
# supported options: pip, conda
# supported options: pip, conda, poetry
type: pip,
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
# pip_version: "<20"
# virtual environment inheres packages from system
system_site_packages: false,
# install with --upgrade
@@ -83,6 +87,17 @@ agent {
# apt cache folder used mapped into docker, for ubuntu package caching
docker_apt_cache = ~/.trains/apt-cache
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# extra_docker_arguments: ["--ipc=host", ]
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda"
@@ -126,6 +141,9 @@ sdk {
quality: 87
subsampling: 0
}
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to True, each series should have its own graph)
tensorboard_single_series_per_graph: False
}
network {

View File

@@ -0,0 +1,59 @@
#!/usr/bin/python3
"""
An example script that cleans up failed experiments by moving them to the archive
"""
import argparse
from datetime import datetime
from trains_agent import APIClient
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--project", "-P", help="Project ID. Only clean up experiments from this project")
parser.add_argument("--user", "-U", help="User ID. Only clean up experiments assigned to this user")
parser.add_argument(
"--status", "-S", default="failed",
help="Experiment status. Only clean up experiments with this status (default %(default)s)"
)
parser.add_argument(
"--iterations", "-I", type=int,
help="Number of iterations. Only clean up experiments with less or equal number of iterations"
)
parser.add_argument(
"--sec-from-start", "-T", type=int,
help="Seconds from start time. "
"Only clean up experiments if less or equal number of seconds have elapsed since started"
)
args = parser.parse_args()
client = APIClient()
tasks = client.tasks.get_all(
project=[args.project] if args.project else None,
user=[args.user] if args.user else None,
status=[args.status] if args.status else None,
system_tags=["-archived"]
)
count = 0
for task in tasks:
if args.iterations and (task.last_iteration or 0) > args.iterations:
continue
if args.sec_from_start:
if not task.started:
continue
if (datetime.utcnow() - task.started.replace(tzinfo=None)).total_seconds() > args.sec_from_start:
continue
try:
client.tasks.edit(
task=task.id,
system_tags=(task.system_tags or []) + ["archived"],
force=True
)
count += 1
except Exception as ex:
print("Failed editing experiment: {}".format(ex))
print("Cleaned up {} experiments".format(count))

View File

@@ -0,0 +1,587 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Auto-Magically Spin AWS EC2 Instances On Demand \n",
"# and Create a Dynamic Cluster Running *Trains-Agent*\n",
"\n",
"### Define your budget and execute the notebook, that's it\n",
"### You now have a fully managed cluster on AWS 🎉 🎊 "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**trains-agent**'s main goal is to quickly pull a job from an execution queue, setup the environment (as defined in the experiment, including git cloning, python packages etc.) then execute the experiment and monitor it.\n",
"\n",
"This notebook defines a cloud budget (currently only AWS is supported, but feel free to expand with PRs), and spins an instance the minute a job is waiting for execution. It will also spin down idle machines, saving you some $$$ :)\n",
"\n",
"Configuration steps\n",
"- Define maximum budget to be used (instance type / number of instances).\n",
"- Create new execution *queues* in the **trains-server**.\n",
"- Define mapping between the created the *queues* and an instance budget.\n",
"\n",
"**TL;DR - This notebook:**\n",
"- Will spin instances if there are jobs in the execution *queues*, until it will hit the budget limit. \n",
"- If machines are idle, it will spin them down.\n",
"\n",
"The controller implementation itself is stateless, meaning you can always re-execute the notebook, if for some reason it stopped.\n",
"\n",
"It is as simple as it sounds, but extremely powerful\n",
"\n",
"Enjoy your newly created dynamic cluster :)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Install & import required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!pip install trains-agent\n",
"!pip install boto3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Define AWS instance types and configuration (Instance Type, EBS, AMI etc.)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# AWS EC2 machines types - default AMI - NVIDIA Deep Learning AMI 19.11.3\n",
"RESOURCE_CONFIGURATIONS = {\n",
" \"amazon_ec2_normal\": {\n",
" \"instance_type\": \"g4dn.4xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
" \"amazon_ec2_high\": {\n",
" \"instance_type\": \"g4dn.8xlarge\",\n",
" \"is_spot\": False,\n",
" \"availability_zone\": \"us-east-1b\",\n",
" \"ami_id\": \"ami-07c95cafbb788face\",\n",
" \"ebs_device_name\": \"/dev/xvda\",\n",
" \"ebs_volume_size\": 100,\n",
" \"ebs_volume_type\": \"gp2\",\n",
" },\n",
"}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Define machine budget per execution queue\n",
"\n",
"Now that we defined our budget, we need to connect it with the **Trains** cluster.\n",
"\n",
"We map each queue to a resource type (instance type).\n",
"\n",
"Create two queues in the WebUI:\n",
"- Browse to http://your_trains_server_ip:8080/workers-and-queues/queues\n",
"- Then click on the \"New Queue\" button and name your queues \"aws_normal\" and \"aws_high\" respectively\n",
"\n",
"The QUEUES dictionary hold the mapping between the queue name and the type/number of instances to spin connected to the specific queue.\n",
"```\n",
"QUEUES = {\n",
" 'queue_name': [(\"instance-type-as-defined-in-RESOURCE_CONFIGURATIONS\", max_number_of_instances), ]\n",
"}\n",
"```\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# Trains-Agent Queues - Machines budget per Queue\n",
"# Per queue: list of (machine type as defined in RESOURCE_CONFIGURATIONS,\n",
"# max instances for the specific queue). Order machines from most preferred to least.\n",
"QUEUES = {\n",
" \"aws_normal\": [(\"amazon_ec2_normal\", 2),],\n",
" \"aws_high\": [(\"amazon_ec2_high\", 1)],\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Credentials for your AWS account, as well as for your **Trains-Server**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# AWS credentials (leave empty to use credentials set using the aws cli)\n",
"CLOUD_CREDENTIALS_KEY = \"\"\n",
"CLOUD_CREDENTIALS_SECRET = \"\"\n",
"CLOUD_CREDENTIALS_REGION = \"us-east-1\"\n",
"\n",
"# TRAINS configuration\n",
"TRAINS_SERVER_WEB_SERVER = \"http://localhost:8080\"\n",
"TRAINS_SERVER_API_SERVER = \"http://localhost:8008\"\n",
"TRAINS_SERVER_FILES_SERVER = \"http://localhost:8081\"\n",
"# TRAINS credentials\n",
"TRAINS_ACCESS_KEY = \"\"\n",
"TRAINS_SECRET_KEY = \"\"\n",
"# Git User/Pass to be used by trains-agent,\n",
"# leave empty if image already contains git ssh-key\n",
"TRAINS_GIT_USER = \"\"\n",
"TRAINS_GIT_PASS = \"\"\n",
"\n",
"# Additional fields for trains.conf file created on the remote instance\n",
"# for example: 'agent.default_docker.image: \"nvidia/cuda:10.0-cudnn7-runtime\"'\n",
"EXTRA_TRAINS_CONF = \"\"\"\n",
"\"\"\"\n",
"\n",
"# Bash script to run on instances before running trains-agent\n",
"# Example: \"\"\"\n",
"# echo \"This is the first line\"\n",
"# echo \"This is the second line\"\n",
"# \"\"\"\n",
"EXTRA_BASH_SCRIPT = \"\"\"\n",
"\"\"\"\n",
"\n",
"# Default docker for trains-agent when running in docker mode (requires docker v19.03 and above). \n",
"# Leave empty to run trains-agent in non-docker mode.\n",
"DEFAULT_DOCKER_IMAGE = \"nvidia/cuda\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Controller Internal Definitions\n",
"\n",
"# maximum idle time in minutes, after which the instance will be shutdown\n",
"MAX_IDLE_TIME_MIN = 15\n",
"# polling interval in minutes\n",
"# make sure to increase in case bash commands were added in EXTRA_BASH_SCRIPT\n",
"POLLING_INTERVAL_MIN = 5.0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Import Packages and Budget Definition Sanity Check"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import base64\n",
"import re\n",
"import os\n",
"from itertools import chain\n",
"from operator import itemgetter\n",
"from time import sleep, time\n",
"\n",
"import boto3\n",
"from trains_agent.backend_api.session.client import APIClient"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Sanity Check - Validate Queue Resources\n",
"if len(set(map(itemgetter(0), chain(*QUEUES.values())))) != sum(\n",
" map(len, QUEUES.values())\n",
"):\n",
" print(\n",
" \"Error: at least one resource name is used in multiple queues. \"\n",
" \"A resource name can only appear in a single queue definition.\"\n",
" )\n",
"\n",
"# Encode EXTRA_TRAINS_CONF for later bash script usage\n",
"EXTRA_TRAINS_CONF_ENCODED = \"\\\\\\\"\".join(EXTRA_TRAINS_CONF.split(\"\\\"\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Cloud specific implementation of spin up/down - currently supports AWS only"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_up_worker(resource, worker_id_prefix, queue_name):\n",
" \"\"\"\n",
" Creates a new worker for trains.\n",
" First, create an instance in the cloud and install some required packages.\n",
" Then, define trains-agent environment variables and run \n",
" trains-agent for the specified queue.\n",
" NOTE: - Will wait until instance is running\n",
" - This implementation assumes the instance image already has docker installed\n",
"\n",
" :param str resource: resource name, as defined in BUDGET and QUEUES.\n",
" :param str worker_id_prefix: worker name prefix\n",
" :param str queue_name: trains queue to listen to\n",
" \"\"\"\n",
" resource_conf = RESOURCE_CONFIGURATIONS[resource]\n",
" # Add worker type and AWS instance type to the worker name.\n",
" worker_id = \"{worker_id_prefix}:{worker_type}:{instance_type}\".format(\n",
" worker_id_prefix=worker_id_prefix,\n",
" worker_type=resource,\n",
" instance_type=resource_conf[\"instance_type\"],\n",
" )\n",
"\n",
" # user_data script will automatically run when the instance is started. \n",
" # It will install the required packages for trains-agent configure it using \n",
" # environment variables and run trains-agent on the required queue\n",
" user_data = \"\"\"#!/bin/bash\n",
" sudo apt-get update\n",
" sudo apt-get install -y python3-dev\n",
" sudo apt-get install -y python3-pip\n",
" sudo apt-get install -y gcc\n",
" sudo apt-get install -y git\n",
" sudo apt-get install -y build-essential\n",
" python3 -m pip install -U pip\n",
" python3 -m pip install virtualenv\n",
" python3 -m virtualenv trains_agent_venv\n",
" source trains_agent_venv/bin/activate\n",
" python -m pip install trains-agent\n",
" echo 'agent.git_user=\\\"{git_user}\\\"' >> /root/trains.conf\n",
" echo 'agent.git_pass=\\\"{git_pass}\\\"' >> /root/trains.conf\n",
" echo \"{trains_conf}\" >> /root/trains.conf\n",
" export TRAINS_API_HOST={api_server}\n",
" export TRAINS_WEB_HOST={web_server}\n",
" export TRAINS_FILES_HOST={files_server}\n",
" export DYNAMIC_INSTANCE_ID=`curl http://169.254.169.254/latest/meta-data/instance-id`\n",
" export TRAINS_WORKER_ID={worker_id}:$DYNAMIC_INSTANCE_ID\n",
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
" {bash_script}\n",
" source ~/.bashrc\n",
" python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}\n",
" shutdown\n",
" \"\"\".format(\n",
" api_server=TRAINS_SERVER_API_SERVER,\n",
" web_server=TRAINS_SERVER_WEB_SERVER,\n",
" files_server=TRAINS_SERVER_FILES_SERVER,\n",
" worker_id=worker_id,\n",
" access_key=TRAINS_ACCESS_KEY,\n",
" secret_key=TRAINS_SECRET_KEY,\n",
" queue=queue_name,\n",
" git_user=TRAINS_GIT_USER,\n",
" git_pass=TRAINS_GIT_PASS,\n",
" trains_conf=EXTRA_TRAINS_CONF_ENCODED,\n",
" bash_script=EXTRA_BASH_SCRIPT,\n",
" docker=\"--docker '{}'\".format(DEFAULT_DOCKER_IMAGE) if DEFAULT_DOCKER_IMAGE else \"\"\n",
" )\n",
"\n",
" ec2 = boto3.client(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" )\n",
"\n",
" if resource_conf[\"is_spot\"]:\n",
" # Create a request for a spot instance in AWS\n",
" encoded_user_data = base64.b64encode(user_data.encode(\"ascii\")).decode(\"ascii\")\n",
" instances = ec2.request_spot_instances(\n",
" LaunchSpecification={\n",
" \"ImageId\": resource_conf[\"ami_id\"],\n",
" \"InstanceType\": resource_conf[\"instance_type\"],\n",
" \"Placement\": {\"AvailabilityZone\": resource_conf[\"availability_zone\"]},\n",
" \"UserData\": encoded_user_data,\n",
" \"BlockDeviceMappings\": [\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" }\n",
" )\n",
"\n",
" # Wait until spot request is fulfilled\n",
" request_id = instances[\"SpotInstanceRequests\"][0][\"SpotInstanceRequestId\"]\n",
" waiter = ec2.get_waiter(\"spot_instance_request_fulfilled\")\n",
" waiter.wait(SpotInstanceRequestIds=[request_id])\n",
" # Get the instance object for later use\n",
" response = ec2.describe_spot_instance_requests(\n",
" SpotInstanceRequestIds=[request_id]\n",
" )\n",
" instance_id = response[\"SpotInstanceRequests\"][0][\"InstanceId\"]\n",
"\n",
" else:\n",
" # Create a new EC2 instance\n",
" instances = ec2.run_instances(\n",
" ImageId=resource_conf[\"ami_id\"],\n",
" MinCount=1,\n",
" MaxCount=1,\n",
" InstanceType=resource_conf[\"instance_type\"],\n",
" UserData=user_data,\n",
" InstanceInitiatedShutdownBehavior='terminate',\n",
" BlockDeviceMappings=[\n",
" {\n",
" \"DeviceName\": resource_conf[\"ebs_device_name\"],\n",
" \"Ebs\": {\n",
" \"VolumeSize\": resource_conf[\"ebs_volume_size\"],\n",
" \"VolumeType\": resource_conf[\"ebs_volume_type\"],\n",
" },\n",
" }\n",
" ],\n",
" )\n",
"\n",
" # Get the instance object for later use\n",
" instance_id = instances[\"Instances\"][0][\"InstanceId\"]\n",
"\n",
" instance = boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).Instance(instance_id)\n",
"\n",
" # Wait until instance is in running state\n",
" instance.wait_until_running()\n",
"\n",
"\n",
"# Cloud-specific implementation (currently, only AWS EC2 is supported)\n",
"def spin_down_worker(instance_id):\n",
" \"\"\"\n",
" Destroys the cloud instance.\n",
"\n",
" :param str instance_id: Cloud instance ID to be destroyed \n",
" (currently, only AWS EC2 is supported)\n",
" \"\"\"\n",
" try:\n",
" boto3.resource(\n",
" \"ec2\",\n",
" aws_access_key_id=CLOUD_CREDENTIALS_KEY or None,\n",
" aws_secret_access_key=CLOUD_CREDENTIALS_SECRET or None,\n",
" region_name=CLOUD_CREDENTIALS_REGION\n",
" ).instances.filter(InstanceIds=[instance_id]).terminate()\n",
" except Exception as ex:\n",
" raise ex"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"###### Controller Implementation and Logic"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def supervisor():\n",
" \"\"\"\n",
" Spin up or down resources as necessary.\n",
" - For every queue in QUEUES do the following:\n",
" 1. Check if there are tasks waiting in the queue.\n",
" 2. Check if there are enough idle workers available for those tasks.\n",
" 3. In case more instances are required, and we haven't reached max instances allowed,\n",
" create the required instances with regards to the maximum number defined in QUEUES\n",
" Choose which instance to create according to their order QUEUES. Won't create \n",
" more instances if maximum number defined has already reached.\n",
" - spin down instances according to their idle time. instance which is idle for \n",
" more than MAX_IDLE_TIME_MIN minutes would be removed.\n",
" \"\"\"\n",
"\n",
" # Internal definitions\n",
" workers_prefix = \"dynamic_aws\"\n",
" # Worker's id in trains would be composed from:\n",
" # prefix, name, instance_type and cloud_id separated by ';'\n",
" workers_pattern = re.compile(\n",
" r\"^(?P<prefix>[^:]+):(?P<name>[^:]+):(?P<instance_type>[^:]+):(?P<cloud_id>[^:]+)\"\n",
" )\n",
"\n",
" # Set up the environment variables for trains\n",
" os.environ[\"TRAINS_API_HOST\"] = TRAINS_SERVER_API_SERVER\n",
" os.environ[\"TRAINS_WEB_HOST\"] = TRAINS_SERVER_WEB_SERVER\n",
" os.environ[\"TRAINS_FILES_HOST\"] = TRAINS_SERVER_FILES_SERVER\n",
" os.environ[\"TRAINS_API_ACCESS_KEY\"] = TRAINS_ACCESS_KEY\n",
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
" api_client = APIClient()\n",
"\n",
" # Verify the requested queues exist and create those that doesn't exist\n",
" all_queues = [q.name for q in list(api_client.queues.get_all())]\n",
" missing_queues = [q for q in QUEUES if q not in all_queues]\n",
" for q in missing_queues:\n",
" api_client.queues.create(q)\n",
"\n",
" idle_workers = {}\n",
" while True:\n",
" queue_name_to_id = {\n",
" queue.name: queue.id for queue in api_client.queues.get_all()\n",
" }\n",
" resource_to_queue = {\n",
" item[0]: queue\n",
" for queue, resources in QUEUES.items()\n",
" for item in resources\n",
" }\n",
" all_workers = [\n",
" worker\n",
" for worker in api_client.workers.get_all()\n",
" if workers_pattern.match(worker.id)\n",
" and workers_pattern.match(worker.id)[\"prefix\"] == workers_prefix\n",
" ]\n",
"\n",
" # Workers without a task, are added to the idle list\n",
" for worker in all_workers:\n",
" if not hasattr(worker, \"task\") or not worker.task:\n",
" if worker.id not in idle_workers:\n",
" resource_name = workers_pattern.match(worker.id)[\"instance_type\"]\n",
" idle_workers[worker.id] = (time(), resource_name, worker)\n",
" elif hasattr(worker, \"task\") and worker.task and worker.id in idle_workers:\n",
" idle_workers.pop(worker.id, None)\n",
"\n",
" required_idle_resources = [] # idle resources we'll need to keep running\n",
" allocate_new_resources = [] # resources that will need to be started\n",
" # Check if we have tasks waiting on one of the designated queues\n",
" for queue in QUEUES:\n",
" entries = api_client.queues.get_by_id(queue_name_to_id[queue]).entries\n",
" if entries and len(entries) > 0:\n",
" queue_resources = QUEUES[queue]\n",
"\n",
" # If we have an idle worker matching the required resource,\n",
" # remove it from the required allocation resources\n",
" free_queue_resources = [\n",
" resource\n",
" for _, resource, _ in idle_workers.values()\n",
" if resource in queue_resources\n",
" ]\n",
" required_idle_resources.extend(free_queue_resources)\n",
" spin_up_count = len(entries) - len(free_queue_resources)\n",
" spin_up_resources = []\n",
"\n",
" # Add as many resources as possible to handle this queue's entries\n",
" for resource, max_instances in queue_resources:\n",
" if len(spin_up_resources) >= spin_up_count:\n",
" break\n",
" max_allowed = max_instances - len(\n",
" [\n",
" worker\n",
" for worker in all_workers\n",
" if workers_pattern.match(worker.id)[\"name\"] == resource\n",
" ]\n",
" )\n",
" spin_up_resources.extend(\n",
" [resource] * min(max_allowed, spin_up_count)\n",
" )\n",
" allocate_new_resources.extend(spin_up_resources)\n",
"\n",
" # Now we actually spin the new machines\n",
" for resource in allocate_new_resources:\n",
" spin_up_worker(resource, workers_prefix, resource_to_queue[resource])\n",
"\n",
" # Go over the idle workers list, and spin down idle workers\n",
" for timestamp, resources, worker in idle_workers.values():\n",
" # skip resource types that might be needed\n",
" if resources in required_idle_resources:\n",
" continue\n",
" # Remove from both aws and trains all instances that are \n",
" # idle for longer than MAX_IDLE_TIME_MIN\n",
" if time() - timestamp > MAX_IDLE_TIME_MIN * 60.0:\n",
" cloud_id = workers_pattern.match(worker.id)[\"cloud_id\"]\n",
" spin_down_worker(cloud_id)\n",
" worker.unregister()\n",
"\n",
" # Nothing else to do\n",
" sleep(POLLING_INTERVAL_MIN * 60.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"##### Execute Forever* (the controller is stateless, so you can always re-execute the notebook)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Loop forever, it is okay we are stateless\n",
"while True:\n",
" try:\n",
" supervisor()\n",
" except Exception as ex:\n",
" print(\"Warning! exception occurred: {ex}\\nRetry in 15 seconds\".format(ex=ex))\n",
" sleep(15)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.0"
},
"pycharm": {
"stem_cell": {
"cell_type": "raw",
"source": [],
"metadata": {
"collapsed": false
}
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -15,9 +15,8 @@ PyYAML>=3.12
requests-file>=1.4.2
requests>=2.20.0
requirements_parser>=0.2.0
semantic_version>=2.6.0
six>=1.11.0
tqdm>=4.19.5
typing>=3.6.4
urllib3>=1.21.1
virtualenv>=16
virtualenv>=16,<20

View File

@@ -35,7 +35,7 @@ def trains_agentyaml(tmpdir):
def _method(template_file):
file = tmpdir.join("trains_agent.yaml")
with (PROJECT_ROOT / "tests/templates" / template_file).open() as f:
code = yaml.load(f)
code = yaml.load(f, Loader=yaml.SafeLoader)
yield Namespace(code=code, file=file.strpath)
file.write(yaml.dump(code))
return _method

View File

@@ -11,7 +11,7 @@ from contextlib import contextmanager
from typing import Iterator, ContextManager, Sequence, IO, Text
from uuid import uuid4
from trains_agent.backend_api.services.tasks import Script
from trains_agent.backend_api.services import tasks
from trains_agent.backend_api.session.client import APIClient
from pathlib2 import Path
from pytest import fixture
@@ -154,7 +154,7 @@ def test_entry_point_warning(client):
"""
with create_task(
client,
script=Script(diff="print('hello')", entry_point="foo.py", repository=""),
script=tasks.Script(diff="print('hello')", entry_point="foo.py", repository=""),
**DEFAULT_TASK_ARGS
) as task, iterate_output(SHORT_TIMEOUT, run_task(task)) as output:
for line in output:
@@ -172,7 +172,7 @@ def test_run_no_dirs(client):
script = "print('{}')".format(uuid)
with create_task(
client,
script=Script(diff=script, entry_point="", repository="", working_dir=""),
script=tasks.Script(diff=script, entry_point="", repository="", working_dir=""),
**DEFAULT_TASK_ARGS
) as task, iterate_output(SHORT_TIMEOUT, run_task(task)) as output:
search_lines(
@@ -196,7 +196,7 @@ def test_run_working_dir(client):
script = "print('{}')".format(uuid)
with create_task(
client,
script=Script(
script=tasks.Script(
diff=script,
entry_point="",
repository="git@bitbucket.org:seematics/roee_test_git.git",
@@ -223,7 +223,7 @@ def test_regular_task(client):
"""
with create_task(
client,
script=Script(
script=tasks.Script(
entry_point="noop.py",
repository="git@bitbucket.org:seematics/roee_test_git.git",
),
@@ -241,7 +241,7 @@ def test_regular_task_nested(client):
"""
with create_task(
client,
script=Script(
script=tasks.Script(
entry_point="noop_nested.py",
working_dir="no_reqs",
repository="git@bitbucket.org:seematics/roee_test_git.git",

View File

@@ -1 +1 @@
from .backend_api.session.client import APIClient

View File

@@ -22,9 +22,12 @@
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
package_manager: {
# supported options: pip, conda
# supported options: pip, conda, poetry
type: pip,
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
pip_version: "<20",
# virtual environment inheres packages from system
system_site_packages: false,
@@ -33,7 +36,6 @@
# additional artifact repositories to use when installing python packages
# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]
extra_index_url: []
# additional conda channels to use when installing with conda package manager
conda_channels: ["defaults", "conda-forge", "pytorch", ]
@@ -69,6 +71,17 @@
# apt cache folder used mapped into docker, for ubuntu package caching
docker_apt_cache = ~/.trains/apt-cache
# optional arguments to pass to docker image
# these are local for this agent and will not be updated in the experiment's docker_cmd section
# extra_docker_arguments: ["--ipc=host", ]
# optional shell script to run in docker when started before the experiment is started
# extra_docker_shell_script: ["apt-get install -y bindfs", ]
# set to true in order to force "docker pull" before running an experiment using a docker image.
# This makes sure the docker image is updated.
docker_force_pull: false
default_docker: {
# default docker image to use when running in docker mode
image: "nvidia/cuda"

View File

@@ -1,8 +1,10 @@
from .v2_4 import auth
from .v2_4 import debug
from .v2_4 import queues
from .v2_4 import tasks
from .v2_4 import workers
from .v2_5 import auth
from .v2_5 import debug
from .v2_5 import queues
from .v2_5 import tasks
from .v2_5 import workers
from .v2_5 import events
from .v2_5 import models
__all__ = [
'auth',
@@ -10,4 +12,6 @@ __all__ = [
'queues',
'tasks',
'workers',
'events',
'models',
]

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,623 @@
"""
auth service
This service provides authentication management and authorization
validation for the entire system.
"""
import six
import types
from datetime import datetime
import enum
from dateutil.parser import parse as parse_datetime
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
class Credentials(NonStrictDataModel):
"""
:param access_key: Credentials access key
:type access_key: str
:param secret_key: Credentials secret key
:type secret_key: str
"""
_schema = {
'properties': {
'access_key': {
'description': 'Credentials access key',
'type': ['string', 'null'],
},
'secret_key': {
'description': 'Credentials secret key',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, access_key=None, secret_key=None, **kwargs):
super(Credentials, self).__init__(**kwargs)
self.access_key = access_key
self.secret_key = secret_key
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
@schema_property('secret_key')
def secret_key(self):
return self._property_secret_key
@secret_key.setter
def secret_key(self, value):
if value is None:
self._property_secret_key = None
return
self.assert_isinstance(value, "secret_key", six.string_types)
self._property_secret_key = value
class CredentialKey(NonStrictDataModel):
"""
:param access_key:
:type access_key: str
:param last_used:
:type last_used: datetime.datetime
:param last_used_from:
:type last_used_from: str
"""
_schema = {
'properties': {
'access_key': {'description': '', 'type': ['string', 'null']},
'last_used': {
'description': '',
'format': 'date-time',
'type': ['string', 'null'],
},
'last_used_from': {'description': '', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, access_key=None, last_used=None, last_used_from=None, **kwargs):
super(CredentialKey, self).__init__(**kwargs)
self.access_key = access_key
self.last_used = last_used
self.last_used_from = last_used_from
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
@schema_property('last_used')
def last_used(self):
return self._property_last_used
@last_used.setter
def last_used(self, value):
if value is None:
self._property_last_used = None
return
self.assert_isinstance(value, "last_used", six.string_types + (datetime,))
if not isinstance(value, datetime):
value = parse_datetime(value)
self._property_last_used = value
@schema_property('last_used_from')
def last_used_from(self):
return self._property_last_used_from
@last_used_from.setter
def last_used_from(self, value):
if value is None:
self._property_last_used_from = None
return
self.assert_isinstance(value, "last_used_from", six.string_types)
self._property_last_used_from = value
class CreateCredentialsRequest(Request):
"""
Creates a new set of credentials for the authenticated user.
New key/secret is returned.
Note: Secret will never be returned in any other API call.
If a secret is lost or compromised, the key should be revoked
and a new set of credentials can be created.
"""
_service = "auth"
_action = "create_credentials"
_version = "2.1"
_schema = {
'additionalProperties': False,
'definitions': {},
'properties': {},
'type': 'object',
}
class CreateCredentialsResponse(Response):
"""
Response of auth.create_credentials endpoint.
:param credentials: Created credentials
:type credentials: Credentials
"""
_service = "auth"
_action = "create_credentials"
_version = "2.1"
_schema = {
'definitions': {
'credentials': {
'properties': {
'access_key': {
'description': 'Credentials access key',
'type': ['string', 'null'],
},
'secret_key': {
'description': 'Credentials secret key',
'type': ['string', 'null'],
},
},
'type': 'object',
},
},
'properties': {
'credentials': {
'description': 'Created credentials',
'oneOf': [{'$ref': '#/definitions/credentials'}, {'type': 'null'}],
},
},
'type': 'object',
}
def __init__(
self, credentials=None, **kwargs):
super(CreateCredentialsResponse, self).__init__(**kwargs)
self.credentials = credentials
@schema_property('credentials')
def credentials(self):
return self._property_credentials
@credentials.setter
def credentials(self, value):
if value is None:
self._property_credentials = None
return
if isinstance(value, dict):
value = Credentials.from_dict(value)
else:
self.assert_isinstance(value, "credentials", Credentials)
self._property_credentials = value
class EditUserRequest(Request):
"""
Edit a users' auth data properties
:param user: User ID
:type user: str
:param role: The new user's role within the company
:type role: str
"""
_service = "auth"
_action = "edit_user"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'role': {
'description': "The new user's role within the company",
'enum': ['admin', 'superuser', 'user', 'annotator'],
'type': ['string', 'null'],
},
'user': {'description': 'User ID', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, user=None, role=None, **kwargs):
super(EditUserRequest, self).__init__(**kwargs)
self.user = user
self.role = role
@schema_property('user')
def user(self):
return self._property_user
@user.setter
def user(self, value):
if value is None:
self._property_user = None
return
self.assert_isinstance(value, "user", six.string_types)
self._property_user = value
@schema_property('role')
def role(self):
return self._property_role
@role.setter
def role(self, value):
if value is None:
self._property_role = None
return
self.assert_isinstance(value, "role", six.string_types)
self._property_role = value
class EditUserResponse(Response):
"""
Response of auth.edit_user endpoint.
:param updated: Number of users updated (0 or 1)
:type updated: float
:param fields: Updated fields names and values
:type fields: dict
"""
_service = "auth"
_action = "edit_user"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'fields': {
'additionalProperties': True,
'description': 'Updated fields names and values',
'type': ['object', 'null'],
},
'updated': {
'description': 'Number of users updated (0 or 1)',
'enum': [0, 1],
'type': ['number', 'null'],
},
},
'type': 'object',
}
def __init__(
self, updated=None, fields=None, **kwargs):
super(EditUserResponse, self).__init__(**kwargs)
self.updated = updated
self.fields = fields
@schema_property('updated')
def updated(self):
return self._property_updated
@updated.setter
def updated(self, value):
if value is None:
self._property_updated = None
return
self.assert_isinstance(value, "updated", six.integer_types + (float,))
self._property_updated = value
@schema_property('fields')
def fields(self):
return self._property_fields
@fields.setter
def fields(self, value):
if value is None:
self._property_fields = None
return
self.assert_isinstance(value, "fields", (dict,))
self._property_fields = value
class GetCredentialsRequest(Request):
"""
Returns all existing credential keys for the authenticated user.
Note: Only credential keys are returned.
"""
_service = "auth"
_action = "get_credentials"
_version = "2.1"
_schema = {
'additionalProperties': False,
'definitions': {},
'properties': {},
'type': 'object',
}
class GetCredentialsResponse(Response):
"""
Response of auth.get_credentials endpoint.
:param credentials: List of credentials, each with an empty secret field.
:type credentials: Sequence[CredentialKey]
"""
_service = "auth"
_action = "get_credentials"
_version = "2.1"
_schema = {
'definitions': {
'credential_key': {
'properties': {
'access_key': {'description': '', 'type': ['string', 'null']},
'last_used': {
'description': '',
'format': 'date-time',
'type': ['string', 'null'],
},
'last_used_from': {
'description': '',
'type': ['string', 'null'],
},
},
'type': 'object',
},
},
'properties': {
'credentials': {
'description': 'List of credentials, each with an empty secret field.',
'items': {'$ref': '#/definitions/credential_key'},
'type': ['array', 'null'],
},
},
'type': 'object',
}
def __init__(
self, credentials=None, **kwargs):
super(GetCredentialsResponse, self).__init__(**kwargs)
self.credentials = credentials
@schema_property('credentials')
def credentials(self):
return self._property_credentials
@credentials.setter
def credentials(self, value):
if value is None:
self._property_credentials = None
return
self.assert_isinstance(value, "credentials", (list, tuple))
if any(isinstance(v, dict) for v in value):
value = [CredentialKey.from_dict(v) if isinstance(v, dict) else v for v in value]
else:
self.assert_isinstance(value, "credentials", CredentialKey, is_array=True)
self._property_credentials = value
class LoginRequest(Request):
"""
Get a token based on supplied credentials (key/secret).
Intended for use by users with key/secret credentials that wish to obtain a token
for use with other services. Token will be limited by the same permissions that
exist for the credentials used in this call.
:param expiration_sec: Requested token expiration time in seconds. Not
guaranteed, might be overridden by the service
:type expiration_sec: int
"""
_service = "auth"
_action = "login"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'expiration_sec': {
'description': 'Requested token expiration time in seconds. \n Not guaranteed, might be overridden by the service',
'type': ['integer', 'null'],
},
},
'type': 'object',
}
def __init__(
self, expiration_sec=None, **kwargs):
super(LoginRequest, self).__init__(**kwargs)
self.expiration_sec = expiration_sec
@schema_property('expiration_sec')
def expiration_sec(self):
return self._property_expiration_sec
@expiration_sec.setter
def expiration_sec(self, value):
if value is None:
self._property_expiration_sec = None
return
if isinstance(value, float) and value.is_integer():
value = int(value)
self.assert_isinstance(value, "expiration_sec", six.integer_types)
self._property_expiration_sec = value
class LoginResponse(Response):
"""
Response of auth.login endpoint.
:param token: Token string
:type token: str
"""
_service = "auth"
_action = "login"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'token': {'description': 'Token string', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, token=None, **kwargs):
super(LoginResponse, self).__init__(**kwargs)
self.token = token
@schema_property('token')
def token(self):
return self._property_token
@token.setter
def token(self, value):
if value is None:
self._property_token = None
return
self.assert_isinstance(value, "token", six.string_types)
self._property_token = value
class LogoutRequest(Request):
"""
Removes the authentication cookie from the current session
"""
_service = "auth"
_action = "logout"
_version = "2.2"
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
class LogoutResponse(Response):
"""
Response of auth.logout endpoint.
"""
_service = "auth"
_action = "logout"
_version = "2.2"
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
class RevokeCredentialsRequest(Request):
"""
Revokes (and deletes) a set (key, secret) of credentials for
the authenticated user.
:param access_key: Credentials key
:type access_key: str
"""
_service = "auth"
_action = "revoke_credentials"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'access_key': {
'description': 'Credentials key',
'type': ['string', 'null'],
},
},
'required': ['key_id'],
'type': 'object',
}
def __init__(
self, access_key=None, **kwargs):
super(RevokeCredentialsRequest, self).__init__(**kwargs)
self.access_key = access_key
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
class RevokeCredentialsResponse(Response):
"""
Response of auth.revoke_credentials endpoint.
:param revoked: Number of credentials revoked
:type revoked: int
"""
_service = "auth"
_action = "revoke_credentials"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'revoked': {
'description': 'Number of credentials revoked',
'enum': [0, 1],
'type': ['integer', 'null'],
},
},
'type': 'object',
}
def __init__(
self, revoked=None, **kwargs):
super(RevokeCredentialsResponse, self).__init__(**kwargs)
self.revoked = revoked
@schema_property('revoked')
def revoked(self):
return self._property_revoked
@revoked.setter
def revoked(self, value):
if value is None:
self._property_revoked = None
return
if isinstance(value, float) and value.is_integer():
value = int(value)
self.assert_isinstance(value, "revoked", six.integer_types)
self._property_revoked = value
response_mapping = {
LoginRequest: LoginResponse,
LogoutRequest: LogoutResponse,
CreateCredentialsRequest: CreateCredentialsResponse,
GetCredentialsRequest: GetCredentialsResponse,
RevokeCredentialsRequest: RevokeCredentialsResponse,
EditUserRequest: EditUserResponse,
}

View File

@@ -0,0 +1,194 @@
"""
debug service
Debugging utilities
"""
import six
import types
from datetime import datetime
import enum
from dateutil.parser import parse as parse_datetime
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
class ApiexRequest(Request):
"""
"""
_service = "debug"
_action = "apiex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
class ApiexResponse(Response):
"""
Response of debug.apiex endpoint.
"""
_service = "debug"
_action = "apiex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class EchoRequest(Request):
"""
Return request data
"""
_service = "debug"
_action = "echo"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class EchoResponse(Response):
"""
Response of debug.echo endpoint.
"""
_service = "debug"
_action = "echo"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class ExRequest(Request):
"""
"""
_service = "debug"
_action = "ex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
class ExResponse(Response):
"""
Response of debug.ex endpoint.
"""
_service = "debug"
_action = "ex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingRequest(Request):
"""
Return a message. Does not require authorization.
"""
_service = "debug"
_action = "ping"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingResponse(Response):
"""
Response of debug.ping endpoint.
:param msg: A friendly message
:type msg: str
"""
_service = "debug"
_action = "ping"
_version = "1.5"
_schema = {
'definitions': {},
'properties': {
'msg': {
'description': 'A friendly message',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, msg=None, **kwargs):
super(PingResponse, self).__init__(**kwargs)
self.msg = msg
@schema_property('msg')
def msg(self):
return self._property_msg
@msg.setter
def msg(self, value):
if value is None:
self._property_msg = None
return
self.assert_isinstance(value, "msg", six.string_types)
self._property_msg = value
class PingAuthRequest(Request):
"""
Return a message. Requires authorization.
"""
_service = "debug"
_action = "ping_auth"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingAuthResponse(Response):
"""
Response of debug.ping_auth endpoint.
:param msg: A friendly message
:type msg: str
"""
_service = "debug"
_action = "ping_auth"
_version = "1.5"
_schema = {
'definitions': {},
'properties': {
'msg': {
'description': 'A friendly message',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, msg=None, **kwargs):
super(PingAuthResponse, self).__init__(**kwargs)
self.msg = msg
@schema_property('msg')
def msg(self):
return self._property_msg
@msg.setter
def msg(self, value):
if value is None:
self._property_msg = None
return
self.assert_isinstance(value, "msg", six.string_types)
self._property_msg = value
response_mapping = {
EchoRequest: EchoResponse,
PingRequest: PingResponse,
PingAuthRequest: PingAuthResponse,
ApiexRequest: ApiexResponse,
ExRequest: ExResponse,
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -139,13 +139,25 @@ class Response(object):
:param dest: if all of a response's data is contained in one field, use that field
:type dest: Text
"""
self.response = None
self._result = result
response = getattr(result, "response", result)
if getattr(response, "_service") == "events" and \
getattr(response, "_action") in ("scalar_metrics_iter_histogram",
"multi_task_scalar_metrics_iter_histogram",
"vector_metrics_iter_histogram",
):
# put all the response data under metrics:
response.metrics = result.response_data
if 'metrics' not in response.__class__._get_data_props():
response.__class__._data_props_list['metrics'] = 'metrics'
if dest:
response = getattr(response, dest)
self.response = response
def __getattr__(self, attr):
if self.response is None:
return None
return getattr(self.response, attr)
@property
@@ -493,6 +505,7 @@ class APIClient(object):
queues = None # type: Any
tasks = None # type: Any
workers = None # type: Any
events = None # type: Any
def __init__(self, session=None, api_version=None):
self.session = session or StrictSession()

View File

@@ -150,6 +150,18 @@ def main():
git_user = None
git_pass = None
# get extra-index-url for pip installations
extra_index_urls = []
print('\nEnter additional artifact repository (extra-index-url) to use when installing python packages '
'(leave blank if not required):', end='')
index_url = input().strip()
while index_url:
extra_index_urls.append(index_url)
print('Another artifact repository? (enter another url or leave blank if done):', end='')
index_url = input().strip()
if len(extra_index_urls):
print("The following artifact repositories will be added:\n\t- {}".format("\n\t- ".join(extra_index_urls)))
# noinspection PyBroadException
try:
conf_folder = Path(__file__).parent.absolute() / '..' / 'backend_api' / 'config' / 'default'
@@ -183,6 +195,10 @@ def main():
'agent.git_pass=\"{}\"\n' \
'\n'.format(git_user or '', git_pass or '')
f.write(git_credentials)
extra_index_str = '# extra_index_url: ["https://allegroai.jfrog.io/trainsai/api/pypi/public/simple"]\n' \
'agent.package_manager.extra_index_url= ' \
'[\n{}\n]\n\n'.format("\n".join(map("\"{}\"".format, extra_index_urls)))
f.write(extra_index_str)
f.write(default_conf)
except Exception:
print('Error! Could not write configuration file at: {}'.format(str(conf_file)))

View File

@@ -17,7 +17,7 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from tempfile import gettempdir, mkdtemp
from tempfile import mkdtemp, gettempdir
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@@ -28,9 +28,6 @@ from trains_agent.backend_api.services import queues as queues_api
from trains_agent.backend_api.services import tasks as tasks_api
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from requests import Session as HTTPSession
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from six.moves.urllib.parse import quote
from trains_agent.helper.check_update import start_check_update_daemon
@@ -40,7 +37,10 @@ from trains_agent.definitions import (
ENVIRONMENT_SDK_PARAMS,
INVALID_WORKER_ID,
PROGRAM_NAME,
DEFAULT_VENV_UPDATE_URL)
DEFAULT_VENV_UPDATE_URL,
ENV_TASK_EXECUTE_AS_USER,
ENV_K8S_HOST_MOUNT,
ENV_TASK_EXTRA_PYTHON_PATH)
from trains_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from trains_agent.errors import APIError, CommandFailedError, Sigterm
from trains_agent.helper.base import (
@@ -59,12 +59,18 @@ from trains_agent.helper.base import (
is_conda,
named_temporary_file,
ExecutionInfo,
HOCONEncoder, error, get_python_path)
from trains_agent.helper.console import ensure_text
HOCONEncoder,
error,
get_python_path,
is_linux_platform,
rm_file,
add_python_path)
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
from trains_agent.helper.os.daemonize import daemonize_process
from trains_agent.helper.package.base import PackageManager
from trains_agent.helper.package.conda_api import CondaAPI
from trains_agent.helper.package.horovod_req import HorovodRequirement
from trains_agent.helper.package.pip_api.system import SystemPip
from trains_agent.helper.package.external_req import ExternalRequirements
from trains_agent.helper.package.pip_api.venv import VirtualenvPip
from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
from trains_agent.helper.package.pytorch import PytorchRequirement
@@ -72,13 +78,16 @@ from trains_agent.helper.package.requirements import RequirementsManager
from trains_agent.helper.package.venv_update_api import VenvUpdateAPI
from trains_agent.helper.process import (
kill_all_child_processes,
check_if_command_exists,
WorkerParams,
ExitStatus,
Argv,
COMMAND_SUCCESS,
Executable,
get_bash_output, shutdown_docker_process, get_docker_id, commit_docker)
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker
)
from trains_agent.helper.package.cython_req import CythonRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
from trains_agent.helper.resource_monitor import ResourceMonitor
@@ -228,6 +237,8 @@ class TaskStopSignal(object):
return self._test()
except Exception as ex:
self.command.log_traceback(ex)
# make sure we break nothing
return TaskStopSignal.default
def _test(self):
# type: () -> TaskStopReason
@@ -287,6 +298,7 @@ class Worker(ServiceCommandSection):
PytorchRequirement,
CythonRequirement,
HorovodRequirement,
ExternalRequirements,
)
# poll queues every _polling_interval seconds
@@ -350,12 +362,16 @@ class Worker(ServiceCommandSection):
self.is_venv_update = self._session.config.agent.venv_update.enabled
self.poetry = PoetryConfig(self._session)
self.poetry.initialize()
self.docker_image_func = None
self._docker_image = None
self._docker_arguments = None
PackageManager.set_pip_version(self._session.config.get("agent.package_manager.pip_version", None))
self._extra_docker_arguments = self._session.config.get("agent.extra_docker_arguments", None)
self._extra_shell_script = self._session.config.get("agent.extra_docker_shell_script", None)
self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False)
self._daemon_foreground = None
self._standalone_mode = None
self._force_current_version = None
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
requirements_manager = RequirementsManager(
@@ -385,13 +401,14 @@ class Worker(ServiceCommandSection):
except Exception:
pass
def run_one_task(self, queue, task_id, worker_args):
def run_one_task(self, queue, task_id, worker_args, docker=None):
# type: (Text, Text, WorkerParams) -> ()
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
:param task_id: ID of task to run
:param worker_args: Worker command line arguments
:param docker: Docker image in which the execution task will run
"""
# start new process and execute task id
print("Running task '{}'".format(task_id))
@@ -413,10 +430,11 @@ class Worker(ServiceCommandSection):
)
)
docker_image = None
if self.docker_image_func:
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = response.execution.docker_cmd
task_docker_cmd = docker or response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
except Exception:
task_docker_cmd = None
@@ -467,6 +485,19 @@ class Worker(ServiceCommandSection):
try:
# set WORKER ID
os.environ['TRAINS_WORKER_ID'] = self.worker_id
if self._docker_force_pull and docker_image:
full_pull_cmd = ['docker', 'pull', docker_image]
pull_cmd = Argv(*full_pull_cmd)
status, stop_signal_status = self._log_command_output(
task_id=task_id,
cmd=pull_cmd,
stdout_path=temp_stdout_name,
stderr_path=temp_stderr_name,
daemon=True,
stop_signal=stop_signal,
)
status, stop_signal_status = self._log_command_output(
task_id=task_id,
cmd=cmd,
@@ -596,7 +627,7 @@ class Worker(ServiceCommandSection):
self._session.print_configuration()
@resolve_names
def daemon(self, queues, log_level, foreground=False, docker=False, **kwargs):
def daemon(self, queues, log_level, foreground=False, docker=False, detached=False, **kwargs):
# make sure we only have a single instance,
# also make sure we set worker_id properly and cache folders
self._singleton()
@@ -610,11 +641,13 @@ class Worker(ServiceCommandSection):
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
if not queues:
if queues:
queues = return_list(queues)
queues = [self._resolve_name(q, "queues") for q in queues]
else:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
queues = [default_queue.id]
queues = return_list(queues)
queues_info = [
self._session.send_api(
queues_api.GetByIdRequest(queue)
@@ -635,9 +668,8 @@ class Worker(ServiceCommandSection):
# print docker image
if docker is not False and docker is not None:
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
self._force_current_version = kwargs.get('force_current_version', False)
self.set_docker_variables(docker)
else:
self.dump_config()
@@ -655,7 +687,18 @@ class Worker(ServiceCommandSection):
name
)
)
sys.stdout = sys.stderr = out_file
if not detached:
# redirect std out/err to new file
sys.stdout = sys.stderr = out_file
else:
# in detached mode
# fully detach stdin.stdout/stderr and leave main process, running in the background
daemonize_process(out_file.fileno())
# reprint headers to std file (we are now inside the daemon process)
print("Worker \"{}\" :".format(self.worker_id))
self._session.print_configuration()
print_table(queues_info, columns=columns, titles=columns)
try:
while True:
@@ -730,15 +773,21 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], TaskStopReason]
def _print_file(file_path, prev_line_count):
with open(file_path, "rt") as f:
with open(file_path, "rb") as f:
binary_text = f.read()
if not binary_text:
return []
# skip the previously printed lines,
return f.readlines()[prev_line_count:]
blines = binary_text.split(b'\n')[prev_line_count:]
if not blines:
return blines
return decode_binary_lines(blines if blines[-1] else blines[:-1])
stdout = open(stdout_path, "wt")
stderr = open(stderr_path, "wt") if stderr_path else stdout
stdout_line_count, stdout_last_lines = 0, []
stderr_line_count, stderr_last_lines = 0, []
try:
stdout_line_count, stdout_last_lines = 0, []
stderr_line_count, stderr_last_lines = 0, []
status = None
stopping = False
_last_machine_update_ts = time()
@@ -761,7 +810,7 @@ class Worker(ServiceCommandSection):
if daemon:
self.send_logs(
task_id=task_id,
lines=["User aborted: stopping task\n"],
lines=["User aborted: stopping task ({})\n".format(str(stop_reason))],
level="ERROR",
)
kill_all_child_processes(process.pid)
@@ -786,6 +835,16 @@ class Worker(ServiceCommandSection):
# non zero return code
stop_reason = 'Exception occurred'
status = ex.returncode
except KeyboardInterrupt:
# so someone else will catch us
raise
except Exception:
# we should not get here, but better safe than sorry
stdout_line_count += self.send_logs(task_id, _print_file(stdout_path, stdout_line_count))
if stderr_path:
stderr_line_count += self.send_logs(task_id, _print_file(stderr_path, stderr_line_count))
stop_reason = 'Exception occurred'
status = -1
stdout.close()
if stderr_path:
@@ -815,7 +874,8 @@ class Worker(ServiceCommandSection):
"""
if not lines:
return 0
print("".join(lines), end="")
print_text("".join(lines), newline=False)
# remove backspaces from the text log, they look bad.
for i, l in enumerate(lines):
lines[i] = l.replace('\x08', '')
@@ -876,8 +936,6 @@ class Worker(ServiceCommandSection):
):
if not task_id:
raise CommandFailedError("Worker build must have valid task id")
if not check_if_command_exists("virtualenv"):
raise CommandFailedError("Worker must have virtualenv installed")
self._session.print_configuration()
@@ -893,7 +951,15 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
# TODO: make sure we pass the correct python_version
if not python_version:
try:
python_version = current_task.script.binary
python_version = python_version.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_version = '{:.1f}'.format(float(python_version))
except:
python_version = None
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target,
requested_python_version=python_version)
@@ -907,8 +973,9 @@ class Worker(ServiceCommandSection):
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
)
freeze = self.freeze_task_environment()
freeze = self.freeze_task_environment(requirements_manager=requirements_manager)
script_dir = directory
# Summary
@@ -1002,12 +1069,12 @@ class Worker(ServiceCommandSection):
require_queue=False,
log_file=None,
standalone_mode=None,
docker=False,
clone=False,
**_
):
if not task_id:
raise CommandFailedError("Worker execute must have valid task id")
if not check_if_command_exists("virtualenv"):
raise CommandFailedError("Worker must have virtualenv installed")
try:
current_task = self._session.api_client.tasks.get_by_id(task_id)
@@ -1016,25 +1083,42 @@ class Worker(ServiceCommandSection):
except Exception:
raise ValueError("Could not find task id={}".format(task_id))
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
except Exception:
if require_queue:
raise
if clone:
try:
print("Cloning task id={}".format(task_id))
current_task = self._session.api_client.tasks.get_by_id(
self._session.send_api(
tasks_api.CloneRequest(task=current_task.id, new_task_name='Clone of {}'.format(current_task.name))
).id
)
print("Task cloned, new task id={}".format(current_task.id))
except Exception:
raise CommandFailedError("Cloning failed")
else:
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
except Exception:
if require_queue:
raise
if full_monitoring:
if docker is not False and docker is not None:
self.set_docker_variables(docker)
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False:
worker_params = WorkerParams(
log_level=log_level,
config_file=self._session.config_file,
debug=self._session.debug_mode,
trace=self._session.trace,
)
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
self.run_one_task(queue='', task_id=task_id, worker_args=worker_params)
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
self.stop_monitor()
self._unregister()
return
@@ -1050,7 +1134,7 @@ class Worker(ServiceCommandSection):
if not disable_monitoring:
self.log.debug("starting resource monitor")
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
execution = self.get_execution_info(current_task)
@@ -1059,7 +1143,16 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode)
try:
python_ver = current_task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_ver = '{:.1f}'.format(float(python_ver))
except:
python_ver = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode,
requested_python_version=python_ver)
if not standalone_mode:
if self._default_pip:
@@ -1079,15 +1172,17 @@ class Worker(ServiceCommandSection):
repo_info,
requirements_manager=requirements_manager,
cached_requirements=requirements,
cwd=vcs.location if vcs and vcs.location else directory,
)
# do not update the task packages if we are using conda,
# it will most likely make the task environment unreproducible
freeze = self.freeze_task_environment(current_task.id if not self.is_conda else None)
freeze = self.freeze_task_environment(current_task.id if not self.is_conda else None,
requirements_manager=requirements_manager)
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
print("Running task id [%s]:" % task_id)
print("Running task id [%s]:" % current_task.id)
extra = ['-u', ]
if optimization:
extra.append(
@@ -1121,14 +1216,28 @@ class Worker(ServiceCommandSection):
)
if repo_info:
self._update_commit_id(task_id, execution, repo_info)
self._update_commit_id(current_task.id, execution, repo_info)
# Add the script CWD to the python path
python_path = get_python_path(script_dir, execution.entry_point, self.package_api)
python_path = get_python_path(script_dir, execution.entry_point, self.package_api) \
if not self.is_conda else None
if os.environ.get(ENV_TASK_EXTRA_PYTHON_PATH):
python_path = add_python_path(python_path, os.environ.get(ENV_TASK_EXTRA_PYTHON_PATH))
if python_path:
os.environ['PYTHONPATH'] = python_path
print("Starting Task Execution:\n".format(task_id))
# check if we want to run as another user, only supported on linux
if os.environ.get(ENV_TASK_EXECUTE_AS_USER, None) and is_linux_platform():
command, script_dir = self._run_as_user_patch(
command, self._session.config_file,
script_dir, venv_folder,
self._session.config.get('sdk.storage.cache.default_base_dir'),
os.environ.get(ENV_TASK_EXECUTE_AS_USER))
use_execv = False
else:
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
print("Starting Task Execution:\n".format(current_task.id))
exit_code = -1
try:
if disable_monitoring:
@@ -1136,7 +1245,7 @@ class Worker(ServiceCommandSection):
sys.stdout.flush()
sys.stderr.flush()
os.chdir(script_dir)
if not is_windows_platform():
if use_execv:
os.execv(command.argv[0].as_posix(), tuple([command.argv[0].as_posix()])+command.argv[1:])
else:
exit_code = command.check_call(cwd=script_dir)
@@ -1144,10 +1253,10 @@ class Worker(ServiceCommandSection):
except subprocess.CalledProcessError as ex:
# non zero return code
exit_code = ex.returncode
if is_windows_platform():
if not use_execv:
exit(exit_code)
except Exception as ex:
if is_windows_platform():
if not use_execv:
exit(-1)
raise ex
else:
@@ -1157,13 +1266,13 @@ class Worker(ServiceCommandSection):
)
print("Storing stdout and stderr log into [%s]" % temp_stdout_fname)
exit_code, _ = self._log_command_output(
task_id=task_id,
task_id=current_task.id,
cmd=command,
stdout_path=temp_stdout_fname,
cwd=script_dir,
)
except KeyboardInterrupt:
self.handle_user_abort(task_id)
self.handle_user_abort(current_task.id)
raise
except Exception as e:
self.log.warning(str(e))
@@ -1180,13 +1289,18 @@ class Worker(ServiceCommandSection):
if not disable_monitoring:
# we need to change task status according to exit code
self.handle_task_termination(task_id, exit_code, TaskStopReason.no_stop)
self.handle_task_termination(current_task.id, exit_code, TaskStopReason.no_stop)
self.stop_monitor()
# unregister the worker
self._unregister()
return 1 if exit_code is None else exit_code
def set_docker_variables(self, docker):
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
def get_execution_info(self, current_task):
# type: (...) -> ExecutionInfo
try:
@@ -1346,13 +1460,17 @@ class Worker(ServiceCommandSection):
status_message=self._task_status_change_message,
)
def freeze_task_environment(self, task_id=None):
def freeze_task_environment(self, task_id=None, requirements_manager=None):
try:
freeze = self.package_api.freeze()
except Exception as e:
print("Could not freeze installed packages")
self.log_traceback(e)
return None
if requirements_manager:
freeze = requirements_manager.replace_back(freeze)
if not task_id:
return freeze
@@ -1377,8 +1495,12 @@ class Worker(ServiceCommandSection):
if not repo_info:
return None
try:
if not self.poetry.enabled:
return None
self.poetry.initialize(cwd=repo_info.root)
api = self.poetry.get_api(repo_info.root)
if api.enabled:
print('Poetry Enabled: Ignoring requested python packages, using repository poetry lock file!')
api.install()
return api
except Exception:
@@ -1386,7 +1508,7 @@ class Worker(ServiceCommandSection):
return None
def install_requirements(
self, execution, repo_info, requirements_manager, cached_requirements=None
self, execution, repo_info, requirements_manager, cached_requirements=None, cwd=None,
):
# type: (ExecutionInfo, RepoInfo, RequirementsManager, Optional[dict]) -> None
"""
@@ -1399,6 +1521,8 @@ class Worker(ServiceCommandSection):
:param requirements_manager: requirements manager for task
:param cached_requirements: cached requirements from previous run
"""
if self.package_api:
self.package_api.cwd = cwd
api = self._install_poetry_requirements(repo_info)
if api:
self.package_api = api
@@ -1420,7 +1544,7 @@ class Worker(ServiceCommandSection):
except Exception as e:
self.log_traceback(e)
cached_requirements_failed = True
raise ValueError("Could not install task requirements!")
raise ValueError("Could not install task requirements!\n{}".format(e))
else:
self.log("task requirements installation passed")
return
@@ -1458,10 +1582,7 @@ class Worker(ServiceCommandSection):
raise e
finally:
if self._session.debug_mode and temp_file:
try:
Path(temp_file.name).unlink()
except OSError:
pass
rm_file(temp_file.name)
# call post installation callback
requirements_manager.post_install()
# mark as successful installation
@@ -1587,7 +1708,7 @@ class Worker(ServiceCommandSection):
)
def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False):
# type: (str, str) -> Tuple[Path, RequirementsManager]
# type: (str, str, bool) -> Tuple[Path, RequirementsManager]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
@@ -1595,14 +1716,27 @@ class Worker(ServiceCommandSection):
requested_python_version = requested_python_version or \
Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
executable_version, executable_version_suffix, executable_name = self.find_python_executable_for_version(
requested_python_version
)
if self.is_conda:
executable_version_suffix = \
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
executable_name = 'python'
else:
try:
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(requested_python_version)
except Exception:
def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
print('Warning: could not locate requested Python version {}, reverting to version {}'.format(
requested_python_version, def_python_version))
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(def_python_version)
self._session.config.put("agent.default_python", executable_version)
self._session.config.put("agent.python_binary", executable_name)
venv_dir = Path(venv_dir) if venv_dir else \
Path(self._session.config["agent.venvs_dir"], executable_version_suffix)
self._session.config.put("agent.default_python", executable_version)
self._session.config.put("agent.python_binary", executable_name)
first_time = not standalone_mode and (
is_windows_platform()
or self.is_conda
@@ -1742,20 +1876,44 @@ class Worker(ServiceCommandSection):
log.warning('Failed creating temporary copy of ~/.ssh for git credential')
pass
# check if the .git credentials exist:
host_git_credentials = Path('~/.git-credentials').expanduser()
try:
if host_git_credentials.is_file():
host_git_credentials = host_git_credentials.as_posix()
else:
host_git_credentials = None
except Exception:
host_git_credentials = None
# store docker arguments
self._docker_image = docker_image
self._docker_arguments = docker_arguments
extra_shell_script_str = ""
if self._extra_shell_script:
cmds = self._extra_shell_script
if not isinstance(cmds, (list, tuple)):
cmds = [cmds]
extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; "
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
docker_cmd = dict(worker_id=self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
extra_docker_arguments=self._extra_docker_arguments,
extra_shell_script=extra_shell_script_str,
python_version=python_version, conf_file=self.temp_config_path,
host_apt_cache=host_apt_cache,
host_pip_cache=host_pip_cache,
host_ssh_cache=host_ssh_cache,
host_ssh_cache=host_ssh_cache, host_git_credentials=host_git_credentials,
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode)
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
return temp_config, partial(docker_cmd_functor, docker_cmd)
@staticmethod
@@ -1766,15 +1924,26 @@ class Worker(ServiceCommandSection):
host_ssh_cache,
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache, standalone_mode=False):
host_vcs_cache, mounted_vcs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None, host_git_credentials=None):
docker = 'docker'
base_cmd = [docker, 'run', '-t']
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
base_cmd += ['--gpus', 'all', ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
dockers_nvidia_visible_devices
else:
base_cmd += ['--gpus', 'all', ]
elif gpu_devices.strip() and gpu_devices.strip() != 'none':
base_cmd += ['--gpus', 'device='+gpu_devices, ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = gpu_devices
else:
base_cmd += ['--gpus', 'device='+gpu_devices, ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
@@ -1783,10 +1952,56 @@ class Worker(ServiceCommandSection):
if isinstance(docker_arguments, (list, tuple)) else [docker_arguments]
base_cmd += [a for a in docker_arguments if a]
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
if extra_docker_arguments:
extra_docker_arguments = [extra_docker_arguments] \
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
base_cmd += [str(a) for a in extra_docker_arguments if a]
if host_ssh_cache:
base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ]
# check if running inside a kubernetes
if os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT'):
# map network to sibling docker, unless we have other network argument
if not any(a.strip().startswith('--network') for a in base_cmd):
try:
network_mode = get_bash_output(
'docker inspect --format=\'{{.HostConfig.NetworkMode}}\' $(basename $(cat /proc/1/cpuset))')
base_cmd += ['--network', network_mode]
except:
pass
base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)]
# check if we need to map host folders
if os.environ.get(ENV_K8S_HOST_MOUNT):
# expect TRAINS_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.trains'
k8s_node_mnt, _, k8s_pod_mnt = os.environ.get(ENV_K8S_HOST_MOUNT).partition(':')
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
for i, m in enumerate(host_mounts):
if k8s_pod_mnt not in m:
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt)
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
# copy the configuration file into the mounted folder
new_conf_file = os.path.join(k8s_pod_mnt, '.trains_agent.{}.cfg'.format(quote(worker_id, safe="")))
try:
rm_file(new_conf_file)
shutil.copy(conf_file, new_conf_file)
conf_file = new_conf_file.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file))
if host_ssh_cache:
new_ssh_cache = os.path.join(k8s_pod_mnt, '.trains_agent.{}.ssh'.format(quote(worker_id, safe="")))
try:
rm_tree(new_ssh_cache)
shutil.copytree(host_ssh_cache, new_ssh_cache)
host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache))
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
# if we are running a RC version, install the same version in the docker
# because the default latest, will be a release version (not RC)
@@ -1794,38 +2009,120 @@ class Worker(ServiceCommandSection):
try:
from trains_agent.version import __version__
_version_parts = __version__.split('.')
if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
if force_current_version or 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
specify_version = '=={}'.format(__version__)
except:
pass
if standalone_mode:
update_scheme = ""
else:
update_scheme = \
if not standalone_mode:
update_scheme += \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \
"apt-get update ; " \
"apt-get install -y git libsm6 libxext6 libxrender-dev libglib2.0-0 {python_single_digit}-pip ; " \
"{python} -m pip install -U pip ; " \
"{python} -m pip install -U \"pip{pip_version}\" ; " \
"{python} -m pip install -U trains-agent{specify_version} ; ".format(
python_single_digit=python_version.split('.')[0],
python=python_version, specify_version=specify_version)
python=python_version, pip_version=PackageManager.get_pip_version(),
specify_version=specify_version)
base_cmd += [
'-v', conf_file+':/root/trains.conf',
'-v', host_apt_cache+':/var/cache/apt/archives',
'-v', host_pip_cache+':/root/.cache/pip',
'-v', host_pip_dl+':'+mounted_pip_dl,
'-v', host_cache+':'+mounted_cache,
'-v', host_vcs_cache+':'+mounted_vcs_cache,
'--rm', docker_image, 'bash', '-c',
update_scheme +
"NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version)
]
base_cmd += (
['-v', conf_file+':/root/trains.conf'] +
(['-v', host_git_credentials+':/root/.git-credentials'] if host_git_credentials else []) +
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
['--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format(
nv_visible=dockers_nvidia_visible_devices, python=python_version)
])
return base_cmd
def _run_as_user_patch(self, command, trains_conf, script_dir, venv_folder, sdk_cache_folder, user_uid):
class RunasArgv(Argv):
def __init__(self, *args):
super(RunasArgv, self).__init__(*args)
self.uid = 0
self.gid = 0
def call_subprocess(self, func, censor_password=False, *args, **kwargs):
self._log.debug("running: %s: %s", func.__name__, list(self))
with self.normalize_exception(censor_password):
return func(list(self), *args, preexec_fn=self._change_uid, **kwargs)
def set_uid(self, user_uid, user_gid):
from pwd import getpwnam
self.uid = getpwnam(user_uid).pw_uid
self.gid = getpwnam(user_gid).pw_gid
def _change_uid(self):
os.setgid(self.gid)
os.setuid(self.uid)
# create a home folder for our user
trains_agent_home = 'trains_agent_home{}'.format('.'+str(Singleton.get_slot()) if Singleton.get_slot() else '')
try:
home_folder = (Path('/') / trains_agent_home).absolute().as_posix()
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
except:
try:
home_folder = (Path.home().parent / trains_agent_home).absolute().as_posix()
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
except:
home_folder = (Path(gettempdir()) / trains_agent_home).absolute().as_posix()
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
# move our entire venv into the new home
venv_folder = venv_folder.as_posix()
if not venv_folder.endswith(os.path.sep):
venv_folder += os.path.sep
new_venv_folder = os.path.join(home_folder, 'venv/')
shutil.move(venv_folder, new_venv_folder)
# allow everyone to access it
for f in Path(new_venv_folder).rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
if sdk_cache_folder:
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
for f in sdk_cache_folder.rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# make sure we could access the trains_conf file
try:
user_trains_conf = os.path.join(home_folder, 'trains.conf')
shutil.copy(trains_conf, user_trains_conf)
Path(user_trains_conf).chmod(0o0777)
except:
user_trains_conf = trains_conf
# patch venv folder to new location
script_dir = script_dir.replace(venv_folder, new_venv_folder)
# New command line execution
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" PYTHONPATH=\"{}\" TRAINS_CONFIG_FILE={} {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
os.environ.get('PYTHONPATH', '').replace(venv_folder, new_venv_folder),
user_trains_conf,
command.serialize().replace(venv_folder, new_venv_folder)))
command.set_uid(user_uid=user_uid, user_gid=user_uid)
return command, script_dir
def _singleton(self):
# ensure singleton
worker_id = self._session.config["agent.worker_id"]
@@ -1839,7 +2136,8 @@ class Worker(ServiceCommandSection):
else:
worker_name = '{}:cpu'.format(worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name,
api_client=self._session.api_client)
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
exit(1)

View File

@@ -73,6 +73,12 @@ ENVIRONMENT_CONFIG = {
"agent.cpu_only": EnvironmentConfig(
"TRAINS_CPU_ONLY", "ALG_CPU_ONLY", "CPU_ONLY", type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
CONFIG_FILE_ENV = EnvironmentConfig("ALG_CONFIG_FILE")
@@ -114,6 +120,9 @@ DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = [
]
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
ENV_TASK_EXECUTE_AS_USER = 'TRAINS_AGENT_EXEC_USER'
ENV_TASK_EXTRA_PYTHON_PATH = 'TRAINS_AGENT_EXTRA_PYTHON_PATH'
ENV_K8S_HOST_MOUNT = 'TRAINS_AGENT_K8S_HOST_MOUNT'
class FileBuffering(IntEnum):

169
trains_agent/glue/k8s.py Normal file
View File

@@ -0,0 +1,169 @@
from __future__ import print_function, division, unicode_literals
import logging
import os
import subprocess
from time import sleep
from typing import Text, List
from pyhocon import HOCONConverter
from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker
from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running"
CONTAINER_BASH_SCRIPT = "apt-get install -y git python-pip && " \
"pip install trains-agent && " \
"python -u -m trains_agent execute --full-monitoring --require-queue --id {}"
def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False):
"""
Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
:param bool debug: Switch logging on
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True)
# Add debug logging
if debug:
self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO)
def run_one_task(self, queue: Text, task_id: Text, worker_args=None):
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler')
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e))
return
if task_data.execution.docker_cmd:
docker_image = task_data.execution.docker_cmd
else:
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_image = docker_image.split()[0]
create_trains_conf = "echo '{}' >> ~/trains.conf && ".format(
HOCONConverter.to_hocon(self._session.config._config))
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue)
# make sure we gave a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c",
create_trains_conf + self.container_bash_script.format(task_id)]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
self.log.info("K8s scheduling experiment task id={}".format(task_id))
if error:
self.log.error("Running kubectl encountered an error: {}".format(
error if isinstance(error, str) else error.decode()))
def run_tasks_loop(self, queues: List[Text], worker_params):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
2. Try getting the next task for each and run the first one that returns.
3. Go to step 1
:param queues: IDs of queues to pull tasks from
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams``
"""
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
try:
self._session.api_client.queues.create(self.k8s_pending_queue_name)
except Exception:
pass
# get queue id
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
_last_machine_update_ts = 0
while True:
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed /failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
self.worker_id,
task_id=task_id,
lines="task {} pulled from {} by worker {}".format(
task_id, queue, self.worker_id
),
level="INFO",
)
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
# sleep and retry polling
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queues):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queues* and scheduling them for execution using kubectl.
Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
and popped when execution actually starts. This creates full visibility into the k8s scheduler.
Manually popping a task from the K8S_PENDING_QUEUE,
will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
:param list(str) queues: List of queue names to pull from
"""
return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False)

View File

@@ -157,6 +157,10 @@ def is_windows_platform():
return any(platform.win32_ver())
def is_linux_platform():
return 'linux' in platform.system().lower()
def normalize_path(*paths):
"""
normalize_path
@@ -195,6 +199,20 @@ def get_python_path(script_dir, entry_point, package_api):
return None
def add_python_path(base_path, extra_path):
try:
if not extra_path:
return base_path
python_path_sep = ';' if is_windows_platform() else ':'
base_path = base_path or ''
if not base_path.endswith(python_path_sep):
base_path += python_path_sep
base_path += extra_path.replace(':', python_path_sep)
except:
pass
return base_path
class Singleton(ABCMeta):
_instances = {}
@@ -459,6 +477,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None
return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error)
def rm_file(filename): # type: (Union[Path, Text]) -> None
"""
A version of os.unlink that will not raise error
"""
try:
os.unlink(os.path.expanduser(os.path.expandvars(Text(filename))))
except:
return False
return True
def is_conda(config):
return config['agent.package_manager.type'].lower() == 'conda'

View File

@@ -4,7 +4,7 @@ from time import sleep
import requests
import json
from threading import Thread
from semantic_version import Version
from .package.requirements import SimpleVersion
from ..version import __version__
__check_update_thread = None
@@ -30,11 +30,11 @@ def _check_new_version_available():
return None
trains_answer = update_server_releases.get("trains-agent", {})
latest_version = trains_answer.get("version")
cur_version = Version(cur_version)
latest_version = Version(latest_version)
if cur_version >= latest_version:
cur_version = cur_version
latest_version = latest_version or ''
if SimpleVersion.compare_versions(cur_version, '>=', latest_version):
return None
patch_upgrade = latest_version.major == cur_version.major and latest_version.minor == cur_version.minor
patch_upgrade = True # latest_version.major == cur_version.major and latest_version.minor == cur_version.minor
return str(latest_version), patch_upgrade, trains_answer.get("description").split("\r\n")

View File

@@ -22,6 +22,18 @@ def print_text(text, newline=True):
sys.stdout.write(data)
def decode_binary_lines(binary_lines, encoding='utf-8'):
# decode per line, if we failed decoding skip the line
lines = []
for b in binary_lines:
try:
l = b.decode(encoding=encoding, errors='replace').replace('\r', '\n')
except:
l = ''
lines.append(l + '\n' if l and l[-1] != '\n' else l)
return lines
def ensure_text(s, encoding='utf-8', errors='strict'):
"""Coerce *s* to six.text_type.
For Python 2:

View File

View File

@@ -0,0 +1,74 @@
import os
def daemonize_process(redirect_fd=None):
"""
Detach a process from the controlling terminal and run it in the background as a daemon.
"""
assert redirect_fd is None or isinstance(redirect_fd, int)
# re-spawn in the same directory
WORKDIR = os.getcwd()
# The standard I/O file descriptors are redirected to /dev/null by default.
if hasattr(os, "devnull"):
devnull = os.devnull
else:
devnull = "/dev/null"
try:
# Fork a child process so the parent can exit. This returns control to
# the command-line or shell. It also guarantees that the child will not
# be a process group leader, since the child receives a new process ID
# and inherits the parent's process group ID. This step is required
# to insure that the next call to os.setsid is successful.
pid = os.fork()
except OSError as e:
raise Exception("%s [%d]" % (e.strerror, e.errno))
if pid == 0: # The first child.
# To become the session leader of this new session and the process group
# leader of the new process group, we call os.setsid().
# The process is also guaranteed not to have a controlling terminal.
os.setsid()
# Is ignoring SIGHUP necessary? (Set handlers for asynchronous events.)
# import signal
# signal.signal(signal.SIGHUP, signal.SIG_IGN)
try:
# Fork a second child and exit immediately to prevent zombies. This
# causes the second child process to be orphaned, making the init
# process responsible for its cleanup.
pid = os.fork() # Fork a second child.
except OSError as e:
raise Exception("%s [%d]" % (e.strerror, e.errno))
if pid == 0: # The second child.
# Since the current working directory may be a mounted filesystem, we
# avoid the issue of not being able to unmount the filesystem at
# shutdown time by changing it to the root directory.
os.chdir(WORKDIR)
# We probably don't want the file mode creation mask inherited from
# the parent, so we give the child complete control over permissions.
os.umask(0)
else:
# Exit parent (the first child) of the second child.
os._exit(0)
else:
# Exit parent of the first child.
os._exit(0)
# notice we count on the fact that we keep all file descriptors open,
# since we opened then in the parent process, but the daemon process will use them
# Redirect the standard I/O file descriptors to the specified file /dev/null.
if redirect_fd is None:
redirect_fd = os.open(devnull, os.O_RDWR)
# Duplicate standard input to standard output and standard error.
# standard output (1), standard error (2)
os.dup2(redirect_fd, 1)
os.dup2(redirect_fd, 2)
return 0

View File

@@ -16,6 +16,8 @@ class PackageManager(object):
"""
_selected_manager = None
_cwd = None
_pip_version = None
@abc.abstractproperty
def bin(self):
@@ -64,7 +66,7 @@ class PackageManager(object):
pass
def upgrade_pip(self):
return self._install("pip", "--upgrade")
return self._install("pip"+self.get_pip_version(), "--upgrade")
def get_python_command(self, extra=()):
# type: (...) -> Executable
@@ -97,11 +99,42 @@ class PackageManager(object):
# this is helpful when we want out of context requirement installations
PackageManager._selected_manager = self
@property
def cwd(self):
return self._cwd
@cwd.setter
def cwd(self, value):
self._cwd = value
@classmethod
def out_of_scope_install_package(cls, package_name):
def out_of_scope_install_package(cls, package_name, *args):
if PackageManager._selected_manager is not None:
try:
return PackageManager._selected_manager._install(package_name)
return PackageManager._selected_manager._install(package_name, *args)
except Exception:
pass
return
return
@classmethod
def out_of_scope_freeze(cls):
if PackageManager._selected_manager is not None:
try:
return PackageManager._selected_manager.freeze()
except Exception:
pass
return []
@classmethod
def set_pip_version(cls, version):
if not version:
return
version = version.replace(' ', '')
if ('=' in version) or ('~' in version) or ('<' in version) or ('>' in version):
cls._pip_version = version
else:
cls._pip_version = "=="+version
@classmethod
def get_pip_version(cls):
return cls._pip_version or ''

View File

@@ -14,13 +14,13 @@ import yaml
from time import time
from attr import attrs, attrib, Factory
from pathlib2 import Path
from semantic_version import Version
from requirements import parse
from requirements.requirement import Requirement
from trains_agent.errors import CommandFailedError
from trains_agent.helper.base import rm_tree, NonStrictAttrs, select_for_platform, is_windows_platform
from trains_agent.helper.process import Argv, Executable, DEVNULL, CommandSequence, PathLike
from trains_agent.helper.package.requirements import SimpleVersion
from trains_agent.session import Session
from .base import PackageManager
from .pip_api.venv import VirtualenvPip
@@ -59,7 +59,7 @@ class CondaAPI(PackageManager):
A programmatic interface for controlling conda
"""
MINIMUM_VERSION = Version("4.3.30", partial=True)
MINIMUM_VERSION = "4.3.30"
def __init__(self, session, path, python, requirements_manager):
# type: (Session, PathLike, float, RequirementsManager) -> None
@@ -93,7 +93,7 @@ class CondaAPI(PackageManager):
)
)
self.conda_version = self.get_conda_version(output)
if Version(self.conda_version, partial=True) < self.MINIMUM_VERSION:
if SimpleVersion.compare_versions(self.conda_version, '<', self.MINIMUM_VERSION):
raise CommandFailedError(
"conda version '{}' is smaller than minimum supported conda version '{}'".format(
self.conda_version, self.MINIMUM_VERSION
@@ -112,7 +112,7 @@ class CondaAPI(PackageManager):
return self.pip.bin
def upgrade_pip(self):
return self.pip.upgrade_pip()
return self._install("pip" + self.pip.get_pip_version())
def create(self):
"""
@@ -227,20 +227,20 @@ class CondaAPI(PackageManager):
self.pip.install_from_file(reqs)
def freeze(self):
# result = yaml.load(
# self._run_command((self.conda, "env", "export", "-p", self.path), raw=True)
# )
# for key in "name", "prefix":
# result.pop(key, None)
# freeze = {"conda": result}
# try:
# freeze["pip"] = result["dependencies"][-1]["pip"]
# except (TypeError, KeyError):
# freeze["pip"] = []
# else:
# del result["dependencies"][-1]
# return freeze
return self.pip.freeze()
requirements = self.pip.freeze()
try:
conda_packages = json.loads(self._run_command((self.conda, "list", "--json", "-p", self.path), raw=True))
conda_packages_txt = []
requirements_pip = [r.split('==')[0].strip().lower() for r in requirements['pip']]
for pkg in conda_packages:
# skip if this is a pypi package or it is not a python package at all
if pkg['channel'] == 'pypi' or pkg['name'].lower() not in requirements_pip:
continue
conda_packages_txt.append('{0}{1}{2}'.format(pkg['name'], '==', pkg['version']))
requirements['conda'] = conda_packages_txt
except:
pass
return requirements
def load_requirements(self, requirements):
# create new environment file
@@ -249,6 +249,8 @@ class CondaAPI(PackageManager):
reqs = []
if isinstance(requirements['pip'], six.string_types):
requirements['pip'] = requirements['pip'].split('\n')
if isinstance(requirements.get('conda'), six.string_types):
requirements['conda'] = requirements['conda'].split('\n')
has_torch = False
has_matplotlib = False
try:
@@ -256,35 +258,86 @@ class CondaAPI(PackageManager):
except:
cuda_version = 0
for r in requirements['pip']:
marker = list(parse(r))
if marker:
m = MarkerRequirement(marker[0])
if m.req.name.lower() == 'matplotlib':
has_matplotlib = True
elif m.req.name.lower().startswith('torch'):
has_torch = True
# notice 'conda' entry with empty string is a valid conda requirements list, it means pip only
# this should happen if experiment was executed on non-conda machine or old trains client
conda_supported_req = requirements['pip'] if requirements.get('conda', None) is None else requirements['conda']
conda_supported_req_names = []
for r in conda_supported_req:
try:
marker = list(parse(r))
except:
marker = None
if not marker:
continue
if m.req.name.lower() in ('torch', 'pytorch'):
has_torch = True
m.req.name = 'pytorch'
m = MarkerRequirement(marker[0])
conda_supported_req_names.append(m.name.lower())
if m.req.name.lower() == 'matplotlib':
has_matplotlib = True
elif m.req.name.lower().startswith('torch'):
has_torch = True
if m.req.name.lower() in ('tensorflow_gpu', 'tensorflow-gpu', 'tensorflow'):
has_torch = True
m.req.name = 'tensorflow-gpu' if cuda_version > 0 else 'tensorflow'
if m.req.name.lower() in ('torch', 'pytorch'):
has_torch = True
m.req.name = 'pytorch'
if m.req.name.lower() in ('tensorflow_gpu', 'tensorflow-gpu', 'tensorflow'):
has_torch = True
m.req.name = 'tensorflow-gpu' if cuda_version > 0 else 'tensorflow'
reqs.append(m)
reqs.append(m)
pip_requirements = []
# if we have a conda list, the rest should be installed with pip,
if requirements.get('conda', None) is not None:
for r in requirements['pip']:
try:
marker = list(parse(r))
except:
marker = None
if not marker:
continue
m = MarkerRequirement(marker[0])
m_name = m.name.lower()
if m_name in conda_supported_req_names:
# this package is in the conda list,
# make sure that if we changed version and we match it in conda
conda_supported_req_names.remove(m_name)
for cr in reqs:
if m_name == cr.name.lower():
# match versions
cr.specs = m.specs
break
else:
# not in conda, it is a pip package
pip_requirements.append(m)
if m_name == 'matplotlib':
has_matplotlib = True
# remove any leftover conda packages (they were removed from the pip list)
if conda_supported_req_names:
reqs = [r for r in reqs if r.name.lower() not in conda_supported_req_names]
# Conda requirements Hacks:
if has_matplotlib:
reqs.append(MarkerRequirement(Requirement.parse('graphviz')))
reqs.append(MarkerRequirement(Requirement.parse('python-graphviz')))
reqs.append(MarkerRequirement(Requirement.parse('kiwisolver')))
if has_torch and cuda_version == 0:
reqs.append(MarkerRequirement(Requirement.parse('cpuonly')))
# conform conda packages (version/name)
for r in reqs:
# remove .post from version numbers, it fails ~= version, and change == to ~=
if r.specs and r.specs[0]:
r.specs = [(r.specs[0][0].replace('==', '~='), r.specs[0][1].split('.post')[0])]
# conda always likes "-" not "_"
r.req.name = r.req.name.replace('_', '-')
while reqs:
conda_env['dependencies'] = [r.tostr().replace('==', '=') for r in reqs]
# notice, we give conda more freedom in version selection, to help it choose best combination
conda_env['dependencies'] = [r.tostr() for r in reqs]
with self.temp_file("conda_env", yaml.dump(conda_env), suffix=".yml") as name:
print('Conda: Trying to install requirements:\n{}'.format(conda_env['dependencies']))
result = self._run_command(
@@ -297,7 +350,7 @@ class CondaAPI(PackageManager):
solved = False
for bad_r in bad_req:
name = bad_r.split('[')[0].split('=')[0]
name = bad_r.split('[')[0].split('=')[0].split('~')[0].split('<')[0].split('>')[0]
# look for name in requirements
for r in reqs:
if r.name.lower() == name.lower():
@@ -338,7 +391,7 @@ class CondaAPI(PackageManager):
if len(empty_lines) >= 2:
deps = error_lines[empty_lines[0]+1:empty_lines[1]]
try:
return yaml.load('\n'.join(deps))
return yaml.load('\n'.join(deps), Loader=yaml.SafeLoader)
except:
return None
return None
@@ -412,4 +465,4 @@ class PackageNotFoundError(CondaException):
as a singleton YAML list.
"""
pkg = attrib(default="", converter=lambda val: yaml.load(val)[0].replace(" ", ""))
pkg = attrib(default="", converter=lambda val: yaml.load(val, Loader=yaml.SafeLoader)[0].replace(" ", ""))

View File

@@ -6,14 +6,14 @@ from .requirements import SimpleSubstitution
class CythonRequirement(SimpleSubstitution):
name = "cython"
name = ("cython", "numpy", )
def __init__(self, *args, **kwargs):
super(CythonRequirement, self).__init__(*args, **kwargs)
def match(self, req):
# match both Cython & cython
return self.name == req.name.lower()
return req.name and req.name.lower() in self.name
def replace(self, req):
"""

View File

@@ -0,0 +1,60 @@
from collections import OrderedDict
from typing import Text
from .base import PackageManager
from .requirements import SimpleSubstitution
class ExternalRequirements(SimpleSubstitution):
name = "external_link"
def __init__(self, *args, **kwargs):
super(ExternalRequirements, self).__init__(*args, **kwargs)
self.post_install_req = []
self.post_install_req_lookup = OrderedDict()
def match(self, req):
# match both editable or code or unparsed
if not (not req.name or req.req and (req.req.editable or req.req.vcs)):
return False
if not req.req or not req.req.line or not req.req.line.strip() or req.req.line.strip().startswith('#'):
return False
return True
def post_install(self):
post_install_req = self.post_install_req
self.post_install_req = []
for req in post_install_req:
try:
freeze_base = PackageManager.out_of_scope_freeze() or ''
except:
freeze_base = ''
PackageManager.out_of_scope_install_package(req.tostr(markers=False), "--no-deps")
try:
freeze_post = PackageManager.out_of_scope_freeze() or ''
package_name = list(set(freeze_post['pip']) - set(freeze_base['pip']))
if package_name and package_name[0] not in self.post_install_req_lookup:
self.post_install_req_lookup[package_name[0]] = req.req.line
except:
pass
PackageManager.out_of_scope_install_package(req.tostr(markers=False), "--ignore-installed")
def replace(self, req):
"""
Replace a requirement
:raises: ValueError if version is pre-release
"""
# Store in post req install, and return nothing
self.post_install_req.append(req)
# mark skip package, we will install it in post install hook
return Text('')
def replace_back(self, list_of_requirements):
if 'pip' in list_of_requirements:
original_requirements = list_of_requirements['pip']
list_of_requirements['pip'] = [r for r in original_requirements
if r not in self.post_install_req_lookup]
list_of_requirements['pip'] += [self.post_install_req_lookup.get(r, '')
for r in self.post_install_req_lookup.keys() if r in original_requirements]
return list_of_requirements

View File

@@ -14,7 +14,7 @@ class HorovodRequirement(SimpleSubstitution):
def match(self, req):
# match both horovod
return self.name == req.name.lower()
return req.name and self.name == req.name.lower()
def post_install(self):
if self.post_install_req:

View File

@@ -29,13 +29,13 @@ class SystemPip(PackageManager):
pass
def install_from_file(self, path):
self.run_with_env(('install', '-r', path) + self.install_flags())
self.run_with_env(('install', '-r', path) + self.install_flags(), cwd=self.cwd)
def install_packages(self, *packages):
self._install(*(packages + self.install_flags()))
def _install(self, *args):
self.run_with_env(('install',) + args)
self.run_with_env(('install',) + args, cwd=self.cwd)
def uninstall_packages(self, *packages):
self.run_with_env(('uninstall', '-y') + packages)
@@ -82,7 +82,7 @@ class SystemPip(PackageManager):
return (command.get_output if output else command.check_call)(stdin=DEVNULL, **kwargs)
def _make_command(self, command):
return Argv(self.bin, '-m', 'pip', *command)
return Argv(self.bin, '-m', 'pip', '--disable-pip-version-check', *command)
def install_flags(self):
if self.indices_args is None:

View File

@@ -33,7 +33,7 @@ class VirtualenvPip(SystemPip, PackageManager):
self.python = python
def _make_command(self, command):
return self.session.command(self.bin, "-m", "pip", *command)
return self.session.command(self.bin, "-m", "pip", "--disable-pip-version-check", *command)
def load_requirements(self, requirements):
if isinstance(requirements, dict) and requirements.get("pip"):

View File

@@ -1,8 +1,11 @@
from copy import deepcopy
from functools import wraps
import attr
import sys
import os
from pathlib2 import Path
from trains_agent.helper.process import Argv, DEVNULL
from trains_agent.helper.process import Argv, DEVNULL, check_if_command_exists
from trains_agent.session import Session, POETRY
@@ -35,10 +38,12 @@ def prop_guard(prop, log_prop=None):
class PoetryConfig:
def __init__(self, session):
# type: (Session) -> ()
def __init__(self, session, interpreter=None):
# type: (Session, str) -> ()
self.session = session
self._log = session.get_logger(__name__)
self._python = interpreter or sys.executable
self._initialized = False
@property
def log(self):
@@ -53,7 +58,20 @@ class PoetryConfig:
def run(self, *args, **kwargs):
func = kwargs.pop("func", Argv.get_output)
kwargs.setdefault("stdin", DEVNULL)
argv = Argv("poetry", "-n", *args)
kwargs['env'] = deepcopy(os.environ)
if 'VIRTUAL_ENV' in kwargs['env'] or 'CONDA_PREFIX' in kwargs['env']:
kwargs['env'].pop('VIRTUAL_ENV', None)
kwargs['env'].pop('CONDA_PREFIX', None)
kwargs['env'].pop('PYTHONPATH', None)
if hasattr(sys, "real_prefix") and hasattr(sys, "base_prefix"):
path = ':'+kwargs['env']['PATH']
path = path.replace(':'+sys.base_prefix, ':'+sys.real_prefix, 1)
kwargs['env']['PATH'] = path
if check_if_command_exists("poetry"):
argv = Argv("poetry", *args)
else:
argv = Argv(self._python, "-m", "poetry", *args)
self.log.debug("running: %s", argv)
return func(argv, **kwargs)
@@ -61,10 +79,16 @@ class PoetryConfig:
return self.run("config", *args, **kwargs)
@_guard_enabled
def initialize(self):
self._config("settings.virtualenvs.in-project", "true")
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
def initialize(self, cwd=None):
if not self._initialized:
self._initialized = True
try:
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
except Exception as ex:
print("Exception: {}\nError: Failed configuring Poetry virtualenvs.in-project".format(ex))
raise
def get_api(self, path):
# type: (Path) -> PoetryAPI
@@ -81,7 +105,7 @@ class PoetryAPI(object):
def install(self):
# type: () -> bool
if self.enabled:
self.config.run("install", cwd=str(self.path), func=Argv.check_call)
self.config.run("install", "-n", cwd=str(self.path), func=Argv.check_call)
return True
return False
@@ -92,10 +116,15 @@ class PoetryAPI(object):
)
def freeze(self):
return {"poetry": self.config.run("show", cwd=str(self.path)).splitlines()}
lines = self.config.run("show", cwd=str(self.path)).splitlines()
lines = [[p for p in line.split(' ') if p] for line in lines]
return {"pip": [parts[0]+'=='+parts[1]+' # '+' '.join(parts[2:]) for parts in lines]}
def get_python_command(self, extra):
return Argv("poetry", "run", "python", *extra)
if check_if_command_exists("poetry"):
return Argv("poetry", "run", "python", *extra)
else:
return Argv(self.config._python, "-m", "poetry", "run", "python", *extra)
def upgrade_pip(self, *args, **kwargs):
pass

View File

@@ -10,10 +10,9 @@ from typing import Text
import attr
import requests
from semantic_version import Version, Spec
import six
from .requirements import SimpleSubstitution, FatalSpecsResolutionError
from .requirements import SimpleSubstitution, FatalSpecsResolutionError, SimpleVersion
OS_TO_WHEEL_NAME = {"linux": "linux_x86_64", "windows": "win_amd64"}
@@ -155,10 +154,15 @@ class PytorchRequirement(SimpleSubstitution):
self.os = os_name or self.get_platform()
self.cuda = "cuda{}".format(self.cuda_version).lower()
self.python_version_string = str(self.config["agent.default_python"])
self.python_semantic_version = Version.coerce(
self.python_version_string, partial=True
)
self.python = "python{}.{}".format(self.python_semantic_version.major, self.python_semantic_version.minor)
self.python_major_minor_str = '.'.join(self.python_version_string.split('.')[:2])
if '.' not in self.python_major_minor_str:
raise PytorchResolutionError(
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
"must have both major and minor parts of the version (for example: '3.7')".format(
self.python_version_string
)
)
self.python = "python{}".format(self.python_major_minor_str)
self.exceptions = [
PytorchResolutionError(message)
@@ -188,9 +192,7 @@ class PytorchRequirement(SimpleSubstitution):
"""
Make sure python version has both major and minor versions as required for choosing pytorch wheel
"""
if self.is_pip and not (
self.python_semantic_version.major and self.python_semantic_version.minor
):
if self.is_pip and not self.python_major_minor_str:
raise PytorchResolutionError(
"invalid python version {!r} defined in configuration file, key 'agent.default_python': "
"must have both major and minor parts of the version (for example: '3.7')".format(
@@ -215,8 +217,9 @@ class PytorchRequirement(SimpleSubstitution):
links_parser = LinksHTMLParser()
links_parser.feed(requests.get(torch_url, timeout=10).text)
platform_wheel = "win" if self.get_platform() == "windows" else self.get_platform()
py_ver = "{0.major}{0.minor}".format(self.python_semantic_version)
py_ver = self.python_major_minor_str.replace('.', '')
url = None
last_v = None
# search for our package
for l in links_parser.links:
parts = l.split('/')[-1].split('-')
@@ -225,21 +228,42 @@ class PytorchRequirement(SimpleSubstitution):
if parts[0] != req.name:
continue
# version (ignore +cpu +cu92 etc. + is %2B in the file link)
if parts[1].split('%')[0].split('+')[0] != req.specs[0][1]:
# version ignore .postX suffix (treat as regular version)
try:
v = str(parts[1].split('%')[0].split('+')[0])
except Exception:
continue
if not req.compare_version(v) or \
(last_v and SimpleVersion.compare_versions(last_v, '>', v, ignore_sub_versions=False)):
continue
if not parts[2].endswith(py_ver):
continue
if platform_wheel not in parts[4]:
continue
url = '/'.join(torch_url.split('/')[:-1] + l.split('/'))
break
last_v = v
return url
def get_url_for_platform(self, req):
assert self.package_manager == "pip"
assert self.os != "mac"
assert req.specs
# check if package is already installed with system packages
try:
if self.config.get("agent.package_manager.system_site_packages"):
from pip._internal.commands.show import search_packages_info
installed_torch = list(search_packages_info([req.name]))
# notice the comparision order, the first part will make sure we have a valid installed package
if installed_torch[0]['version'] and req.compare_version(installed_torch[0]['version']):
print('PyTorch: requested "{}" version {}, using pre-installed version {}'.format(
req.name, req.specs[0] if req.specs else 'unspecified', installed_torch[0]['version']))
# package already installed, do nothing
return str(req), True
except:
pass
# make sure we have a specific version to retrieve
if not req.specs:
req.specs = [('>', '0')]
try:
req.specs[0] = (req.specs[0][0], req.specs[0][1].split('+')[0])
except:
@@ -266,7 +290,7 @@ class PytorchRequirement(SimpleSubstitution):
if not url:
url = PytorchWheel(
torch_version=fix_version(version),
python="{0.major}{0.minor}".format(self.python_semantic_version),
python=self.python_major_minor_str.replace('.', ''),
os_name=self.os,
cuda_version=self.cuda_version,
).make_url()
@@ -280,20 +304,17 @@ class PytorchRequirement(SimpleSubstitution):
@staticmethod
def match_version(req, options):
versioned_options = sorted(
((Version(fix_version(key)), value) for key, value in options.items()),
((fix_version(key), value) for key, value in options.items()),
key=itemgetter(0),
reverse=True,
)
req.specs = [(op, fix_version(version)) for op, version in req.specs]
if req.specs:
specs = Spec(req.format_specs())
else:
specs = None
try:
return next(
replacement
for version, replacement in versioned_options
if not specs or version in specs
if req.compare_version(version)
)
except StopIteration:
raise PytorchResolutionError(

View File

@@ -8,9 +8,8 @@ from copy import deepcopy
from itertools import chain, starmap
from operator import itemgetter
from os import path
from typing import Text, List, Type, Optional, Tuple
from typing import Text, List, Type, Optional, Tuple, Dict
import semantic_version
from pathlib2 import Path
from pyhocon import ConfigTree
from requirements import parse
@@ -48,7 +47,7 @@ class MarkerRequirement(object):
def tostr(self, markers=True):
if not self.uri:
parts = [self.name]
parts = [self.name or self.line]
if self.extras:
parts.append('[{0}]'.format(','.join(sorted(self.extras))))
@@ -69,8 +68,19 @@ class MarkerRequirement(object):
def __repr__(self):
return '{self.__class__.__name__}[{self}]'.format(self=self)
def format_specs(self):
return ','.join(starmap(operator.add, self.specs))
def format_specs(self, num_parts=None, max_num_parts=None):
max_num_parts = max_num_parts or num_parts
if max_num_parts is None or not self.specs:
return ','.join(starmap(operator.add, self.specs))
op, version = self.specs[0]
for v in self._sub_versions_pep440:
version = version.replace(v, '.')
if num_parts:
version = (version.strip('.').split('.') + ['0'] * num_parts)[:max_num_parts]
else:
version = version.strip('.').split('.')[:max_num_parts]
return op+'.'.join(version)
def __getattr__(self, item):
return getattr(self.req, item)
@@ -99,6 +109,186 @@ class MarkerRequirement(object):
else:
self.specs = greater + smaller
def compare_version(self, requested_version, op=None, num_parts=3):
"""
compare the requested version with the one we have in the spec,
If the requested version is 1.2.3 the self.spec should be 1.2.3*
If the requested version is 1.2 the self.spec should be 1.2*
etc.
:param str requested_version:
:param str op: '==', '>', '>=', '<=', '<', '~='
:param int num_parts: number of parts to compare
:return: True if we answer the requested version
"""
# if we have no specific version, we cannot compare, so assume it's okay
if not self.specs:
return True
version = self.specs[0][1]
op = (op or self.specs[0][0]).strip()
return SimpleVersion.compare_versions(requested_version, op, version)
class SimpleVersion:
_sub_versions_pep440 = ['a', 'b', 'rc', '.post', '.dev', '+', ]
VERSION_PATTERN = r"""
v?
(?:
(?:(?P<epoch>[0-9]+)!)? # epoch
(?P<release>[0-9]+(?:\.[0-9]+)*) # release segment
(?P<pre> # pre-release
[-_\.]?
(?P<pre_l>(a|b|c|rc|alpha|beta|pre|preview))
[-_\.]?
(?P<pre_n>[0-9]+)?
)?
(?P<post> # post release
(?:-(?P<post_n1>[0-9]+))
|
(?:
[-_\.]?
(?P<post_l>post|rev|r)
[-_\.]?
(?P<post_n2>[0-9]+)?
)
)?
(?P<dev> # dev release
[-_\.]?
(?P<dev_l>dev)
[-_\.]?
(?P<dev_n>[0-9]+)?
)?
)
(?:\+(?P<local>[a-z0-9]+(?:[-_\.][a-z0-9]+)*))? # local version
"""
_local_version_separators = re.compile(r"[\._-]")
_regex = re.compile(r"^\s*" + VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE)
@classmethod
def compare_versions(cls, version_a, op, version_b, ignore_sub_versions=True):
"""
Compare two versions based on the op operator
returns bool(version_a op version_b)
Notice: Ignores a/b/rc/post/dev markers on the version
:param str version_a:
:param str op: '==', '===', '>', '>=', '<=', '<', '~='
:param str version_b:
:param bool ignore_sub_versions: if true compare only major.minor.patch
(ignore a/b/rc/post/dev in the comparison)
:return bool: version_a op version_b
"""
if not version_b:
return True
num_parts = 3
if op == '~=':
num_parts = max(num_parts, 2)
op = '=='
ignore_sub_versions = True
elif op == '===':
op = '=='
try:
version_a_key = cls._get_match_key(cls._regex.search(version_a), num_parts, ignore_sub_versions)
version_b_key = cls._get_match_key(cls._regex.search(version_b), num_parts, ignore_sub_versions)
except:
# revert to string based
for v in cls._sub_versions_pep440:
version_a = version_a.replace(v, '.')
version_b = version_b.replace(v, '.')
version_a = (version_a.strip('.').split('.') + ['0'] * num_parts)[:num_parts]
version_b = (version_b.strip('.').split('.') + ['0'] * num_parts)[:num_parts]
version_a_key = ''
version_b_key = ''
for i in range(num_parts):
pad = '{:0>%d}.' % max([9, 1 + len(version_a[i]), 1 + len(version_b[i])])
version_a_key += pad.format(version_a[i])
version_b_key += pad.format(version_b[i])
if op == '==':
return version_a_key == version_b_key
if op == '<=':
return version_a_key <= version_b_key
if op == '>=':
return version_a_key >= version_b_key
if op == '>':
return version_a_key > version_b_key
if op == '<':
return version_a_key < version_b_key
raise ValueError('Unrecognized comparison operator [{}]'.format(op))
@staticmethod
def _parse_letter_version(
letter, # type: str
number, # type: Union[str, bytes, SupportsInt]
):
# type: (...) -> Optional[Tuple[str, int]]
if letter:
# We consider there to be an implicit 0 in a pre-release if there is
# not a numeral associated with it.
if number is None:
number = 0
# We normalize any letters to their lower case form
letter = letter.lower()
# We consider some words to be alternate spellings of other words and
# in those cases we want to normalize the spellings to our preferred
# spelling.
if letter == "alpha":
letter = "a"
elif letter == "beta":
letter = "b"
elif letter in ["c", "pre", "preview"]:
letter = "rc"
elif letter in ["rev", "r"]:
letter = "post"
return letter, int(number)
if not letter and number:
# We assume if we are given a number, but we are not given a letter
# then this is using the implicit post release syntax (e.g. 1.0-1)
letter = "post"
return letter, int(number)
return ()
@staticmethod
def _get_match_key(match, num_parts, ignore_sub_versions):
if ignore_sub_versions:
return (0, tuple(int(i) for i in match.group("release").split(".")[:num_parts]),
(), (), (), (),)
return (
int(match.group("epoch")) if match.group("epoch") else 0,
tuple(int(i) for i in match.group("release").split(".")[:num_parts]),
SimpleVersion._parse_letter_version(match.group("pre_l"), match.group("pre_n")),
SimpleVersion._parse_letter_version(
match.group("post_l"), match.group("post_n1") or match.group("post_n2")
),
SimpleVersion._parse_letter_version(match.group("dev_l"), match.group("dev_n")),
SimpleVersion._parse_local_version(match.group("local")),
)
@staticmethod
def _parse_local_version(local):
# type: (str) -> Optional[LocalType]
"""
Takes a string like abc.1.twelve and turns it into ("abc", 1, "twelve").
"""
if local is not None:
return tuple(
part.lower() if not part.isdigit() else int(part)
for part in SimpleVersion._local_version_separators.split(local)
)
return ()
@six.add_metaclass(ABCMeta)
class RequirementSubstitution(object):
@@ -177,13 +367,20 @@ class SimpleSubstitution(RequirementSubstitution):
if req.specs:
_, version_number = req.specs[0]
assert semantic_version.Version(version_number, partial=True)
# assert packaging_version.parse(version_number)
else:
version_number = self.get_pip_version(self.name)
req.specs = [('==', version_number + self.suffix)]
return Text(req)
def replace_back(self, list_of_requirements): # type: (Dict) -> Dict
"""
:param list_of_requirements: {'pip': ['a==1.0', ]}
:return: {'pip': ['a==1.0', ]}
"""
return list_of_requirements
@six.add_metaclass(ABCMeta)
class CudaSensitiveSubstitution(SimpleSubstitution):
@@ -235,15 +432,17 @@ class RequirementsManager(object):
return None
def replace(self, requirements): # type: (Text) -> Text
def safe_parse(req_str):
try:
return next(parse(req_str))
except Exception as ex:
return Requirement(req_str)
parsed_requirements = tuple(
map(
MarkerRequirement,
filter(
None,
parse(requirements)
if isinstance(requirements, six.text_type)
else (next(parse(line), None) for line in requirements)
)
[safe_parse(line) for line in (requirements.splitlines()
if isinstance(requirements, six.text_type) else requirements)]
)
)
if not parsed_requirements:
@@ -258,7 +457,7 @@ class RequirementsManager(object):
warning('could not resolve python wheel replacement for {}'.format(req))
raise
except Exception:
warning('could not resolve python wheel replacement for {}, '
warning('could not resolve python wheel replacement for \"{}\", '
'using original requirements line: {}'.format(req, i))
return None
@@ -280,6 +479,14 @@ class RequirementsManager(object):
except Exception as ex:
print('RequirementsManager handler {} raised exception: {}'.format(h, ex))
def replace_back(self, requirements):
for h in self.handlers:
try:
requirements = h.replace_back(requirements)
except Exception:
pass
return requirements
@staticmethod
def get_cuda_version(config): # type: (ConfigTree) -> (Text, Text)
# we assume os.environ already updated the config['agent.cuda_version'] & config['agent.cudnn_version']

View File

@@ -42,7 +42,9 @@ class VcsFactory(object):
:param location: (desired) clone location
"""
url = execution_info.repository
is_git = url.endswith(cls.GIT_SUFFIX)
# We only support git, hg is deprecated
is_git = True
# is_git = url.endswith(cls.GIT_SUFFIX)
vcs_cls = Git if is_git else Hg
revision = (
execution_info.version_num
@@ -454,7 +456,17 @@ class Git(VCS):
)
def pull(self):
self.call("fetch", "--all", cwd=self.location)
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
def checkout(self): # type: () -> None
"""
Checkout repository at specified revision
"""
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
try:
self.call("submodule", "update", "--recursive", cwd=self.location)
except:
pass
info_commands = dict(
url=Argv(executable_name, "ls-remote", "--get-url", "origin"),

View File

@@ -4,11 +4,12 @@ from time import sleep
from glob import glob
from tempfile import gettempdir, NamedTemporaryFile
from trains_agent.definitions import ENV_K8S_HOST_MOUNT
from trains_agent.helper.base import warning
class Singleton(object):
prefix = 'trainsagent'
prefix = '.trainsagent'
sep = '_'
ext = '.tmp'
worker_id = None
@@ -19,7 +20,7 @@ class Singleton(object):
_lock_timeout = 10
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None):
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
"""
# Exit the process if another instance of us is using the same worker_id
@@ -28,7 +29,7 @@ class Singleton(object):
:return: (str worker_id, int slot_number) Return None value on instance already running
"""
# try to lock file
lock_file = os.path.join(gettempdir(), cls._lock_file_name)
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
timeout = 0
while os.path.exists(lock_file):
if timeout > cls._lock_timeout:
@@ -46,7 +47,8 @@ class Singleton(object):
f.write(bytes(os.getpid()))
f.flush()
try:
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name)
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
api_client=api_client)
except:
ret = None, None
@@ -58,12 +60,12 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None):
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
temp_folder = gettempdir()
temp_folder = cls._get_temp_folder()
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
slots = {}
for file in files:
@@ -73,8 +75,24 @@ class Singleton(object):
except Exception:
# something is wrong, use non existing pid and delete the file
pid = -1
uid, slot = None, None
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
pass
worker = None
if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid:
try:
worker = [w for w in api_client.workers.get_all() if w.id == uid]
except Exception:
worker = None
# count active instances and delete dead files
if not psutil.pid_exists(pid):
if not worker and not psutil.pid_exists(pid):
# delete the file
try:
os.remove(os.path.join(file))
@@ -83,11 +101,7 @@ class Singleton(object):
continue
instance_num += 1
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
if slot is None:
continue
if uid == unique_worker_id:
@@ -110,10 +124,20 @@ class Singleton(object):
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
# create lock
cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep,
suffix=cls.ext)
cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(),
prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id
return cls.worker_id, cls.instance_slot
@classmethod
def _get_temp_folder(cls):
if os.environ.get(ENV_K8S_HOST_MOUNT):
return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1]
return gettempdir()
@classmethod
def get_slot(cls):
return cls.instance_slot or 0

View File

@@ -1,3 +1,4 @@
import itertools
from functools import partial
from importlib import import_module
import argparse
@@ -24,8 +25,17 @@ def get_parser():
from .worker import COMMANDS
subparsers = top_parser.add_subparsers(dest='command')
for c in COMMANDS:
parser = subparsers.add_parser(name=c, help=COMMANDS[c]['help'])
for a in COMMANDS[c].get('args', {}).keys():
parser.add_argument(a, **COMMANDS[c]['args'][a])
parser = subparsers.add_parser(name=c, help=COMMANDS[c]["help"])
groups = itertools.groupby(
sorted(
COMMANDS[c].get("args", {}).items(), key=lambda x: x[1].get("group", "")
),
key=lambda x: x[1].pop("group", ""),
)
for group_name, group in groups:
p = parser if not group_name else parser.add_argument_group(group_name)
for key, value in group:
aliases = value.pop("aliases", [])
p.add_argument(key, *aliases, **value)
return top_parser

View File

@@ -37,21 +37,29 @@ DAEMON_ARGS = dict({
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
'action': 'store_true',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
'group': 'Docker support',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
'group': 'Docker support',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
'group': 'Docker support',
},
'--force-current-version': {
'help': 'Force trains-agent to use the current trains-agent version when running in the docker',
'action': 'store_true',
'group': 'Docker support',
},
'--queue': {
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
@@ -64,6 +72,11 @@ DAEMON_ARGS = dict({
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
'--detached': {
'help': 'Detached mode, run agent in the background',
'action': 'store_true',
'aliases': ['-d'],
},
}, **WORKER_ARGS)
@@ -97,6 +110,17 @@ COMMANDS = {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
},
'--clone': {
'help': 'Clone the experiment before execution, and execute the cloned experiment',
'action': 'store_true',
},
}, **WORKER_ARGS),
},
'build': {

View File

@@ -15,7 +15,7 @@ from pyhocon import ConfigFactory, HOCONConverter, ConfigTree
from trains_agent.backend_api.session import Session as _Session, Request
from trains_agent.backend_api.session.client import APIClient
from trains_agent.backend_config.defs import LOCAL_CONFIG_FILE_OVERRIDE_VAR, LOCAL_CONFIG_FILES
from trains_agent.definitions import ENVIRONMENT_CONFIG
from trains_agent.definitions import ENVIRONMENT_CONFIG, ENV_TASK_EXECUTE_AS_USER
from trains_agent.errors import APIError
from trains_agent.helper.base import HOCONEncoder
from trains_agent.helper.process import Argv
@@ -75,7 +75,8 @@ class Session(_Session):
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
if kwargs.get('gpus'):
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
if kwargs.get('only_load_config'):
from trains_agent.backend_api.config import load
@@ -86,7 +87,7 @@ class Session(_Session):
self.trace = kwargs.get('trace', False)
self._config_file = kwargs.get('config_file') or \
os.environ.get(LOCAL_CONFIG_FILE_OVERRIDE_VAR) or LOCAL_CONFIG_FILES[0]
self.api_client = APIClient(session=self, api_version="2.4")
self.api_client = APIClient(session=self, api_version="2.5")
# HACK make sure we have python version to execute,
# if nothing was specific, use the one that runs us
def_python = ConfigValue(self.config, "agent.default_python")
@@ -111,6 +112,17 @@ class Session(_Session):
# override with environment variables
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
for config_key, env_config in ENVIRONMENT_CONFIG.items():
# check if the propery is of a list:
if config_key.endswith('.0'):
if all(not i.get() for i in env_config.values()):
continue
parent = config_key.partition('.0')[0]
if not self.config[parent]:
self.config.put(parent, [])
self.config.put(parent, self.config[parent] + [ConfigTree((k, v.get()) for k, v in env_config.items())])
continue
value = env_config.get()
if not value:
continue
@@ -165,7 +177,11 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path',)
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
if os.environ.get(ENV_TASK_EXECUTE_AS_USER):
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])
singleton_folders = tuple(list(singleton_folders) + ['sdk.storage.cache.default_base_dir'])
for key in folder_keys:
folder_key = ConfigValue(self.config, key)
@@ -206,6 +222,15 @@ class Session(_Session):
config.pop('env', None)
if remove_secret_keys:
recursive_remove_secrets(config, secret_keys=remove_secret_keys)
# remove logging.loggers.urllib3.level from the print
try:
config['logging']['loggers']['urllib3'].pop('level', None)
except (KeyError, TypeError, AttributeError):
pass
try:
config['logging'].pop('version', None)
except (KeyError, TypeError, AttributeError):
pass
config = ConfigFactory.from_dict(config)
self.log.debug("Run by interpreter: %s", sys.executable)
print(

View File

@@ -1 +1 @@
__version__ = '0.12.2'
__version__ = '0.14.1'