Compare commits

..

36 Commits

Author SHA1 Message Date
allegroai
ea0ed4807e Version bump to v0.14.0 2020-03-12 19:42:32 +02:00
allegroai
389600b91e Fix git checkout with submodules 2020-03-12 18:39:47 +02:00
allegroai
5fb2550212 Update to backend API v2.5 2020-03-12 18:39:10 +02:00
allegroai
15e9e6b778 Fix "execute --clone" support 2020-03-12 18:38:35 +02:00
allegroai
aa75b92e46 Prefer docker image from command line over the one in the experiment 2020-03-12 18:35:49 +02:00
allegroai
757210d5b3 Add support for "execute --docker" and for cloning an experiment before execution 2020-03-12 18:33:07 +02:00
allegroai
00eb2f10ec Version bump to v0.13.3 2020-03-09 16:07:50 +02:00
allegroai
3393372b9c Do not share apt cache among agents on the same machine 2020-03-09 12:38:51 +02:00
allegroai
f2d2d702de Fix k8s support to allow a specific network for the docker (do not use the parent daemon network definition) 2020-03-09 12:38:32 +02:00
allegroai
e3d0680d39 Improve Unicode/UTF stdout handling 2020-03-09 12:34:48 +02:00
allegroai
618c2ac5c4 Add default storage environment vars to generated agent configuration 2020-03-09 12:33:03 +02:00
allegroai
0272c4c79c Add "--force-current-version" daemon command-line flag 2020-03-09 12:31:43 +02:00
allegroai
ff8cf63abf Add "--force-current-version" daemon command-line flag 2020-03-09 12:27:39 +02:00
allegroai
2c7c7f5b44 Add K8s/trains glue service example 2020-03-05 14:10:08 +02:00
allegroai
01f57c1e44 Create missing queues when starting the AWS dynamic cluster management service 2020-03-05 14:08:32 +02:00
allegroai
47bcd3839a Pass correct GPU limit when skipping gpus flag in docker mode 2020-03-05 14:07:44 +02:00
allegroai
0a3a8a1c52 Add support for mounting dockerized experiment folders to host when running on K8s in daemon mode 2020-03-05 13:13:03 +02:00
allegroai
231a907cff Add support for running daemon inside a K8s pod in daemon mode 2020-03-05 13:03:36 +02:00
allegroai
8f95eecf2e Add TRAINS_AGENT_EXEC_USER support for multiple daemon instances 2020-03-05 12:46:53 +02:00
allegroai
81008ee00e Add support for launching a specific python version based on Task.script.binary 2020-03-01 17:15:18 +02:00
allegroai
25bc44c0cf Add poetry to the list of supported package managers 2020-03-01 17:13:15 +02:00
allegroai
f838c8fc70 Allow providing queue names to daemon 2020-02-26 16:58:25 +02:00
allegroai
596093aac6 Version bump to v0.13.2 2020-02-23 16:25:14 +02:00
allegroai
8f23f3b4c0 Add support for pulling recursive git modules as as well as main project 2020-02-23 15:48:12 +02:00
allegroai
95d503afdd Fix pip install or upgrade with limit in conda 2020-02-23 15:47:28 +02:00
allegroai
73ee33be99 Print error in case Poetry configuration failed 2020-02-23 14:43:21 +02:00
allegroai
ee3adf625f Add single-series-per-graph setting to the configuration example 2020-02-23 12:38:14 +02:00
allegroai
afec38a50e Add missing models service 2020-02-18 11:31:58 +02:00
allegroai
f9c60904f4 version bump 2020-02-12 11:23:53 +02:00
allegroai
a09dc85c67 Limit virtualenv version to <20 due to an import issue in v20.0.0 2020-02-12 11:23:48 +02:00
allegroai
5d74f4b376 version bump 2020-02-10 10:47:20 +02:00
allegroai
d558c66d3c Do not stop experiments if network is down 2020-02-10 10:47:13 +02:00
allegroai
714c6a05d0 Add .bashrc reloading before running trains-agent in the AWS dynamic cluster management service 2020-02-10 10:36:00 +02:00
allegroai
43b2f7f41d version bump 2020-02-04 18:06:45 +02:00
allegroai
28d752d568 Preinstall numpy if it exists in the requirements (temporary fix) 2020-02-04 18:06:25 +02:00
allegroai
6d091d8e08 Add experiment archiving example 2020-02-02 14:51:09 +02:00
28 changed files with 21795 additions and 122 deletions

View File

@@ -38,7 +38,7 @@ agent {
# currently supported pip and conda
# poetry is used if pip selected and repository contains poetry.lock file
package_manager: {
# supported options: pip, conda
# supported options: pip, conda, poetry
type: pip,
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
@@ -141,6 +141,9 @@ sdk {
quality: 87
subsampling: 0
}
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to True, each series should have its own graph)
tensorboard_single_series_per_graph: False
}
network {

View File

@@ -0,0 +1,59 @@
#!/usr/bin/python3
"""
An example script that cleans up failed experiments by moving them to the archive
"""
import argparse
from datetime import datetime
from trains_agent import APIClient
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--project", "-P", help="Project ID. Only clean up experiments from this project")
parser.add_argument("--user", "-U", help="User ID. Only clean up experiments assigned to this user")
parser.add_argument(
"--status", "-S", default="failed",
help="Experiment status. Only clean up experiments with this status (default %(default)s)"
)
parser.add_argument(
"--iterations", "-I", type=int,
help="Number of iterations. Only clean up experiments with less or equal number of iterations"
)
parser.add_argument(
"--sec-from-start", "-T", type=int,
help="Seconds from start time. "
"Only clean up experiments if less or equal number of seconds have elapsed since started"
)
args = parser.parse_args()
client = APIClient()
tasks = client.tasks.get_all(
project=[args.project] if args.project else None,
user=[args.user] if args.user else None,
status=[args.status] if args.status else None,
system_tags=["-archived"]
)
count = 0
for task in tasks:
if args.iterations and (task.last_iteration or 0) > args.iterations:
continue
if args.sec_from_start:
if not task.started:
continue
if (datetime.utcnow() - task.started.replace(tzinfo=None)).total_seconds() > args.sec_from_start:
continue
try:
client.tasks.edit(
task=task.id,
system_tags=(task.system_tags or []) + ["archived"],
force=True
)
count += 1
except Exception as ex:
print("Failed editing experiment: {}".format(ex))
print("Cleaned up {} experiments".format(count))

View File

@@ -292,6 +292,7 @@
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
" {bash_script}\n",
" source ~/.bashrc\n",
" python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}\n",
" shutdown\n",
" \"\"\".format(\n",
@@ -443,6 +444,12 @@
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
" api_client = APIClient()\n",
"\n",
" # Verify the requested queues exist and create those that doesn't exist\n",
" all_queues = [q.name for q in list(api_client.queues.get_all())]\n",
" missing_queues = [q for q in QUEUES if q not in all_queues]\n",
" for q in missing_queues:\n",
" api_client.queues.create(q)\n",
"\n",
" idle_workers = {}\n",
" while True:\n",
" queue_name_to_id = {\n",

View File

@@ -20,4 +20,4 @@ six>=1.11.0
tqdm>=4.19.5
typing>=3.6.4
urllib3>=1.21.1
virtualenv>=16
virtualenv>=16,<20

View File

@@ -1,9 +1,10 @@
from .v2_4 import auth
from .v2_4 import debug
from .v2_4 import queues
from .v2_4 import tasks
from .v2_4 import workers
from .v2_4 import events
from .v2_5 import auth
from .v2_5 import debug
from .v2_5 import queues
from .v2_5 import tasks
from .v2_5 import workers
from .v2_5 import events
from .v2_5 import models
__all__ = [
'auth',
@@ -12,4 +13,5 @@ __all__ = [
'tasks',
'workers',
'events',
'models',
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,623 @@
"""
auth service
This service provides authentication management and authorization
validation for the entire system.
"""
import six
import types
from datetime import datetime
import enum
from dateutil.parser import parse as parse_datetime
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
class Credentials(NonStrictDataModel):
"""
:param access_key: Credentials access key
:type access_key: str
:param secret_key: Credentials secret key
:type secret_key: str
"""
_schema = {
'properties': {
'access_key': {
'description': 'Credentials access key',
'type': ['string', 'null'],
},
'secret_key': {
'description': 'Credentials secret key',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, access_key=None, secret_key=None, **kwargs):
super(Credentials, self).__init__(**kwargs)
self.access_key = access_key
self.secret_key = secret_key
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
@schema_property('secret_key')
def secret_key(self):
return self._property_secret_key
@secret_key.setter
def secret_key(self, value):
if value is None:
self._property_secret_key = None
return
self.assert_isinstance(value, "secret_key", six.string_types)
self._property_secret_key = value
class CredentialKey(NonStrictDataModel):
"""
:param access_key:
:type access_key: str
:param last_used:
:type last_used: datetime.datetime
:param last_used_from:
:type last_used_from: str
"""
_schema = {
'properties': {
'access_key': {'description': '', 'type': ['string', 'null']},
'last_used': {
'description': '',
'format': 'date-time',
'type': ['string', 'null'],
},
'last_used_from': {'description': '', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, access_key=None, last_used=None, last_used_from=None, **kwargs):
super(CredentialKey, self).__init__(**kwargs)
self.access_key = access_key
self.last_used = last_used
self.last_used_from = last_used_from
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
@schema_property('last_used')
def last_used(self):
return self._property_last_used
@last_used.setter
def last_used(self, value):
if value is None:
self._property_last_used = None
return
self.assert_isinstance(value, "last_used", six.string_types + (datetime,))
if not isinstance(value, datetime):
value = parse_datetime(value)
self._property_last_used = value
@schema_property('last_used_from')
def last_used_from(self):
return self._property_last_used_from
@last_used_from.setter
def last_used_from(self, value):
if value is None:
self._property_last_used_from = None
return
self.assert_isinstance(value, "last_used_from", six.string_types)
self._property_last_used_from = value
class CreateCredentialsRequest(Request):
"""
Creates a new set of credentials for the authenticated user.
New key/secret is returned.
Note: Secret will never be returned in any other API call.
If a secret is lost or compromised, the key should be revoked
and a new set of credentials can be created.
"""
_service = "auth"
_action = "create_credentials"
_version = "2.1"
_schema = {
'additionalProperties': False,
'definitions': {},
'properties': {},
'type': 'object',
}
class CreateCredentialsResponse(Response):
"""
Response of auth.create_credentials endpoint.
:param credentials: Created credentials
:type credentials: Credentials
"""
_service = "auth"
_action = "create_credentials"
_version = "2.1"
_schema = {
'definitions': {
'credentials': {
'properties': {
'access_key': {
'description': 'Credentials access key',
'type': ['string', 'null'],
},
'secret_key': {
'description': 'Credentials secret key',
'type': ['string', 'null'],
},
},
'type': 'object',
},
},
'properties': {
'credentials': {
'description': 'Created credentials',
'oneOf': [{'$ref': '#/definitions/credentials'}, {'type': 'null'}],
},
},
'type': 'object',
}
def __init__(
self, credentials=None, **kwargs):
super(CreateCredentialsResponse, self).__init__(**kwargs)
self.credentials = credentials
@schema_property('credentials')
def credentials(self):
return self._property_credentials
@credentials.setter
def credentials(self, value):
if value is None:
self._property_credentials = None
return
if isinstance(value, dict):
value = Credentials.from_dict(value)
else:
self.assert_isinstance(value, "credentials", Credentials)
self._property_credentials = value
class EditUserRequest(Request):
"""
Edit a users' auth data properties
:param user: User ID
:type user: str
:param role: The new user's role within the company
:type role: str
"""
_service = "auth"
_action = "edit_user"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'role': {
'description': "The new user's role within the company",
'enum': ['admin', 'superuser', 'user', 'annotator'],
'type': ['string', 'null'],
},
'user': {'description': 'User ID', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, user=None, role=None, **kwargs):
super(EditUserRequest, self).__init__(**kwargs)
self.user = user
self.role = role
@schema_property('user')
def user(self):
return self._property_user
@user.setter
def user(self, value):
if value is None:
self._property_user = None
return
self.assert_isinstance(value, "user", six.string_types)
self._property_user = value
@schema_property('role')
def role(self):
return self._property_role
@role.setter
def role(self, value):
if value is None:
self._property_role = None
return
self.assert_isinstance(value, "role", six.string_types)
self._property_role = value
class EditUserResponse(Response):
"""
Response of auth.edit_user endpoint.
:param updated: Number of users updated (0 or 1)
:type updated: float
:param fields: Updated fields names and values
:type fields: dict
"""
_service = "auth"
_action = "edit_user"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'fields': {
'additionalProperties': True,
'description': 'Updated fields names and values',
'type': ['object', 'null'],
},
'updated': {
'description': 'Number of users updated (0 or 1)',
'enum': [0, 1],
'type': ['number', 'null'],
},
},
'type': 'object',
}
def __init__(
self, updated=None, fields=None, **kwargs):
super(EditUserResponse, self).__init__(**kwargs)
self.updated = updated
self.fields = fields
@schema_property('updated')
def updated(self):
return self._property_updated
@updated.setter
def updated(self, value):
if value is None:
self._property_updated = None
return
self.assert_isinstance(value, "updated", six.integer_types + (float,))
self._property_updated = value
@schema_property('fields')
def fields(self):
return self._property_fields
@fields.setter
def fields(self, value):
if value is None:
self._property_fields = None
return
self.assert_isinstance(value, "fields", (dict,))
self._property_fields = value
class GetCredentialsRequest(Request):
"""
Returns all existing credential keys for the authenticated user.
Note: Only credential keys are returned.
"""
_service = "auth"
_action = "get_credentials"
_version = "2.1"
_schema = {
'additionalProperties': False,
'definitions': {},
'properties': {},
'type': 'object',
}
class GetCredentialsResponse(Response):
"""
Response of auth.get_credentials endpoint.
:param credentials: List of credentials, each with an empty secret field.
:type credentials: Sequence[CredentialKey]
"""
_service = "auth"
_action = "get_credentials"
_version = "2.1"
_schema = {
'definitions': {
'credential_key': {
'properties': {
'access_key': {'description': '', 'type': ['string', 'null']},
'last_used': {
'description': '',
'format': 'date-time',
'type': ['string', 'null'],
},
'last_used_from': {
'description': '',
'type': ['string', 'null'],
},
},
'type': 'object',
},
},
'properties': {
'credentials': {
'description': 'List of credentials, each with an empty secret field.',
'items': {'$ref': '#/definitions/credential_key'},
'type': ['array', 'null'],
},
},
'type': 'object',
}
def __init__(
self, credentials=None, **kwargs):
super(GetCredentialsResponse, self).__init__(**kwargs)
self.credentials = credentials
@schema_property('credentials')
def credentials(self):
return self._property_credentials
@credentials.setter
def credentials(self, value):
if value is None:
self._property_credentials = None
return
self.assert_isinstance(value, "credentials", (list, tuple))
if any(isinstance(v, dict) for v in value):
value = [CredentialKey.from_dict(v) if isinstance(v, dict) else v for v in value]
else:
self.assert_isinstance(value, "credentials", CredentialKey, is_array=True)
self._property_credentials = value
class LoginRequest(Request):
"""
Get a token based on supplied credentials (key/secret).
Intended for use by users with key/secret credentials that wish to obtain a token
for use with other services. Token will be limited by the same permissions that
exist for the credentials used in this call.
:param expiration_sec: Requested token expiration time in seconds. Not
guaranteed, might be overridden by the service
:type expiration_sec: int
"""
_service = "auth"
_action = "login"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'expiration_sec': {
'description': 'Requested token expiration time in seconds. \n Not guaranteed, might be overridden by the service',
'type': ['integer', 'null'],
},
},
'type': 'object',
}
def __init__(
self, expiration_sec=None, **kwargs):
super(LoginRequest, self).__init__(**kwargs)
self.expiration_sec = expiration_sec
@schema_property('expiration_sec')
def expiration_sec(self):
return self._property_expiration_sec
@expiration_sec.setter
def expiration_sec(self, value):
if value is None:
self._property_expiration_sec = None
return
if isinstance(value, float) and value.is_integer():
value = int(value)
self.assert_isinstance(value, "expiration_sec", six.integer_types)
self._property_expiration_sec = value
class LoginResponse(Response):
"""
Response of auth.login endpoint.
:param token: Token string
:type token: str
"""
_service = "auth"
_action = "login"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'token': {'description': 'Token string', 'type': ['string', 'null']},
},
'type': 'object',
}
def __init__(
self, token=None, **kwargs):
super(LoginResponse, self).__init__(**kwargs)
self.token = token
@schema_property('token')
def token(self):
return self._property_token
@token.setter
def token(self, value):
if value is None:
self._property_token = None
return
self.assert_isinstance(value, "token", six.string_types)
self._property_token = value
class LogoutRequest(Request):
"""
Removes the authentication cookie from the current session
"""
_service = "auth"
_action = "logout"
_version = "2.2"
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
class LogoutResponse(Response):
"""
Response of auth.logout endpoint.
"""
_service = "auth"
_action = "logout"
_version = "2.2"
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
class RevokeCredentialsRequest(Request):
"""
Revokes (and deletes) a set (key, secret) of credentials for
the authenticated user.
:param access_key: Credentials key
:type access_key: str
"""
_service = "auth"
_action = "revoke_credentials"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'access_key': {
'description': 'Credentials key',
'type': ['string', 'null'],
},
},
'required': ['key_id'],
'type': 'object',
}
def __init__(
self, access_key=None, **kwargs):
super(RevokeCredentialsRequest, self).__init__(**kwargs)
self.access_key = access_key
@schema_property('access_key')
def access_key(self):
return self._property_access_key
@access_key.setter
def access_key(self, value):
if value is None:
self._property_access_key = None
return
self.assert_isinstance(value, "access_key", six.string_types)
self._property_access_key = value
class RevokeCredentialsResponse(Response):
"""
Response of auth.revoke_credentials endpoint.
:param revoked: Number of credentials revoked
:type revoked: int
"""
_service = "auth"
_action = "revoke_credentials"
_version = "2.1"
_schema = {
'definitions': {},
'properties': {
'revoked': {
'description': 'Number of credentials revoked',
'enum': [0, 1],
'type': ['integer', 'null'],
},
},
'type': 'object',
}
def __init__(
self, revoked=None, **kwargs):
super(RevokeCredentialsResponse, self).__init__(**kwargs)
self.revoked = revoked
@schema_property('revoked')
def revoked(self):
return self._property_revoked
@revoked.setter
def revoked(self, value):
if value is None:
self._property_revoked = None
return
if isinstance(value, float) and value.is_integer():
value = int(value)
self.assert_isinstance(value, "revoked", six.integer_types)
self._property_revoked = value
response_mapping = {
LoginRequest: LoginResponse,
LogoutRequest: LogoutResponse,
CreateCredentialsRequest: CreateCredentialsResponse,
GetCredentialsRequest: GetCredentialsResponse,
RevokeCredentialsRequest: RevokeCredentialsResponse,
EditUserRequest: EditUserResponse,
}

View File

@@ -0,0 +1,194 @@
"""
debug service
Debugging utilities
"""
import six
import types
from datetime import datetime
import enum
from dateutil.parser import parse as parse_datetime
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
class ApiexRequest(Request):
"""
"""
_service = "debug"
_action = "apiex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
class ApiexResponse(Response):
"""
Response of debug.apiex endpoint.
"""
_service = "debug"
_action = "apiex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class EchoRequest(Request):
"""
Return request data
"""
_service = "debug"
_action = "echo"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class EchoResponse(Response):
"""
Response of debug.echo endpoint.
"""
_service = "debug"
_action = "echo"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class ExRequest(Request):
"""
"""
_service = "debug"
_action = "ex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
class ExResponse(Response):
"""
Response of debug.ex endpoint.
"""
_service = "debug"
_action = "ex"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingRequest(Request):
"""
Return a message. Does not require authorization.
"""
_service = "debug"
_action = "ping"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingResponse(Response):
"""
Response of debug.ping endpoint.
:param msg: A friendly message
:type msg: str
"""
_service = "debug"
_action = "ping"
_version = "1.5"
_schema = {
'definitions': {},
'properties': {
'msg': {
'description': 'A friendly message',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, msg=None, **kwargs):
super(PingResponse, self).__init__(**kwargs)
self.msg = msg
@schema_property('msg')
def msg(self):
return self._property_msg
@msg.setter
def msg(self, value):
if value is None:
self._property_msg = None
return
self.assert_isinstance(value, "msg", six.string_types)
self._property_msg = value
class PingAuthRequest(Request):
"""
Return a message. Requires authorization.
"""
_service = "debug"
_action = "ping_auth"
_version = "1.5"
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
class PingAuthResponse(Response):
"""
Response of debug.ping_auth endpoint.
:param msg: A friendly message
:type msg: str
"""
_service = "debug"
_action = "ping_auth"
_version = "1.5"
_schema = {
'definitions': {},
'properties': {
'msg': {
'description': 'A friendly message',
'type': ['string', 'null'],
},
},
'type': 'object',
}
def __init__(
self, msg=None, **kwargs):
super(PingAuthResponse, self).__init__(**kwargs)
self.msg = msg
@schema_property('msg')
def msg(self):
return self._property_msg
@msg.setter
def msg(self, value):
if value is None:
self._property_msg = None
return
self.assert_isinstance(value, "msg", six.string_types)
self._property_msg = value
response_mapping = {
EchoRequest: EchoResponse,
PingRequest: PingResponse,
PingAuthRequest: PingAuthResponse,
ApiexRequest: ApiexResponse,
ExRequest: ExResponse,
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,7 @@ from datetime import datetime
from distutils.spawn import find_executable
from functools import partial
from itertools import chain
from tempfile import gettempdir, mkdtemp
from tempfile import mkdtemp
from time import sleep, time
from typing import Text, Optional, Any, Tuple
@@ -28,9 +28,6 @@ from trains_agent.backend_api.services import queues as queues_api
from trains_agent.backend_api.services import tasks as tasks_api
from pathlib2 import Path
from pyhocon import ConfigTree, ConfigFactory
from requests import Session as HTTPSession
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from six.moves.urllib.parse import quote
from trains_agent.helper.check_update import start_check_update_daemon
@@ -40,7 +37,10 @@ from trains_agent.definitions import (
ENVIRONMENT_SDK_PARAMS,
INVALID_WORKER_ID,
PROGRAM_NAME,
DEFAULT_VENV_UPDATE_URL)
DEFAULT_VENV_UPDATE_URL,
ENV_TASK_EXECUTE_AS_USER,
ENV_K8S_HOST_MOUNT
)
from trains_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
from trains_agent.errors import APIError, CommandFailedError, Sigterm
from trains_agent.helper.base import (
@@ -59,13 +59,17 @@ from trains_agent.helper.base import (
is_conda,
named_temporary_file,
ExecutionInfo,
HOCONEncoder, error, get_python_path, is_linux_platform)
from trains_agent.helper.console import ensure_text
HOCONEncoder,
error,
get_python_path,
is_linux_platform,
rm_file
)
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
from trains_agent.helper.package.base import PackageManager
from trains_agent.helper.package.conda_api import CondaAPI
from trains_agent.helper.package.horovod_req import HorovodRequirement
from trains_agent.helper.package.external_req import ExternalRequirements
from trains_agent.helper.package.pip_api.system import SystemPip
from trains_agent.helper.package.pip_api.venv import VirtualenvPip
from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
from trains_agent.helper.package.pytorch import PytorchRequirement
@@ -73,13 +77,16 @@ from trains_agent.helper.package.requirements import RequirementsManager
from trains_agent.helper.package.venv_update_api import VenvUpdateAPI
from trains_agent.helper.process import (
kill_all_child_processes,
check_if_command_exists,
WorkerParams,
ExitStatus,
Argv,
COMMAND_SUCCESS,
Executable,
get_bash_output, shutdown_docker_process, get_docker_id, commit_docker)
get_bash_output,
shutdown_docker_process,
get_docker_id,
commit_docker
)
from trains_agent.helper.package.cython_req import CythonRequirement
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
from trains_agent.helper.resource_monitor import ResourceMonitor
@@ -229,6 +236,8 @@ class TaskStopSignal(object):
return self._test()
except Exception as ex:
self.command.log_traceback(ex)
# make sure we break nothing
return TaskStopSignal.default
def _test(self):
# type: () -> TaskStopReason
@@ -361,6 +370,7 @@ class Worker(ServiceCommandSection):
self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False)
self._daemon_foreground = None
self._standalone_mode = None
self._force_current_version = None
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
requirements_manager = RequirementsManager(
@@ -390,13 +400,14 @@ class Worker(ServiceCommandSection):
except Exception:
pass
def run_one_task(self, queue, task_id, worker_args):
def run_one_task(self, queue, task_id, worker_args, docker=None):
# type: (Text, Text, WorkerParams) -> ()
"""
Run one task pulled from queue.
:param queue: ID of queue that task was pulled from
:param task_id: ID of task to run
:param worker_args: Worker command line arguments
:param docker: Docker image in which the execution task will run
"""
# start new process and execute task id
print("Running task '{}'".format(task_id))
@@ -422,7 +433,7 @@ class Worker(ServiceCommandSection):
if self.docker_image_func:
try:
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
task_docker_cmd = response.execution.docker_cmd
task_docker_cmd = docker or response.execution.docker_cmd
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
except Exception:
task_docker_cmd = None
@@ -629,11 +640,13 @@ class Worker(ServiceCommandSection):
self.log.debug("starting resource monitor thread")
print("Worker \"{}\" - ".format(self.worker_id), end='')
if not queues:
if queues:
queues = return_list(queues)
queues = [self._resolve_name(q, "queues") for q in queues]
else:
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
queues = [default_queue.id]
queues = return_list(queues)
queues_info = [
self._session.send_api(
queues_api.GetByIdRequest(queue)
@@ -654,9 +667,8 @@ class Worker(ServiceCommandSection):
# print docker image
if docker is not False and docker is not None:
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
self._force_current_version = kwargs.get('force_current_version', False)
self.set_docker_variables(docker)
else:
self.dump_config()
@@ -749,9 +761,11 @@ class Worker(ServiceCommandSection):
):
# type: (...) -> Tuple[Optional[int], TaskStopReason]
def _print_file(file_path, prev_line_count):
with open(file_path, "rt") as f:
with open(file_path, "rb") as f:
binary_text = f.read()
# skip the previously printed lines,
return f.readlines()[prev_line_count:]
blines = binary_text.split(b'\n')[prev_line_count:]
return decode_binary_lines(blines)
stdout = open(stdout_path, "wt")
stderr = open(stderr_path, "wt") if stderr_path else stdout
@@ -780,7 +794,7 @@ class Worker(ServiceCommandSection):
if daemon:
self.send_logs(
task_id=task_id,
lines=["User aborted: stopping task\n"],
lines=["User aborted: stopping task ({})\n".format(str(stop_reason))],
level="ERROR",
)
kill_all_child_processes(process.pid)
@@ -844,7 +858,8 @@ class Worker(ServiceCommandSection):
"""
if not lines:
return 0
print("".join(lines), end="")
print_text("".join(lines))
# remove backspaces from the text log, they look bad.
for i, l in enumerate(lines):
lines[i] = l.replace('\x08', '')
@@ -920,7 +935,15 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
# TODO: make sure we pass the correct python_version
if not python_version:
try:
python_version = current_task.script.binary
python_version = python_version.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_version = '{:.1f}'.format(float(python_version))
except:
python_version = None
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target,
requested_python_version=python_version)
@@ -1030,6 +1053,8 @@ class Worker(ServiceCommandSection):
require_queue=False,
log_file=None,
standalone_mode=None,
docker=False,
clone=False,
**_
):
if not task_id:
@@ -1042,25 +1067,42 @@ class Worker(ServiceCommandSection):
except Exception:
raise ValueError("Could not find task id={}".format(task_id))
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
except Exception:
if require_queue:
raise
if clone:
try:
print("Cloning task id={}".format(task_id))
current_task = self._session.api_client.tasks.get_by_id(
self._session.send_api(
tasks_api.CloneRequest(task=current_task.id, new_task_name='Clone of {}'.format(current_task.name))
).id
)
print("Task cloned, new task id={}".format(current_task.id))
except Exception:
raise CommandFailedError("Cloning failed")
else:
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
try:
res = self._session.api_client.tasks.dequeue(task=current_task.id)
if require_queue and res.meta.result_code != 200:
raise ValueError("Execution required enqueued task, "
"but task id={} is not queued.".format(current_task.id))
except Exception:
if require_queue:
raise
if full_monitoring:
if docker is not False and docker is not None:
self.set_docker_variables(docker)
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
if full_monitoring or docker is not False:
worker_params = WorkerParams(
log_level=log_level,
config_file=self._session.config_file,
debug=self._session.debug_mode,
trace=self._session.trace,
)
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
self.run_one_task(queue='', task_id=task_id, worker_args=worker_params)
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
self.stop_monitor()
self._unregister()
return
@@ -1076,7 +1118,7 @@ class Worker(ServiceCommandSection):
if not disable_monitoring:
self.log.debug("starting resource monitor")
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
execution = self.get_execution_info(current_task)
@@ -1085,7 +1127,16 @@ class Worker(ServiceCommandSection):
except AttributeError:
requirements = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode)
try:
python_ver = current_task.script.binary
python_ver = python_ver.split('/')[-1].replace('python', '')
# if we can cast it, we are good
python_ver = '{:.1f}'.format(float(python_ver))
except:
python_ver = None
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode,
requested_python_version=python_ver)
if not standalone_mode:
if self._default_pip:
@@ -1115,7 +1166,7 @@ class Worker(ServiceCommandSection):
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
# run code
print("Running task id [%s]:" % task_id)
print("Running task id [%s]:" % current_task.id)
extra = ['-u', ]
if optimization:
extra.append(
@@ -1149,7 +1200,7 @@ class Worker(ServiceCommandSection):
)
if repo_info:
self._update_commit_id(task_id, execution, repo_info)
self._update_commit_id(current_task.id, execution, repo_info)
# Add the script CWD to the python path
python_path = get_python_path(script_dir, execution.entry_point, self.package_api) \
@@ -1157,11 +1208,20 @@ class Worker(ServiceCommandSection):
if python_path:
os.environ['PYTHONPATH'] = python_path
print("Starting Task Execution:\n".format(task_id))
# check if we want to run as another user, only supported on linux
if os.environ.get(ENV_TASK_EXECUTE_AS_USER, None) and is_linux_platform():
command, script_dir = self._run_as_user_patch(
command, script_dir, venv_folder,
self._session.config.get('sdk.storage.cache.default_base_dir'),
os.environ.get(ENV_TASK_EXECUTE_AS_USER))
use_execv = False
else:
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
print("Starting Task Execution:\n".format(current_task.id))
exit_code = -1
try:
if disable_monitoring:
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
try:
sys.stdout.flush()
sys.stderr.flush()
@@ -1187,13 +1247,13 @@ class Worker(ServiceCommandSection):
)
print("Storing stdout and stderr log into [%s]" % temp_stdout_fname)
exit_code, _ = self._log_command_output(
task_id=task_id,
task_id=current_task.id,
cmd=command,
stdout_path=temp_stdout_fname,
cwd=script_dir,
)
except KeyboardInterrupt:
self.handle_user_abort(task_id)
self.handle_user_abort(current_task.id)
raise
except Exception as e:
self.log.warning(str(e))
@@ -1210,13 +1270,18 @@ class Worker(ServiceCommandSection):
if not disable_monitoring:
# we need to change task status according to exit code
self.handle_task_termination(task_id, exit_code, TaskStopReason.no_stop)
self.handle_task_termination(current_task.id, exit_code, TaskStopReason.no_stop)
self.stop_monitor()
# unregister the worker
self._unregister()
return 1 if exit_code is None else exit_code
def set_docker_variables(self, docker):
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
self.dump_config(temp_config)
self.docker_image_func = docker_image_func
def get_execution_info(self, current_task):
# type: (...) -> ExecutionInfo
try:
@@ -1498,10 +1563,7 @@ class Worker(ServiceCommandSection):
raise e
finally:
if self._session.debug_mode and temp_file:
try:
Path(temp_file.name).unlink()
except OSError:
pass
rm_file(temp_file.name)
# call post installation callback
requirements_manager.post_install()
# mark as successful installation
@@ -1627,7 +1689,7 @@ class Worker(ServiceCommandSection):
)
def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False):
# type: (str, str) -> Tuple[Path, RequirementsManager]
# type: (str, str, bool) -> Tuple[Path, RequirementsManager]
"""
Install a new python virtual environment, removing the old one if exists
:return: virtualenv directory and requirements manager to use with task
@@ -1640,9 +1702,16 @@ class Worker(ServiceCommandSection):
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
executable_name = 'python'
else:
executable_version, executable_version_suffix, executable_name = self.find_python_executable_for_version(
requested_python_version
)
try:
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(requested_python_version)
except Exception:
def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \
Text(self._session.config.get("agent.default_python", None))
print('Warning: could not locate requested Python version {}, reverting to version {}'.format(
requested_python_version, def_python_version))
executable_version, executable_version_suffix, executable_name = \
self.find_python_executable_for_version(def_python_version)
self._session.config.put("agent.default_python", executable_version)
self._session.config.put("agent.python_binary", executable_name)
@@ -1799,6 +1868,10 @@ class Worker(ServiceCommandSection):
cmds = [cmds]
extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; "
self.temp_config_path = self.temp_config_path or safe_mkstemp(
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
)
docker_cmd = dict(worker_id=self.worker_id,
# docker_image=docker_image,
# docker_arguments=docker_arguments,
@@ -1811,7 +1884,7 @@ class Worker(ServiceCommandSection):
host_cache=host_cache, mounted_cache=mounted_cache_dir,
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
standalone_mode=self._standalone_mode)
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
return temp_config, partial(docker_cmd_functor, docker_cmd)
@staticmethod
@@ -1823,15 +1896,25 @@ class Worker(ServiceCommandSection):
host_cache, mounted_cache,
host_pip_dl, mounted_pip_dl,
host_vcs_cache, mounted_vcs_cache,
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None):
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
force_current_version=None):
docker = 'docker'
base_cmd = [docker, 'run', '-t']
update_scheme = ""
dockers_nvidia_visible_devices = 'all'
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
base_cmd += ['--gpus', 'all', ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
dockers_nvidia_visible_devices
else:
base_cmd += ['--gpus', 'all', ]
elif gpu_devices.strip() and gpu_devices.strip() != 'none':
base_cmd += ['--gpus', 'device='+gpu_devices, ]
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
dockers_nvidia_visible_devices = gpu_devices
else:
base_cmd += ['--gpus', 'device='+gpu_devices, ]
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
@@ -1845,10 +1928,51 @@ class Worker(ServiceCommandSection):
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
base_cmd += [str(a) for a in extra_docker_arguments if a]
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
# check if running inside a kubernetes
if os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT'):
# map network to sibling docker, unless we have other network argument
if not any(a.strip().startswith('--network') for a in base_cmd):
try:
network_mode = get_bash_output(
'docker inspect --format=\'{{.HostConfig.NetworkMode}}\' $(basename $(cat /proc/1/cpuset))')
base_cmd += ['--network', network_mode]
except:
pass
base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)]
if host_ssh_cache:
base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ]
# check if we need to map host folders
if os.environ.get(ENV_K8S_HOST_MOUNT):
# expect TRAINS_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.trains'
k8s_node_mnt, _, k8s_pod_mnt = os.environ.get(ENV_K8S_HOST_MOUNT).partition(':')
# search and replace all the host folders with the k8s
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
for i, m in enumerate(host_mounts):
if k8s_pod_mnt not in m:
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
host_mounts[i] = None
else:
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt)
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
# copy the configuration file into the mounted folder
new_conf_file = os.path.join(k8s_pod_mnt, '.trains_agent.{}.cfg'.format(quote(worker_id, safe="")))
try:
rm_file(new_conf_file)
shutil.copy(conf_file, new_conf_file)
conf_file = new_conf_file.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file))
if host_ssh_cache:
new_ssh_cache = os.path.join(k8s_pod_mnt, '.trains_agent.{}.ssh'.format(quote(worker_id, safe="")))
try:
rm_tree(new_ssh_cache)
shutil.copytree(host_ssh_cache, new_ssh_cache)
host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt)
except Exception:
raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache))
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
# if we are running a RC version, install the same version in the docker
# because the default latest, will be a release version (not RC)
@@ -1856,15 +1980,13 @@ class Worker(ServiceCommandSection):
try:
from trains_agent.version import __version__
_version_parts = __version__.split('.')
if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
if force_current_version or 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
specify_version = '=={}'.format(__version__)
except:
pass
if standalone_mode:
update_scheme = ""
else:
update_scheme = \
if not standalone_mode:
update_scheme += \
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
"chown -R root /root/.cache/pip ; " \
"apt-get update ; " \
@@ -1875,21 +1997,86 @@ class Worker(ServiceCommandSection):
python=python_version, pip_version=PackageManager.get_pip_version(),
specify_version=specify_version)
base_cmd += [
'-v', conf_file+':/root/trains.conf',
'-v', host_apt_cache+':/var/cache/apt/archives',
'-v', host_pip_cache+':/root/.cache/pip',
'-v', host_pip_dl+':'+mounted_pip_dl,
'-v', host_cache+':'+mounted_cache,
'-v', host_vcs_cache+':'+mounted_vcs_cache,
'--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version)
]
base_cmd += (
['-v', conf_file+':/root/trains.conf'] +
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
['--rm', docker_image, 'bash', '-c',
update_scheme +
extra_shell_script +
"NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format(
nv_visible=dockers_nvidia_visible_devices, python=python_version)
])
return base_cmd
def _run_as_user_patch(self, command, script_dir, venv_folder, sdk_cache_folder, user_uid):
class RunasArgv(Argv):
def __init__(self, *args):
super(RunasArgv, self).__init__(*args)
self.uid = 0
self.gid = 0
def call_subprocess(self, func, censor_password=False, *args, **kwargs):
self._log.debug("running: %s: %s", func.__name__, list(self))
with self.normalize_exception(censor_password):
return func(list(self), *args, preexec_fn=self._change_uid, **kwargs)
def set_uid(self, user_uid, user_gid):
from pwd import getpwnam
self.uid = getpwnam(user_uid).pw_uid
self.gid = getpwnam(user_gid).pw_gid
def _change_uid(self):
os.setgid(self.gid)
os.setuid(self.uid)
# create a home folder for our user
try:
home_folder = '/trains_agent_home'
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
except:
home_folder = '/home/trains_agent_home'
rm_tree(home_folder)
Path(home_folder).mkdir(parents=True, exist_ok=True)
# move our entire venv into the new home
venv_folder = venv_folder.as_posix()
if not venv_folder.endswith(os.path.sep):
venv_folder += os.path.sep
new_venv_folder = os.path.join(home_folder, 'venv/')
shutil.move(venv_folder, new_venv_folder)
# allow everyone to access it
for f in Path(new_venv_folder).rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
if sdk_cache_folder:
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
for f in sdk_cache_folder.rglob('*'):
try:
f.chmod(0o0777)
except:
pass
# patch venv folder to new location
script_dir = script_dir.replace(venv_folder, new_venv_folder)
# New command line execution
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" {}'.format(
home_folder,
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
command.serialize().replace(venv_folder, new_venv_folder)))
command.set_uid(user_uid=user_uid, user_gid=user_uid)
return command, script_dir
def _singleton(self):
# ensure singleton
worker_id = self._session.config["agent.worker_id"]
@@ -1903,7 +2090,8 @@ class Worker(ServiceCommandSection):
else:
worker_name = '{}:cpu'.format(worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name)
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name,
api_client=self._session.api_client)
if self.worker_id is None:
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
exit(1)

View File

@@ -73,6 +73,12 @@ ENVIRONMENT_CONFIG = {
"agent.cpu_only": EnvironmentConfig(
"TRAINS_CPU_ONLY", "ALG_CPU_ONLY", "CPU_ONLY", type=bool
),
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
}
CONFIG_FILE_ENV = EnvironmentConfig("ALG_CONFIG_FILE")
@@ -114,6 +120,8 @@ DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
PIP_EXTRA_INDICES = [
]
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
ENV_TASK_EXECUTE_AS_USER = 'TRAINS_AGENT_EXEC_USER'
ENV_K8S_HOST_MOUNT = 'TRAINS_AGENT_K8S_HOST_MOUNT'
class FileBuffering(IntEnum):

169
trains_agent/glue/k8s.py Normal file
View File

@@ -0,0 +1,169 @@
from __future__ import print_function, division, unicode_literals
import logging
import os
import subprocess
from time import sleep
from typing import Text, List
from pyhocon import HOCONConverter
from trains_agent.commands.events import Events
from trains_agent.commands.worker import Worker
from trains_agent.helper.process import get_bash_output
from trains_agent.helper.resource_monitor import ResourceMonitor
class K8sIntegration(Worker):
K8S_PENDING_QUEUE = "k8s_scheduler"
KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \
"--image {docker_image} " \
"--restart=Never --replicas=1 " \
"--generator=run-pod/v1"
KUBECTL_DELETE_CMD = "kubectl delete pods " \
"--selector=TRAINS=agent " \
"--field-selector=status.phase!=Pending,status.phase!=Running"
CONTAINER_BASH_SCRIPT = "apt-get install -y git python-pip && " \
"pip install trains-agent && " \
"python -u -m trains_agent execute --full-monitoring --require-queue --id {}"
def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False):
"""
Initialize the k8s integration glue layer daemon
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
:param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD)
example: "task={task_id} image={docker_image} queue_id={queue_id}"
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
:param bool debug: Switch logging on
"""
super(K8sIntegration, self).__init__()
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
# Always do system packages, because by we will be running inside a docker
self._session.config.put("agent.package_manager.system_site_packages", True)
# Add debug logging
if debug:
self.log.logger.disabled = False
self.log.logger.setLevel(logging.INFO)
def run_one_task(self, queue: Text, task_id: Text, worker_args=None):
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
try:
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
status_reason='k8s pending scheduler')
except Exception as e:
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
task_id, self.k8s_pending_queue_name, e))
return
if task_data.execution.docker_cmd:
docker_image = task_data.execution.docker_cmd
else:
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
# take the first part, this is the docker image name (not arguments)
docker_image = docker_image.split()[0]
create_trains_conf = "echo '{}' >> ~/trains.conf && ".format(
HOCONConverter.to_hocon(self._session.config._config))
if callable(self.kubectl_cmd):
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
else:
kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue)
# make sure we gave a list
if isinstance(kubectl_cmd, str):
kubectl_cmd = kubectl_cmd.split()
kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c",
create_trains_conf + self.container_bash_script.format(task_id)]
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = process.communicate()
self.log.info("K8s scheduling experiment task id={}".format(task_id))
if error:
self.log.error("Running kubectl encountered an error: {}".format(
error if isinstance(error, str) else error.decode()))
def run_tasks_loop(self, queues: List[Text], worker_params):
"""
:summary: Pull and run tasks from queues.
:description: 1. Go through ``queues`` by order.
2. Try getting the next task for each and run the first one that returns.
3. Go to step 1
:param queues: IDs of queues to pull tasks from
:type queues: list of ``Text``
:param worker_params: Worker command line arguments
:type worker_params: ``trains_agent.helper.process.WorkerParams``
"""
events_service = self.get_service(Events)
# make sure we have a k8s pending queue
try:
self._session.api_client.queues.create(self.k8s_pending_queue_name)
except Exception:
pass
# get queue id
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
_last_machine_update_ts = 0
while True:
# iterate over queues (priority style, queues[0] is highest)
for queue in queues:
# delete old completed /failed pods
get_bash_output(self.KUBECTL_DELETE_CMD)
# get next task in queue
try:
response = self._session.api_client.queues.get_next_task(queue=queue)
except Exception as e:
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
continue
else:
try:
task_id = response.entry.task
except AttributeError:
print("No tasks in queue {}".format(queue))
continue
events_service.send_log_events(
self.worker_id,
task_id=task_id,
lines="task {} pulled from {} by worker {}".format(
task_id, queue, self.worker_id
),
level="INFO",
)
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
self.run_one_task(queue, task_id, worker_params)
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
break
else:
# sleep and retry polling
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
sleep(self._polling_interval)
if self._session.config["agent.reload_config"]:
self.reload_config()
def k8s_daemon(self, queues):
"""
Start the k8s Glue service.
This service will be pulling tasks from *queues* and scheduling them for execution using kubectl.
Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
and popped when execution actually starts. This creates full visibility into the k8s scheduler.
Manually popping a task from the K8S_PENDING_QUEUE,
will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
:param list(str) queues: List of queue names to pull from
"""
return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False)

View File

@@ -463,6 +463,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None
return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error)
def rm_file(filename): # type: (Union[Path, Text]) -> None
"""
A version of os.unlink that will not raise error
"""
try:
os.unlink(os.path.expanduser(os.path.expandvars(Text(filename))))
except:
return False
return True
def is_conda(config):
return config['agent.package_manager.type'].lower() == 'conda'

View File

@@ -22,6 +22,18 @@ def print_text(text, newline=True):
sys.stdout.write(data)
def decode_binary_lines(binary_lines, encoding='utf-8'):
# decode per line, if we failed decoding skip the line
lines = []
for b in binary_lines:
try:
l = b.decode(encoding=encoding, errors='replace').replace('\r', '\n')
except:
l = ''
lines.append(l + '\n' if l and l[-1] != '\n' else l)
return lines
def ensure_text(s, encoding='utf-8', errors='strict'):
"""Coerce *s* to six.text_type.
For Python 2:

View File

@@ -112,7 +112,7 @@ class CondaAPI(PackageManager):
return self.pip.bin
def upgrade_pip(self):
return self.pip.upgrade_pip()
return self._install("pip" + self.pip.get_pip_version())
def create(self):
"""

View File

@@ -6,14 +6,14 @@ from .requirements import SimpleSubstitution
class CythonRequirement(SimpleSubstitution):
name = "cython"
name = ("cython", "numpy", )
def __init__(self, *args, **kwargs):
super(CythonRequirement, self).__init__(*args, **kwargs)
def match(self, req):
# match both Cython & cython
return req.name and self.name == req.name.lower()
return req.name and req.name.lower() in self.name
def replace(self, req):
"""

View File

@@ -82,9 +82,13 @@ class PoetryConfig:
def initialize(self, cwd=None):
if not self._initialized:
self._initialized = True
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
try:
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
except Exception as ex:
print("Exception: {}\nError: Failed configuring Poetry virtualenvs.in-project".format(ex))
raise
def get_api(self, path):
# type: (Path) -> PoetryAPI

View File

@@ -456,7 +456,17 @@ class Git(VCS):
)
def pull(self):
self.call("fetch", "--all", cwd=self.location)
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
def checkout(self): # type: () -> None
"""
Checkout repository at specified revision
"""
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
try:
self.call("submodule", "update", "--recursive", cwd=self.location)
except:
pass
info_commands = dict(
url=Argv(executable_name, "ls-remote", "--get-url", "origin"),

View File

@@ -4,11 +4,12 @@ from time import sleep
from glob import glob
from tempfile import gettempdir, NamedTemporaryFile
from trains_agent.definitions import ENV_K8S_HOST_MOUNT
from trains_agent.helper.base import warning
class Singleton(object):
prefix = 'trainsagent'
prefix = '.trainsagent'
sep = '_'
ext = '.tmp'
worker_id = None
@@ -19,7 +20,7 @@ class Singleton(object):
_lock_timeout = 10
@classmethod
def register_instance(cls, unique_worker_id=None, worker_name=None):
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
"""
# Exit the process if another instance of us is using the same worker_id
@@ -28,7 +29,7 @@ class Singleton(object):
:return: (str worker_id, int slot_number) Return None value on instance already running
"""
# try to lock file
lock_file = os.path.join(gettempdir(), cls._lock_file_name)
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
timeout = 0
while os.path.exists(lock_file):
if timeout > cls._lock_timeout:
@@ -46,7 +47,8 @@ class Singleton(object):
f.write(bytes(os.getpid()))
f.flush()
try:
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name)
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
api_client=api_client)
except:
ret = None, None
@@ -58,12 +60,12 @@ class Singleton(object):
return ret
@classmethod
def _register_instance(cls, unique_worker_id=None, worker_name=None):
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
if cls.worker_id:
return cls.worker_id, cls.instance_slot
# make sure we have a unique name
instance_num = 0
temp_folder = gettempdir()
temp_folder = cls._get_temp_folder()
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
slots = {}
for file in files:
@@ -73,8 +75,24 @@ class Singleton(object):
except Exception:
# something is wrong, use non existing pid and delete the file
pid = -1
uid, slot = None, None
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
pass
worker = None
if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid:
try:
worker = [w for w in api_client.workers.get_all() if w.id == uid]
except Exception:
worker = None
# count active instances and delete dead files
if not psutil.pid_exists(pid):
if not worker and not psutil.pid_exists(pid):
# delete the file
try:
os.remove(os.path.join(file))
@@ -83,11 +101,7 @@ class Singleton(object):
continue
instance_num += 1
try:
with open(file, 'r') as f:
uid, slot = str(f.read()).split('\n')
slot = int(slot)
except Exception:
if slot is None:
continue
if uid == unique_worker_id:
@@ -110,10 +124,16 @@ class Singleton(object):
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
# create lock
cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep,
suffix=cls.ext)
cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(),
prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext)
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
cls._pid_file.flush()
cls.worker_id = unique_worker_id
return cls.worker_id, cls.instance_slot
@classmethod
def _get_temp_folder(cls):
if os.environ.get(ENV_K8S_HOST_MOUNT):
return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1]
return gettempdir()

View File

@@ -1,3 +1,4 @@
import itertools
from functools import partial
from importlib import import_module
import argparse
@@ -24,8 +25,16 @@ def get_parser():
from .worker import COMMANDS
subparsers = top_parser.add_subparsers(dest='command')
for c in COMMANDS:
parser = subparsers.add_parser(name=c, help=COMMANDS[c]['help'])
for a in COMMANDS[c].get('args', {}).keys():
parser.add_argument(a, **COMMANDS[c]['args'][a])
parser = subparsers.add_parser(name=c, help=COMMANDS[c]["help"])
groups = itertools.groupby(
sorted(
COMMANDS[c].get("args", {}).items(), key=lambda x: x[1].get("group", "")
),
key=lambda x: x[1].pop("group", ""),
)
for group_name, group in groups:
p = parser if not group_name else parser.add_argument_group(group_name)
for key, value in group:
p.add_argument(key, **value)
return top_parser

View File

@@ -37,21 +37,29 @@ DAEMON_ARGS = dict({
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
'action': 'store_true',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
'group': 'Docker support',
},
'--gpus': {
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
'group': 'Docker support',
},
'--cpu-only': {
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
'action': 'store_true',
'group': 'Docker support',
},
'--force-current-version': {
'help': 'Force trains-agent to use the current trains-agent version when running in the docker',
'action': 'store_true',
'group': 'Docker support',
},
'--queue': {
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
@@ -97,6 +105,17 @@ COMMANDS = {
'help': 'Do not use any network connects, assume everything is pre-installed',
'action': 'store_true',
},
'--docker': {
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
'nargs': '*',
'default': False,
},
'--clone': {
'help': 'Clone the experiment before execution, and execute the cloned experiment',
'action': 'store_true',
},
}, **WORKER_ARGS),
},
'build': {

View File

@@ -15,7 +15,7 @@ from pyhocon import ConfigFactory, HOCONConverter, ConfigTree
from trains_agent.backend_api.session import Session as _Session, Request
from trains_agent.backend_api.session.client import APIClient
from trains_agent.backend_config.defs import LOCAL_CONFIG_FILE_OVERRIDE_VAR, LOCAL_CONFIG_FILES
from trains_agent.definitions import ENVIRONMENT_CONFIG
from trains_agent.definitions import ENVIRONMENT_CONFIG, ENV_TASK_EXECUTE_AS_USER
from trains_agent.errors import APIError
from trains_agent.helper.base import HOCONEncoder
from trains_agent.helper.process import Argv
@@ -75,7 +75,8 @@ class Session(_Session):
cpu_only = kwargs.get('cpu_only')
if cpu_only:
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
if kwargs.get('gpus'):
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
and not os.environ.get('KUBERNETES_PORT'):
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
if kwargs.get('only_load_config'):
from trains_agent.backend_api.config import load
@@ -86,7 +87,7 @@ class Session(_Session):
self.trace = kwargs.get('trace', False)
self._config_file = kwargs.get('config_file') or \
os.environ.get(LOCAL_CONFIG_FILE_OVERRIDE_VAR) or LOCAL_CONFIG_FILES[0]
self.api_client = APIClient(session=self, api_version="2.4")
self.api_client = APIClient(session=self, api_version="2.5")
# HACK make sure we have python version to execute,
# if nothing was specific, use the one that runs us
def_python = ConfigValue(self.config, "agent.default_python")
@@ -111,6 +112,17 @@ class Session(_Session):
# override with environment variables
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
for config_key, env_config in ENVIRONMENT_CONFIG.items():
# check if the propery is of a list:
if config_key.endswith('.0'):
if all(not i.get() for i in env_config.values()):
continue
parent = config_key.partition('.0')[0]
if not self.config[parent]:
self.config.put(parent, [])
self.config.put(parent, self.config[parent] + [ConfigTree((k, v.get()) for k, v in env_config.items())])
continue
value = env_config.get()
if not value:
continue
@@ -165,7 +177,11 @@ class Session(_Session):
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
'agent.pip_download_cache.path',
'agent.docker_pip_cache', 'agent.docker_apt_cache')
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path',)
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
if os.environ.get(ENV_TASK_EXECUTE_AS_USER):
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])
singleton_folders = tuple(list(singleton_folders) + ['sdk.storage.cache.default_base_dir'])
for key in folder_keys:
folder_key = ConfigValue(self.config, key)

View File

@@ -1 +1 @@
__version__ = '0.13.1'
__version__ = '0.14.0'