diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 65803198..6d47e490 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -20,12 +20,12 @@ from ..task import Task from ..backend_api.services import tasks as tasks_service -logger = getLogger('clearml.automation.job') +logger = getLogger("clearml.automation.job") class BaseJob(object): - _job_hash_description = 'job_hash={}' - _job_hash_property = 'pipeline_job_hash' + _job_hash_description = "job_hash={}" + _job_hash_property = "pipeline_job_hash" _hashing_callback = None _last_batch_status_update_ts = 0 @@ -58,19 +58,22 @@ class BaseJob(object): id=[self.task.id], page=0, page_size=1, - only_fields=['id', ] + metrics + only_fields=[ + "id", + ] + + metrics, ) ) response = res.wait() - return tuple(response.response_data['tasks'][0]['last_metrics'][title][series][v] for v in values) + return tuple(response.response_data["tasks"][0]["last_metrics"][title][series][v] for v in values) @staticmethod def get_metric_req_params(title, series): - title = hashlib.md5(str(title).encode('utf-8')).hexdigest() - series = hashlib.md5(str(series).encode('utf-8')).hexdigest() - metric = 'last_metrics.{}.{}.'.format(title, series) - values = ['min_value', 'max_value', 'value'] + title = hashlib.md5(str(title).encode("utf-8")).hexdigest() + series = hashlib.md5(str(series).encode("utf-8")).hexdigest() + metric = "last_metrics.{}.{}.".format(title, series) + values = ["min_value", "max_value", "value"] metrics = [metric + v for v in values] return metrics, title, series, values @@ -89,7 +92,7 @@ class BaseJob(object): Task.enqueue(task=self.task, queue_name=queue_name) return True except Exception as ex: - logger.warning('Error enqueuing Task {} to {}: {}'.format(self.task, queue_name, ex)) + logger.warning("Error enqueuing Task {} to {}: {}".format(self.task, queue_name, ex)) return False def abort(self): @@ -156,7 +159,7 @@ class BaseJob(object): :return: Task status Task.TaskStatusEnum in string. """ - if self._last_status and not force and time() - self._last_status_ts < 1.: + if self._last_status and not force and time() - self._last_status_ts < 1.0: return self._last_status self._last_status = self.task.status @@ -205,7 +208,7 @@ class BaseJob(object): # noinspection PyProtectedMember id_map[task_id]._last_status_ts = last_batch_update_ts - def wait(self, timeout=None, pool_period=30., aborted_nonresponsive_as_running=False): + def wait(self, timeout=None, pool_period=30.0, aborted_nonresponsive_as_running=False): # type: (Optional[float], float, bool) -> bool """ Wait until the task is fully executed (i.e., aborted/completed/failed) @@ -218,7 +221,7 @@ class BaseJob(object): :return: True, if Task finished. """ tic = time() - while timeout is None or time() - tic < timeout * 60.: + while timeout is None or time() - tic < timeout * 60.0: if self.is_stopped(aborted_nonresponsive_as_running=aborted_nonresponsive_as_running): return True sleep(pool_period) @@ -274,16 +277,23 @@ class BaseJob(object): """ task_status = self.status() # check if we are Not in any of the non-running states - if task_status not in (Task.TaskStatusEnum.stopped, Task.TaskStatusEnum.completed, - Task.TaskStatusEnum.failed, Task.TaskStatusEnum.published): + if task_status not in ( + Task.TaskStatusEnum.stopped, + Task.TaskStatusEnum.completed, + Task.TaskStatusEnum.failed, + Task.TaskStatusEnum.published, + ): return False # notice the status update also refresh the "status_message" field on the Task # if we are stopped but the message says "non-responsive" it means for some reason the # Task's instance was killed, we should ignore it if requested because we assume someone will bring it back - if aborted_nonresponsive_as_running and task_status == Task.TaskStatusEnum.stopped and \ - str(self.task.data.status_message).lower() == "forced stop (non-responsive)": + if ( + aborted_nonresponsive_as_running + and task_status == Task.TaskStatusEnum.stopped + and str(self.task.data.status_message).lower() == "forced stop (non-responsive)" + ): # if we are here it means the state is "stopped" but we should ignore it # because the non-responsive watchdog set it. We assume someone (autoscaler) will relaunch it. return False @@ -298,7 +308,7 @@ class BaseJob(object): :return: True the task is currently in failed state """ - return self.status() in (Task.TaskStatusEnum.failed, ) + return self.status() in (Task.TaskStatusEnum.failed,) def is_completed(self): # type: () -> bool @@ -316,7 +326,7 @@ class BaseJob(object): :return: True the task is currently in aborted state """ - return self.status() in (Task.TaskStatusEnum.stopped, ) + return self.status() in (Task.TaskStatusEnum.stopped,) def is_pending(self): # type: () -> bool @@ -334,8 +344,7 @@ class BaseJob(object): :return: False, if the task is currently in draft mode or pending. """ - if not self.task_started and self.task.status in ( - Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created): + if not self.task_started and self.task.status in (Task.TaskStatusEnum.in_progress, Task.TaskStatusEnum.created): return False self.task_started = True @@ -378,12 +387,12 @@ class BaseJob(object): @classmethod def _create_task_hash( - cls, - task, - section_overrides=None, - params_override=None, - configurations_override=None, - explicit_docker_image=None + cls, + task, + section_overrides=None, + params_override=None, + configurations_override=None, + explicit_docker_image=None, ): # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str] """ @@ -466,22 +475,21 @@ class BaseJob(object): """ if not task_hash: return None - if Session.check_min_api_version('2.13'): + if Session.check_min_api_version("2.13"): # noinspection PyProtectedMember potential_tasks = Task._query_tasks( - status=['completed', 'published'], - system_tags=['-{}'.format(Task.archived_tag)], - _all_=dict(fields=['runtime.{}'.format(cls._job_hash_property)], - pattern=exact_match_regex(task_hash)), - only_fields=['id'], + status=["completed", "published"], + system_tags=["-{}".format(Task.archived_tag)], + _all_=dict(fields=["runtime.{}".format(cls._job_hash_property)], pattern=exact_match_regex(task_hash)), + only_fields=["id"], ) else: # noinspection PyProtectedMember potential_tasks = Task._query_tasks( - status=['completed', 'published'], - system_tags=['-{}'.format(Task.archived_tag)], - _all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)), - only_fields=['id'], + status=["completed", "published"], + system_tags=["-{}".format(Task.archived_tag)], + _all_=dict(fields=["comment"], pattern=cls._job_hash_description.format(task_hash)), + only_fields=["id"], ) for obj in potential_tasks: task = Task.get_task(task_id=obj.id) @@ -500,30 +508,29 @@ class BaseJob(object): return if not task_hash: task_hash = cls._create_task_hash(task=task) - if Session.check_min_api_version('2.13'): + if Session.check_min_api_version("2.13"): # noinspection PyProtectedMember task._set_runtime_properties(runtime_properties={cls._job_hash_property: str(task_hash)}) else: - hash_comment = cls._job_hash_description.format(task_hash) + '\n' - task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment) + hash_comment = cls._job_hash_description.format(task_hash) + "\n" + task.set_comment(task.comment + "\n" + hash_comment if task.comment else hash_comment) class ClearmlJob(BaseJob): - def __init__( - self, - base_task_id, # type: str - parameter_override=None, # type: Optional[Mapping[str, str]] - task_overrides=None, # type: Optional[Mapping[str, str]] - configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] - tags=None, # type: Optional[Sequence[str]] - parent=None, # type: Optional[str] - disable_clone_task=False, # type: bool - allow_caching=False, # type: bool - target_project=None, # type: Optional[str] - output_uri=None, # type: Optional[Union[str, bool]] - enable_local_imports=True, # type: bool - **kwargs # type: Any + self, + base_task_id, # type: str + parameter_override=None, # type: Optional[Mapping[str, str]] + task_overrides=None, # type: Optional[Mapping[str, str]] + configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] + tags=None, # type: Optional[Sequence[str]] + parent=None, # type: Optional[str] + disable_clone_task=False, # type: bool + allow_caching=False, # type: bool + target_project=None, # type: Optional[str] + output_uri=None, # type: Optional[Union[str, bool]] + enable_local_imports=True, # type: bool + **kwargs # type: Any ): # type: (...) -> () """ @@ -561,8 +568,10 @@ class ClearmlJob(BaseJob): self.task = base_temp_task task_status = self.task.status if task_status != Task.TaskStatusEnum.created: - logger.warning('Task cloning disabled but requested Task [{}] status={}. ' - 'Reverting to clone Task'.format(base_task_id, task_status)) + logger.warning( + "Task cloning disabled but requested Task [{}] status={}. " + "Reverting to clone Task".format(base_task_id, task_status) + ) disable_clone_task = False self.task = None elif parent: @@ -582,14 +591,16 @@ class ClearmlJob(BaseJob): task_configurations = deepcopy(base_temp_task.data.configuration or {}) for k, v in configuration_overrides.items(): if not isinstance(v, (str, dict)): - raise ValueError('Configuration override dictionary value must be wither str or dict, ' - 'got {} instead'.format(type(v))) + raise ValueError( + "Configuration override dictionary value must be wither str or dict, " + "got {} instead".format(type(v)) + ) value = v if isinstance(v, str) else json.dumps(v) if k in task_configurations: task_configurations[k].value = value else: task_configurations[k] = tasks_service.ConfigurationItem( - name=str(k), value=value, description=None, type='json' if isinstance(v, dict) else None + name=str(k), value=value, description=None, type="json" if isinstance(v, dict) else None ) configuration_overrides = {k: v.value for k, v in task_configurations.items()} @@ -600,7 +611,7 @@ class ClearmlJob(BaseJob): # notice we can allow ourselves to change the base-task object as we will not use it any further # noinspection PyProtectedMember base_temp_task._set_task_property(k, v, raise_on_error=False, log_on_error=True) - section = k.split('.')[0] + section = k.split(".")[0] sections[section] = getattr(base_temp_task.data, section, None) # check cached task @@ -614,7 +625,7 @@ class ClearmlJob(BaseJob): section_overrides=sections, params_override=task_params, configurations_override=configuration_overrides or None, - explicit_docker_image=kwargs.get("explicit_docker_image") + explicit_docker_image=kwargs.get("explicit_docker_image"), ) task = self._get_cached_task(task_hash) # if we found a task, just use @@ -631,20 +642,23 @@ class ClearmlJob(BaseJob): return # if we have target_project, remove project from kwargs if we have it. - if target_project and 'project' in kwargs: + if target_project and "project" in kwargs: logger.info( - 'target_project={} and project={} passed, using target_project.'.format( - target_project, kwargs['project'])) - kwargs.pop('project', None) + "target_project={} and project={} passed, using target_project.".format( + target_project, kwargs["project"] + ) + ) + kwargs.pop("project", None) # check again if we need to clone the Task if not disable_clone_task: # noinspection PyProtectedMember self.task = Task.clone( - base_task_id, parent=parent or base_task_id, - project=get_or_create_project( - session=Task._get_default_session(), project_name=target_project - ) if target_project else kwargs.pop('project', None), + base_task_id, + parent=parent or base_task_id, + project=get_or_create_project(session=Task._get_default_session(), project_name=target_project) + if target_project + else kwargs.pop("project", None), **kwargs ) @@ -676,6 +690,7 @@ class LocalClearmlJob(ClearmlJob): Run jobs locally as a sub-process, use only when no agents are available (this will not use queues) or for debug purposes. """ + def __init__(self, *args, **kwargs): super(LocalClearmlJob, self).__init__(*args, **kwargs) self._job_process = None @@ -695,34 +710,34 @@ class LocalClearmlJob(ClearmlJob): # check if standalone diff = self.task.data.script.diff - if diff and not diff.lstrip().startswith('diff '): + if diff and not diff.lstrip().startswith("diff "): # standalone, we need to create if - fd, local_filename = tempfile.mkstemp(suffix='.py') + fd, local_filename = tempfile.mkstemp(suffix=".py") os.close(fd) - with open(local_filename, 'wt') as f: + with open(local_filename, "wt") as f: f.write(diff) self._local_temp_file = local_filename else: local_filename = self.task.data.script.entry_point - cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir or '') + cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir or "") # try to check based on current root repo + entrypoint - if Task.current_task() and not (Path(cwd)/local_filename).is_file(): - working_dir = Task.current_task().data.script.working_dir or '' - working_dir = working_dir.strip('.') + if Task.current_task() and not (Path(cwd) / local_filename).is_file(): + working_dir = Task.current_task().data.script.working_dir or "" + working_dir = working_dir.strip(".") levels = 0 if working_dir: - levels = 1 + sum(1 for c in working_dir if c == '/') - cwd = os.path.abspath(os.path.join(os.getcwd(), os.sep.join(['..'] * levels))) if levels else os.getcwd() + levels = 1 + sum(1 for c in working_dir if c == "/") + cwd = os.path.abspath(os.path.join(os.getcwd(), os.sep.join([".."] * levels))) if levels else os.getcwd() cwd = os.path.join(cwd, working_dir) python = sys.executable env = dict(**os.environ) - env.pop('CLEARML_PROC_MASTER_ID', None) - env.pop('TRAINS_PROC_MASTER_ID', None) - env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id) - env['CLEARML_LOG_TASK_TO_BACKEND'] = '1' - env['CLEARML_SIMULATE_REMOTE_TASK'] = '1' + env.pop("CLEARML_PROC_MASTER_ID", None) + env.pop("TRAINS_PROC_MASTER_ID", None) + env["CLEARML_TASK_ID"] = env["TRAINS_TASK_ID"] = str(self.task.id) + env["CLEARML_LOG_TASK_TO_BACKEND"] = "1" + env["CLEARML_SIMULATE_REMOTE_TASK"] = "1" try: if self._enable_local_imports: current_python_path = env.get("PYTHONPATH") @@ -765,7 +780,7 @@ class LocalClearmlJob(ClearmlJob): user_aborted = False if self.task.status == Task.TaskStatusEnum.stopped: self.task.reload() - if str(self.task.data.status_reason).lower().startswith('user aborted'): + if str(self.task.data.status_reason).lower().startswith("user aborted"): user_aborted = True if not user_aborted: @@ -826,13 +841,13 @@ class _JobStub(object): """ def __init__( - self, - base_task_id, # type: str - parameter_override=None, # type: Optional[Mapping[str, str]] - task_overrides=None, # type: Optional[Mapping[str, str]] - tags=None, # type: Optional[Sequence[str]] - **kwargs # type: Any - ): + self, + base_task_id, # type: str + parameter_override=None, # type: Optional[Mapping[str, str]] + task_overrides=None, # type: Optional[Mapping[str, str]] + tags=None, # type: Optional[Sequence[str]] + **kwargs # type: Any + ): # type: (...) -> () self.task = None self.base_task_id = base_task_id @@ -846,7 +861,7 @@ class _JobStub(object): # type: (str) -> () self.iteration = 0 self.task_started = time() - print('launching', self.parameter_override, 'in', queue_name) + print("launching", self.parameter_override, "in", queue_name) def abort(self): # type: () -> () @@ -886,13 +901,13 @@ class _JobStub(object): def task_id(self): # type: () -> str - return 'stub' + return "stub" def status(self): # type: () -> str - return 'in_progress' + return "in_progress" - def wait(self, timeout=None, pool_period=30.): + def wait(self, timeout=None, pool_period=30.0): # type: (Optional[float], float) -> bool """ Wait for the task to be processed (i.e., aborted/completed/failed)