From 23fc9260f883d858fc6f610a5ed8e5cf01e79ae3 Mon Sep 17 00:00:00 2001
From: allegroai <>
Date: Sun, 5 Sep 2021 00:30:44 +0300
Subject: [PATCH] Add PipelineDecorator.pipeline & PipelineDecorator.component
decorators for full custom pipeline logic
---
clearml/automation/controller.py | 650 +++++++++++++++++++++++++++---
clearml/automation/job.py | 43 +-
clearml/utilities/proxy_object.py | 78 ++++
3 files changed, 700 insertions(+), 71 deletions(-)
diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py
index ec89da71..9477fbb7 100644
--- a/clearml/automation/controller.py
+++ b/clearml/automation/controller.py
@@ -1,3 +1,5 @@
+import functools
+import inspect
import json
import re
from copy import copy
@@ -5,7 +7,7 @@ from datetime import datetime
from logging import getLogger
from threading import Thread, Event, RLock
from time import time
-from typing import Sequence, Optional, Mapping, Callable, Any, Union, List, Dict
+from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union
from attr import attrib, attrs
@@ -17,6 +19,7 @@ from ..debugging.log import LoggerRoot
from ..model import BaseModel
from ..task import Task
from ..utilities.process.mp import leave_process
+from ..utilities.proxy_object import LazyEvalWrapper
class PipelineController(object):
@@ -29,7 +32,7 @@ class PipelineController(object):
_tag = 'pipeline'
_step_pattern = r"\${[^}]*}"
_config_section = 'Pipeline'
- _args_section = 'PipelineArgs'
+ _args_section = 'Args'
_pipeline_step_ref = 'pipeline'
_reserved_pipeline_names = (_pipeline_step_ref, )
_task_project_lookup = {}
@@ -407,7 +410,7 @@ class PipelineController(object):
function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items():
- if self._step_ref_pattern.match(v):
+ if v and self._step_ref_pattern.match(str(v)):
# check for step artifacts
step, _, artifact = v[2:-1].partition('.')
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
@@ -420,28 +423,17 @@ class PipelineController(object):
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
- {"{}/{}".format(CreateFromFunction.input_artifact_section, k): v
+ {"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
for k, v in function_input_artifacts.items()}
)
if self._task.running_locally():
project_name = project_name or self._target_project or self._task.get_project_name()
- task_definition = CreateFromFunction.create_task_from_function(
- a_function=function,
- function_kwargs=function_kwargs or None,
- function_input_artifacts=function_input_artifacts,
- function_return=function_return,
- project_name=project_name,
- task_name=task_name,
- task_type=task_type,
- packages=packages,
- docker=docker,
- docker_args=docker_args,
- docker_bash_setup_script=docker_bash_setup_script,
- output_uri=None,
- dry_run=True,
- )
+ task_definition = self._create_task_from_function(docker, docker_args, docker_bash_setup_script, function,
+ function_input_artifacts, function_kwargs,
+ function_return, packages, project_name, task_name,
+ task_type)
# noinspection PyProtectedMember
self._task._set_configuration(
name=name, config_type='json',
@@ -476,6 +468,28 @@ class PipelineController(object):
return True
+ def _create_task_from_function(
+ self, docker, docker_args, docker_bash_setup_script,
+ function, function_input_artifacts, function_kwargs, function_return,
+ packages, project_name, task_name, task_type
+ ):
+ task_definition = CreateFromFunction.create_task_from_function(
+ a_function=function,
+ function_kwargs=function_kwargs or None,
+ function_input_artifacts=function_input_artifacts,
+ function_return=function_return,
+ project_name=project_name,
+ task_name=task_name,
+ task_type=task_type,
+ packages=packages,
+ docker=docker,
+ docker_args=docker_args,
+ docker_bash_setup_script=docker_bash_setup_script,
+ output_uri=None,
+ dry_run=True,
+ )
+ return task_definition
+
def start(
self,
queue='services',
@@ -582,6 +596,8 @@ class PipelineController(object):
:param float timeout: Wait timeout for the optimization thread to exit (minutes).
The default is ``None``, indicating do not wait terminate immediately.
"""
+ self._stop_event.set()
+
self.wait(timeout=timeout)
if self._task and self._pipeline_task_status_failed:
print('Setting pipeline controller Task as failed (due to failed steps) !')
@@ -711,6 +727,14 @@ class PipelineController(object):
if description:
self._pipeline_args_desc[str(name)] = str(description)
+ def get_parameters(self):
+ # type: () -> dict
+ """
+ Return the pipeline parameters dictionary
+ :return: Dictionary str -> str
+ """
+ return self._pipeline_args
+
def _start(
self,
step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
@@ -761,34 +785,7 @@ class PipelineController(object):
if self._thread:
return True
- params, pipeline_dag = self._serialize_pipeline_task()
-
- # deserialize back pipeline state
- if not params['continue_pipeline']:
- for k in pipeline_dag:
- pipeline_dag[k]['executed'] = None
-
- self._default_execution_queue = params['default_queue']
- self._add_pipeline_tags = params['add_pipeline_tags']
- self._target_project = params['target_project'] or ''
- self._deserialize(pipeline_dag)
-
- # if we continue the pipeline, make sure that we re-execute failed tasks
- if params['continue_pipeline']:
- for node in self._nodes.values():
- if node.executed is False:
- node.executed = None
-
- if not self._verify():
- raise ValueError("Failed verifying pipeline execution graph, "
- "it has either inaccessible nodes, or contains cycles")
-
- self.update_execution_plot()
-
- self._start_time = time()
- self._stop_event = Event()
- self._experiment_created_cb = step_task_created_callback
- self._experiment_completed_cb = step_task_completed_callback
+ self._prepare_pipeline(step_task_completed_callback, step_task_created_callback)
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
@@ -799,6 +796,36 @@ class PipelineController(object):
return True
+ def _prepare_pipeline(
+ self,
+ step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
+ step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
+ ):
+ # type (...) -> None
+
+ params, pipeline_dag = self._serialize_pipeline_task()
+ # deserialize back pipeline state
+ if not params['continue_pipeline']:
+ for k in pipeline_dag:
+ pipeline_dag[k]['executed'] = None
+ self._default_execution_queue = params['default_queue']
+ self._add_pipeline_tags = params['add_pipeline_tags']
+ self._target_project = params['target_project'] or ''
+ self._deserialize(pipeline_dag)
+ # if we continue the pipeline, make sure that we re-execute failed tasks
+ if params['continue_pipeline']:
+ for node in self._nodes.values():
+ if node.executed is False:
+ node.executed = None
+ if not self._verify():
+ raise ValueError("Failed verifying pipeline execution graph, "
+ "it has either inaccessible nodes, or contains cycles")
+ self.update_execution_plot()
+ self._start_time = time()
+ self._stop_event = Event()
+ self._experiment_created_cb = step_task_created_callback
+ self._experiment_completed_cb = step_task_completed_callback
+
def _serialize_pipeline_task(self):
# type: () -> (dict, dict)
"""
@@ -820,7 +847,7 @@ class PipelineController(object):
if self._task.running_locally():
# noinspection PyProtectedMember
self._task._set_parameters(
- {'{}/{}'.format(self._args_section, k): v for k, v in self._pipeline_args.items()},
+ {'{}/{}'.format(self._args_section, k): str(v) for k, v in self._pipeline_args.items()},
__parameters_descriptions=self._pipeline_args_desc,
__update=True,
)
@@ -912,22 +939,20 @@ class PipelineController(object):
pattern = self._step_ref_pattern
parents = set()
- for v in node.parameters.values():
+ for k, v in node.parameters.items():
if isinstance(v, str):
- ref_matched = False
for g in pattern.findall(v):
- ref_matched = True
ref_step = self.__verify_step_reference(node, g)
if ref_step:
parents.add(ref_step)
- # verify we have a section name
- if not ref_matched and '/' not in v:
- raise ValueError(
- "Section name is missing in parameter \"{}\", "
- "parameters should be in the form of "
- "\"`section-name`/parameter\", example: \"Args/param\"".format(v))
+ # verify we have a section name
+ if '/' not in k:
+ raise ValueError(
+ "Section name is missing in parameter \"{}\", "
+ "parameters should be in the form of "
+ "\"`section-name`/parameter\", example: \"Args/param\"".format(v))
- if parents != set(node.parents or []):
+ if parents and parents != set(node.parents or []):
parents = parents - set(node.parents or [])
getLogger('clearml.automation.controller').info(
'Node "{}" missing parent reference, adding: {}'.format(node.name, parents))
@@ -947,6 +972,9 @@ class PipelineController(object):
for k, node in self._nodes.items():
if k in visited:
continue
+ if any(p == node.name for p in node.parents or []):
+ # node cannot have itself as parent
+ return False
if not all(p in visited for p in node.parents or []):
continue
visited.add(k)
@@ -1020,6 +1048,8 @@ class PipelineController(object):
else:
return node.job.launch(queue_name=node.queue or self._default_execution_queue)
+ self._running_nodes.append(node.name)
+
return True
def _update_execution_plot(self):
@@ -1285,7 +1315,6 @@ class PipelineController(object):
self._nodes[name].job.task_parameter_override if self._nodes[name].job
else self._nodes[name].parameters))
print('Overrides:\n{}'.format(self._nodes[name].task_overrides))
- self._running_nodes.append(name)
launched_nodes.add(name)
# check if node is cached do not wait for event but run the loop again
if self._nodes[name].executed:
@@ -1556,3 +1585,506 @@ class PipelineController(object):
return ''
return ' {} '.format(task_link_template.format(project=project_id, task=task_id), task_id)
+
+
+class PipelineDecorator(PipelineController):
+ _added_decorator = [] # type: List[dict]
+ _singleton = None # type: Optional[PipelineDecorator]
+ _debug_execute_step_process = False
+ _debug_execute_step_function = False
+ _default_execution_queue = None
+
+ def __init__(
+ self,
+ name, # type: str
+ project, # type: str
+ version, # type: str
+ pool_frequency=0.2, # type: float
+ add_pipeline_tags=False, # type: bool
+ target_project=None, # type: Optional[str]
+ ):
+ # type: (...) -> ()
+ """
+ Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
+
+ :param name: Provide pipeline name (if main Task exists it overrides its name)
+ :param project: Provide project storing the pipeline (if main Task exists it overrides its project)
+ :param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
+ template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
+ :param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
+ :param bool add_pipeline_tags: (default: False) if True, add `pipe: ` tag to all
+ steps (Tasks) created by this pipeline.
+ :param str target_project: If provided, all pipeline steps are cloned into the target project
+ """
+ super(PipelineDecorator, self).__init__(
+ name=name,
+ project=project,
+ version=version,
+ pool_frequency=pool_frequency,
+ add_pipeline_tags=add_pipeline_tags,
+ target_project=target_project,
+ )
+ if PipelineDecorator._default_execution_queue:
+ super(PipelineDecorator, self).set_default_execution_queue(
+ PipelineDecorator._default_execution_queue)
+
+ for n in self._added_decorator:
+ self.add_function_step(**n)
+ self._added_decorator.clear()
+ PipelineDecorator._singleton = self
+ self._reference_callback = []
+
+ def _daemon(self):
+ # type: () -> ()
+ """
+ The main pipeline execution loop. This loop is executed on its own dedicated thread.
+ override the daemon function, we only need to update the state
+
+ :return:
+ """
+ pooling_counter = 0
+ launched_nodes = set()
+ last_plot_report = time()
+ while self._stop_event:
+ # stop request
+ if self._stop_event.wait(self._pool_frequency if pooling_counter else 0.01):
+ break
+
+ pooling_counter += 1
+
+ # check the pipeline time limit
+ if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit:
+ break
+
+ # check the state of all current jobs
+ # if no a job ended, continue
+ completed_jobs = []
+ force_execution_plot_update = False
+ for j in self._running_nodes:
+ node = self._nodes[j]
+ if not node.job:
+ continue
+ if node.job.is_stopped():
+ completed_jobs.append(j)
+ node.executed = node.job.task_id() if not node.job.is_failed() else False
+ if j in launched_nodes:
+ launched_nodes.remove(j)
+ elif node.timeout:
+ started = node.job.task.data.started
+ if (datetime.now().astimezone(started.tzinfo) - started).total_seconds() > node.timeout:
+ node.job.abort()
+ completed_jobs.append(j)
+ node.executed = node.job.task_id()
+ elif j in launched_nodes and node.job.is_running():
+ # make sure update the execution graph when the job started running
+ # (otherwise it will still be marked queued)
+ launched_nodes.remove(j)
+ force_execution_plot_update = True
+
+ # update running jobs
+ self._running_nodes = [j for j in self._running_nodes if j not in completed_jobs]
+
+ # nothing changed, we can sleep
+ if not completed_jobs and self._running_nodes:
+ # force updating the pipeline state (plot) at least every 5 min.
+ if force_execution_plot_update or time()-last_plot_report > 5.*60:
+ last_plot_report = time()
+ self.update_execution_plot()
+ continue
+
+ # callback on completed jobs
+ if self._experiment_completed_cb or self._post_step_callbacks:
+ for job in completed_jobs:
+ job_node = self._nodes.get(job)
+ if not job_node:
+ continue
+ if self._experiment_completed_cb:
+ self._experiment_completed_cb(self, job_node)
+ if self._post_step_callbacks.get(job_node.name):
+ self._post_step_callbacks[job_node.name](self, job_node)
+
+ # update current state (in configuration, so that we could later continue an aborted pipeline)
+ self._force_task_configuration_update()
+
+ # visualize pipeline state (plot)
+ self.update_execution_plot()
+
+ # stop all currently running jobs:
+ for node in self._nodes.values():
+ if node.executed is False:
+ self._pipeline_task_status_failed = True
+ if node.job and node.executed and not node.job.is_stopped():
+ node.job.abort()
+ elif not node.job and not node.executed:
+ # mark Node as skipped if it has no Job object and it is not executed
+ node.skip_job = True
+
+ # visualize pipeline state (plot)
+ self.update_execution_plot()
+
+ if self._stop_event:
+ # noinspection PyBroadException
+ try:
+ self._stop_event.set()
+ except Exception:
+ pass
+
+ def _create_task_from_function(
+ self, docker, docker_args, docker_bash_setup_script,
+ function, function_input_artifacts, function_kwargs, function_return,
+ packages, project_name, task_name, task_type
+ ):
+ def sanitize(function_source):
+ matched = re.match(r"[\s]*@PipelineDecorator.component[\s\\]*\(", function_source)
+ if matched:
+ function_source = function_source[matched.span()[1]:]
+ # find the last ")"
+ open_parenthesis = 0
+ last_index = -1
+ for i, c in enumerate(function_source):
+ if not open_parenthesis and c == ')':
+ last_index = i
+ break
+ elif c == ')':
+ open_parenthesis -= 1
+ elif c == '(':
+ open_parenthesis += 1
+ if last_index >= 0:
+ function_source = function_source[last_index+1:].lstrip()
+ return function_source
+
+ task_definition = CreateFromFunction.create_task_from_function(
+ a_function=function,
+ function_kwargs=function_kwargs or None,
+ function_input_artifacts=function_input_artifacts,
+ function_return=function_return,
+ project_name=project_name,
+ task_name=task_name,
+ task_type=task_type,
+ packages=packages,
+ docker=docker,
+ docker_args=docker_args,
+ docker_bash_setup_script=docker_bash_setup_script,
+ output_uri=None,
+ dry_run=True,
+ _sanitize_function=sanitize,
+ )
+ return task_definition
+
+ def _find_executed_node_leaves(self):
+ # type: () -> List[PipelineController.Node]
+ all_parents = set([p for n in self._nodes.values() if n.executed for p in n.parents])
+ executed_leaves = [name for name, n in self._nodes.items() if n.executed and name not in all_parents]
+ return executed_leaves
+
+ def _adjust_task_hashing(self, task_hash):
+ # type: (dict) -> dict
+ """
+ Fix the Task hashing so that parameters pointing to the current Task artifact are encoded using the
+ hash content of the artifact, instead of the Task.id
+ :param task_hash: Task representation dict
+ :return: Adjusted Task representation dict
+ """
+ if task_hash.get('hyper_params'):
+ updated_params = {}
+ for k, v in task_hash['hyper_params'].items():
+ if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and \
+ str(v).startswith("{}.".format(self._task.id)):
+ task_id, artifact_name = str(v).split(".", 1)
+ if artifact_name in self._task.artifacts:
+ updated_params[k] = self._task.artifacts[artifact_name].hash
+ task_hash['hyper_params'].update(updated_params)
+
+ return task_hash
+
+ @classmethod
+ def component(
+ cls,
+ _func=None, *,
+ return_values=('return_object', ), # type: Union[str, List[str]]
+ name=None, # type: Optional[str]
+ cache=False, # type: bool
+ packages=None, # type: Optional[List[str]]
+ parents=None, # type: Optional[List[str]]
+ execution_queue=None, # type: Optional[str]
+ docker=None, # type: Optional[str]
+ docker_args=None, # type: Optional[str]
+ docker_bash_setup_script=None, # type: Optional[str]
+ task_type=None, # type: Optional[str]
+ ):
+ # type: (...) -> Callable
+ """
+ pipeline component function to be executed remotely
+
+ :param _func: wrapper function
+ :param return_values: Provide a list of names for all the results.
+ Notice! If not provided no results will be stored as artifacts.
+ :param name: Set the name of the remote task. Required if base_task_id is None.
+ :param cache: If True, before launching the new step,
+ after updating with the latest configuration, check if an exact Task with the same parameter/code
+ was already executed. If it was found, use it instead of launching a new Task.
+ Default: False, a new cloned copy of base_task is always used.
+ Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
+ If `clone_base_task` is False there is no cloning, hence the base_task is used.
+ :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
+ If not provided, packages are automatically added based on the imports used in the function.
+ :param parents: Optional list of parent nodes in the DAG.
+ The current step in the pipeline will be sent for execution only after all the parent nodes
+ have been executed successfully.
+ :param execution_queue: Optional, the queue to use for executing this specific step.
+ If not provided, the task will be sent to the default execution queue, as defined on the class
+ :param docker: Select the docker image to be executed in by the remote session
+ :param docker_args: Add docker arguments, pass a single string
+ :param docker_bash_setup_script: Add bash script to be executed
+ inside the docker before setting up the Task's environment
+ :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
+ 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
+
+ :return: function wrapper
+ """
+ def decorator_wrap(func):
+ _name = name or str(func.__name__)
+ function_return = return_values if isinstance(return_values, (tuple, list)) else [return_values]
+
+ inspect_func = inspect.getfullargspec(func)
+ # add default argument values
+ if inspect_func.args:
+ default_values = list(inspect_func.defaults or [])
+ default_values = ([None] * (len(inspect_func.args)-len(default_values))) + default_values
+ function_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)}
+ else:
+ function_kwargs = dict()
+
+ add_step_spec = dict(
+ name=_name,
+ function=func,
+ function_kwargs=function_kwargs,
+ function_return=function_return,
+ cache_executed_step=cache,
+ packages=packages,
+ parents=parents,
+ execution_queue=execution_queue,
+ docker=docker,
+ docker_args=docker_args,
+ docker_bash_setup_script=docker_bash_setup_script,
+ task_type=task_type,
+ )
+
+ if cls._singleton:
+ cls._singleton.add_function_step(**add_step_spec)
+ else:
+ cls._added_decorator.append(add_step_spec)
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ if cls._debug_execute_step_function:
+ args = [v._remoteref() if isinstance(v, LazyEvalWrapper) else v for v in args]
+ kwargs = {k: v._remoteref() if isinstance(v, LazyEvalWrapper) else v for k, v in kwargs.items()}
+
+ def result_wrapper(return_name):
+ result = func(*args, **kwargs)
+ return result
+
+ return_w = [LazyEvalWrapper(
+ callback=functools.partial(result_wrapper, n),
+ remote_reference=functools.partial(result_wrapper, n))
+ for n in function_return]
+ return return_w[0] if len(return_w) == 1 else return_w
+
+ # resolve all lazy objects if we have any:
+ kwargs_artifacts = {}
+ for i, v in enumerate(args):
+ kwargs[inspect_func.args[i]] = v
+
+ kwargs_artifacts.update(
+ {k: v._remoteref() for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper)}
+ )
+ kwargs = {k: v for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)}
+
+ _node = cls._singleton._nodes[_name]
+ # update artifacts kwargs
+ for k, v in kwargs_artifacts.items():
+ if k in kwargs:
+ kwargs.pop(k, None)
+ _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v
+ if v and '.' in str(v):
+ parent_id, _ = str(v).split('.', 1)
+ # find parent and push it into the _node.parents
+ for n, node in cls._singleton._nodes.items():
+ if n != _node.name and node.executed and node.executed == parent_id:
+ if n not in _node.parents:
+ _node.parents.append(n)
+ break
+ for k, v in kwargs.items():
+ if v is None or isinstance(v, (bool, int, float, str)):
+ _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
+ elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v):
+ _node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
+ else:
+ # we need to create an artifact
+ artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k)
+ cls._singleton._task.upload_artifact(
+ name=artifact_name, artifact_object=v, wait_on_upload=True)
+ _node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \
+ "{}.{}".format(cls._singleton._task.id, artifact_name)
+ # now add all the executed nodes as parents (only the leaves of the DAG, no need for parents)
+ _node.parents = list(
+ set((_node.parents or []) + cls._singleton._find_executed_node_leaves())
+ - set(list(_node.name)))
+
+ cls._singleton._verify_node(_node)
+ cls._singleton._launch_node(_node)
+
+ def results_reference(return_name):
+ # wait until job is completed
+ _node.job.wait(pool_period=0.2)
+ if _node.job.is_failed():
+ raise ValueError(
+ 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
+
+ _node.executed = _node.job.task_id()
+ return "{}.{}".format(_node.job.task_id(), return_name)
+
+ def result_wrapper(return_name):
+ # wait until job is completed
+ _node.job.wait(pool_period=0.2)
+ if _node.job.is_failed():
+ raise ValueError(
+ 'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
+
+ _node.executed = _node.job.task_id()
+ return Task.get_task(_node.job.task_id()).artifacts[return_name].get()
+
+ return_w = [LazyEvalWrapper(
+ callback=functools.partial(result_wrapper, n),
+ remote_reference=functools.partial(results_reference, n)) for n in function_return]
+
+ return return_w[0] if len(return_w) == 1 else return_w
+
+ return wrapper
+
+ return decorator_wrap if _func is None else decorator_wrap(_func)
+
+ @classmethod
+ def pipeline(
+ cls,
+ _func=None, *, # noqa
+ name, # type: str
+ project, # type: str
+ version, # type: str
+ default_queue=None, # type: Optional[str]
+ pool_frequency=0.2, # type: float
+ add_pipeline_tags=False, # type: bool
+ target_project=None, # type: Optional[str]
+ pipeline_execution_queue='services', # type: Optional[str]
+ ):
+ # type: (...) -> Callable
+ """
+ Decorate pipeline logic function.
+
+ :param name: Provide pipeline name (if main Task exists it overrides its name)
+ :param project: Provide project storing the pipeline (if main Task exists it overrides its project)
+ :param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
+ template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
+ :param default_queue: default pipeline step queue
+ :param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
+ :param bool add_pipeline_tags: (default: False) if True, add `pipe: ` tag to all
+ steps (Tasks) created by this pipeline.
+ :param str target_project: If provided, all pipeline steps are cloned into the target project
+ :param pipeline_execution_queue: remote pipeline execution queue (default 'services' queue).
+ If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)
+ """
+ def decorator_wrap(func):
+
+ def internal_decorator(*args, **kwargs):
+ pipeline_kwargs = dict(**(kwargs or {}))
+ inspect_func = inspect.getfullargspec(func)
+ if args:
+ if not inspect_func.args:
+ raise ValueError("Could not parse function arguments")
+
+ pipeline_kwargs.update({inspect_func.args[i]: v for i, v in enumerate(args)})
+
+ # add default function arguments if we have defaults for all arguments
+ if inspect_func.args:
+ default_values = list(inspect_func.defaults or [])
+ default_values = ([None] * (len(inspect_func.args) - len(default_values))) + default_values
+ default_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)}
+ default_kwargs.update(pipeline_kwargs)
+ pipeline_kwargs = default_kwargs
+
+ # run the entire pipeline locally, as python functions
+ if cls._debug_execute_step_function:
+ ret_val = func(**pipeline_kwargs)
+ LazyEvalWrapper.trigger_all_remote_references()
+ return ret_val
+
+ if default_queue:
+ cls.set_default_execution_queue(default_queue)
+
+ a_pipeline = PipelineDecorator(
+ name=name,
+ project=project,
+ version=version,
+ pool_frequency=pool_frequency,
+ add_pipeline_tags=add_pipeline_tags,
+ target_project=target_project,
+ )
+
+ if PipelineDecorator._debug_execute_step_process:
+ a_pipeline._clearml_job_class = LocalClearmlJob
+ a_pipeline._default_execution_queue = 'mock'
+
+ a_pipeline._clearml_job_class.register_hashing_callback(a_pipeline._adjust_task_hashing)
+
+ # add pipeline arguments
+ if pipeline_kwargs:
+ a_pipeline.get_parameters().update(pipeline_kwargs)
+
+ # serialize / deserialize state only if we are running locally
+ a_pipeline._start(wait=False)
+
+ # sync arguments back
+ for k in pipeline_kwargs.keys():
+ if k in a_pipeline.get_parameters():
+ pipeline_kwargs[k] = a_pipeline.get_parameters()[k]
+
+ # run the actual pipeline
+ if not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
+ # rerun the pipeline on a remote machine
+ a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
+ # when we get here it means we are running remotely
+
+ # this time the pipeline is executed only on the remote machine
+ func(**pipeline_kwargs)
+ LazyEvalWrapper.trigger_all_remote_references()
+ a_pipeline.stop()
+ return
+
+ return internal_decorator
+
+ return decorator_wrap if _func is None else decorator_wrap(_func)
+
+ @classmethod
+ def set_default_execution_queue(cls, default_execution_queue):
+ # type: (Optional[str]) -> None
+ """
+ Set the default execution queue for if pipeline step does not specify an execution queue
+
+ :param default_execution_queue: The execution queue to use if no execution queue is provided
+ """
+ cls._default_execution_queue = str(default_execution_queue) if default_execution_queue else None
+
+ @classmethod
+ def debug_pipeline(cls, execute_steps_as_functions=False):
+ # type: (bool) -> ()
+ """
+ Set debugging mode, run all functions locally as subprocess or serially as functions
+ Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks
+ Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
+
+ :param execute_steps_as_functions: If True, run the pipeline steps locally
+ as a function (no Task will be created). Default False.
+ """
+ cls._debug_execute_step_process = True
+ cls._debug_execute_step_function = execute_steps_as_functions
diff --git a/clearml/automation/job.py b/clearml/automation/job.py
index 7723ec37..6d311d68 100644
--- a/clearml/automation/job.py
+++ b/clearml/automation/job.py
@@ -4,10 +4,11 @@ import subprocess
import sys
import tempfile
import warnings
+from copy import deepcopy
from datetime import datetime
from logging import getLogger
from time import time, sleep
-from typing import Optional, Mapping, Sequence, Any
+from typing import Optional, Mapping, Sequence, Any, Callable
from pathlib2 import Path
@@ -24,6 +25,7 @@ logger = getLogger('clearml.automation.job')
class ClearmlJob(object):
_job_hash_description = 'job_hash={}'
_job_hash_property = 'pipeline_job_hash'
+ _hashing_callback = None
def __init__(
self,
@@ -387,6 +389,19 @@ class ClearmlJob(object):
"""
return self._is_cached_task
+ @classmethod
+ def register_hashing_callback(cls, a_function):
+ # type: (Callable[[dict], dict]) -> None
+ """
+ Allow to customize the dict used for hashing the Task.
+ Provided function will be called with a dict representing a Task,
+ allowing to return a modified version of the representation dict.
+
+ :param a_function: Function manipulating the representation dict of a function
+ """
+ assert callable(a_function)
+ cls._hashing_callback = a_function
+
@classmethod
def _create_task_hash(cls, task, section_overrides=None, params_override=None):
# type: (Task, Optional[dict], Optional[dict]) -> Optional[str]
@@ -429,16 +444,14 @@ class ClearmlJob(object):
# make sure that if we only have docker args/bash,
# we use encode it, otherwise we revert to the original encoding (excluding docker altogether)
- if docker:
- return hash_dict(
- dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker),
- hash_func=hash_func
- )
+ repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker) \
+ if docker else dict(script=script, hyper_params=hyper_params, configs=configs)
- return hash_dict(
- dict(script=script, hyper_params=hyper_params, configs=configs),
- hash_func=hash_func
- )
+ # callback for modifying the representation dict
+ if cls._hashing_callback:
+ repr_dict = cls._hashing_callback(deepcopy(repr_dict))
+
+ return hash_dict(repr_dict, hash_func=hash_func)
@classmethod
def _get_cached_task(cls, task_hash):
@@ -469,8 +482,7 @@ class ClearmlJob(object):
)
for obj in potential_tasks:
task = Task.get_task(task_id=obj.id)
- if task_hash == cls._create_task_hash(task):
- return task
+ return task
return None
@classmethod
@@ -545,6 +557,7 @@ class LocalClearmlJob(ClearmlJob):
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
+ self.task.mark_started()
self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env)
return True
@@ -571,6 +584,12 @@ class LocalClearmlJob(ClearmlJob):
except Exception:
pass
self._local_temp_file = None
+
+ if exit_code == 0:
+ self.task.mark_completed()
+ else:
+ self.task.mark_failed()
+
return exit_code
diff --git a/clearml/utilities/proxy_object.py b/clearml/utilities/proxy_object.py
index 98269738..ca5949b6 100644
--- a/clearml/utilities/proxy_object.py
+++ b/clearml/utilities/proxy_object.py
@@ -156,3 +156,81 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'):
)
)
}
+
+
+class WrapperBase(type):
+
+ # This metaclass is heavily inspired by the Object Proxying python recipe
+ # (http://code.activestate.com/recipes/496741/). It adds special methods
+ # to the wrapper class so it can proxy the wrapped class. In addition, it
+ # adds a field __overrides__ in the wrapper class dictionary, containing
+ # all attributes decorated to be overriden.
+
+ _special_names = [
+ '__abs__', '__add__', '__and__', '__call__', '__cmp__', '__coerce__',
+ '__contains__', '__delitem__', '__delslice__', '__div__', '__divmod__',
+ '__eq__', '__float__', '__floordiv__', '__ge__', '__getitem__',
+ '__getslice__', '__gt__', '__hash__', '__hex__', '__iadd__', '__iand__',
+ '__idiv__', '__idivmod__', '__ifloordiv__', '__ilshift__', '__imod__',
+ '__imul__', '__int__', '__invert__', '__ior__', '__ipow__', '__irshift__',
+ '__isub__', '__iter__', '__itruediv__', '__ixor__', '__le__', '__len__',
+ '__long__', '__lshift__', '__lt__', '__mod__', '__mul__', '__ne__',
+ '__neg__', '__oct__', '__or__', '__pos__', '__pow__', '__radd__',
+ '__rand__', '__rdiv__', '__rdivmod__', '__reduce__', '__reduce_ex__',
+ '__repr__', '__reversed__', '__rfloorfiv__', '__rlshift__', '__rmod__',
+ '__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__',
+ '__rtruediv__', '__rxor__', '__setitem__', '__setslice__', '__sub__',
+ '__truediv__', '__xor__', 'next', '__str__', '__repr__',
+ ]
+
+ def __new__(mcs, classname, bases, attrs):
+ def make_method(name):
+ def method(self, *args, **kwargs):
+ obj = object.__getattribute__(self, "_wrapped")
+ if obj is None:
+ cb = object.__getattribute__(self, "_callback")
+ obj = cb()
+ object.__setattr__(self, '_wrapped', obj)
+ mtd = getattr(obj, name)
+ return mtd(*args, **kwargs)
+ return method
+
+ for name in mcs._special_names:
+ attrs[name] = make_method(name)
+
+ overrides = attrs.get('__overrides__', [])
+ # overrides.extend(k for k, v in attrs.items() if isinstance(v, lazy))
+ attrs['__overrides__'] = overrides
+ return type.__new__(mcs, classname, bases, attrs)
+
+
+class LazyEvalWrapper(six.with_metaclass(WrapperBase)):
+
+ # This class acts as a proxy for the wrapped instance it is passed. All
+ # access to its attributes are delegated to the wrapped class, except
+ # those contained in __overrides__.
+
+ __slots__ = ['_wrapped', '_callback', '_remote_reference', '__weakref__']
+
+ _remote_reference_calls = []
+
+ def __init__(self, callback, remote_reference=None):
+ object.__setattr__(self, '_wrapped', None)
+ object.__setattr__(self, '_callback', callback)
+ object.__setattr__(self, '_remote_reference', remote_reference)
+ if remote_reference:
+ LazyEvalWrapper._remote_reference_calls.append(remote_reference)
+
+ def _remoteref(self):
+ func = object.__getattribute__(self, "_remote_reference")
+ if func in LazyEvalWrapper._remote_reference_calls:
+ LazyEvalWrapper._remote_reference_calls.remove(func)
+
+ return func() if callable(func) else func
+
+ @classmethod
+ def trigger_all_remote_references(cls):
+ for func in cls._remote_reference_calls:
+ if callable(func):
+ func()
+ cls._remote_reference_calls = []