From 75dfed602268e58bd1f37deb8db78dbb43d8299e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Thu, 9 Sep 2021 22:01:58 +0300 Subject: [PATCH] Add enhanced pipeline support and examples --- clearml/automation/controller.py | 236 +++++++++++++++++---- clearml/automation/job.py | 4 +- clearml/backend_interface/task/populate.py | 32 ++- examples/pipeline/full_custom_pipeline.py | 3 + examples/pipeline/pipeline_controller.py | 45 ---- 5 files changed, 223 insertions(+), 97 deletions(-) delete mode 100644 examples/pipeline/pipeline_controller.py diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 9477fbb7..6b2cf897 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -14,7 +14,7 @@ from attr import attrib, attrs from .job import LocalClearmlJob from ..automation import ClearmlJob from ..backend_interface.task.populate import CreateFromFunction -from ..backend_interface.util import get_or_create_project +from ..backend_interface.util import get_or_create_project, exact_match_regex from ..debugging.log import LoggerRoot from ..model import BaseModel from ..task import Task @@ -34,6 +34,7 @@ class PipelineController(object): _config_section = 'Pipeline' _args_section = 'Args' _pipeline_step_ref = 'pipeline' + _runtime_property_hash = '_pipeline_hash' _reserved_pipeline_names = (_pipeline_step_ref, ) _task_project_lookup = {} _clearml_job_class = ClearmlJob @@ -64,7 +65,7 @@ class PipelineController(object): add_pipeline_tags=False, # type: bool target_project=None, # type: Optional[str] ): - # type: (...) -> () + # type: (...) -> None """ Create a new pipeline controller. The newly created object will launch and monitor the new experiments. @@ -82,7 +83,11 @@ class PipelineController(object): self._start_time = None self._pipeline_time_limit = None self._default_execution_queue = None - self._version = str(version) + self._version = str(version).strip() + if not self._version or not all(i and i.isnumeric() for i in self._version.split('.')): + raise ValueError( + "Pipeline version has to be in a semantic version form, " + "examples: version='1.0.1', version='1.2', version='23'") self._pool_frequency = pool_frequency * 60. self._thread = None self._pipeline_args = dict() @@ -104,6 +109,7 @@ class PipelineController(object): task_name=name or 'Pipeline {}'.format(datetime.now()), task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, + reuse_last_task_id=False## ) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) self._task.set_user_properties(version=self._version) @@ -293,14 +299,16 @@ class PipelineController(object): project_name=None, # type: Optional[str] task_name=None, # type: Optional[str] task_type=None, # type: Optional[str] - packages=None, # type: Optional[Sequence[str]] + packages=None, # type: Optional[Union[str, Sequence[str]]] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] docker=None, # type: Optional[str] docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] parents=None, # type: Optional[Sequence[str]], execution_queue=None, # type: Optional[str] time_limit=None, # type: Optional[float] - clone_base_task=True, # type: bool pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool @@ -343,8 +351,18 @@ class PipelineController(object): :param task_name: Set the name of the remote task. Required if base_task_id is None. :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' - :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" If not provided, packages are automatically added based on the imports used in the function. + :param repo: Optional, specify a repository to attach to the function, when remotely executing. + Allow users to execute the function inside the specified repository, enabling to load modules/script + from a repository Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path. + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) :param docker: Select the docker image to be executed in by the remote session :param docker_args: Add docker arguments, pass a single string :param docker_bash_setup_script: Add bash script to be executed @@ -356,8 +374,6 @@ class PipelineController(object): If not provided, the task will be sent to the default execution queue, as defined on the class :param time_limit: Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed. - :param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue - the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created). :param pre_execute_callback: Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. @@ -394,7 +410,6 @@ class PipelineController(object): was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. - If `clone_base_task` is False there is no cloning, hence the base_task is used. :return: True if successful """ @@ -433,7 +448,7 @@ class PipelineController(object): task_definition = self._create_task_from_function(docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, packages, project_name, task_name, - task_type) + task_type, repo, repo_branch, repo_commit) # noinspection PyProtectedMember self._task._set_configuration( name=name, config_type='json', @@ -457,7 +472,7 @@ class PipelineController(object): name=name, base_task_id=None, parents=parents or [], queue=execution_queue, timeout=time_limit, parameters=parameters, - clone_task=clone_base_task, + clone_task=False, cache_executed_step=cache_executed_step, task_factory_func=_create_task, return_artifacts=function_return, @@ -471,7 +486,7 @@ class PipelineController(object): def _create_task_from_function( self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, - packages, project_name, task_name, task_type + packages, project_name, task_name, task_type, repo, branch, commit ): task_definition = CreateFromFunction.create_task_from_function( a_function=function, @@ -481,6 +496,9 @@ class PipelineController(object): project_name=project_name, task_name=task_name, task_type=task_type, + repo=repo, + branch=branch, + commit=commit, packages=packages, docker=docker, docker_args=docker_args, @@ -805,15 +823,15 @@ class PipelineController(object): params, pipeline_dag = self._serialize_pipeline_task() # deserialize back pipeline state - if not params['continue_pipeline']: + if not params['_continue_pipeline_']: for k in pipeline_dag: pipeline_dag[k]['executed'] = None - self._default_execution_queue = params['default_queue'] - self._add_pipeline_tags = params['add_pipeline_tags'] - self._target_project = params['target_project'] or '' + self._default_execution_queue = params['_default_queue_'] + self._add_pipeline_tags = params['_add_pipeline_tags_'] + self._target_project = params['_target_project_'] or '' self._deserialize(pipeline_dag) # if we continue the pipeline, make sure that we re-execute failed tasks - if params['continue_pipeline']: + if params['_continue_pipeline_']: for node in self._nodes.values(): if node.executed is False: node.executed = None @@ -833,29 +851,126 @@ class PipelineController(object): :return: params, pipeline_dag """ - params = {'continue_pipeline': False, - 'default_queue': self._default_execution_queue, - 'add_pipeline_tags': self._add_pipeline_tags, - 'target_project': self._target_project, - } + params = { + '_default_queue_': self._default_execution_queue, + '_add_pipeline_tags_': self._add_pipeline_tags, + '_target_project_': self._target_project, + } pipeline_dag = self._serialize() # serialize pipeline state if self._task and self._auto_connect_task: - self._task.connect_configuration(pipeline_dag, name=self._config_section) - self._task.connect(params, name=self._config_section) if self._task.running_locally(): + # noinspection PyProtectedMember + self._task._set_configuration( + name=self._config_section, config_type='dictionary', + config_text=json.dumps(pipeline_dag, indent=2)) + params.update(self._pipeline_args) # noinspection PyProtectedMember self._task._set_parameters( - {'{}/{}'.format(self._args_section, k): str(v) for k, v in self._pipeline_args.items()}, + {'{}/{}'.format(self._args_section, k): str(v) for k, v in params.items()}, __parameters_descriptions=self._pipeline_args_desc, __update=True, ) + params['_continue_pipeline_'] = False + + # make sure we have a unique version number (auto bump version if needed) + # only needed when manually (from code) creating pipelines + self._verify_pipeline_version() + + # noinspection PyProtectedMember + pipeline_hash = self._get_task_hash() + + # noinspection PyProtectedMember + self._task._set_runtime_properties({ + self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), + }) else: + self._task.connect_configuration(pipeline_dag, name=self._config_section) self._task.connect(self._pipeline_args, name=self._args_section) + self._task.connect(params, name=self._args_section) + # noinspection PyProtectedMember + if self._task._get_runtime_properties().get(self._runtime_property_hash): + params['_continue_pipeline_'] = True + else: + # noinspection PyProtectedMember + pipeline_hash = ClearmlJob._create_task_hash(self._task) + # noinspection PyProtectedMember + self._task._set_runtime_properties({ + self._runtime_property_hash: "{}:{}".format(pipeline_hash, self._version), + }) + params['_continue_pipeline_'] = False return params, pipeline_dag + def _verify_pipeline_version(self): + # check if pipeline version exists, if it does increase version + pipeline_hash = self._get_task_hash() + # noinspection PyProtectedMember + existing_tasks = Task._query_tasks( + project=[self._task.project], task_name=exact_match_regex(self._task.name), + type=[str(self._task.task_type)], + system_tags=['-{}'.format(Task.archived_tag), self._tag], + _all_=dict(fields=['runtime.{}'.format(self._runtime_property_hash)], + pattern=":{}".format(self._version)), + only_fields=['id', 'runtime'], + ) + if existing_tasks: + # check if hash match the current version. + matched = True + for t in existing_tasks: + h, _, v = t.runtime.get(self._runtime_property_hash, '').partition(':') + if v == self._version: + matched = bool(h == pipeline_hash) + break + # if hash did not match, look for the highest version + if not matched: + # noinspection PyProtectedMember + existing_tasks = Task._query_tasks( + project=[self._task.project], task_name=exact_match_regex(self._task.name), + type=[str(self._task.task_type)], + system_tags=['-{}'.format(Task.archived_tag), self._tag], + only_fields=['id', 'hyperparams', 'runtime'], + ) + found_match_version = False + existing_versions = set([self._version]) + for t in existing_tasks: + if not t.hyperparams: + continue + v = t.hyperparams.get('properties', {}).get('version') + if v: + existing_versions.add(v.value) + if t.runtime: + h, _, _ = t.runtime.get(self._runtime_property_hash, '').partition(':') + if h == pipeline_hash: + self._version = v.value + found_match_version = True + break + + # match to the version we found: + if found_match_version: + getLogger('clearml.automation.controller').info( + 'Existing Pipeline found, matching version to: {}'.format(self._version)) + else: + # if we did not find a matched pipeline version, get the max one and bump the version by 1 + while True: + v = self._version.split('.') + self._version = '.'.join(v[:-1] + [str(int(v[-1]) + 1)]) + if self._version not in existing_versions: + break + + getLogger('clearml.automation.controller').info( + 'Existing Pipeline version found, bump new version to: {}'.format(self._version)) + + self._task.set_user_properties(version=self._version) + + def _get_task_hash(self): + params_override = dict(**(self._task.get_parameters() or {})) + params_override.pop('properties/version', None) + # noinspection PyProtectedMember + pipeline_hash = ClearmlJob._create_task_hash(self._task, params_override=params_override) + return pipeline_hash + def _serialize(self): # type: () -> dict """ @@ -876,17 +991,18 @@ class PipelineController(object): This will be used to create the DAG from the dict stored on the Task, when running remotely. :return: """ - # make sure that we override nodes that we do not clone. - for name in self._nodes: - if self._nodes[name].clone_task and name in dag_dict and dag_dict[name].get('clone_task'): - dag_dict[name] = dict( - (k, v) for k, v in self._nodes[name].__dict__.items() - if k not in ('job', 'name', 'task_factory_func')) + # if we do not clone the Task, only merge the parts we can override. + for name in self._nodes: + if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'): + for k in ('queue', 'parents', 'timeout', 'parameters', 'task_overrides'): + setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))()) + + # if we do clone the Task deserialize everything, except the function creating self._nodes = { k: self.Node(name=k, **v) - if (k not in self._nodes or not self._nodes[k].task_factory_func) and ( - not v.get('clone_task') or k not in self._nodes) else self._nodes[k] + if k not in self._nodes or (v.get('base_task_id') and v.get('clone_task')) + else self._nodes[k] for k, v in dag_dict.items()} def _has_stored_configuration(self): @@ -1228,7 +1344,8 @@ class PipelineController(object): if self._task: # noinspection PyProtectedMember self._task._set_configuration( - name=self._config_section, config_type='dictionary', config_dict=pipeline_dag) + name=self._config_section, config_type='dictionary', + config_text=json.dumps(pipeline_dag, indent=2)) def _daemon(self): # type: () -> () @@ -1732,7 +1849,7 @@ class PipelineDecorator(PipelineController): def _create_task_from_function( self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, - packages, project_name, task_name, task_type + packages, project_name, task_name, task_type, repo, branch, commit, ): def sanitize(function_source): matched = re.match(r"[\s]*@PipelineDecorator.component[\s\\]*\(", function_source) @@ -1761,6 +1878,9 @@ class PipelineDecorator(PipelineController): project_name=project_name, task_name=task_name, task_type=task_type, + repo=repo, + branch=branch, + commit=commit, packages=packages, docker=docker, docker_args=docker_args, @@ -1804,13 +1924,16 @@ class PipelineDecorator(PipelineController): return_values=('return_object', ), # type: Union[str, List[str]] name=None, # type: Optional[str] cache=False, # type: bool - packages=None, # type: Optional[List[str]] + packages=None, # type: Optional[Union[str, Sequence[str]]] parents=None, # type: Optional[List[str]] execution_queue=None, # type: Optional[str] docker=None, # type: Optional[str] docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] task_type=None, # type: Optional[str] + repo=None, # type: Optional[str] + repo_branch=None, # type: Optional[str] + repo_commit=None, # type: Optional[str] ): # type: (...) -> Callable """ @@ -1825,8 +1948,8 @@ class PipelineDecorator(PipelineController): was already executed. If it was found, use it instead of launching a new Task. Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. - If `clone_base_task` is False there is no cloning, hence the base_task is used. - :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" If not provided, packages are automatically added based on the imports used in the function. :param parents: Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes @@ -1839,6 +1962,15 @@ class PipelineDecorator(PipelineController): inside the docker before setting up the Task's environment :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param repo: Optional, specify a repository to attach to the function, when remotely executing. + Allow users to execute the function inside the specified repository, enabling to load modules/script + from a repository Notice the execution work directory will be the repository root folder. + Supports both git repo url link, and local repository path. + Example remote url: 'https://github.com/user/repo.git' + Example local repo copy: './repo' -> will automatically store the remote + repo url and commit ID based on the locally cloned copy + :param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used) + :param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used) :return: function wrapper """ @@ -1868,6 +2000,9 @@ class PipelineDecorator(PipelineController): docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, task_type=task_type, + repo=repo, + repo_branch=repo_branch, + repo_commit=repo_commit, ) if cls._singleton: @@ -1881,15 +2016,24 @@ class PipelineDecorator(PipelineController): args = [v._remoteref() if isinstance(v, LazyEvalWrapper) else v for v in args] kwargs = {k: v._remoteref() if isinstance(v, LazyEvalWrapper) else v for k, v in kwargs.items()} - def result_wrapper(return_name): - result = func(*args, **kwargs) - return result + func_return = [] - return_w = [LazyEvalWrapper( - callback=functools.partial(result_wrapper, n), - remote_reference=functools.partial(result_wrapper, n)) - for n in function_return] - return return_w[0] if len(return_w) == 1 else return_w + def result_wrapper(a_func_return, return_index): + if not a_func_return: + a_func_return.append(func(*args, **kwargs)) + a_func_return = a_func_return[0] + return a_func_return if return_index is None else a_func_return[return_index] + + if len(function_return) == 1: + return LazyEvalWrapper( + callback=functools.partial(result_wrapper, func_return, None), + remote_reference=functools.partial(result_wrapper, func_return, None)) + else: + return_w = [LazyEvalWrapper( + callback=functools.partial(result_wrapper, func_return, i), + remote_reference=functools.partial(result_wrapper, func_return, i)) + for i, _ in enumerate(function_return)] + return return_w # resolve all lazy objects if we have any: kwargs_artifacts = {} diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 6d311d68..ab9277ac 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -466,7 +466,7 @@ class ClearmlJob(object): if Session.check_min_api_version('2.13'): # noinspection PyProtectedMember potential_tasks = Task._query_tasks( - status=['completed', 'stopped', 'published'], + status=['completed', 'published'], system_tags=['-{}'.format(Task.archived_tag)], _all_=dict(fields=['runtime.{}'.format(cls._job_hash_property)], pattern=exact_match_regex(task_hash)), @@ -475,7 +475,7 @@ class ClearmlJob(object): else: # noinspection PyProtectedMember potential_tasks = Task._query_tasks( - status=['completed', 'stopped', 'published'], + status=['completed', 'published'], system_tags=['-{}'.format(Task.archived_tag)], _all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)), only_fields=['id'], diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index d961aba9..706f2e56 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -156,6 +156,7 @@ class CreateAndPopulate(object): uncommitted_from_remote=True, detect_jupyter_notebook=False, add_missing_installed_packages=True, + detailed_req_report=False, ) # check if we have no repository and no requirements raise error @@ -495,7 +496,10 @@ if __name__ == '__main__': project_name=None, # type: Optional[str] task_name=None, # type: Optional[str] task_type=None, # type: Optional[str] - packages=None, # type: Optional[Sequence[str]] + repo=None, # type: Optional[str] + branch=None, # type: Optional[str] + commit=None, # type: Optional[str] + packages=None, # type: Optional[Union[str, Sequence[str]]] docker=None, # type: Optional[str] docker_args=None, # type: Optional[str] docker_bash_setup_script=None, # type: Optional[str] @@ -540,7 +544,13 @@ if __name__ == '__main__': :param task_name: Set the name of the remote task. Required if base_task_id is None. :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' - :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + :param repo: Remote URL for the repository to use, OR path to local copy of the git repository + Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo' + :param branch: Select specific repository branch/tag (implies the latest commit from the branch) + :param commit: Select specific commit id to use (default: latest commit, + or when used with local repository matching the local commit id) + :param packages: Manually specify a list of required packages or a local requirements.txt file. + Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt" If not provided, packages are automatically added based on the imports used in the function. :param docker: Select the docker image to be executed in by the remote session :param docker_args: Add docker arguments, pass a single string @@ -594,16 +604,26 @@ if __name__ == '__main__': function_name=function_name, function_return=function_return) - with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: + temp_dir = repo if repo and Path(repo).is_dir() else None + with tempfile.NamedTemporaryFile('w', suffix='.py', dir=temp_dir) as temp_file: temp_file.write(task_template) temp_file.flush() + requirements_file = None + if packages and not isinstance(packages, (list, tuple)) and Path(packages).is_file(): + requirements_file = packages + packages = False + populate = CreateAndPopulate( project_name=project_name, task_name=task_name or str(function_name), task_type=task_type, script=temp_file.name, packages=packages if packages is not None else True, + requirements_file=requirements_file, + repo=repo, + branch=branch, + commit=commit, docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, @@ -614,7 +634,9 @@ if __name__ == '__main__': task = populate.create_task(dry_run=dry_run) if dry_run: + task['script']['diff'] = task_template task['script']['entry_point'] = entry_point + task['script']['working_dir'] = '.' task['hyperparams'] = { cls.kwargs_section: { k: dict(section=cls.kwargs_section, name=k, value=str(v)) @@ -626,7 +648,9 @@ if __name__ == '__main__': } } else: - task.update_task(task_data={'script': {'entry_point': entry_point}}) + task.update_task(task_data={ + 'script': task.data.script.to_dict().update( + {'entry_point': entry_point, 'working_dir': '.', 'diff': task_template})}) hyper_parameters = {'{}/{}'.format(cls.kwargs_section, k): str(v) for k, v in function_kwargs} \ if function_kwargs else {} hyper_parameters.update( diff --git a/examples/pipeline/full_custom_pipeline.py b/examples/pipeline/full_custom_pipeline.py index 2adbc75b..b1008c4e 100644 --- a/examples/pipeline/full_custom_pipeline.py +++ b/examples/pipeline/full_custom_pipeline.py @@ -65,6 +65,7 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): print('pipeline args:', pickle_url, mock_parameter) # Use the pipeline argument to start the pipeline and pass it ot the first step + print('launch step one') data_frame = step_one(pickle_url) # Use the returned data from the first step (`step_one`), and pass it to the next step (`step_two`) @@ -72,11 +73,13 @@ def executing_pipeline(pickle_url, mock_parameter='mock'): # the pipeline logic does not actually load the artifact itself. # When actually passing the `data_frame` object into a new step, # It waits for the creating step/function (`step_one`) to complete the execution + print('launch step two') processed_data = step_two(data_frame) # Notice we can actually process/modify the returned values inside the pipeline logic context. # This means the modified object will be stored on the pipeline Task. processed_data = [processed_data[0], processed_data[1]*2, processed_data[2], processed_data[3]] + print('launch step three') model = step_three(processed_data) # Notice since we are "printing" the `model` object, diff --git a/examples/pipeline/pipeline_controller.py b/examples/pipeline/pipeline_controller.py deleted file mode 100644 index d2839315..00000000 --- a/examples/pipeline/pipeline_controller.py +++ /dev/null @@ -1,45 +0,0 @@ -from clearml import Task -from clearml.automation.controller import PipelineController - - -def pre_execute_callback_example(a_pipeline, a_node, current_param_override): - # type (PipelineController, PipelineController.Node, dict) -> bool - print('Cloning Task id={} with parameters: {}'.format(a_node.base_task_id, current_param_override)) - # if we want to skip this node (and subtree of this node) we return False - # return True to continue DAG execution - return True - - -def post_execute_callback_example(a_pipeline, a_node): - # type (PipelineController, PipelineController.Node) -> None - print('Completed Task id={}'.format(a_node.executed)) - # if we need the actual Task to change Task.get_task(task_id=a_node.executed) - return - - -# Connecting ClearML with the current process, -# from here on everything is logged automatically -task = Task.init(project_name='examples', task_name='pipeline demo', - task_type=Task.TaskTypes.controller, reuse_last_task_id=False) - -pipe = PipelineController(default_execution_queue='default', add_pipeline_tags=False) -pipe.add_step(name='stage_data', base_task_project='examples', base_task_name='pipeline step 1 dataset artifact') -pipe.add_step(name='stage_process', parents=['stage_data', ], - base_task_project='examples', base_task_name='pipeline step 2 process dataset', - parameter_override={'General/dataset_url': '${stage_data.artifacts.dataset.url}', - 'General/test_size': 0.25}, - pre_execute_callback=pre_execute_callback_example, - post_execute_callback=post_execute_callback_example - ) -pipe.add_step(name='stage_train', parents=['stage_process', ], - base_task_project='examples', base_task_name='pipeline step 3 train model', - parameter_override={'General/dataset_task_id': '${stage_process.id}'}) - -# Starting the pipeline (in the background) -pipe.start() -# Wait until pipeline terminates -pipe.wait() -# cleanup everything -pipe.stop() - -print('done')