Add PipelineDecorator.pipeline & PipelineDecorator.component decorators for full custom pipeline logic

This commit is contained in:
allegroai 2021-09-05 00:30:44 +03:00
parent fca1aac93f
commit 23fc9260f8
3 changed files with 700 additions and 71 deletions

View File

@ -1,3 +1,5 @@
import functools
import inspect
import json
import re
from copy import copy
@ -5,7 +7,7 @@ from datetime import datetime
from logging import getLogger
from threading import Thread, Event, RLock
from time import time
from typing import Sequence, Optional, Mapping, Callable, Any, Union, List, Dict
from typing import Sequence, Optional, Mapping, Callable, Any, List, Dict, Union
from attr import attrib, attrs
@ -17,6 +19,7 @@ from ..debugging.log import LoggerRoot
from ..model import BaseModel
from ..task import Task
from ..utilities.process.mp import leave_process
from ..utilities.proxy_object import LazyEvalWrapper
class PipelineController(object):
@ -29,7 +32,7 @@ class PipelineController(object):
_tag = 'pipeline'
_step_pattern = r"\${[^}]*}"
_config_section = 'Pipeline'
_args_section = 'PipelineArgs'
_args_section = 'Args'
_pipeline_step_ref = 'pipeline'
_reserved_pipeline_names = (_pipeline_step_ref, )
_task_project_lookup = {}
@ -407,7 +410,7 @@ class PipelineController(object):
function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items():
if self._step_ref_pattern.match(v):
if v and self._step_ref_pattern.match(str(v)):
# check for step artifacts
step, _, artifact = v[2:-1].partition('.')
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
@ -420,28 +423,17 @@ class PipelineController(object):
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
{"{}/{}".format(CreateFromFunction.input_artifact_section, k): v
{"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
for k, v in function_input_artifacts.items()}
)
if self._task.running_locally():
project_name = project_name or self._target_project or self._task.get_project_name()
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
function_kwargs=function_kwargs or None,
function_input_artifacts=function_input_artifacts,
function_return=function_return,
project_name=project_name,
task_name=task_name,
task_type=task_type,
packages=packages,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=None,
dry_run=True,
)
task_definition = self._create_task_from_function(docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs,
function_return, packages, project_name, task_name,
task_type)
# noinspection PyProtectedMember
self._task._set_configuration(
name=name, config_type='json',
@ -476,6 +468,28 @@ class PipelineController(object):
return True
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
packages, project_name, task_name, task_type
):
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
function_kwargs=function_kwargs or None,
function_input_artifacts=function_input_artifacts,
function_return=function_return,
project_name=project_name,
task_name=task_name,
task_type=task_type,
packages=packages,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=None,
dry_run=True,
)
return task_definition
def start(
self,
queue='services',
@ -582,6 +596,8 @@ class PipelineController(object):
:param float timeout: Wait timeout for the optimization thread to exit (minutes).
The default is ``None``, indicating do not wait terminate immediately.
"""
self._stop_event.set()
self.wait(timeout=timeout)
if self._task and self._pipeline_task_status_failed:
print('Setting pipeline controller Task as failed (due to failed steps) !')
@ -711,6 +727,14 @@ class PipelineController(object):
if description:
self._pipeline_args_desc[str(name)] = str(description)
def get_parameters(self):
# type: () -> dict
"""
Return the pipeline parameters dictionary
:return: Dictionary str -> str
"""
return self._pipeline_args
def _start(
self,
step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
@ -761,34 +785,7 @@ class PipelineController(object):
if self._thread:
return True
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
if not params['continue_pipeline']:
for k in pipeline_dag:
pipeline_dag[k]['executed'] = None
self._default_execution_queue = params['default_queue']
self._add_pipeline_tags = params['add_pipeline_tags']
self._target_project = params['target_project'] or ''
self._deserialize(pipeline_dag)
# if we continue the pipeline, make sure that we re-execute failed tasks
if params['continue_pipeline']:
for node in self._nodes.values():
if node.executed is False:
node.executed = None
if not self._verify():
raise ValueError("Failed verifying pipeline execution graph, "
"it has either inaccessible nodes, or contains cycles")
self.update_execution_plot()
self._start_time = time()
self._stop_event = Event()
self._experiment_created_cb = step_task_created_callback
self._experiment_completed_cb = step_task_completed_callback
self._prepare_pipeline(step_task_completed_callback, step_task_created_callback)
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
@ -799,6 +796,36 @@ class PipelineController(object):
return True
def _prepare_pipeline(
self,
step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
):
# type (...) -> None
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
if not params['continue_pipeline']:
for k in pipeline_dag:
pipeline_dag[k]['executed'] = None
self._default_execution_queue = params['default_queue']
self._add_pipeline_tags = params['add_pipeline_tags']
self._target_project = params['target_project'] or ''
self._deserialize(pipeline_dag)
# if we continue the pipeline, make sure that we re-execute failed tasks
if params['continue_pipeline']:
for node in self._nodes.values():
if node.executed is False:
node.executed = None
if not self._verify():
raise ValueError("Failed verifying pipeline execution graph, "
"it has either inaccessible nodes, or contains cycles")
self.update_execution_plot()
self._start_time = time()
self._stop_event = Event()
self._experiment_created_cb = step_task_created_callback
self._experiment_completed_cb = step_task_completed_callback
def _serialize_pipeline_task(self):
# type: () -> (dict, dict)
"""
@ -820,7 +847,7 @@ class PipelineController(object):
if self._task.running_locally():
# noinspection PyProtectedMember
self._task._set_parameters(
{'{}/{}'.format(self._args_section, k): v for k, v in self._pipeline_args.items()},
{'{}/{}'.format(self._args_section, k): str(v) for k, v in self._pipeline_args.items()},
__parameters_descriptions=self._pipeline_args_desc,
__update=True,
)
@ -912,22 +939,20 @@ class PipelineController(object):
pattern = self._step_ref_pattern
parents = set()
for v in node.parameters.values():
for k, v in node.parameters.items():
if isinstance(v, str):
ref_matched = False
for g in pattern.findall(v):
ref_matched = True
ref_step = self.__verify_step_reference(node, g)
if ref_step:
parents.add(ref_step)
# verify we have a section name
if not ref_matched and '/' not in v:
raise ValueError(
"Section name is missing in parameter \"{}\", "
"parameters should be in the form of "
"\"`section-name`/parameter\", example: \"Args/param\"".format(v))
# verify we have a section name
if '/' not in k:
raise ValueError(
"Section name is missing in parameter \"{}\", "
"parameters should be in the form of "
"\"`section-name`/parameter\", example: \"Args/param\"".format(v))
if parents != set(node.parents or []):
if parents and parents != set(node.parents or []):
parents = parents - set(node.parents or [])
getLogger('clearml.automation.controller').info(
'Node "{}" missing parent reference, adding: {}'.format(node.name, parents))
@ -947,6 +972,9 @@ class PipelineController(object):
for k, node in self._nodes.items():
if k in visited:
continue
if any(p == node.name for p in node.parents or []):
# node cannot have itself as parent
return False
if not all(p in visited for p in node.parents or []):
continue
visited.add(k)
@ -1020,6 +1048,8 @@ class PipelineController(object):
else:
return node.job.launch(queue_name=node.queue or self._default_execution_queue)
self._running_nodes.append(node.name)
return True
def _update_execution_plot(self):
@ -1285,7 +1315,6 @@ class PipelineController(object):
self._nodes[name].job.task_parameter_override if self._nodes[name].job
else self._nodes[name].parameters))
print('Overrides:\n{}'.format(self._nodes[name].task_overrides))
self._running_nodes.append(name)
launched_nodes.add(name)
# check if node is cached do not wait for event but run the loop again
if self._nodes[name].executed:
@ -1556,3 +1585,506 @@ class PipelineController(object):
return ''
return '<a href="{}"> {} </a>'.format(task_link_template.format(project=project_id, task=task_id), task_id)
class PipelineDecorator(PipelineController):
_added_decorator = [] # type: List[dict]
_singleton = None # type: Optional[PipelineDecorator]
_debug_execute_step_process = False
_debug_execute_step_function = False
_default_execution_queue = None
def __init__(
self,
name, # type: str
project, # type: str
version, # type: str
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
):
# type: (...) -> ()
"""
Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
:param name: Provide pipeline name (if main Task exists it overrides its name)
:param project: Provide project storing the pipeline (if main Task exists it overrides its project)
:param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
:param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
"""
super(PipelineDecorator, self).__init__(
name=name,
project=project,
version=version,
pool_frequency=pool_frequency,
add_pipeline_tags=add_pipeline_tags,
target_project=target_project,
)
if PipelineDecorator._default_execution_queue:
super(PipelineDecorator, self).set_default_execution_queue(
PipelineDecorator._default_execution_queue)
for n in self._added_decorator:
self.add_function_step(**n)
self._added_decorator.clear()
PipelineDecorator._singleton = self
self._reference_callback = []
def _daemon(self):
# type: () -> ()
"""
The main pipeline execution loop. This loop is executed on its own dedicated thread.
override the daemon function, we only need to update the state
:return:
"""
pooling_counter = 0
launched_nodes = set()
last_plot_report = time()
while self._stop_event:
# stop request
if self._stop_event.wait(self._pool_frequency if pooling_counter else 0.01):
break
pooling_counter += 1
# check the pipeline time limit
if self._pipeline_time_limit and (time() - self._start_time) > self._pipeline_time_limit:
break
# check the state of all current jobs
# if no a job ended, continue
completed_jobs = []
force_execution_plot_update = False
for j in self._running_nodes:
node = self._nodes[j]
if not node.job:
continue
if node.job.is_stopped():
completed_jobs.append(j)
node.executed = node.job.task_id() if not node.job.is_failed() else False
if j in launched_nodes:
launched_nodes.remove(j)
elif node.timeout:
started = node.job.task.data.started
if (datetime.now().astimezone(started.tzinfo) - started).total_seconds() > node.timeout:
node.job.abort()
completed_jobs.append(j)
node.executed = node.job.task_id()
elif j in launched_nodes and node.job.is_running():
# make sure update the execution graph when the job started running
# (otherwise it will still be marked queued)
launched_nodes.remove(j)
force_execution_plot_update = True
# update running jobs
self._running_nodes = [j for j in self._running_nodes if j not in completed_jobs]
# nothing changed, we can sleep
if not completed_jobs and self._running_nodes:
# force updating the pipeline state (plot) at least every 5 min.
if force_execution_plot_update or time()-last_plot_report > 5.*60:
last_plot_report = time()
self.update_execution_plot()
continue
# callback on completed jobs
if self._experiment_completed_cb or self._post_step_callbacks:
for job in completed_jobs:
job_node = self._nodes.get(job)
if not job_node:
continue
if self._experiment_completed_cb:
self._experiment_completed_cb(self, job_node)
if self._post_step_callbacks.get(job_node.name):
self._post_step_callbacks[job_node.name](self, job_node)
# update current state (in configuration, so that we could later continue an aborted pipeline)
self._force_task_configuration_update()
# visualize pipeline state (plot)
self.update_execution_plot()
# stop all currently running jobs:
for node in self._nodes.values():
if node.executed is False:
self._pipeline_task_status_failed = True
if node.job and node.executed and not node.job.is_stopped():
node.job.abort()
elif not node.job and not node.executed:
# mark Node as skipped if it has no Job object and it is not executed
node.skip_job = True
# visualize pipeline state (plot)
self.update_execution_plot()
if self._stop_event:
# noinspection PyBroadException
try:
self._stop_event.set()
except Exception:
pass
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
packages, project_name, task_name, task_type
):
def sanitize(function_source):
matched = re.match(r"[\s]*@PipelineDecorator.component[\s\\]*\(", function_source)
if matched:
function_source = function_source[matched.span()[1]:]
# find the last ")"
open_parenthesis = 0
last_index = -1
for i, c in enumerate(function_source):
if not open_parenthesis and c == ')':
last_index = i
break
elif c == ')':
open_parenthesis -= 1
elif c == '(':
open_parenthesis += 1
if last_index >= 0:
function_source = function_source[last_index+1:].lstrip()
return function_source
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
function_kwargs=function_kwargs or None,
function_input_artifacts=function_input_artifacts,
function_return=function_return,
project_name=project_name,
task_name=task_name,
task_type=task_type,
packages=packages,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=None,
dry_run=True,
_sanitize_function=sanitize,
)
return task_definition
def _find_executed_node_leaves(self):
# type: () -> List[PipelineController.Node]
all_parents = set([p for n in self._nodes.values() if n.executed for p in n.parents])
executed_leaves = [name for name, n in self._nodes.items() if n.executed and name not in all_parents]
return executed_leaves
def _adjust_task_hashing(self, task_hash):
# type: (dict) -> dict
"""
Fix the Task hashing so that parameters pointing to the current Task artifact are encoded using the
hash content of the artifact, instead of the Task.id
:param task_hash: Task representation dict
:return: Adjusted Task representation dict
"""
if task_hash.get('hyper_params'):
updated_params = {}
for k, v in task_hash['hyper_params'].items():
if k.startswith("{}/".format(CreateFromFunction.input_artifact_section)) and \
str(v).startswith("{}.".format(self._task.id)):
task_id, artifact_name = str(v).split(".", 1)
if artifact_name in self._task.artifacts:
updated_params[k] = self._task.artifacts[artifact_name].hash
task_hash['hyper_params'].update(updated_params)
return task_hash
@classmethod
def component(
cls,
_func=None, *,
return_values=('return_object', ), # type: Union[str, List[str]]
name=None, # type: Optional[str]
cache=False, # type: bool
packages=None, # type: Optional[List[str]]
parents=None, # type: Optional[List[str]]
execution_queue=None, # type: Optional[str]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
task_type=None, # type: Optional[str]
):
# type: (...) -> Callable
"""
pipeline component function to be executed remotely
:param _func: wrapper function
:param return_values: Provide a list of names for all the results.
Notice! If not provided no results will be stored as artifacts.
:param name: Set the name of the remote task. Required if base_task_id is None.
:param cache: If True, before launching the new step,
after updating with the latest configuration, check if an exact Task with the same parameter/code
was already executed. If it was found, use it instead of launching a new Task.
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
If not provided, packages are automatically added based on the imports used in the function.
:param parents: Optional list of parent nodes in the DAG.
The current step in the pipeline will be sent for execution only after all the parent nodes
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:return: function wrapper
"""
def decorator_wrap(func):
_name = name or str(func.__name__)
function_return = return_values if isinstance(return_values, (tuple, list)) else [return_values]
inspect_func = inspect.getfullargspec(func)
# add default argument values
if inspect_func.args:
default_values = list(inspect_func.defaults or [])
default_values = ([None] * (len(inspect_func.args)-len(default_values))) + default_values
function_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)}
else:
function_kwargs = dict()
add_step_spec = dict(
name=_name,
function=func,
function_kwargs=function_kwargs,
function_return=function_return,
cache_executed_step=cache,
packages=packages,
parents=parents,
execution_queue=execution_queue,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
task_type=task_type,
)
if cls._singleton:
cls._singleton.add_function_step(**add_step_spec)
else:
cls._added_decorator.append(add_step_spec)
@functools.wraps(func)
def wrapper(*args, **kwargs):
if cls._debug_execute_step_function:
args = [v._remoteref() if isinstance(v, LazyEvalWrapper) else v for v in args]
kwargs = {k: v._remoteref() if isinstance(v, LazyEvalWrapper) else v for k, v in kwargs.items()}
def result_wrapper(return_name):
result = func(*args, **kwargs)
return result
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, n),
remote_reference=functools.partial(result_wrapper, n))
for n in function_return]
return return_w[0] if len(return_w) == 1 else return_w
# resolve all lazy objects if we have any:
kwargs_artifacts = {}
for i, v in enumerate(args):
kwargs[inspect_func.args[i]] = v
kwargs_artifacts.update(
{k: v._remoteref() for k, v in kwargs.items() if isinstance(v, LazyEvalWrapper)}
)
kwargs = {k: v for k, v in kwargs.items() if not isinstance(v, LazyEvalWrapper)}
_node = cls._singleton._nodes[_name]
# update artifacts kwargs
for k, v in kwargs_artifacts.items():
if k in kwargs:
kwargs.pop(k, None)
_node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = v
if v and '.' in str(v):
parent_id, _ = str(v).split('.', 1)
# find parent and push it into the _node.parents
for n, node in cls._singleton._nodes.items():
if n != _node.name and node.executed and node.executed == parent_id:
if n not in _node.parents:
_node.parents.append(n)
break
for k, v in kwargs.items():
if v is None or isinstance(v, (bool, int, float, str)):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
elif isinstance(v, (list, tuple)) and all(isinstance(i, (bool, int, float, str)) for i in v):
_node.parameters["{}/{}".format(CreateFromFunction.kwargs_section, k)] = v
else:
# we need to create an artifact
artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k)
cls._singleton._task.upload_artifact(
name=artifact_name, artifact_object=v, wait_on_upload=True)
_node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \
"{}.{}".format(cls._singleton._task.id, artifact_name)
# now add all the executed nodes as parents (only the leaves of the DAG, no need for parents)
_node.parents = list(
set((_node.parents or []) + cls._singleton._find_executed_node_leaves())
- set(list(_node.name)))
cls._singleton._verify_node(_node)
cls._singleton._launch_node(_node)
def results_reference(return_name):
# wait until job is completed
_node.job.wait(pool_period=0.2)
if _node.job.is_failed():
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
_node.executed = _node.job.task_id()
return "{}.{}".format(_node.job.task_id(), return_name)
def result_wrapper(return_name):
# wait until job is completed
_node.job.wait(pool_period=0.2)
if _node.job.is_failed():
raise ValueError(
'Pipeline step "{}", Task ID={} failed'.format(_node.name, _node.job.task_id()))
_node.executed = _node.job.task_id()
return Task.get_task(_node.job.task_id()).artifacts[return_name].get()
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, n),
remote_reference=functools.partial(results_reference, n)) for n in function_return]
return return_w[0] if len(return_w) == 1 else return_w
return wrapper
return decorator_wrap if _func is None else decorator_wrap(_func)
@classmethod
def pipeline(
cls,
_func=None, *, # noqa
name, # type: str
project, # type: str
version, # type: str
default_queue=None, # type: Optional[str]
pool_frequency=0.2, # type: float
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
pipeline_execution_queue='services', # type: Optional[str]
):
# type: (...) -> Callable
"""
Decorate pipeline logic function.
:param name: Provide pipeline name (if main Task exists it overrides its name)
:param project: Provide project storing the pipeline (if main Task exists it overrides its project)
:param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
:param default_queue: default pipeline step queue
:param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
:param pipeline_execution_queue: remote pipeline execution queue (default 'services' queue).
If None is passed, execute the pipeline logic locally (pipeline steps are still executed remotely)
"""
def decorator_wrap(func):
def internal_decorator(*args, **kwargs):
pipeline_kwargs = dict(**(kwargs or {}))
inspect_func = inspect.getfullargspec(func)
if args:
if not inspect_func.args:
raise ValueError("Could not parse function arguments")
pipeline_kwargs.update({inspect_func.args[i]: v for i, v in enumerate(args)})
# add default function arguments if we have defaults for all arguments
if inspect_func.args:
default_values = list(inspect_func.defaults or [])
default_values = ([None] * (len(inspect_func.args) - len(default_values))) + default_values
default_kwargs = {k: v for k, v in zip(inspect_func.args, default_values)}
default_kwargs.update(pipeline_kwargs)
pipeline_kwargs = default_kwargs
# run the entire pipeline locally, as python functions
if cls._debug_execute_step_function:
ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
return ret_val
if default_queue:
cls.set_default_execution_queue(default_queue)
a_pipeline = PipelineDecorator(
name=name,
project=project,
version=version,
pool_frequency=pool_frequency,
add_pipeline_tags=add_pipeline_tags,
target_project=target_project,
)
if PipelineDecorator._debug_execute_step_process:
a_pipeline._clearml_job_class = LocalClearmlJob
a_pipeline._default_execution_queue = 'mock'
a_pipeline._clearml_job_class.register_hashing_callback(a_pipeline._adjust_task_hashing)
# add pipeline arguments
if pipeline_kwargs:
a_pipeline.get_parameters().update(pipeline_kwargs)
# serialize / deserialize state only if we are running locally
a_pipeline._start(wait=False)
# sync arguments back
for k in pipeline_kwargs.keys():
if k in a_pipeline.get_parameters():
pipeline_kwargs[k] = a_pipeline.get_parameters()[k]
# run the actual pipeline
if not PipelineDecorator._debug_execute_step_process and pipeline_execution_queue:
# rerun the pipeline on a remote machine
a_pipeline._task.execute_remotely(queue_name=pipeline_execution_queue)
# when we get here it means we are running remotely
# this time the pipeline is executed only on the remote machine
func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references()
a_pipeline.stop()
return
return internal_decorator
return decorator_wrap if _func is None else decorator_wrap(_func)
@classmethod
def set_default_execution_queue(cls, default_execution_queue):
# type: (Optional[str]) -> None
"""
Set the default execution queue for if pipeline step does not specify an execution queue
:param default_execution_queue: The execution queue to use if no execution queue is provided
"""
cls._default_execution_queue = str(default_execution_queue) if default_execution_queue else None
@classmethod
def debug_pipeline(cls, execute_steps_as_functions=False):
# type: (bool) -> ()
"""
Set debugging mode, run all functions locally as subprocess or serially as functions
Run the full pipeline DAG locally, where steps are executed as sub-processes Tasks
Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
:param execute_steps_as_functions: If True, run the pipeline steps locally
as a function (no Task will be created). Default False.
"""
cls._debug_execute_step_process = True
cls._debug_execute_step_function = execute_steps_as_functions

View File

@ -4,10 +4,11 @@ import subprocess
import sys
import tempfile
import warnings
from copy import deepcopy
from datetime import datetime
from logging import getLogger
from time import time, sleep
from typing import Optional, Mapping, Sequence, Any
from typing import Optional, Mapping, Sequence, Any, Callable
from pathlib2 import Path
@ -24,6 +25,7 @@ logger = getLogger('clearml.automation.job')
class ClearmlJob(object):
_job_hash_description = 'job_hash={}'
_job_hash_property = 'pipeline_job_hash'
_hashing_callback = None
def __init__(
self,
@ -387,6 +389,19 @@ class ClearmlJob(object):
"""
return self._is_cached_task
@classmethod
def register_hashing_callback(cls, a_function):
# type: (Callable[[dict], dict]) -> None
"""
Allow to customize the dict used for hashing the Task.
Provided function will be called with a dict representing a Task,
allowing to return a modified version of the representation dict.
:param a_function: Function manipulating the representation dict of a function
"""
assert callable(a_function)
cls._hashing_callback = a_function
@classmethod
def _create_task_hash(cls, task, section_overrides=None, params_override=None):
# type: (Task, Optional[dict], Optional[dict]) -> Optional[str]
@ -429,16 +444,14 @@ class ClearmlJob(object):
# make sure that if we only have docker args/bash,
# we use encode it, otherwise we revert to the original encoding (excluding docker altogether)
if docker:
return hash_dict(
dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker),
hash_func=hash_func
)
repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker) \
if docker else dict(script=script, hyper_params=hyper_params, configs=configs)
return hash_dict(
dict(script=script, hyper_params=hyper_params, configs=configs),
hash_func=hash_func
)
# callback for modifying the representation dict
if cls._hashing_callback:
repr_dict = cls._hashing_callback(deepcopy(repr_dict))
return hash_dict(repr_dict, hash_func=hash_func)
@classmethod
def _get_cached_task(cls, task_hash):
@ -469,8 +482,7 @@ class ClearmlJob(object):
)
for obj in potential_tasks:
task = Task.get_task(task_id=obj.id)
if task_hash == cls._create_task_hash(task):
return task
return task
return None
@classmethod
@ -545,6 +557,7 @@ class LocalClearmlJob(ClearmlJob):
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
self.task.mark_started()
self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env)
return True
@ -571,6 +584,12 @@ class LocalClearmlJob(ClearmlJob):
except Exception:
pass
self._local_temp_file = None
if exit_code == 0:
self.task.mark_completed()
else:
self.task.mark_failed()
return exit_code

View File

@ -156,3 +156,81 @@ def naive_nested_from_flat_dictionary(flat_dict, sep='/'):
)
)
}
class WrapperBase(type):
# This metaclass is heavily inspired by the Object Proxying python recipe
# (http://code.activestate.com/recipes/496741/). It adds special methods
# to the wrapper class so it can proxy the wrapped class. In addition, it
# adds a field __overrides__ in the wrapper class dictionary, containing
# all attributes decorated to be overriden.
_special_names = [
'__abs__', '__add__', '__and__', '__call__', '__cmp__', '__coerce__',
'__contains__', '__delitem__', '__delslice__', '__div__', '__divmod__',
'__eq__', '__float__', '__floordiv__', '__ge__', '__getitem__',
'__getslice__', '__gt__', '__hash__', '__hex__', '__iadd__', '__iand__',
'__idiv__', '__idivmod__', '__ifloordiv__', '__ilshift__', '__imod__',
'__imul__', '__int__', '__invert__', '__ior__', '__ipow__', '__irshift__',
'__isub__', '__iter__', '__itruediv__', '__ixor__', '__le__', '__len__',
'__long__', '__lshift__', '__lt__', '__mod__', '__mul__', '__ne__',
'__neg__', '__oct__', '__or__', '__pos__', '__pow__', '__radd__',
'__rand__', '__rdiv__', '__rdivmod__', '__reduce__', '__reduce_ex__',
'__repr__', '__reversed__', '__rfloorfiv__', '__rlshift__', '__rmod__',
'__rmul__', '__ror__', '__rpow__', '__rrshift__', '__rshift__', '__rsub__',
'__rtruediv__', '__rxor__', '__setitem__', '__setslice__', '__sub__',
'__truediv__', '__xor__', 'next', '__str__', '__repr__',
]
def __new__(mcs, classname, bases, attrs):
def make_method(name):
def method(self, *args, **kwargs):
obj = object.__getattribute__(self, "_wrapped")
if obj is None:
cb = object.__getattribute__(self, "_callback")
obj = cb()
object.__setattr__(self, '_wrapped', obj)
mtd = getattr(obj, name)
return mtd(*args, **kwargs)
return method
for name in mcs._special_names:
attrs[name] = make_method(name)
overrides = attrs.get('__overrides__', [])
# overrides.extend(k for k, v in attrs.items() if isinstance(v, lazy))
attrs['__overrides__'] = overrides
return type.__new__(mcs, classname, bases, attrs)
class LazyEvalWrapper(six.with_metaclass(WrapperBase)):
# This class acts as a proxy for the wrapped instance it is passed. All
# access to its attributes are delegated to the wrapped class, except
# those contained in __overrides__.
__slots__ = ['_wrapped', '_callback', '_remote_reference', '__weakref__']
_remote_reference_calls = []
def __init__(self, callback, remote_reference=None):
object.__setattr__(self, '_wrapped', None)
object.__setattr__(self, '_callback', callback)
object.__setattr__(self, '_remote_reference', remote_reference)
if remote_reference:
LazyEvalWrapper._remote_reference_calls.append(remote_reference)
def _remoteref(self):
func = object.__getattribute__(self, "_remote_reference")
if func in LazyEvalWrapper._remote_reference_calls:
LazyEvalWrapper._remote_reference_calls.remove(func)
return func() if callable(func) else func
@classmethod
def trigger_all_remote_references(cls):
for func in cls._remote_reference_calls:
if callable(func):
func()
cls._remote_reference_calls = []