mirror of
synced 2025-03-09 21:40:51 +00:00
Add enhanced pipeline support and examples
This commit is contained in:
@ -14,7 +14,7 @@ from attr import attrib, attrs
from .job import LocalClearmlJob
from ..automation import ClearmlJob
from ..backend_interface.task.populate import CreateFromFunction
from ..backend_interface.util import get_or_create_project
from ..backend_interface.util import get_or_create_project, exact_match_regex
from ..debugging.log import LoggerRoot
from ..model import BaseModel
from ..task import Task
@ -34,6 +34,7 @@ class PipelineController(object):
_config_section = 'Pipeline'
_args_section = 'Args'
_pipeline_step_ref = 'pipeline'
_runtime_property_hash = '_pipeline_hash'
_reserved_pipeline_names = (_pipeline_step_ref, )
_task_project_lookup = {}
_clearml_job_class = ClearmlJob
@ -64,7 +65,7 @@ class PipelineController(object):
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
# type: (...) -> ()
# type: (...) -> None
Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
@ -82,7 +83,11 @@ class PipelineController(object):
self._start_time = None
self._pipeline_time_limit = None
self._default_execution_queue = None
self._version = str(version)
self._version = str(version).strip()
if not self._version or not all(i and i.isnumeric() for i in self._version.split('.')):
raise ValueError(
"Pipeline version has to be in a semantic version form, "
"examples: version='1.0.1', version='1.2', version='23'")
self._pool_frequency = pool_frequency * 60.
self._thread = None
self._pipeline_args = dict()
@ -104,6 +109,7 @@ class PipelineController(object):
task_name=name or 'Pipeline {}'.format(datetime.now()),
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
@ -293,14 +299,16 @@ class PipelineController(object):
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
packages=None, # type: Optional[Union[str, Sequence[str]]]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]],
execution_queue=None, # type: Optional[str]
time_limit=None, # type: Optional[float]
clone_base_task=True, # type: bool
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
@ -343,8 +351,18 @@ class PipelineController(object):
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added based on the imports used in the function.
:param repo: Optional, specify a repository to attach to the function, when remotely executing.
Allow users to execute the function inside the specified repository, enabling to load modules/script
from a repository Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path.
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
@ -356,8 +374,6 @@ class PipelineController(object):
If not provided, the task will be sent to the default execution queue, as defined on the class
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -394,7 +410,6 @@ class PipelineController(object):
was already executed. If it was found, use it instead of launching a new Task.
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:return: True if successful
@ -433,7 +448,7 @@ class PipelineController(object):
task_definition = self._create_task_from_function(docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs,
function_return, packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit)
# noinspection PyProtectedMember
name=name, config_type='json',
@ -457,7 +472,7 @@ class PipelineController(object):
name=name, base_task_id=None, parents=parents or [],
queue=execution_queue, timeout=time_limit,
@ -471,7 +486,7 @@ class PipelineController(object):
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
packages, project_name, task_name, task_type
packages, project_name, task_name, task_type, repo, branch, commit
task_definition = CreateFromFunction.create_task_from_function(
@ -481,6 +496,9 @@ class PipelineController(object):
@ -805,15 +823,15 @@ class PipelineController(object):
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
if not params['continue_pipeline']:
if not params['_continue_pipeline_']:
for k in pipeline_dag:
pipeline_dag[k]['executed'] = None
self._default_execution_queue = params['default_queue']
self._add_pipeline_tags = params['add_pipeline_tags']
self._target_project = params['target_project'] or ''
self._default_execution_queue = params['_default_queue_']
self._add_pipeline_tags = params['_add_pipeline_tags_']
self._target_project = params['_target_project_'] or ''
# if we continue the pipeline, make sure that we re-execute failed tasks
if params['continue_pipeline']:
if params['_continue_pipeline_']:
for node in self._nodes.values():
if node.executed is False:
node.executed = None
@ -833,29 +851,126 @@ class PipelineController(object):
:return: params, pipeline_dag
params = {'continue_pipeline': False,
'default_queue': self._default_execution_queue,
'add_pipeline_tags': self._add_pipeline_tags,
'target_project': self._target_project,
params = {
'_default_queue_': self._default_execution_queue,
'_add_pipeline_tags_': self._add_pipeline_tags,
'_target_project_': self._target_project,
pipeline_dag = self._serialize()
# serialize pipeline state
if self._task and self._auto_connect_task:
self._task.connect_configuration(pipeline_dag, name=self._config_section)
self._task.connect(params, name=self._config_section)
if self._task.running_locally():
# noinspection PyProtectedMember
name=self._config_section, config_type='dictionary',
config_text=json.dumps(pipeline_dag, indent=2))
# noinspection PyProtectedMember
{'{}/{}'.format(self._args_section, k): str(v) for k, v in self._pipeline_args.items()},
{'{}/{}'.format(self._args_section, k): str(v) for k, v in params.items()},
params['_continue_pipeline_'] = False
# make sure we have a unique version number (auto bump version if needed)
# only needed when manually (from code) creating pipelines
# noinspection PyProtectedMember
pipeline_hash = self._get_task_hash()
# noinspection PyProtectedMember
self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
self._task.connect_configuration(pipeline_dag, name=self._config_section)
self._task.connect(self._pipeline_args, name=self._args_section)
self._task.connect(params, name=self._args_section)
# noinspection PyProtectedMember
if self._task._get_runtime_properties().get(self._runtime_property_hash):
params['_continue_pipeline_'] = True
# noinspection PyProtectedMember
pipeline_hash = ClearmlJob._create_task_hash(self._task)
# noinspection PyProtectedMember
self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version),
params['_continue_pipeline_'] = False
return params, pipeline_dag
def _verify_pipeline_version(self):
# check if pipeline version exists, if it does increase version
pipeline_hash = self._get_task_hash()
# noinspection PyProtectedMember
existing_tasks = Task._query_tasks(
project=[self._task.project], task_name=exact_match_regex(self._task.name),
system_tags=['-{}'.format(Task.archived_tag), self._tag],
only_fields=['id', 'runtime'],
if existing_tasks:
# check if hash match the current version.
matched = True
for t in existing_tasks:
h, _, v = t.runtime.get(self._runtime_property_hash, '').partition(':')
if v == self._version:
matched = bool(h == pipeline_hash)
# if hash did not match, look for the highest version
if not matched:
# noinspection PyProtectedMember
existing_tasks = Task._query_tasks(
project=[self._task.project], task_name=exact_match_regex(self._task.name),
system_tags=['-{}'.format(Task.archived_tag), self._tag],
only_fields=['id', 'hyperparams', 'runtime'],
found_match_version = False
existing_versions = set([self._version])
for t in existing_tasks:
if not t.hyperparams:
v = t.hyperparams.get('properties', {}).get('version')
if v:
if t.runtime:
h, _, _ = t.runtime.get(self._runtime_property_hash, '').partition(':')
if h == pipeline_hash:
self._version = v.value
found_match_version = True
# match to the version we found:
if found_match_version:
'Existing Pipeline found, matching version to: {}'.format(self._version))
# if we did not find a matched pipeline version, get the max one and bump the version by 1
while True:
v = self._version.split('.')
self._version = '.'.join(v[:-1] + [str(int(v[-1]) + 1)])
if self._version not in existing_versions:
'Existing Pipeline version found, bump new version to: {}'.format(self._version))
def _get_task_hash(self):
params_override = dict(**(self._task.get_parameters() or {}))
params_override.pop('properties/version', None)
# noinspection PyProtectedMember
pipeline_hash = ClearmlJob._create_task_hash(self._task, params_override=params_override)
return pipeline_hash
def _serialize(self):
# type: () -> dict
@ -876,17 +991,18 @@ class PipelineController(object):
This will be used to create the DAG from the dict stored on the Task, when running remotely.
# make sure that we override nodes that we do not clone.
for name in self._nodes:
if self._nodes[name].clone_task and name in dag_dict and dag_dict[name].get('clone_task'):
dag_dict[name] = dict(
(k, v) for k, v in self._nodes[name].__dict__.items()
if k not in ('job', 'name', 'task_factory_func'))
# if we do not clone the Task, only merge the parts we can override.
for name in self._nodes:
if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'):
for k in ('queue', 'parents', 'timeout', 'parameters', 'task_overrides'):
setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))())
# if we do clone the Task deserialize everything, except the function creating
self._nodes = {
k: self.Node(name=k, **v)
if (k not in self._nodes or not self._nodes[k].task_factory_func) and (
not v.get('clone_task') or k not in self._nodes) else self._nodes[k]
if k not in self._nodes or (v.get('base_task_id') and v.get('clone_task'))
else self._nodes[k]
for k, v in dag_dict.items()}
def _has_stored_configuration(self):
@ -1228,7 +1344,8 @@ class PipelineController(object):
if self._task:
# noinspection PyProtectedMember
name=self._config_section, config_type='dictionary', config_dict=pipeline_dag)
name=self._config_section, config_type='dictionary',
config_text=json.dumps(pipeline_dag, indent=2))
def _daemon(self):
# type: () -> ()
@ -1732,7 +1849,7 @@ class PipelineDecorator(PipelineController):
def _create_task_from_function(
self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return,
packages, project_name, task_name, task_type
packages, project_name, task_name, task_type, repo, branch, commit,
def sanitize(function_source):
matched = re.match(r"[\s]*@PipelineDecorator.component[\s\\]*\(", function_source)
@ -1761,6 +1878,9 @@ class PipelineDecorator(PipelineController):
@ -1804,13 +1924,16 @@ class PipelineDecorator(PipelineController):
return_values=('return_object', ), # type: Union[str, List[str]]
name=None, # type: Optional[str]
cache=False, # type: bool
packages=None, # type: Optional[List[str]]
packages=None, # type: Optional[Union[str, Sequence[str]]]
parents=None, # type: Optional[List[str]]
execution_queue=None, # type: Optional[str]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
task_type=None, # type: Optional[str]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str]
# type: (...) -> Callable
@ -1825,8 +1948,8 @@ class PipelineDecorator(PipelineController):
was already executed. If it was found, use it instead of launching a new Task.
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added based on the imports used in the function.
:param parents: Optional list of parent nodes in the DAG.
The current step in the pipeline will be sent for execution only after all the parent nodes
@ -1839,6 +1962,15 @@ class PipelineDecorator(PipelineController):
inside the docker before setting up the Task's environment
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param repo: Optional, specify a repository to attach to the function, when remotely executing.
Allow users to execute the function inside the specified repository, enabling to load modules/script
from a repository Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path.
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
:return: function wrapper
@ -1868,6 +2000,9 @@ class PipelineDecorator(PipelineController):
if cls._singleton:
@ -1881,15 +2016,24 @@ class PipelineDecorator(PipelineController):
args = [v._remoteref() if isinstance(v, LazyEvalWrapper) else v for v in args]
kwargs = {k: v._remoteref() if isinstance(v, LazyEvalWrapper) else v for k, v in kwargs.items()}
def result_wrapper(return_name):
result = func(*args, **kwargs)
return result
func_return = []
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, n),
remote_reference=functools.partial(result_wrapper, n))
for n in function_return]
return return_w[0] if len(return_w) == 1 else return_w
def result_wrapper(a_func_return, return_index):
if not a_func_return:
a_func_return.append(func(*args, **kwargs))
a_func_return = a_func_return[0]
return a_func_return if return_index is None else a_func_return[return_index]
if len(function_return) == 1:
return LazyEvalWrapper(
callback=functools.partial(result_wrapper, func_return, None),
remote_reference=functools.partial(result_wrapper, func_return, None))
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, func_return, i),
remote_reference=functools.partial(result_wrapper, func_return, i))
for i, _ in enumerate(function_return)]
return return_w
# resolve all lazy objects if we have any:
kwargs_artifacts = {}
@ -466,7 +466,7 @@ class ClearmlJob(object):
if Session.check_min_api_version('2.13'):
# noinspection PyProtectedMember
potential_tasks = Task._query_tasks(
status=['completed', 'stopped', 'published'],
status=['completed', 'published'],
@ -475,7 +475,7 @@ class ClearmlJob(object):
# noinspection PyProtectedMember
potential_tasks = Task._query_tasks(
status=['completed', 'stopped', 'published'],
status=['completed', 'published'],
_all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)),
@ -156,6 +156,7 @@ class CreateAndPopulate(object):
# check if we have no repository and no requirements raise error
@ -495,7 +496,10 @@ if __name__ == '__main__':
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
repo=None, # type: Optional[str]
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
packages=None, # type: Optional[Union[str, Sequence[str]]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
@ -540,7 +544,13 @@ if __name__ == '__main__':
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
:param repo: Remote URL for the repository to use, OR path to local copy of the git repository
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'
:param branch: Select specific repository branch/tag (implies the latest commit from the branch)
:param commit: Select specific commit id to use (default: latest commit,
or when used with local repository matching the local commit id)
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added based on the imports used in the function.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
@ -594,16 +604,26 @@ if __name__ == '__main__':
with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file:
temp_dir = repo if repo and Path(repo).is_dir() else None
with tempfile.NamedTemporaryFile('w', suffix='.py', dir=temp_dir) as temp_file:
requirements_file = None
if packages and not isinstance(packages, (list, tuple)) and Path(packages).is_file():
requirements_file = packages
packages = False
populate = CreateAndPopulate(
task_name=task_name or str(function_name),
packages=packages if packages is not None else True,
@ -614,7 +634,9 @@ if __name__ == '__main__':
task = populate.create_task(dry_run=dry_run)
if dry_run:
task['script']['diff'] = task_template
task['script']['entry_point'] = entry_point
task['script']['working_dir'] = '.'
task['hyperparams'] = {
cls.kwargs_section: {
k: dict(section=cls.kwargs_section, name=k, value=str(v))
@ -626,7 +648,9 @@ if __name__ == '__main__':
task.update_task(task_data={'script': {'entry_point': entry_point}})
'script': task.data.script.to_dict().update(
{'entry_point': entry_point, 'working_dir': '.', 'diff': task_template})})
hyper_parameters = {'{}/{}'.format(cls.kwargs_section, k): str(v) for k, v in function_kwargs} \
if function_kwargs else {}
@ -65,6 +65,7 @@ def executing_pipeline(pickle_url, mock_parameter='mock'):
print('pipeline args:', pickle_url, mock_parameter)
# Use the pipeline argument to start the pipeline and pass it ot the first step
print('launch step one')
data_frame = step_one(pickle_url)
# Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`)
@ -72,11 +73,13 @@ def executing_pipeline(pickle_url, mock_parameter='mock'):
# the pipeline logic does not actually load the artifact itself.
# When actually passing the `data_frame` object into a new step,
# It waits for the creating step/function (`step_one`) to complete the execution
print('launch step two')
processed_data = step_two(data_frame)
# Notice we can actually process/modify the returned values inside the pipeline logic context.
# This means the modified object will be stored on the pipeline Task.
processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]]
print('launch step three')
model = step_three(processed_data)
# Notice since we are "printing" the `model` object,
@ -1,45 +0,0 @@
from clearml import Task
from clearml.automation.controller import PipelineController
def pre_execute_callback_example(a_pipeline, a_node, current_param_override):
# type (PipelineController, PipelineController.Node, dict) -> bool
print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override))
# if we want to skip this node (and subtree of this node) we return False
# return True to continue DAG execution
return True
def post_execute_callback_example(a_pipeline, a_node):
# type (PipelineController, PipelineController.Node) -> None
print('Completed Task id={}'.format(a_node.executed))
# if we need the actual Task to change Task.get_task(task_id=a_node.executed)
# Connecting ClearML with the current process,
# from here on everything is logged automatically
task = Task.init(project_name='examples', task_name='pipeline demo',
task_type=Task.TaskTypes.controller, reuse_last_task_id=False)
pipe = PipelineController(default_execution_queue='default', add_pipeline_tags=False)
pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='pipeline step 1 dataset artifact')
pipe.add_step(name='stage_process', parents=['stage_data', ],
base_task_project='examples', base_task_name='pipeline step 2 process dataset',
parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}',
'General/test_size': 0.25},
pipe.add_step(name='stage_train', parents=['stage_process', ],
base_task_project='examples', base_task_name='pipeline step 3 train model',
parameter_override={'General/dataset_task_id': '${stage_process.id}'})
# Starting the pipeline (in the background)
# Wait until pipeline terminates
# cleanup everything
Reference in New Issue
Block a user