mirror of
https://github.com/clearml/clearml-agent
synced 2025-06-26 18:16:15 +00:00
Compare commits
36 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ea0ed4807e | ||
|
|
389600b91e | ||
|
|
5fb2550212 | ||
|
|
15e9e6b778 | ||
|
|
aa75b92e46 | ||
|
|
757210d5b3 | ||
|
|
00eb2f10ec | ||
|
|
3393372b9c | ||
|
|
f2d2d702de | ||
|
|
e3d0680d39 | ||
|
|
618c2ac5c4 | ||
|
|
0272c4c79c | ||
|
|
ff8cf63abf | ||
|
|
2c7c7f5b44 | ||
|
|
01f57c1e44 | ||
|
|
47bcd3839a | ||
|
|
0a3a8a1c52 | ||
|
|
231a907cff | ||
|
|
8f95eecf2e | ||
|
|
81008ee00e | ||
|
|
25bc44c0cf | ||
|
|
f838c8fc70 | ||
|
|
596093aac6 | ||
|
|
8f23f3b4c0 | ||
|
|
95d503afdd | ||
|
|
73ee33be99 | ||
|
|
ee3adf625f | ||
|
|
afec38a50e | ||
|
|
f9c60904f4 | ||
|
|
a09dc85c67 | ||
|
|
5d74f4b376 | ||
|
|
d558c66d3c | ||
|
|
714c6a05d0 | ||
|
|
43b2f7f41d | ||
|
|
28d752d568 | ||
|
|
6d091d8e08 |
@@ -38,7 +38,7 @@ agent {
|
||||
# currently supported pip and conda
|
||||
# poetry is used if pip selected and repository contains poetry.lock file
|
||||
package_manager: {
|
||||
# supported options: pip, conda
|
||||
# supported options: pip, conda, poetry
|
||||
type: pip,
|
||||
|
||||
# specify pip version to use (examples "<20", "==19.3.1", "", empty string will install the latest version)
|
||||
@@ -141,6 +141,9 @@ sdk {
|
||||
quality: 87
|
||||
subsampling: 0
|
||||
}
|
||||
|
||||
# Support plot-per-graph fully matching Tensorboard behavior (i.e. if this is set to True, each series should have its own graph)
|
||||
tensorboard_single_series_per_graph: False
|
||||
}
|
||||
|
||||
network {
|
||||
|
||||
59
examples/archive_experiments.py
Normal file
59
examples/archive_experiments.py
Normal file
@@ -0,0 +1,59 @@
|
||||
#!/usr/bin/python3
|
||||
"""
|
||||
An example script that cleans up failed experiments by moving them to the archive
|
||||
"""
|
||||
import argparse
|
||||
from datetime import datetime
|
||||
|
||||
from trains_agent import APIClient
|
||||
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--project", "-P", help="Project ID. Only clean up experiments from this project")
|
||||
parser.add_argument("--user", "-U", help="User ID. Only clean up experiments assigned to this user")
|
||||
parser.add_argument(
|
||||
"--status", "-S", default="failed",
|
||||
help="Experiment status. Only clean up experiments with this status (default %(default)s)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--iterations", "-I", type=int,
|
||||
help="Number of iterations. Only clean up experiments with less or equal number of iterations"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sec-from-start", "-T", type=int,
|
||||
help="Seconds from start time. "
|
||||
"Only clean up experiments if less or equal number of seconds have elapsed since started"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
client = APIClient()
|
||||
|
||||
tasks = client.tasks.get_all(
|
||||
project=[args.project] if args.project else None,
|
||||
user=[args.user] if args.user else None,
|
||||
status=[args.status] if args.status else None,
|
||||
system_tags=["-archived"]
|
||||
)
|
||||
|
||||
count = 0
|
||||
|
||||
for task in tasks:
|
||||
if args.iterations and (task.last_iteration or 0) > args.iterations:
|
||||
continue
|
||||
if args.sec_from_start:
|
||||
if not task.started:
|
||||
continue
|
||||
if (datetime.utcnow() - task.started.replace(tzinfo=None)).total_seconds() > args.sec_from_start:
|
||||
continue
|
||||
|
||||
try:
|
||||
client.tasks.edit(
|
||||
task=task.id,
|
||||
system_tags=(task.system_tags or []) + ["archived"],
|
||||
force=True
|
||||
)
|
||||
count += 1
|
||||
except Exception as ex:
|
||||
print("Failed editing experiment: {}".format(ex))
|
||||
|
||||
print("Cleaned up {} experiments".format(count))
|
||||
@@ -292,6 +292,7 @@
|
||||
" export TRAINS_API_ACCESS_KEY='{access_key}'\n",
|
||||
" export TRAINS_API_SECRET_KEY='{secret_key}'\n",
|
||||
" {bash_script}\n",
|
||||
" source ~/.bashrc\n",
|
||||
" python -m trains_agent --config-file '/root/trains.conf' daemon --queue '{queue}' {docker}\n",
|
||||
" shutdown\n",
|
||||
" \"\"\".format(\n",
|
||||
@@ -443,6 +444,12 @@
|
||||
" os.environ[\"TRAINS_API_SECRET_KEY\"] = TRAINS_SECRET_KEY\n",
|
||||
" api_client = APIClient()\n",
|
||||
"\n",
|
||||
" # Verify the requested queues exist and create those that doesn't exist\n",
|
||||
" all_queues = [q.name for q in list(api_client.queues.get_all())]\n",
|
||||
" missing_queues = [q for q in QUEUES if q not in all_queues]\n",
|
||||
" for q in missing_queues:\n",
|
||||
" api_client.queues.create(q)\n",
|
||||
"\n",
|
||||
" idle_workers = {}\n",
|
||||
" while True:\n",
|
||||
" queue_name_to_id = {\n",
|
||||
|
||||
@@ -20,4 +20,4 @@ six>=1.11.0
|
||||
tqdm>=4.19.5
|
||||
typing>=3.6.4
|
||||
urllib3>=1.21.1
|
||||
virtualenv>=16
|
||||
virtualenv>=16,<20
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from .v2_4 import auth
|
||||
from .v2_4 import debug
|
||||
from .v2_4 import queues
|
||||
from .v2_4 import tasks
|
||||
from .v2_4 import workers
|
||||
from .v2_4 import events
|
||||
from .v2_5 import auth
|
||||
from .v2_5 import debug
|
||||
from .v2_5 import queues
|
||||
from .v2_5 import tasks
|
||||
from .v2_5 import workers
|
||||
from .v2_5 import events
|
||||
from .v2_5 import models
|
||||
|
||||
__all__ = [
|
||||
'auth',
|
||||
@@ -12,4 +13,5 @@ __all__ = [
|
||||
'tasks',
|
||||
'workers',
|
||||
'events',
|
||||
'models',
|
||||
]
|
||||
|
||||
2850
trains_agent/backend_api/services/v2_4/models.py
Normal file
2850
trains_agent/backend_api/services/v2_4/models.py
Normal file
File diff suppressed because it is too large
Load Diff
0
trains_agent/backend_api/services/v2_5/__init__.py
Normal file
0
trains_agent/backend_api/services/v2_5/__init__.py
Normal file
623
trains_agent/backend_api/services/v2_5/auth.py
Normal file
623
trains_agent/backend_api/services/v2_5/auth.py
Normal file
@@ -0,0 +1,623 @@
|
||||
"""
|
||||
auth service
|
||||
|
||||
This service provides authentication management and authorization
|
||||
validation for the entire system.
|
||||
"""
|
||||
import six
|
||||
import types
|
||||
from datetime import datetime
|
||||
import enum
|
||||
|
||||
from dateutil.parser import parse as parse_datetime
|
||||
|
||||
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
|
||||
|
||||
|
||||
class Credentials(NonStrictDataModel):
|
||||
"""
|
||||
:param access_key: Credentials access key
|
||||
:type access_key: str
|
||||
:param secret_key: Credentials secret key
|
||||
:type secret_key: str
|
||||
"""
|
||||
_schema = {
|
||||
'properties': {
|
||||
'access_key': {
|
||||
'description': 'Credentials access key',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
'secret_key': {
|
||||
'description': 'Credentials secret key',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, access_key=None, secret_key=None, **kwargs):
|
||||
super(Credentials, self).__init__(**kwargs)
|
||||
self.access_key = access_key
|
||||
self.secret_key = secret_key
|
||||
|
||||
@schema_property('access_key')
|
||||
def access_key(self):
|
||||
return self._property_access_key
|
||||
|
||||
@access_key.setter
|
||||
def access_key(self, value):
|
||||
if value is None:
|
||||
self._property_access_key = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "access_key", six.string_types)
|
||||
self._property_access_key = value
|
||||
|
||||
@schema_property('secret_key')
|
||||
def secret_key(self):
|
||||
return self._property_secret_key
|
||||
|
||||
@secret_key.setter
|
||||
def secret_key(self, value):
|
||||
if value is None:
|
||||
self._property_secret_key = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "secret_key", six.string_types)
|
||||
self._property_secret_key = value
|
||||
|
||||
|
||||
class CredentialKey(NonStrictDataModel):
|
||||
"""
|
||||
:param access_key:
|
||||
:type access_key: str
|
||||
:param last_used:
|
||||
:type last_used: datetime.datetime
|
||||
:param last_used_from:
|
||||
:type last_used_from: str
|
||||
"""
|
||||
_schema = {
|
||||
'properties': {
|
||||
'access_key': {'description': '', 'type': ['string', 'null']},
|
||||
'last_used': {
|
||||
'description': '',
|
||||
'format': 'date-time',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
'last_used_from': {'description': '', 'type': ['string', 'null']},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, access_key=None, last_used=None, last_used_from=None, **kwargs):
|
||||
super(CredentialKey, self).__init__(**kwargs)
|
||||
self.access_key = access_key
|
||||
self.last_used = last_used
|
||||
self.last_used_from = last_used_from
|
||||
|
||||
@schema_property('access_key')
|
||||
def access_key(self):
|
||||
return self._property_access_key
|
||||
|
||||
@access_key.setter
|
||||
def access_key(self, value):
|
||||
if value is None:
|
||||
self._property_access_key = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "access_key", six.string_types)
|
||||
self._property_access_key = value
|
||||
|
||||
@schema_property('last_used')
|
||||
def last_used(self):
|
||||
return self._property_last_used
|
||||
|
||||
@last_used.setter
|
||||
def last_used(self, value):
|
||||
if value is None:
|
||||
self._property_last_used = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "last_used", six.string_types + (datetime,))
|
||||
if not isinstance(value, datetime):
|
||||
value = parse_datetime(value)
|
||||
self._property_last_used = value
|
||||
|
||||
@schema_property('last_used_from')
|
||||
def last_used_from(self):
|
||||
return self._property_last_used_from
|
||||
|
||||
@last_used_from.setter
|
||||
def last_used_from(self, value):
|
||||
if value is None:
|
||||
self._property_last_used_from = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "last_used_from", six.string_types)
|
||||
self._property_last_used_from = value
|
||||
|
||||
|
||||
|
||||
|
||||
class CreateCredentialsRequest(Request):
|
||||
"""
|
||||
Creates a new set of credentials for the authenticated user.
|
||||
New key/secret is returned.
|
||||
Note: Secret will never be returned in any other API call.
|
||||
If a secret is lost or compromised, the key should be revoked
|
||||
and a new set of credentials can be created.
|
||||
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "create_credentials"
|
||||
_version = "2.1"
|
||||
_schema = {
|
||||
'additionalProperties': False,
|
||||
'definitions': {},
|
||||
'properties': {},
|
||||
'type': 'object',
|
||||
}
|
||||
|
||||
|
||||
class CreateCredentialsResponse(Response):
|
||||
"""
|
||||
Response of auth.create_credentials endpoint.
|
||||
|
||||
:param credentials: Created credentials
|
||||
:type credentials: Credentials
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "create_credentials"
|
||||
_version = "2.1"
|
||||
|
||||
_schema = {
|
||||
'definitions': {
|
||||
'credentials': {
|
||||
'properties': {
|
||||
'access_key': {
|
||||
'description': 'Credentials access key',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
'secret_key': {
|
||||
'description': 'Credentials secret key',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
},
|
||||
},
|
||||
'properties': {
|
||||
'credentials': {
|
||||
'description': 'Created credentials',
|
||||
'oneOf': [{'$ref': '#/definitions/credentials'}, {'type': 'null'}],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, credentials=None, **kwargs):
|
||||
super(CreateCredentialsResponse, self).__init__(**kwargs)
|
||||
self.credentials = credentials
|
||||
|
||||
@schema_property('credentials')
|
||||
def credentials(self):
|
||||
return self._property_credentials
|
||||
|
||||
@credentials.setter
|
||||
def credentials(self, value):
|
||||
if value is None:
|
||||
self._property_credentials = None
|
||||
return
|
||||
if isinstance(value, dict):
|
||||
value = Credentials.from_dict(value)
|
||||
else:
|
||||
self.assert_isinstance(value, "credentials", Credentials)
|
||||
self._property_credentials = value
|
||||
|
||||
|
||||
|
||||
|
||||
class EditUserRequest(Request):
|
||||
"""
|
||||
Edit a users' auth data properties
|
||||
|
||||
:param user: User ID
|
||||
:type user: str
|
||||
:param role: The new user's role within the company
|
||||
:type role: str
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "edit_user"
|
||||
_version = "2.1"
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'role': {
|
||||
'description': "The new user's role within the company",
|
||||
'enum': ['admin', 'superuser', 'user', 'annotator'],
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
'user': {'description': 'User ID', 'type': ['string', 'null']},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, user=None, role=None, **kwargs):
|
||||
super(EditUserRequest, self).__init__(**kwargs)
|
||||
self.user = user
|
||||
self.role = role
|
||||
|
||||
@schema_property('user')
|
||||
def user(self):
|
||||
return self._property_user
|
||||
|
||||
@user.setter
|
||||
def user(self, value):
|
||||
if value is None:
|
||||
self._property_user = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "user", six.string_types)
|
||||
self._property_user = value
|
||||
|
||||
@schema_property('role')
|
||||
def role(self):
|
||||
return self._property_role
|
||||
|
||||
@role.setter
|
||||
def role(self, value):
|
||||
if value is None:
|
||||
self._property_role = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "role", six.string_types)
|
||||
self._property_role = value
|
||||
|
||||
|
||||
class EditUserResponse(Response):
|
||||
"""
|
||||
Response of auth.edit_user endpoint.
|
||||
|
||||
:param updated: Number of users updated (0 or 1)
|
||||
:type updated: float
|
||||
:param fields: Updated fields names and values
|
||||
:type fields: dict
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "edit_user"
|
||||
_version = "2.1"
|
||||
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'fields': {
|
||||
'additionalProperties': True,
|
||||
'description': 'Updated fields names and values',
|
||||
'type': ['object', 'null'],
|
||||
},
|
||||
'updated': {
|
||||
'description': 'Number of users updated (0 or 1)',
|
||||
'enum': [0, 1],
|
||||
'type': ['number', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, updated=None, fields=None, **kwargs):
|
||||
super(EditUserResponse, self).__init__(**kwargs)
|
||||
self.updated = updated
|
||||
self.fields = fields
|
||||
|
||||
@schema_property('updated')
|
||||
def updated(self):
|
||||
return self._property_updated
|
||||
|
||||
@updated.setter
|
||||
def updated(self, value):
|
||||
if value is None:
|
||||
self._property_updated = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "updated", six.integer_types + (float,))
|
||||
self._property_updated = value
|
||||
|
||||
@schema_property('fields')
|
||||
def fields(self):
|
||||
return self._property_fields
|
||||
|
||||
@fields.setter
|
||||
def fields(self, value):
|
||||
if value is None:
|
||||
self._property_fields = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "fields", (dict,))
|
||||
self._property_fields = value
|
||||
|
||||
|
||||
class GetCredentialsRequest(Request):
|
||||
"""
|
||||
Returns all existing credential keys for the authenticated user.
|
||||
Note: Only credential keys are returned.
|
||||
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "get_credentials"
|
||||
_version = "2.1"
|
||||
_schema = {
|
||||
'additionalProperties': False,
|
||||
'definitions': {},
|
||||
'properties': {},
|
||||
'type': 'object',
|
||||
}
|
||||
|
||||
|
||||
class GetCredentialsResponse(Response):
|
||||
"""
|
||||
Response of auth.get_credentials endpoint.
|
||||
|
||||
:param credentials: List of credentials, each with an empty secret field.
|
||||
:type credentials: Sequence[CredentialKey]
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "get_credentials"
|
||||
_version = "2.1"
|
||||
|
||||
_schema = {
|
||||
'definitions': {
|
||||
'credential_key': {
|
||||
'properties': {
|
||||
'access_key': {'description': '', 'type': ['string', 'null']},
|
||||
'last_used': {
|
||||
'description': '',
|
||||
'format': 'date-time',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
'last_used_from': {
|
||||
'description': '',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
},
|
||||
},
|
||||
'properties': {
|
||||
'credentials': {
|
||||
'description': 'List of credentials, each with an empty secret field.',
|
||||
'items': {'$ref': '#/definitions/credential_key'},
|
||||
'type': ['array', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, credentials=None, **kwargs):
|
||||
super(GetCredentialsResponse, self).__init__(**kwargs)
|
||||
self.credentials = credentials
|
||||
|
||||
@schema_property('credentials')
|
||||
def credentials(self):
|
||||
return self._property_credentials
|
||||
|
||||
@credentials.setter
|
||||
def credentials(self, value):
|
||||
if value is None:
|
||||
self._property_credentials = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "credentials", (list, tuple))
|
||||
if any(isinstance(v, dict) for v in value):
|
||||
value = [CredentialKey.from_dict(v) if isinstance(v, dict) else v for v in value]
|
||||
else:
|
||||
self.assert_isinstance(value, "credentials", CredentialKey, is_array=True)
|
||||
self._property_credentials = value
|
||||
|
||||
|
||||
|
||||
|
||||
class LoginRequest(Request):
|
||||
"""
|
||||
Get a token based on supplied credentials (key/secret).
|
||||
Intended for use by users with key/secret credentials that wish to obtain a token
|
||||
for use with other services. Token will be limited by the same permissions that
|
||||
exist for the credentials used in this call.
|
||||
|
||||
:param expiration_sec: Requested token expiration time in seconds. Not
|
||||
guaranteed, might be overridden by the service
|
||||
:type expiration_sec: int
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "login"
|
||||
_version = "2.1"
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'expiration_sec': {
|
||||
'description': 'Requested token expiration time in seconds. \n Not guaranteed, might be overridden by the service',
|
||||
'type': ['integer', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, expiration_sec=None, **kwargs):
|
||||
super(LoginRequest, self).__init__(**kwargs)
|
||||
self.expiration_sec = expiration_sec
|
||||
|
||||
@schema_property('expiration_sec')
|
||||
def expiration_sec(self):
|
||||
return self._property_expiration_sec
|
||||
|
||||
@expiration_sec.setter
|
||||
def expiration_sec(self, value):
|
||||
if value is None:
|
||||
self._property_expiration_sec = None
|
||||
return
|
||||
if isinstance(value, float) and value.is_integer():
|
||||
value = int(value)
|
||||
|
||||
self.assert_isinstance(value, "expiration_sec", six.integer_types)
|
||||
self._property_expiration_sec = value
|
||||
|
||||
|
||||
class LoginResponse(Response):
|
||||
"""
|
||||
Response of auth.login endpoint.
|
||||
|
||||
:param token: Token string
|
||||
:type token: str
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "login"
|
||||
_version = "2.1"
|
||||
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'token': {'description': 'Token string', 'type': ['string', 'null']},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, token=None, **kwargs):
|
||||
super(LoginResponse, self).__init__(**kwargs)
|
||||
self.token = token
|
||||
|
||||
@schema_property('token')
|
||||
def token(self):
|
||||
return self._property_token
|
||||
|
||||
@token.setter
|
||||
def token(self, value):
|
||||
if value is None:
|
||||
self._property_token = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "token", six.string_types)
|
||||
self._property_token = value
|
||||
|
||||
|
||||
class LogoutRequest(Request):
|
||||
"""
|
||||
Removes the authentication cookie from the current session
|
||||
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "logout"
|
||||
_version = "2.2"
|
||||
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class LogoutResponse(Response):
|
||||
"""
|
||||
Response of auth.logout endpoint.
|
||||
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "logout"
|
||||
_version = "2.2"
|
||||
|
||||
_schema = {'additionalProperties': False, 'definitions': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class RevokeCredentialsRequest(Request):
|
||||
"""
|
||||
Revokes (and deletes) a set (key, secret) of credentials for
|
||||
the authenticated user.
|
||||
|
||||
:param access_key: Credentials key
|
||||
:type access_key: str
|
||||
"""
|
||||
|
||||
_service = "auth"
|
||||
_action = "revoke_credentials"
|
||||
_version = "2.1"
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'access_key': {
|
||||
'description': 'Credentials key',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'required': ['key_id'],
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, access_key=None, **kwargs):
|
||||
super(RevokeCredentialsRequest, self).__init__(**kwargs)
|
||||
self.access_key = access_key
|
||||
|
||||
@schema_property('access_key')
|
||||
def access_key(self):
|
||||
return self._property_access_key
|
||||
|
||||
@access_key.setter
|
||||
def access_key(self, value):
|
||||
if value is None:
|
||||
self._property_access_key = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "access_key", six.string_types)
|
||||
self._property_access_key = value
|
||||
|
||||
|
||||
class RevokeCredentialsResponse(Response):
|
||||
"""
|
||||
Response of auth.revoke_credentials endpoint.
|
||||
|
||||
:param revoked: Number of credentials revoked
|
||||
:type revoked: int
|
||||
"""
|
||||
_service = "auth"
|
||||
_action = "revoke_credentials"
|
||||
_version = "2.1"
|
||||
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'revoked': {
|
||||
'description': 'Number of credentials revoked',
|
||||
'enum': [0, 1],
|
||||
'type': ['integer', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, revoked=None, **kwargs):
|
||||
super(RevokeCredentialsResponse, self).__init__(**kwargs)
|
||||
self.revoked = revoked
|
||||
|
||||
@schema_property('revoked')
|
||||
def revoked(self):
|
||||
return self._property_revoked
|
||||
|
||||
@revoked.setter
|
||||
def revoked(self, value):
|
||||
if value is None:
|
||||
self._property_revoked = None
|
||||
return
|
||||
if isinstance(value, float) and value.is_integer():
|
||||
value = int(value)
|
||||
|
||||
self.assert_isinstance(value, "revoked", six.integer_types)
|
||||
self._property_revoked = value
|
||||
|
||||
|
||||
|
||||
|
||||
response_mapping = {
|
||||
LoginRequest: LoginResponse,
|
||||
LogoutRequest: LogoutResponse,
|
||||
CreateCredentialsRequest: CreateCredentialsResponse,
|
||||
GetCredentialsRequest: GetCredentialsResponse,
|
||||
RevokeCredentialsRequest: RevokeCredentialsResponse,
|
||||
EditUserRequest: EditUserResponse,
|
||||
}
|
||||
194
trains_agent/backend_api/services/v2_5/debug.py
Normal file
194
trains_agent/backend_api/services/v2_5/debug.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""
|
||||
debug service
|
||||
|
||||
Debugging utilities
|
||||
"""
|
||||
import six
|
||||
import types
|
||||
from datetime import datetime
|
||||
import enum
|
||||
|
||||
from dateutil.parser import parse as parse_datetime
|
||||
|
||||
from ....backend_api.session import Request, BatchRequest, Response, DataModel, NonStrictDataModel, CompoundRequest, schema_property, StringEnum
|
||||
|
||||
|
||||
class ApiexRequest(Request):
|
||||
"""
|
||||
"""
|
||||
|
||||
_service = "debug"
|
||||
_action = "apiex"
|
||||
_version = "1.5"
|
||||
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
|
||||
|
||||
|
||||
class ApiexResponse(Response):
|
||||
"""
|
||||
Response of debug.apiex endpoint.
|
||||
|
||||
"""
|
||||
_service = "debug"
|
||||
_action = "apiex"
|
||||
_version = "1.5"
|
||||
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class EchoRequest(Request):
|
||||
"""
|
||||
Return request data
|
||||
|
||||
"""
|
||||
|
||||
_service = "debug"
|
||||
_action = "echo"
|
||||
_version = "1.5"
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class EchoResponse(Response):
|
||||
"""
|
||||
Response of debug.echo endpoint.
|
||||
|
||||
"""
|
||||
_service = "debug"
|
||||
_action = "echo"
|
||||
_version = "1.5"
|
||||
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class ExRequest(Request):
|
||||
"""
|
||||
"""
|
||||
|
||||
_service = "debug"
|
||||
_action = "ex"
|
||||
_version = "1.5"
|
||||
_schema = {'definitions': {}, 'properties': {}, 'required': [], 'type': 'object'}
|
||||
|
||||
|
||||
class ExResponse(Response):
|
||||
"""
|
||||
Response of debug.ex endpoint.
|
||||
|
||||
"""
|
||||
_service = "debug"
|
||||
_action = "ex"
|
||||
_version = "1.5"
|
||||
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class PingRequest(Request):
|
||||
"""
|
||||
Return a message. Does not require authorization.
|
||||
|
||||
"""
|
||||
|
||||
_service = "debug"
|
||||
_action = "ping"
|
||||
_version = "1.5"
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class PingResponse(Response):
|
||||
"""
|
||||
Response of debug.ping endpoint.
|
||||
|
||||
:param msg: A friendly message
|
||||
:type msg: str
|
||||
"""
|
||||
_service = "debug"
|
||||
_action = "ping"
|
||||
_version = "1.5"
|
||||
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'msg': {
|
||||
'description': 'A friendly message',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, msg=None, **kwargs):
|
||||
super(PingResponse, self).__init__(**kwargs)
|
||||
self.msg = msg
|
||||
|
||||
@schema_property('msg')
|
||||
def msg(self):
|
||||
return self._property_msg
|
||||
|
||||
@msg.setter
|
||||
def msg(self, value):
|
||||
if value is None:
|
||||
self._property_msg = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "msg", six.string_types)
|
||||
self._property_msg = value
|
||||
|
||||
|
||||
class PingAuthRequest(Request):
|
||||
"""
|
||||
Return a message. Requires authorization.
|
||||
|
||||
"""
|
||||
|
||||
_service = "debug"
|
||||
_action = "ping_auth"
|
||||
_version = "1.5"
|
||||
_schema = {'definitions': {}, 'properties': {}, 'type': 'object'}
|
||||
|
||||
|
||||
class PingAuthResponse(Response):
|
||||
"""
|
||||
Response of debug.ping_auth endpoint.
|
||||
|
||||
:param msg: A friendly message
|
||||
:type msg: str
|
||||
"""
|
||||
_service = "debug"
|
||||
_action = "ping_auth"
|
||||
_version = "1.5"
|
||||
|
||||
_schema = {
|
||||
'definitions': {},
|
||||
'properties': {
|
||||
'msg': {
|
||||
'description': 'A friendly message',
|
||||
'type': ['string', 'null'],
|
||||
},
|
||||
},
|
||||
'type': 'object',
|
||||
}
|
||||
def __init__(
|
||||
self, msg=None, **kwargs):
|
||||
super(PingAuthResponse, self).__init__(**kwargs)
|
||||
self.msg = msg
|
||||
|
||||
@schema_property('msg')
|
||||
def msg(self):
|
||||
return self._property_msg
|
||||
|
||||
@msg.setter
|
||||
def msg(self, value):
|
||||
if value is None:
|
||||
self._property_msg = None
|
||||
return
|
||||
|
||||
self.assert_isinstance(value, "msg", six.string_types)
|
||||
self._property_msg = value
|
||||
|
||||
|
||||
response_mapping = {
|
||||
EchoRequest: EchoResponse,
|
||||
PingRequest: PingResponse,
|
||||
PingAuthRequest: PingAuthResponse,
|
||||
ApiexRequest: ApiexResponse,
|
||||
ExRequest: ExResponse,
|
||||
}
|
||||
3000
trains_agent/backend_api/services/v2_5/events.py
Normal file
3000
trains_agent/backend_api/services/v2_5/events.py
Normal file
File diff suppressed because it is too large
Load Diff
2850
trains_agent/backend_api/services/v2_5/models.py
Normal file
2850
trains_agent/backend_api/services/v2_5/models.py
Normal file
File diff suppressed because it is too large
Load Diff
2198
trains_agent/backend_api/services/v2_5/queues.py
Normal file
2198
trains_agent/backend_api/services/v2_5/queues.py
Normal file
File diff suppressed because it is too large
Load Diff
7053
trains_agent/backend_api/services/v2_5/tasks.py
Normal file
7053
trains_agent/backend_api/services/v2_5/tasks.py
Normal file
File diff suppressed because it is too large
Load Diff
2368
trains_agent/backend_api/services/v2_5/workers.py
Normal file
2368
trains_agent/backend_api/services/v2_5/workers.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -17,7 +17,7 @@ from datetime import datetime
|
||||
from distutils.spawn import find_executable
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
from tempfile import gettempdir, mkdtemp
|
||||
from tempfile import mkdtemp
|
||||
from time import sleep, time
|
||||
from typing import Text, Optional, Any, Tuple
|
||||
|
||||
@@ -28,9 +28,6 @@ from trains_agent.backend_api.services import queues as queues_api
|
||||
from trains_agent.backend_api.services import tasks as tasks_api
|
||||
from pathlib2 import Path
|
||||
from pyhocon import ConfigTree, ConfigFactory
|
||||
from requests import Session as HTTPSession
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.util import Retry
|
||||
from six.moves.urllib.parse import quote
|
||||
|
||||
from trains_agent.helper.check_update import start_check_update_daemon
|
||||
@@ -40,7 +37,10 @@ from trains_agent.definitions import (
|
||||
ENVIRONMENT_SDK_PARAMS,
|
||||
INVALID_WORKER_ID,
|
||||
PROGRAM_NAME,
|
||||
DEFAULT_VENV_UPDATE_URL)
|
||||
DEFAULT_VENV_UPDATE_URL,
|
||||
ENV_TASK_EXECUTE_AS_USER,
|
||||
ENV_K8S_HOST_MOUNT
|
||||
)
|
||||
from trains_agent.definitions import WORKING_REPOSITORY_DIR, PIP_EXTRA_INDICES
|
||||
from trains_agent.errors import APIError, CommandFailedError, Sigterm
|
||||
from trains_agent.helper.base import (
|
||||
@@ -59,13 +59,17 @@ from trains_agent.helper.base import (
|
||||
is_conda,
|
||||
named_temporary_file,
|
||||
ExecutionInfo,
|
||||
HOCONEncoder, error, get_python_path, is_linux_platform)
|
||||
from trains_agent.helper.console import ensure_text
|
||||
HOCONEncoder,
|
||||
error,
|
||||
get_python_path,
|
||||
is_linux_platform,
|
||||
rm_file
|
||||
)
|
||||
from trains_agent.helper.console import ensure_text, print_text, decode_binary_lines
|
||||
from trains_agent.helper.package.base import PackageManager
|
||||
from trains_agent.helper.package.conda_api import CondaAPI
|
||||
from trains_agent.helper.package.horovod_req import HorovodRequirement
|
||||
from trains_agent.helper.package.external_req import ExternalRequirements
|
||||
from trains_agent.helper.package.pip_api.system import SystemPip
|
||||
from trains_agent.helper.package.pip_api.venv import VirtualenvPip
|
||||
from trains_agent.helper.package.poetry_api import PoetryConfig, PoetryAPI
|
||||
from trains_agent.helper.package.pytorch import PytorchRequirement
|
||||
@@ -73,13 +77,16 @@ from trains_agent.helper.package.requirements import RequirementsManager
|
||||
from trains_agent.helper.package.venv_update_api import VenvUpdateAPI
|
||||
from trains_agent.helper.process import (
|
||||
kill_all_child_processes,
|
||||
check_if_command_exists,
|
||||
WorkerParams,
|
||||
ExitStatus,
|
||||
Argv,
|
||||
COMMAND_SUCCESS,
|
||||
Executable,
|
||||
get_bash_output, shutdown_docker_process, get_docker_id, commit_docker)
|
||||
get_bash_output,
|
||||
shutdown_docker_process,
|
||||
get_docker_id,
|
||||
commit_docker
|
||||
)
|
||||
from trains_agent.helper.package.cython_req import CythonRequirement
|
||||
from trains_agent.helper.repo import clone_repository_cached, RepoInfo, VCS
|
||||
from trains_agent.helper.resource_monitor import ResourceMonitor
|
||||
@@ -229,6 +236,8 @@ class TaskStopSignal(object):
|
||||
return self._test()
|
||||
except Exception as ex:
|
||||
self.command.log_traceback(ex)
|
||||
# make sure we break nothing
|
||||
return TaskStopSignal.default
|
||||
|
||||
def _test(self):
|
||||
# type: () -> TaskStopReason
|
||||
@@ -361,6 +370,7 @@ class Worker(ServiceCommandSection):
|
||||
self._docker_force_pull = self._session.config.get("agent.docker_force_pull", False)
|
||||
self._daemon_foreground = None
|
||||
self._standalone_mode = None
|
||||
self._force_current_version = None
|
||||
|
||||
def _get_requirements_manager(self, os_override=None, base_interpreter=None):
|
||||
requirements_manager = RequirementsManager(
|
||||
@@ -390,13 +400,14 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def run_one_task(self, queue, task_id, worker_args):
|
||||
def run_one_task(self, queue, task_id, worker_args, docker=None):
|
||||
# type: (Text, Text, WorkerParams) -> ()
|
||||
"""
|
||||
Run one task pulled from queue.
|
||||
:param queue: ID of queue that task was pulled from
|
||||
:param task_id: ID of task to run
|
||||
:param worker_args: Worker command line arguments
|
||||
:param docker: Docker image in which the execution task will run
|
||||
"""
|
||||
# start new process and execute task id
|
||||
print("Running task '{}'".format(task_id))
|
||||
@@ -422,7 +433,7 @@ class Worker(ServiceCommandSection):
|
||||
if self.docker_image_func:
|
||||
try:
|
||||
response = get_task(self._session, task_id, only_fields=["execution.docker_cmd"])
|
||||
task_docker_cmd = response.execution.docker_cmd
|
||||
task_docker_cmd = docker or response.execution.docker_cmd
|
||||
task_docker_cmd = task_docker_cmd.strip() if task_docker_cmd else None
|
||||
except Exception:
|
||||
task_docker_cmd = None
|
||||
@@ -629,11 +640,13 @@ class Worker(ServiceCommandSection):
|
||||
self.log.debug("starting resource monitor thread")
|
||||
print("Worker \"{}\" - ".format(self.worker_id), end='')
|
||||
|
||||
if not queues:
|
||||
if queues:
|
||||
queues = return_list(queues)
|
||||
queues = [self._resolve_name(q, "queues") for q in queues]
|
||||
else:
|
||||
default_queue = self._session.send_api(queues_api.GetDefaultRequest())
|
||||
queues = [default_queue.id]
|
||||
|
||||
queues = return_list(queues)
|
||||
queues_info = [
|
||||
self._session.send_api(
|
||||
queues_api.GetByIdRequest(queue)
|
||||
@@ -654,9 +667,8 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
# print docker image
|
||||
if docker is not False and docker is not None:
|
||||
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
|
||||
self.dump_config(temp_config)
|
||||
self.docker_image_func = docker_image_func
|
||||
self._force_current_version = kwargs.get('force_current_version', False)
|
||||
self.set_docker_variables(docker)
|
||||
else:
|
||||
self.dump_config()
|
||||
|
||||
@@ -749,9 +761,11 @@ class Worker(ServiceCommandSection):
|
||||
):
|
||||
# type: (...) -> Tuple[Optional[int], TaskStopReason]
|
||||
def _print_file(file_path, prev_line_count):
|
||||
with open(file_path, "rt") as f:
|
||||
with open(file_path, "rb") as f:
|
||||
binary_text = f.read()
|
||||
# skip the previously printed lines,
|
||||
return f.readlines()[prev_line_count:]
|
||||
blines = binary_text.split(b'\n')[prev_line_count:]
|
||||
return decode_binary_lines(blines)
|
||||
|
||||
stdout = open(stdout_path, "wt")
|
||||
stderr = open(stderr_path, "wt") if stderr_path else stdout
|
||||
@@ -780,7 +794,7 @@ class Worker(ServiceCommandSection):
|
||||
if daemon:
|
||||
self.send_logs(
|
||||
task_id=task_id,
|
||||
lines=["User aborted: stopping task\n"],
|
||||
lines=["User aborted: stopping task ({})\n".format(str(stop_reason))],
|
||||
level="ERROR",
|
||||
)
|
||||
kill_all_child_processes(process.pid)
|
||||
@@ -844,7 +858,8 @@ class Worker(ServiceCommandSection):
|
||||
"""
|
||||
if not lines:
|
||||
return 0
|
||||
print("".join(lines), end="")
|
||||
print_text("".join(lines))
|
||||
|
||||
# remove backspaces from the text log, they look bad.
|
||||
for i, l in enumerate(lines):
|
||||
lines[i] = l.replace('\x08', '')
|
||||
@@ -920,7 +935,15 @@ class Worker(ServiceCommandSection):
|
||||
except AttributeError:
|
||||
requirements = None
|
||||
|
||||
# TODO: make sure we pass the correct python_version
|
||||
if not python_version:
|
||||
try:
|
||||
python_version = current_task.script.binary
|
||||
python_version = python_version.split('/')[-1].replace('python', '')
|
||||
# if we can cast it, we are good
|
||||
python_version = '{:.1f}'.format(float(python_version))
|
||||
except:
|
||||
python_version = None
|
||||
|
||||
venv_folder, requirements_manager = self.install_virtualenv(venv_dir=target,
|
||||
requested_python_version=python_version)
|
||||
|
||||
@@ -1030,6 +1053,8 @@ class Worker(ServiceCommandSection):
|
||||
require_queue=False,
|
||||
log_file=None,
|
||||
standalone_mode=None,
|
||||
docker=False,
|
||||
clone=False,
|
||||
**_
|
||||
):
|
||||
if not task_id:
|
||||
@@ -1042,25 +1067,42 @@ class Worker(ServiceCommandSection):
|
||||
except Exception:
|
||||
raise ValueError("Could not find task id={}".format(task_id))
|
||||
|
||||
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
|
||||
try:
|
||||
res = self._session.api_client.tasks.dequeue(task=current_task.id)
|
||||
if require_queue and res.meta.result_code != 200:
|
||||
raise ValueError("Execution required enqueued task, "
|
||||
"but task id={} is not queued.".format(current_task.id))
|
||||
except Exception:
|
||||
if require_queue:
|
||||
raise
|
||||
if clone:
|
||||
try:
|
||||
print("Cloning task id={}".format(task_id))
|
||||
current_task = self._session.api_client.tasks.get_by_id(
|
||||
self._session.send_api(
|
||||
tasks_api.CloneRequest(task=current_task.id, new_task_name='Clone of {}'.format(current_task.name))
|
||||
).id
|
||||
)
|
||||
print("Task cloned, new task id={}".format(current_task.id))
|
||||
except Exception:
|
||||
raise CommandFailedError("Cloning failed")
|
||||
else:
|
||||
# make sure this task is not stuck in an execution queue, it shouldn't have been, but just in case.
|
||||
try:
|
||||
res = self._session.api_client.tasks.dequeue(task=current_task.id)
|
||||
if require_queue and res.meta.result_code != 200:
|
||||
raise ValueError("Execution required enqueued task, "
|
||||
"but task id={} is not queued.".format(current_task.id))
|
||||
except Exception:
|
||||
if require_queue:
|
||||
raise
|
||||
|
||||
if full_monitoring:
|
||||
if docker is not False and docker is not None:
|
||||
self.set_docker_variables(docker)
|
||||
|
||||
# We expect the same behaviour in case full_monitoring was set, and in case docker mode is used
|
||||
if full_monitoring or docker is not False:
|
||||
worker_params = WorkerParams(
|
||||
log_level=log_level,
|
||||
config_file=self._session.config_file,
|
||||
debug=self._session.debug_mode,
|
||||
trace=self._session.trace,
|
||||
)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
|
||||
self.run_one_task(queue='', task_id=task_id, worker_args=worker_params)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
|
||||
self.run_one_task(queue='', task_id=current_task.id, worker_args=worker_params, docker=docker)
|
||||
|
||||
self.stop_monitor()
|
||||
self._unregister()
|
||||
return
|
||||
@@ -1076,7 +1118,7 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
if not disable_monitoring:
|
||||
self.log.debug("starting resource monitor")
|
||||
self.report_monitor(ResourceMonitor.StatusReport(task=task_id))
|
||||
self.report_monitor(ResourceMonitor.StatusReport(task=current_task.id))
|
||||
|
||||
execution = self.get_execution_info(current_task)
|
||||
|
||||
@@ -1085,7 +1127,16 @@ class Worker(ServiceCommandSection):
|
||||
except AttributeError:
|
||||
requirements = None
|
||||
|
||||
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode)
|
||||
try:
|
||||
python_ver = current_task.script.binary
|
||||
python_ver = python_ver.split('/')[-1].replace('python', '')
|
||||
# if we can cast it, we are good
|
||||
python_ver = '{:.1f}'.format(float(python_ver))
|
||||
except:
|
||||
python_ver = None
|
||||
|
||||
venv_folder, requirements_manager = self.install_virtualenv(standalone_mode=standalone_mode,
|
||||
requested_python_version=python_ver)
|
||||
|
||||
if not standalone_mode:
|
||||
if self._default_pip:
|
||||
@@ -1115,7 +1166,7 @@ class Worker(ServiceCommandSection):
|
||||
script_dir = (directory if isinstance(directory, Path) else Path(directory)).absolute().as_posix()
|
||||
|
||||
# run code
|
||||
print("Running task id [%s]:" % task_id)
|
||||
print("Running task id [%s]:" % current_task.id)
|
||||
extra = ['-u', ]
|
||||
if optimization:
|
||||
extra.append(
|
||||
@@ -1149,7 +1200,7 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
if repo_info:
|
||||
self._update_commit_id(task_id, execution, repo_info)
|
||||
self._update_commit_id(current_task.id, execution, repo_info)
|
||||
|
||||
# Add the script CWD to the python path
|
||||
python_path = get_python_path(script_dir, execution.entry_point, self.package_api) \
|
||||
@@ -1157,11 +1208,20 @@ class Worker(ServiceCommandSection):
|
||||
if python_path:
|
||||
os.environ['PYTHONPATH'] = python_path
|
||||
|
||||
print("Starting Task Execution:\n".format(task_id))
|
||||
# check if we want to run as another user, only supported on linux
|
||||
if os.environ.get(ENV_TASK_EXECUTE_AS_USER, None) and is_linux_platform():
|
||||
command, script_dir = self._run_as_user_patch(
|
||||
command, script_dir, venv_folder,
|
||||
self._session.config.get('sdk.storage.cache.default_base_dir'),
|
||||
os.environ.get(ENV_TASK_EXECUTE_AS_USER))
|
||||
use_execv = False
|
||||
else:
|
||||
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
|
||||
|
||||
print("Starting Task Execution:\n".format(current_task.id))
|
||||
exit_code = -1
|
||||
try:
|
||||
if disable_monitoring:
|
||||
use_execv = is_linux_platform() and not isinstance(self.package_api, (PoetryAPI, CondaAPI))
|
||||
try:
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
@@ -1187,13 +1247,13 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
print("Storing stdout and stderr log into [%s]" % temp_stdout_fname)
|
||||
exit_code, _ = self._log_command_output(
|
||||
task_id=task_id,
|
||||
task_id=current_task.id,
|
||||
cmd=command,
|
||||
stdout_path=temp_stdout_fname,
|
||||
cwd=script_dir,
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
self.handle_user_abort(task_id)
|
||||
self.handle_user_abort(current_task.id)
|
||||
raise
|
||||
except Exception as e:
|
||||
self.log.warning(str(e))
|
||||
@@ -1210,13 +1270,18 @@ class Worker(ServiceCommandSection):
|
||||
|
||||
if not disable_monitoring:
|
||||
# we need to change task status according to exit code
|
||||
self.handle_task_termination(task_id, exit_code, TaskStopReason.no_stop)
|
||||
self.handle_task_termination(current_task.id, exit_code, TaskStopReason.no_stop)
|
||||
self.stop_monitor()
|
||||
# unregister the worker
|
||||
self._unregister()
|
||||
|
||||
return 1 if exit_code is None else exit_code
|
||||
|
||||
def set_docker_variables(self, docker):
|
||||
temp_config, docker_image_func = self.get_docker_config_cmd(docker)
|
||||
self.dump_config(temp_config)
|
||||
self.docker_image_func = docker_image_func
|
||||
|
||||
def get_execution_info(self, current_task):
|
||||
# type: (...) -> ExecutionInfo
|
||||
try:
|
||||
@@ -1498,10 +1563,7 @@ class Worker(ServiceCommandSection):
|
||||
raise e
|
||||
finally:
|
||||
if self._session.debug_mode and temp_file:
|
||||
try:
|
||||
Path(temp_file.name).unlink()
|
||||
except OSError:
|
||||
pass
|
||||
rm_file(temp_file.name)
|
||||
# call post installation callback
|
||||
requirements_manager.post_install()
|
||||
# mark as successful installation
|
||||
@@ -1627,7 +1689,7 @@ class Worker(ServiceCommandSection):
|
||||
)
|
||||
|
||||
def install_virtualenv(self, venv_dir=None, requested_python_version=None, standalone_mode=False):
|
||||
# type: (str, str) -> Tuple[Path, RequirementsManager]
|
||||
# type: (str, str, bool) -> Tuple[Path, RequirementsManager]
|
||||
"""
|
||||
Install a new python virtual environment, removing the old one if exists
|
||||
:return: virtualenv directory and requirements manager to use with task
|
||||
@@ -1640,9 +1702,16 @@ class Worker(ServiceCommandSection):
|
||||
requested_python_version[max(requested_python_version.find('python'), 0):].replace('python', '')
|
||||
executable_name = 'python'
|
||||
else:
|
||||
executable_version, executable_version_suffix, executable_name = self.find_python_executable_for_version(
|
||||
requested_python_version
|
||||
)
|
||||
try:
|
||||
executable_version, executable_version_suffix, executable_name = \
|
||||
self.find_python_executable_for_version(requested_python_version)
|
||||
except Exception:
|
||||
def_python_version = Text(self._session.config.get("agent.python_binary", None)) or \
|
||||
Text(self._session.config.get("agent.default_python", None))
|
||||
print('Warning: could not locate requested Python version {}, reverting to version {}'.format(
|
||||
requested_python_version, def_python_version))
|
||||
executable_version, executable_version_suffix, executable_name = \
|
||||
self.find_python_executable_for_version(def_python_version)
|
||||
self._session.config.put("agent.default_python", executable_version)
|
||||
self._session.config.put("agent.python_binary", executable_name)
|
||||
|
||||
@@ -1799,6 +1868,10 @@ class Worker(ServiceCommandSection):
|
||||
cmds = [cmds]
|
||||
extra_shell_script_str = " ; ".join(map(str, cmds)) + " ; "
|
||||
|
||||
self.temp_config_path = self.temp_config_path or safe_mkstemp(
|
||||
suffix=".cfg", prefix=".trains_agent.", text=True, name_only=True
|
||||
)
|
||||
|
||||
docker_cmd = dict(worker_id=self.worker_id,
|
||||
# docker_image=docker_image,
|
||||
# docker_arguments=docker_arguments,
|
||||
@@ -1811,7 +1884,7 @@ class Worker(ServiceCommandSection):
|
||||
host_cache=host_cache, mounted_cache=mounted_cache_dir,
|
||||
host_pip_dl=host_pip_dl, mounted_pip_dl=mounted_pip_dl_dir,
|
||||
host_vcs_cache=host_vcs_cache, mounted_vcs_cache=mounted_vcs_cache,
|
||||
standalone_mode=self._standalone_mode)
|
||||
standalone_mode=self._standalone_mode, force_current_version=self._force_current_version)
|
||||
return temp_config, partial(docker_cmd_functor, docker_cmd)
|
||||
|
||||
@staticmethod
|
||||
@@ -1823,15 +1896,25 @@ class Worker(ServiceCommandSection):
|
||||
host_cache, mounted_cache,
|
||||
host_pip_dl, mounted_pip_dl,
|
||||
host_vcs_cache, mounted_vcs_cache,
|
||||
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None):
|
||||
standalone_mode=False, extra_docker_arguments=None, extra_shell_script=None,
|
||||
force_current_version=None):
|
||||
docker = 'docker'
|
||||
|
||||
base_cmd = [docker, 'run', '-t']
|
||||
update_scheme = ""
|
||||
dockers_nvidia_visible_devices = 'all'
|
||||
gpu_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES', None)
|
||||
if gpu_devices is None or gpu_devices.lower().strip() == 'all':
|
||||
base_cmd += ['--gpus', 'all', ]
|
||||
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
|
||||
dockers_nvidia_visible_devices = os.environ.get('NVIDIA_VISIBLE_DEVICES') or \
|
||||
dockers_nvidia_visible_devices
|
||||
else:
|
||||
base_cmd += ['--gpus', 'all', ]
|
||||
elif gpu_devices.strip() and gpu_devices.strip() != 'none':
|
||||
base_cmd += ['--gpus', 'device='+gpu_devices, ]
|
||||
if os.environ.get('TRAINS_DOCKER_SKIP_GPUS_FLAG', None):
|
||||
dockers_nvidia_visible_devices = gpu_devices
|
||||
else:
|
||||
base_cmd += ['--gpus', 'device='+gpu_devices, ]
|
||||
# We are using --gpu, so we should not pass NVIDIA_VISIBLE_DEVICES, I think.
|
||||
# base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES=' + gpu_devices, ]
|
||||
|
||||
@@ -1845,10 +1928,51 @@ class Worker(ServiceCommandSection):
|
||||
if isinstance(extra_docker_arguments, six.string_types) else extra_docker_arguments
|
||||
base_cmd += [str(a) for a in extra_docker_arguments if a]
|
||||
|
||||
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
|
||||
# check if running inside a kubernetes
|
||||
if os.environ.get('KUBERNETES_SERVICE_HOST') and os.environ.get('KUBERNETES_PORT'):
|
||||
# map network to sibling docker, unless we have other network argument
|
||||
if not any(a.strip().startswith('--network') for a in base_cmd):
|
||||
try:
|
||||
network_mode = get_bash_output(
|
||||
'docker inspect --format=\'{{.HostConfig.NetworkMode}}\' $(basename $(cat /proc/1/cpuset))')
|
||||
base_cmd += ['--network', network_mode]
|
||||
except:
|
||||
pass
|
||||
base_cmd += ['-e', 'NVIDIA_VISIBLE_DEVICES={}'.format(dockers_nvidia_visible_devices)]
|
||||
|
||||
if host_ssh_cache:
|
||||
base_cmd += ['-v', host_ssh_cache+':/root/.ssh', ]
|
||||
# check if we need to map host folders
|
||||
if os.environ.get(ENV_K8S_HOST_MOUNT):
|
||||
# expect TRAINS_AGENT_K8S_HOST_MOUNT = '/mnt/host/data:/root/.trains'
|
||||
k8s_node_mnt, _, k8s_pod_mnt = os.environ.get(ENV_K8S_HOST_MOUNT).partition(':')
|
||||
# search and replace all the host folders with the k8s
|
||||
host_mounts = [host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache]
|
||||
for i, m in enumerate(host_mounts):
|
||||
if k8s_pod_mnt not in m:
|
||||
print('Warning: K8S mount missing, ignoring cached folder {}'.format(m))
|
||||
host_mounts[i] = None
|
||||
else:
|
||||
host_mounts[i] = m.replace(k8s_pod_mnt, k8s_node_mnt)
|
||||
host_apt_cache, host_pip_cache, host_pip_dl, host_cache, host_vcs_cache = host_mounts
|
||||
|
||||
# copy the configuration file into the mounted folder
|
||||
new_conf_file = os.path.join(k8s_pod_mnt, '.trains_agent.{}.cfg'.format(quote(worker_id, safe="")))
|
||||
try:
|
||||
rm_file(new_conf_file)
|
||||
shutil.copy(conf_file, new_conf_file)
|
||||
conf_file = new_conf_file.replace(k8s_pod_mnt, k8s_node_mnt)
|
||||
except Exception:
|
||||
raise ValueError('Error: could not copy configuration file into: {}'.format(new_conf_file))
|
||||
|
||||
if host_ssh_cache:
|
||||
new_ssh_cache = os.path.join(k8s_pod_mnt, '.trains_agent.{}.ssh'.format(quote(worker_id, safe="")))
|
||||
try:
|
||||
rm_tree(new_ssh_cache)
|
||||
shutil.copytree(host_ssh_cache, new_ssh_cache)
|
||||
host_ssh_cache = new_ssh_cache.replace(k8s_pod_mnt, k8s_node_mnt)
|
||||
except Exception:
|
||||
raise ValueError('Error: could not copy .ssh directory into: {}'.format(new_ssh_cache))
|
||||
|
||||
base_cmd += ['-e', 'TRAINS_WORKER_ID='+worker_id, ]
|
||||
|
||||
# if we are running a RC version, install the same version in the docker
|
||||
# because the default latest, will be a release version (not RC)
|
||||
@@ -1856,15 +1980,13 @@ class Worker(ServiceCommandSection):
|
||||
try:
|
||||
from trains_agent.version import __version__
|
||||
_version_parts = __version__.split('.')
|
||||
if 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
|
||||
if force_current_version or 'rc' in _version_parts[-1].lower() or 'rc' in _version_parts[-2].lower():
|
||||
specify_version = '=={}'.format(__version__)
|
||||
except:
|
||||
pass
|
||||
|
||||
if standalone_mode:
|
||||
update_scheme = ""
|
||||
else:
|
||||
update_scheme = \
|
||||
if not standalone_mode:
|
||||
update_scheme += \
|
||||
"echo 'Binary::apt::APT::Keep-Downloaded-Packages \"true\";' > /etc/apt/apt.conf.d/docker-clean ; " \
|
||||
"chown -R root /root/.cache/pip ; " \
|
||||
"apt-get update ; " \
|
||||
@@ -1875,21 +1997,86 @@ class Worker(ServiceCommandSection):
|
||||
python=python_version, pip_version=PackageManager.get_pip_version(),
|
||||
specify_version=specify_version)
|
||||
|
||||
base_cmd += [
|
||||
'-v', conf_file+':/root/trains.conf',
|
||||
'-v', host_apt_cache+':/var/cache/apt/archives',
|
||||
'-v', host_pip_cache+':/root/.cache/pip',
|
||||
'-v', host_pip_dl+':'+mounted_pip_dl,
|
||||
'-v', host_cache+':'+mounted_cache,
|
||||
'-v', host_vcs_cache+':'+mounted_vcs_cache,
|
||||
'--rm', docker_image, 'bash', '-c',
|
||||
update_scheme +
|
||||
extra_shell_script +
|
||||
"NVIDIA_VISIBLE_DEVICES=all {python} -u -m trains_agent ".format(python=python_version)
|
||||
]
|
||||
base_cmd += (
|
||||
['-v', conf_file+':/root/trains.conf'] +
|
||||
(['-v', host_ssh_cache+':/root/.ssh'] if host_ssh_cache else []) +
|
||||
(['-v', host_apt_cache+':/var/cache/apt/archives'] if host_apt_cache else []) +
|
||||
(['-v', host_pip_cache+':/root/.cache/pip'] if host_pip_cache else []) +
|
||||
(['-v', host_pip_dl+':'+mounted_pip_dl] if host_pip_dl else []) +
|
||||
(['-v', host_cache+':'+mounted_cache] if host_cache else []) +
|
||||
(['-v', host_vcs_cache+':'+mounted_vcs_cache] if host_vcs_cache else []) +
|
||||
['--rm', docker_image, 'bash', '-c',
|
||||
update_scheme +
|
||||
extra_shell_script +
|
||||
"NVIDIA_VISIBLE_DEVICES={nv_visible} {python} -u -m trains_agent ".format(
|
||||
nv_visible=dockers_nvidia_visible_devices, python=python_version)
|
||||
])
|
||||
|
||||
return base_cmd
|
||||
|
||||
def _run_as_user_patch(self, command, script_dir, venv_folder, sdk_cache_folder, user_uid):
|
||||
class RunasArgv(Argv):
|
||||
def __init__(self, *args):
|
||||
super(RunasArgv, self).__init__(*args)
|
||||
self.uid = 0
|
||||
self.gid = 0
|
||||
|
||||
def call_subprocess(self, func, censor_password=False, *args, **kwargs):
|
||||
self._log.debug("running: %s: %s", func.__name__, list(self))
|
||||
with self.normalize_exception(censor_password):
|
||||
return func(list(self), *args, preexec_fn=self._change_uid, **kwargs)
|
||||
|
||||
def set_uid(self, user_uid, user_gid):
|
||||
from pwd import getpwnam
|
||||
self.uid = getpwnam(user_uid).pw_uid
|
||||
self.gid = getpwnam(user_gid).pw_gid
|
||||
|
||||
def _change_uid(self):
|
||||
os.setgid(self.gid)
|
||||
os.setuid(self.uid)
|
||||
|
||||
# create a home folder for our user
|
||||
try:
|
||||
home_folder = '/trains_agent_home'
|
||||
rm_tree(home_folder)
|
||||
Path(home_folder).mkdir(parents=True, exist_ok=True)
|
||||
except:
|
||||
home_folder = '/home/trains_agent_home'
|
||||
rm_tree(home_folder)
|
||||
Path(home_folder).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# move our entire venv into the new home
|
||||
venv_folder = venv_folder.as_posix()
|
||||
if not venv_folder.endswith(os.path.sep):
|
||||
venv_folder += os.path.sep
|
||||
new_venv_folder = os.path.join(home_folder, 'venv/')
|
||||
shutil.move(venv_folder, new_venv_folder)
|
||||
# allow everyone to access it
|
||||
for f in Path(new_venv_folder).rglob('*'):
|
||||
try:
|
||||
f.chmod(0o0777)
|
||||
except:
|
||||
pass
|
||||
# make sure we will be able to access the cache folder (we assume we have the ability change mod)
|
||||
if sdk_cache_folder:
|
||||
sdk_cache_folder = Path(os.path.expandvars(sdk_cache_folder)).expanduser().absolute()
|
||||
for f in sdk_cache_folder.rglob('*'):
|
||||
try:
|
||||
f.chmod(0o0777)
|
||||
except:
|
||||
pass
|
||||
|
||||
# patch venv folder to new location
|
||||
script_dir = script_dir.replace(venv_folder, new_venv_folder)
|
||||
# New command line execution
|
||||
command = RunasArgv('bash', '-c', 'HOME=\"{}\" PATH=\"{}\" {}'.format(
|
||||
home_folder,
|
||||
os.environ.get('PATH', '').replace(venv_folder, new_venv_folder),
|
||||
command.serialize().replace(venv_folder, new_venv_folder)))
|
||||
command.set_uid(user_uid=user_uid, user_gid=user_uid)
|
||||
|
||||
return command, script_dir
|
||||
|
||||
def _singleton(self):
|
||||
# ensure singleton
|
||||
worker_id = self._session.config["agent.worker_id"]
|
||||
@@ -1903,7 +2090,8 @@ class Worker(ServiceCommandSection):
|
||||
else:
|
||||
worker_name = '{}:cpu'.format(worker_name)
|
||||
|
||||
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name)
|
||||
self.worker_id, worker_slot = Singleton.register_instance(unique_worker_id=worker_id, worker_name=worker_name,
|
||||
api_client=self._session.api_client)
|
||||
if self.worker_id is None:
|
||||
error('Instance with the same WORKER_ID [{}] is already running'.format(worker_id))
|
||||
exit(1)
|
||||
|
||||
@@ -73,6 +73,12 @@ ENVIRONMENT_CONFIG = {
|
||||
"agent.cpu_only": EnvironmentConfig(
|
||||
"TRAINS_CPU_ONLY", "ALG_CPU_ONLY", "CPU_ONLY", type=bool
|
||||
),
|
||||
"sdk.aws.s3.key": EnvironmentConfig("AWS_ACCESS_KEY_ID"),
|
||||
"sdk.aws.s3.secret": EnvironmentConfig("AWS_SECRET_ACCESS_KEY"),
|
||||
"sdk.aws.s3.region": EnvironmentConfig("AWS_DEFAULT_REGION"),
|
||||
"sdk.azure.storage.containers.0": {'account_name': EnvironmentConfig("AZURE_STORAGE_ACCOUNT"),
|
||||
'account_key': EnvironmentConfig("AZURE_STORAGE_KEY")},
|
||||
"sdk.google.storage.credentials_json": EnvironmentConfig("GOOGLE_APPLICATION_CREDENTIALS"),
|
||||
}
|
||||
|
||||
CONFIG_FILE_ENV = EnvironmentConfig("ALG_CONFIG_FILE")
|
||||
@@ -114,6 +120,8 @@ DEFAULT_VCS_CACHE = normalize_path(CONFIG_DIR, "vcs-cache")
|
||||
PIP_EXTRA_INDICES = [
|
||||
]
|
||||
DEFAULT_PIP_DOWNLOAD_CACHE = normalize_path(CONFIG_DIR, "pip-download-cache")
|
||||
ENV_TASK_EXECUTE_AS_USER = 'TRAINS_AGENT_EXEC_USER'
|
||||
ENV_K8S_HOST_MOUNT = 'TRAINS_AGENT_K8S_HOST_MOUNT'
|
||||
|
||||
|
||||
class FileBuffering(IntEnum):
|
||||
|
||||
169
trains_agent/glue/k8s.py
Normal file
169
trains_agent/glue/k8s.py
Normal file
@@ -0,0 +1,169 @@
|
||||
from __future__ import print_function, division, unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from time import sleep
|
||||
from typing import Text, List
|
||||
|
||||
from pyhocon import HOCONConverter
|
||||
|
||||
from trains_agent.commands.events import Events
|
||||
from trains_agent.commands.worker import Worker
|
||||
from trains_agent.helper.process import get_bash_output
|
||||
from trains_agent.helper.resource_monitor import ResourceMonitor
|
||||
|
||||
|
||||
class K8sIntegration(Worker):
|
||||
K8S_PENDING_QUEUE = "k8s_scheduler"
|
||||
|
||||
KUBECTL_RUN_CMD = "kubectl run trains_id_{task_id} " \
|
||||
"--image {docker_image} " \
|
||||
"--restart=Never --replicas=1 " \
|
||||
"--generator=run-pod/v1"
|
||||
|
||||
KUBECTL_DELETE_CMD = "kubectl delete pods " \
|
||||
"--selector=TRAINS=agent " \
|
||||
"--field-selector=status.phase!=Pending,status.phase!=Running"
|
||||
|
||||
CONTAINER_BASH_SCRIPT = "apt-get install -y git python-pip && " \
|
||||
"pip install trains-agent && " \
|
||||
"python -u -m trains_agent execute --full-monitoring --require-queue --id {}"
|
||||
|
||||
def __init__(self, k8s_pending_queue_name=None, kubectl_cmd=None, container_bash_script=None, debug=False):
|
||||
"""
|
||||
Initialize the k8s integration glue layer daemon
|
||||
|
||||
:param str k8s_pending_queue_name: queue name to use when task is pending in the k8s scheduler
|
||||
:param str|callable kubectl_cmd: kubectl command line str, supports formating (default: KUBECTL_RUN_CMD)
|
||||
example: "task={task_id} image={docker_image} queue_id={queue_id}"
|
||||
or a callable function: kubectl_cmd(task_id, docker_image, queue_id, task_data)
|
||||
:param str container_bash_script: container bash script to be executed in k8s (default: CONTAINER_BASH_SCRIPT)
|
||||
:param bool debug: Switch logging on
|
||||
"""
|
||||
super(K8sIntegration, self).__init__()
|
||||
self.k8s_pending_queue_name = k8s_pending_queue_name or self.K8S_PENDING_QUEUE
|
||||
self.kubectl_cmd = kubectl_cmd or self.KUBECTL_RUN_CMD
|
||||
self.container_bash_script = container_bash_script or self.CONTAINER_BASH_SCRIPT
|
||||
# Always do system packages, because by we will be running inside a docker
|
||||
self._session.config.put("agent.package_manager.system_site_packages", True)
|
||||
# Add debug logging
|
||||
if debug:
|
||||
self.log.logger.disabled = False
|
||||
self.log.logger.setLevel(logging.INFO)
|
||||
|
||||
def run_one_task(self, queue: Text, task_id: Text, worker_args=None):
|
||||
task_data = self._session.api_client.tasks.get_all(id=[task_id])[0]
|
||||
|
||||
# push task into the k8s queue, so we have visibility on pending tasks in the k8s scheduler
|
||||
try:
|
||||
self._session.api_client.tasks.enqueue(task_id, queue=self.k8s_pending_queue_name,
|
||||
status_reason='k8s pending scheduler')
|
||||
except Exception as e:
|
||||
self.log.error("ERROR: Could not push back task [{}] to k8s pending queue [{}], error: {}".format(
|
||||
task_id, self.k8s_pending_queue_name, e))
|
||||
return
|
||||
|
||||
if task_data.execution.docker_cmd:
|
||||
docker_image = task_data.execution.docker_cmd
|
||||
else:
|
||||
docker_image = str(os.environ.get("TRAINS_DOCKER_IMAGE") or
|
||||
self._session.config.get("agent.default_docker.image", "nvidia/cuda"))
|
||||
|
||||
# take the first part, this is the docker image name (not arguments)
|
||||
docker_image = docker_image.split()[0]
|
||||
|
||||
create_trains_conf = "echo '{}' >> ~/trains.conf && ".format(
|
||||
HOCONConverter.to_hocon(self._session.config._config))
|
||||
|
||||
if callable(self.kubectl_cmd):
|
||||
kubectl_cmd = self.kubectl_cmd(task_id, docker_image, queue, task_data)
|
||||
else:
|
||||
kubectl_cmd = self.kubectl_cmd.format(task_id=task_id, docker_image=docker_image, queue_id=queue)
|
||||
|
||||
# make sure we gave a list
|
||||
if isinstance(kubectl_cmd, str):
|
||||
kubectl_cmd = kubectl_cmd.split()
|
||||
|
||||
kubectl_cmd += ["--labels=TRAINS=agent", "--command", "--", "/bin/sh", "-c",
|
||||
create_trains_conf + self.container_bash_script.format(task_id)]
|
||||
process = subprocess.Popen(kubectl_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
output, error = process.communicate()
|
||||
self.log.info("K8s scheduling experiment task id={}".format(task_id))
|
||||
if error:
|
||||
self.log.error("Running kubectl encountered an error: {}".format(
|
||||
error if isinstance(error, str) else error.decode()))
|
||||
|
||||
def run_tasks_loop(self, queues: List[Text], worker_params):
|
||||
"""
|
||||
:summary: Pull and run tasks from queues.
|
||||
:description: 1. Go through ``queues`` by order.
|
||||
2. Try getting the next task for each and run the first one that returns.
|
||||
3. Go to step 1
|
||||
:param queues: IDs of queues to pull tasks from
|
||||
:type queues: list of ``Text``
|
||||
:param worker_params: Worker command line arguments
|
||||
:type worker_params: ``trains_agent.helper.process.WorkerParams``
|
||||
"""
|
||||
events_service = self.get_service(Events)
|
||||
|
||||
# make sure we have a k8s pending queue
|
||||
try:
|
||||
self._session.api_client.queues.create(self.k8s_pending_queue_name)
|
||||
except Exception:
|
||||
pass
|
||||
# get queue id
|
||||
self.k8s_pending_queue_name = self._resolve_name(self.k8s_pending_queue_name, "queues")
|
||||
|
||||
_last_machine_update_ts = 0
|
||||
while True:
|
||||
# iterate over queues (priority style, queues[0] is highest)
|
||||
for queue in queues:
|
||||
# delete old completed /failed pods
|
||||
get_bash_output(self.KUBECTL_DELETE_CMD)
|
||||
|
||||
# get next task in queue
|
||||
try:
|
||||
response = self._session.api_client.queues.get_next_task(queue=queue)
|
||||
except Exception as e:
|
||||
print("Warning: Could not access task queue [{}], error: {}".format(queue, e))
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
task_id = response.entry.task
|
||||
except AttributeError:
|
||||
print("No tasks in queue {}".format(queue))
|
||||
continue
|
||||
events_service.send_log_events(
|
||||
self.worker_id,
|
||||
task_id=task_id,
|
||||
lines="task {} pulled from {} by worker {}".format(
|
||||
task_id, queue, self.worker_id
|
||||
),
|
||||
level="INFO",
|
||||
)
|
||||
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=queues, queue=queue, task=task_id))
|
||||
self.run_one_task(queue, task_id, worker_params)
|
||||
self.report_monitor(ResourceMonitor.StatusReport(queues=self.queues))
|
||||
break
|
||||
else:
|
||||
# sleep and retry polling
|
||||
print("No tasks in Queues, sleeping for {:.1f} seconds".format(self._polling_interval))
|
||||
sleep(self._polling_interval)
|
||||
|
||||
if self._session.config["agent.reload_config"]:
|
||||
self.reload_config()
|
||||
|
||||
def k8s_daemon(self, queues):
|
||||
"""
|
||||
Start the k8s Glue service.
|
||||
This service will be pulling tasks from *queues* and scheduling them for execution using kubectl.
|
||||
Notice all scheduled tasks are pushed back into K8S_PENDING_QUEUE,
|
||||
and popped when execution actually starts. This creates full visibility into the k8s scheduler.
|
||||
Manually popping a task from the K8S_PENDING_QUEUE,
|
||||
will cause the k8s scheduler to skip the execution once the scheduled tasks needs to be executed
|
||||
|
||||
:param list(str) queues: List of queue names to pull from
|
||||
"""
|
||||
return self.daemon(queues=queues, log_level=logging.INFO, foreground=True, docker=False)
|
||||
@@ -463,6 +463,17 @@ def rm_tree(root): # type: (Union[Path, Text]) -> None
|
||||
return shutil.rmtree(os.path.expanduser(os.path.expandvars(Text(root))), onerror=on_error)
|
||||
|
||||
|
||||
def rm_file(filename): # type: (Union[Path, Text]) -> None
|
||||
"""
|
||||
A version of os.unlink that will not raise error
|
||||
"""
|
||||
try:
|
||||
os.unlink(os.path.expanduser(os.path.expandvars(Text(filename))))
|
||||
except:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_conda(config):
|
||||
return config['agent.package_manager.type'].lower() == 'conda'
|
||||
|
||||
|
||||
@@ -22,6 +22,18 @@ def print_text(text, newline=True):
|
||||
sys.stdout.write(data)
|
||||
|
||||
|
||||
def decode_binary_lines(binary_lines, encoding='utf-8'):
|
||||
# decode per line, if we failed decoding skip the line
|
||||
lines = []
|
||||
for b in binary_lines:
|
||||
try:
|
||||
l = b.decode(encoding=encoding, errors='replace').replace('\r', '\n')
|
||||
except:
|
||||
l = ''
|
||||
lines.append(l + '\n' if l and l[-1] != '\n' else l)
|
||||
return lines
|
||||
|
||||
|
||||
def ensure_text(s, encoding='utf-8', errors='strict'):
|
||||
"""Coerce *s* to six.text_type.
|
||||
For Python 2:
|
||||
|
||||
@@ -112,7 +112,7 @@ class CondaAPI(PackageManager):
|
||||
return self.pip.bin
|
||||
|
||||
def upgrade_pip(self):
|
||||
return self.pip.upgrade_pip()
|
||||
return self._install("pip" + self.pip.get_pip_version())
|
||||
|
||||
def create(self):
|
||||
"""
|
||||
|
||||
@@ -6,14 +6,14 @@ from .requirements import SimpleSubstitution
|
||||
|
||||
class CythonRequirement(SimpleSubstitution):
|
||||
|
||||
name = "cython"
|
||||
name = ("cython", "numpy", )
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CythonRequirement, self).__init__(*args, **kwargs)
|
||||
|
||||
def match(self, req):
|
||||
# match both Cython & cython
|
||||
return req.name and self.name == req.name.lower()
|
||||
return req.name and req.name.lower() in self.name
|
||||
|
||||
def replace(self, req):
|
||||
"""
|
||||
|
||||
@@ -82,9 +82,13 @@ class PoetryConfig:
|
||||
def initialize(self, cwd=None):
|
||||
if not self._initialized:
|
||||
self._initialized = True
|
||||
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
|
||||
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
|
||||
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
|
||||
try:
|
||||
self._config("--local", "virtualenvs.in-project", "true", cwd=cwd)
|
||||
# self._config("repositories.{}".format(self.REPO_NAME), PYTHON_INDEX)
|
||||
# self._config("http-basic.{}".format(self.REPO_NAME), *PYTHON_INDEX_CREDENTIALS)
|
||||
except Exception as ex:
|
||||
print("Exception: {}\nError: Failed configuring Poetry virtualenvs.in-project".format(ex))
|
||||
raise
|
||||
|
||||
def get_api(self, path):
|
||||
# type: (Path) -> PoetryAPI
|
||||
|
||||
@@ -456,7 +456,17 @@ class Git(VCS):
|
||||
)
|
||||
|
||||
def pull(self):
|
||||
self.call("fetch", "--all", cwd=self.location)
|
||||
self.call("fetch", "--all", "--recurse-submodules", cwd=self.location)
|
||||
|
||||
def checkout(self): # type: () -> None
|
||||
"""
|
||||
Checkout repository at specified revision
|
||||
"""
|
||||
self.call("checkout", self.revision, *self.checkout_flags, cwd=self.location)
|
||||
try:
|
||||
self.call("submodule", "update", "--recursive", cwd=self.location)
|
||||
except:
|
||||
pass
|
||||
|
||||
info_commands = dict(
|
||||
url=Argv(executable_name, "ls-remote", "--get-url", "origin"),
|
||||
|
||||
@@ -4,11 +4,12 @@ from time import sleep
|
||||
from glob import glob
|
||||
from tempfile import gettempdir, NamedTemporaryFile
|
||||
|
||||
from trains_agent.definitions import ENV_K8S_HOST_MOUNT
|
||||
from trains_agent.helper.base import warning
|
||||
|
||||
|
||||
class Singleton(object):
|
||||
prefix = 'trainsagent'
|
||||
prefix = '.trainsagent'
|
||||
sep = '_'
|
||||
ext = '.tmp'
|
||||
worker_id = None
|
||||
@@ -19,7 +20,7 @@ class Singleton(object):
|
||||
_lock_timeout = 10
|
||||
|
||||
@classmethod
|
||||
def register_instance(cls, unique_worker_id=None, worker_name=None):
|
||||
def register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
|
||||
"""
|
||||
# Exit the process if another instance of us is using the same worker_id
|
||||
|
||||
@@ -28,7 +29,7 @@ class Singleton(object):
|
||||
:return: (str worker_id, int slot_number) Return None value on instance already running
|
||||
"""
|
||||
# try to lock file
|
||||
lock_file = os.path.join(gettempdir(), cls._lock_file_name)
|
||||
lock_file = os.path.join(cls._get_temp_folder(), cls._lock_file_name)
|
||||
timeout = 0
|
||||
while os.path.exists(lock_file):
|
||||
if timeout > cls._lock_timeout:
|
||||
@@ -46,7 +47,8 @@ class Singleton(object):
|
||||
f.write(bytes(os.getpid()))
|
||||
f.flush()
|
||||
try:
|
||||
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name)
|
||||
ret = cls._register_instance(unique_worker_id=unique_worker_id, worker_name=worker_name,
|
||||
api_client=api_client)
|
||||
except:
|
||||
ret = None, None
|
||||
|
||||
@@ -58,12 +60,12 @@ class Singleton(object):
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None):
|
||||
def _register_instance(cls, unique_worker_id=None, worker_name=None, api_client=None):
|
||||
if cls.worker_id:
|
||||
return cls.worker_id, cls.instance_slot
|
||||
# make sure we have a unique name
|
||||
instance_num = 0
|
||||
temp_folder = gettempdir()
|
||||
temp_folder = cls._get_temp_folder()
|
||||
files = glob(os.path.join(temp_folder, cls.prefix + cls.sep + '*' + cls.ext))
|
||||
slots = {}
|
||||
for file in files:
|
||||
@@ -73,8 +75,24 @@ class Singleton(object):
|
||||
except Exception:
|
||||
# something is wrong, use non existing pid and delete the file
|
||||
pid = -1
|
||||
|
||||
uid, slot = None, None
|
||||
try:
|
||||
with open(file, 'r') as f:
|
||||
uid, slot = str(f.read()).split('\n')
|
||||
slot = int(slot)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
worker = None
|
||||
if api_client and os.environ.get(ENV_K8S_HOST_MOUNT) and uid:
|
||||
try:
|
||||
worker = [w for w in api_client.workers.get_all() if w.id == uid]
|
||||
except Exception:
|
||||
worker = None
|
||||
|
||||
# count active instances and delete dead files
|
||||
if not psutil.pid_exists(pid):
|
||||
if not worker and not psutil.pid_exists(pid):
|
||||
# delete the file
|
||||
try:
|
||||
os.remove(os.path.join(file))
|
||||
@@ -83,11 +101,7 @@ class Singleton(object):
|
||||
continue
|
||||
|
||||
instance_num += 1
|
||||
try:
|
||||
with open(file, 'r') as f:
|
||||
uid, slot = str(f.read()).split('\n')
|
||||
slot = int(slot)
|
||||
except Exception:
|
||||
if slot is None:
|
||||
continue
|
||||
|
||||
if uid == unique_worker_id:
|
||||
@@ -110,10 +124,16 @@ class Singleton(object):
|
||||
unique_worker_id = worker_name + cls.worker_name_sep + str(cls.instance_slot)
|
||||
|
||||
# create lock
|
||||
cls._pid_file = NamedTemporaryFile(dir=gettempdir(), prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep,
|
||||
suffix=cls.ext)
|
||||
cls._pid_file = NamedTemporaryFile(dir=cls._get_temp_folder(),
|
||||
prefix=cls.prefix + cls.sep + str(os.getpid()) + cls.sep, suffix=cls.ext)
|
||||
cls._pid_file.write(('{}\n{}'.format(unique_worker_id, cls.instance_slot)).encode())
|
||||
cls._pid_file.flush()
|
||||
cls.worker_id = unique_worker_id
|
||||
|
||||
return cls.worker_id, cls.instance_slot
|
||||
|
||||
@classmethod
|
||||
def _get_temp_folder(cls):
|
||||
if os.environ.get(ENV_K8S_HOST_MOUNT):
|
||||
return os.environ.get(ENV_K8S_HOST_MOUNT).split(':')[-1]
|
||||
return gettempdir()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import itertools
|
||||
from functools import partial
|
||||
from importlib import import_module
|
||||
import argparse
|
||||
@@ -24,8 +25,16 @@ def get_parser():
|
||||
from .worker import COMMANDS
|
||||
subparsers = top_parser.add_subparsers(dest='command')
|
||||
for c in COMMANDS:
|
||||
parser = subparsers.add_parser(name=c, help=COMMANDS[c]['help'])
|
||||
for a in COMMANDS[c].get('args', {}).keys():
|
||||
parser.add_argument(a, **COMMANDS[c]['args'][a])
|
||||
parser = subparsers.add_parser(name=c, help=COMMANDS[c]["help"])
|
||||
groups = itertools.groupby(
|
||||
sorted(
|
||||
COMMANDS[c].get("args", {}).items(), key=lambda x: x[1].get("group", "")
|
||||
),
|
||||
key=lambda x: x[1].pop("group", ""),
|
||||
)
|
||||
for group_name, group in groups:
|
||||
p = parser if not group_name else parser.add_argument_group(group_name)
|
||||
for key, value in group:
|
||||
p.add_argument(key, **value)
|
||||
|
||||
return top_parser
|
||||
|
||||
@@ -37,21 +37,29 @@ DAEMON_ARGS = dict({
|
||||
'help': 'Pipe full log to stdout/stderr, should not be used if running in background',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--gpus': {
|
||||
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
|
||||
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
|
||||
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
|
||||
},
|
||||
'--cpu-only': {
|
||||
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--docker': {
|
||||
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
|
||||
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
|
||||
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
|
||||
'nargs': '*',
|
||||
'default': False,
|
||||
'group': 'Docker support',
|
||||
},
|
||||
'--gpus': {
|
||||
'help': 'Specify active GPUs for the daemon to use (docker / virtual environment), '
|
||||
'Equivalent to setting NVIDIA_VISIBLE_DEVICES '
|
||||
'Examples: --gpus 0 or --gpu 0,1,2 or --gpus all',
|
||||
'group': 'Docker support',
|
||||
},
|
||||
'--cpu-only': {
|
||||
'help': 'Disable GPU access for the daemon, only use CPU in either docker or virtual environment',
|
||||
'action': 'store_true',
|
||||
'group': 'Docker support',
|
||||
},
|
||||
'--force-current-version': {
|
||||
'help': 'Force trains-agent to use the current trains-agent version when running in the docker',
|
||||
'action': 'store_true',
|
||||
'group': 'Docker support',
|
||||
},
|
||||
'--queue': {
|
||||
'help': 'Queue ID(s)/Name(s) to pull tasks from (\'default\' queue)',
|
||||
@@ -97,6 +105,17 @@ COMMANDS = {
|
||||
'help': 'Do not use any network connects, assume everything is pre-installed',
|
||||
'action': 'store_true',
|
||||
},
|
||||
'--docker': {
|
||||
'help': 'Run execution task inside a docker (v19.03 and above). Optional args <image> <arguments> or '
|
||||
'specify default docker image in agent.default_docker.image / agent.default_docker.arguments'
|
||||
'use --gpus/--cpu-only (or set NVIDIA_VISIBLE_DEVICES) to limit gpu visibility for docker',
|
||||
'nargs': '*',
|
||||
'default': False,
|
||||
},
|
||||
'--clone': {
|
||||
'help': 'Clone the experiment before execution, and execute the cloned experiment',
|
||||
'action': 'store_true',
|
||||
},
|
||||
}, **WORKER_ARGS),
|
||||
},
|
||||
'build': {
|
||||
|
||||
@@ -15,7 +15,7 @@ from pyhocon import ConfigFactory, HOCONConverter, ConfigTree
|
||||
from trains_agent.backend_api.session import Session as _Session, Request
|
||||
from trains_agent.backend_api.session.client import APIClient
|
||||
from trains_agent.backend_config.defs import LOCAL_CONFIG_FILE_OVERRIDE_VAR, LOCAL_CONFIG_FILES
|
||||
from trains_agent.definitions import ENVIRONMENT_CONFIG
|
||||
from trains_agent.definitions import ENVIRONMENT_CONFIG, ENV_TASK_EXECUTE_AS_USER
|
||||
from trains_agent.errors import APIError
|
||||
from trains_agent.helper.base import HOCONEncoder
|
||||
from trains_agent.helper.process import Argv
|
||||
@@ -75,7 +75,8 @@ class Session(_Session):
|
||||
cpu_only = kwargs.get('cpu_only')
|
||||
if cpu_only:
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = 'none'
|
||||
if kwargs.get('gpus'):
|
||||
if kwargs.get('gpus') and not os.environ.get('KUBERNETES_SERVICE_HOST') \
|
||||
and not os.environ.get('KUBERNETES_PORT'):
|
||||
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['NVIDIA_VISIBLE_DEVICES'] = kwargs.get('gpus')
|
||||
if kwargs.get('only_load_config'):
|
||||
from trains_agent.backend_api.config import load
|
||||
@@ -86,7 +87,7 @@ class Session(_Session):
|
||||
self.trace = kwargs.get('trace', False)
|
||||
self._config_file = kwargs.get('config_file') or \
|
||||
os.environ.get(LOCAL_CONFIG_FILE_OVERRIDE_VAR) or LOCAL_CONFIG_FILES[0]
|
||||
self.api_client = APIClient(session=self, api_version="2.4")
|
||||
self.api_client = APIClient(session=self, api_version="2.5")
|
||||
# HACK make sure we have python version to execute,
|
||||
# if nothing was specific, use the one that runs us
|
||||
def_python = ConfigValue(self.config, "agent.default_python")
|
||||
@@ -111,6 +112,17 @@ class Session(_Session):
|
||||
# override with environment variables
|
||||
# cuda_version & cudnn_version are overridden with os.environ here, and normalized in the next section
|
||||
for config_key, env_config in ENVIRONMENT_CONFIG.items():
|
||||
# check if the propery is of a list:
|
||||
if config_key.endswith('.0'):
|
||||
if all(not i.get() for i in env_config.values()):
|
||||
continue
|
||||
parent = config_key.partition('.0')[0]
|
||||
if not self.config[parent]:
|
||||
self.config.put(parent, [])
|
||||
|
||||
self.config.put(parent, self.config[parent] + [ConfigTree((k, v.get()) for k, v in env_config.items())])
|
||||
continue
|
||||
|
||||
value = env_config.get()
|
||||
if not value:
|
||||
continue
|
||||
@@ -165,7 +177,11 @@ class Session(_Session):
|
||||
folder_keys = ('agent.venvs_dir', 'agent.vcs_cache.path',
|
||||
'agent.pip_download_cache.path',
|
||||
'agent.docker_pip_cache', 'agent.docker_apt_cache')
|
||||
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path',)
|
||||
singleton_folders = ('agent.venvs_dir', 'agent.vcs_cache.path', 'agent.docker_apt_cache')
|
||||
|
||||
if os.environ.get(ENV_TASK_EXECUTE_AS_USER):
|
||||
folder_keys = tuple(list(folder_keys) + ['sdk.storage.cache.default_base_dir'])
|
||||
singleton_folders = tuple(list(singleton_folders) + ['sdk.storage.cache.default_base_dir'])
|
||||
|
||||
for key in folder_keys:
|
||||
folder_key = ConfigValue(self.config, key)
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = '0.13.1'
|
||||
__version__ = '0.14.0'
|
||||
|
||||
Reference in New Issue
Block a user