Add support for CLEARML_NO_DEFAULT_SERVER env var to prevent agent from using the demo server

Add support for FORCE_CLEARML_AGENT_REPO env var to allow installing agent from a repo url when executing a task
Implement skip venv installation on execute and allow custom binary
Fix services mode limit implementation in docker mode
This commit is contained in:
allegroai 2021-08-02 22:51:26 +03:00
parent fd068c0933
commit 5ed47d2d2c
9 changed files with 548 additions and 215 deletions

View File

@ -6,6 +6,8 @@ ENV_WEB_HOST = EnvEntry("CLEARML_WEB_HOST", "TRAINS_WEB_HOST")
ENV_FILES_HOST = EnvEntry("CLEARML_FILES_HOST", "TRAINS_FILES_HOST")
ENV_ACCESS_KEY = EnvEntry("CLEARML_API_ACCESS_KEY", "TRAINS_API_ACCESS_KEY")
ENV_SECRET_KEY = EnvEntry("CLEARML_API_SECRET_KEY", "TRAINS_API_SECRET_KEY")
ENV_AUTH_TOKEN = EnvEntry("CLEARML_AUTH_TOKEN")
ENV_VERBOSE = EnvEntry("CLEARML_API_VERBOSE", "TRAINS_API_VERBOSE", type=bool, default=False)
ENV_HOST_VERIFY_CERT = EnvEntry("CLEARML_API_HOST_VERIFY_CERT", "TRAINS_API_HOST_VERIFY_CERT", type=bool, default=True)
ENV_CONDA_ENV_PACKAGE = EnvEntry("CLEARML_CONDA_ENV_PACKAGE", "TRAINS_CONDA_ENV_PACKAGE")
ENV_NO_DEFAULT_SERVER = EnvEntry("CLEARML_NO_DEFAULT_SERVER", "TRAINS_NO_DEFAULT_SERVER", type=bool, default=False)

View File

@ -11,7 +11,8 @@ from pyhocon import ConfigTree
from requests.auth import HTTPBasicAuth
from .callresult import CallResult
from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST
from .defs import ENV_VERBOSE, ENV_HOST, ENV_ACCESS_KEY, ENV_SECRET_KEY, ENV_WEB_HOST, ENV_FILES_HOST, ENV_AUTH_TOKEN, \
ENV_NO_DEFAULT_SERVER
from .request import Request, BatchRequest
from .token_manager import TokenManager
from ..config import load
@ -45,6 +46,7 @@ class Session(TokenManager):
_write_session_timeout = (30.0, 30.)
api_version = '2.1'
feature_set = 'basic'
default_host = "https://demoapi.demo.clear.ml"
default_web = "https://demoapp.demo.clear.ml"
default_files = "https://demofiles.demo.clear.ml"
@ -103,13 +105,18 @@ class Session(TokenManager):
"auth.token_expiration_threshold_sec", 60
)
super(Session, self).__init__(
token_expiration_threshold_sec=token_expiration_threshold_sec, **kwargs
)
self._verbose = verbose if verbose is not None else ENV_VERBOSE.get()
self._logger = logger
self.__auth_token = None
if ENV_AUTH_TOKEN.get(
value_cb=lambda key, value: print("Using environment access token {}=********".format(key))
):
self._set_auth_token(ENV_AUTH_TOKEN.get())
# if we use a token we override make sure we are at least 3600 seconds (1 hour)
# away from the token expiration date, ask for a new one.
token_expiration_threshold_sec = max(token_expiration_threshold_sec, 3600)
else:
self.__access_key = api_key or ENV_ACCESS_KEY.get(
default=(self.config.get("api.credentials.access_key", None) or self.default_key),
value_cb=lambda key, value: print("Using environment access key {}={}".format(key, value))
@ -128,6 +135,13 @@ class Session(TokenManager):
"Missing secret_key. Please set in configuration file or pass in session init."
)
super(Session, self).__init__(
token_expiration_threshold_sec=token_expiration_threshold_sec, **kwargs
)
if self.access_key == self.default_key and self.secret_key == self.default_secret:
print("Using built-in ClearML default key/secret")
host = host or self.get_api_server_host(config=self.config)
if not host:
raise ValueError("host is required in init or config")
@ -163,6 +177,7 @@ class Session(TokenManager):
api_version = '2.2' if token_dict.get('env', '') == 'prod' else Session.api_version
Session.api_version = str(api_version)
Session.feature_set = str(token_dict.get('feature_set', self.feature_set) or "basic")
except (jwt.DecodeError, ValueError):
pass
@ -244,6 +259,14 @@ class Session(TokenManager):
headers[self._AUTHORIZATION_HEADER] = "Bearer {}".format(self.token)
return headers
def _set_auth_token(self, auth_token):
self.__access_key = self.__secret_key = None
self.__auth_token = auth_token
def set_auth_token(self, auth_token):
self._set_auth_token(auth_token)
self.refresh_token()
def send_request(
self,
service,
@ -441,8 +464,11 @@ class Session(TokenManager):
if not config:
return None
return ENV_HOST.get(default=(config.get("api.api_server", None) or
config.get("api.host", None) or cls.default_host))
default = config.get("api.api_server", None) or config.get("api.host", None)
if not ENV_NO_DEFAULT_SERVER.get():
default = default or cls.default_host
return ENV_HOST.get(default=default)
@classmethod
def get_app_server_host(cls, config=None):
@ -522,7 +548,13 @@ class Session(TokenManager):
)
)
auth = HTTPBasicAuth(self.access_key, self.secret_key)
headers = None
# use token only once (the second time the token is already built into the http session)
if self.__auth_token:
headers = dict(Authorization="Bearer {}".format(self.__auth_token))
self.__auth_token = None
auth = HTTPBasicAuth(self.access_key, self.secret_key) if self.access_key and self.secret_key else None
res = None
try:
data = {"expiration_sec": exp} if exp else {}
@ -531,6 +563,7 @@ class Session(TokenManager):
action="login",
auth=auth,
json=data,
headers=headers,
refresh_token_if_unauthorized=False,
)
try:

View File

@ -118,11 +118,13 @@ class ServiceCommandSection(BaseCommandSection):
""" The name of the REST service used by this command """
pass
def get(self, endpoint, *args, **kwargs):
return self._session.get(service=self.service, action=endpoint, *args, **kwargs)
def get(self, endpoint, *args, session=None, **kwargs):
session = session or self._session
return session.get(service=self.service, action=endpoint, *args, **kwargs)
def post(self, endpoint, *args, **kwargs):
return self._session.post(service=self.service, action=endpoint, *args, **kwargs)
def post(self, endpoint, *args, session=None, **kwargs):
session = session or self._session
return session.post(service=self.service, action=endpoint, *args, **kwargs)
def get_with_act_as(self, endpoint, *args, **kwargs):
return self._session.get_with_act_as(service=self.service, action=endpoint, *args, **kwargs)

View File

@ -21,14 +21,16 @@ class Events(ServiceCommandSection):
""" Events command service endpoint """
return 'events'
def send_events(self, list_events):
def send_events(self, list_events, session=None):
def send_packet(jsonlines):
if not jsonlines:
return 0
num_lines = len(jsonlines)
jsonlines = '\n'.join(jsonlines)
new_events = self.post('add_batch', data=jsonlines, headers={'Content-type': 'application/json-lines'})
new_events = self.post(
'add_batch', data=jsonlines, headers={'Content-type': 'application/json-lines'}, session=session
)
if new_events['added'] != num_lines:
print('Error (%s) sending events only %d of %d registered' %
(new_events['errors'], new_events['added'], num_lines))
@ -57,7 +59,7 @@ class Events(ServiceCommandSection):
# print('Sending events done: %d / %d events sent' % (sent_events, len(list_events)))
return sent_events
def send_log_events(self, worker_id, task_id, lines, level='DEBUG'):
def send_log_events(self, worker_id, task_id, lines, level='DEBUG', session=None):
log_events = []
base_timestamp = int(time.time() * 1000)
base_log_items = {
@ -94,4 +96,4 @@ class Events(ServiceCommandSection):
log_events.append(get_event(count))
# now send the events
return self.send_events(list_events=log_events)
return self.send_events(list_events=log_events, session=session)

File diff suppressed because it is too large Load Diff

View File

@ -3,11 +3,13 @@ from __future__ import unicode_literals
import abc
from collections import OrderedDict
from contextlib import contextmanager
from typing import Text, Iterable, Union, Optional, Dict, List
from pathlib2 import Path
from hashlib import md5
from typing import Text, Iterable, Union, Optional, Dict, List
import six
from pathlib2 import Path
from clearml_agent.definitions import ENV_VENV_CACHE_PATH
from clearml_agent.helper.base import mkstemp, safe_remove_file, join_lines, select_for_platform
from clearml_agent.helper.console import ensure_binary
from clearml_agent.helper.os.folder_cache import FolderCache
@ -252,7 +254,7 @@ class PackageManager(object):
def _get_cache_manager(self):
if not self._cache_manager:
cache_folder = self.session.config.get(self._config_cache_folder, None)
cache_folder = ENV_VENV_CACHE_PATH.get() or self.session.config.get(self._config_cache_folder, None)
if not cache_folder:
return None

View File

@ -1,6 +1,7 @@
import os
import sys
from itertools import chain
from pathlib import Path
from typing import Text, Optional
from clearml_agent.definitions import PIP_EXTRA_INDICES, PROGRAM_NAME
@ -19,7 +20,7 @@ class SystemPip(PackageManager):
Program interface to the system pip.
"""
super(SystemPip, self).__init__()
self._bin = interpreter or sys.executable
self._bin = Path(interpreter or sys.executable)
self.session = session
@property

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals, division
import logging
import os
import shlex
from collections import deque
from itertools import starmap
from threading import Thread, Event
@ -12,6 +13,7 @@ import attr
import psutil
from pathlib2 import Path
from clearml_agent.session import Session
from clearml_agent.definitions import ENV_WORKER_TAGS
try:
from .gpu import gpustat
@ -59,6 +61,7 @@ class ResourceMonitor(object):
sample_frequency_per_sec=2.0,
report_frequency_sec=30.0,
first_report_sec=None,
worker_tags=None,
):
self.session = session
self.queue = deque(maxlen=1)
@ -76,6 +79,9 @@ class ResourceMonitor(object):
self._gpustat_fail = 0
self._gpustat = gpustat
self._active_gpus = None
if not worker_tags and ENV_WORKER_TAGS.get():
worker_tags = shlex.split(ENV_WORKER_TAGS.get())
self._worker_tags = worker_tags
if os.environ.get('NVIDIA_VISIBLE_DEVICES') == 'none':
# NVIDIA_VISIBLE_DEVICES set to none, marks cpu_only flag
# active_gpus == False means no GPU reporting
@ -118,6 +124,7 @@ class ResourceMonitor(object):
machine_stats=stats,
timestamp=(int(time()) * 1000),
worker=self._worker_id,
tags=self._worker_tags,
**self.get_report().to_dict()
)
log.debug("sending report: %s", report)

View File

@ -83,6 +83,12 @@ DAEMON_ARGS = dict({
'type': int,
'default': None,
},
'--child-report-tags': {
'help': 'List of tags to send with the status reports from the worker that runs a task',
'nargs': '+',
'type': str,
'default': None,
},
'--create-queue': {
'help': 'Create requested queue if it does not exist already.',
'action': 'store_true',
@ -121,6 +127,10 @@ DAEMON_ARGS = dict({
'help': 'Print the worker\'s schedule (uptime properties, server\'s runtime properties and listening queues)',
'action': 'store_true',
},
'--use-owner-token': {
'help': 'Generate and use task owner token for the execution of the task',
'action': 'store_true',
}
}, **WORKER_ARGS)
COMMANDS = {