From a494a926f2c3adcf6567d1a029a6037a587d7105 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Tue, 3 Dec 2024 10:02:47 +0200 Subject: [PATCH] Add programmatic pipeline clone using `PipelineController.create()` and `PipelineController.clone()` (#1353) --- clearml/automation/controller.py | 222 +++++++++++++++++++++++++++---- 1 file changed, 194 insertions(+), 28 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 08612282..630f2830 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -59,6 +59,7 @@ class PipelineController(object): _update_execution_plot_interval = 5.*60 _update_progress_interval = 10. _monitor_node_interval = 5.*60 + _pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) _report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow') _report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details') _evaluated_return_values = {} # TID: pipeline_name @@ -328,7 +329,7 @@ class PipelineController(object): self._version = str(version).strip() if version else None if self._version and not Version.is_valid_version_string(self._version): raise ValueError( - "Setting non-semantic dataset version '{}'".format(self._version) + "Setting non-semantic pipeline version '{}'".format(self._version) ) self._pool_frequency = pool_frequency * 60. self._thread = None @@ -348,19 +349,12 @@ class PipelineController(object): self._reporting_lock = RLock() self._pipeline_task_status_failed = None self._mock_execution = False # used for nested pipelines (eager execution) - self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17")) self._last_progress_update_time = 0 self._artifact_serialization_function = artifact_serialization_function self._artifact_deserialization_function = artifact_deserialization_function self._skip_global_imports = skip_global_imports if not self._task: - task_name = name or project or '{}'.format(datetime.now()) - if self._pipeline_as_sub_project: - parent_project = (project + "/" if project else "") + self._project_section - project_name = "{}/{}".format(parent_project, task_name) - else: - parent_project = None - project_name = project or 'Pipelines' + pipeline_project_args = self._create_pipeline_project_args(name, project) # if user disabled the auto-repo, we force local script storage (repo="" or repo=False) set_force_local_repo = False @@ -369,8 +363,8 @@ class PipelineController(object): set_force_local_repo = True self._task = Task.init( - project_name=project_name, - task_name=task_name, + project_name=pipeline_project_args["project_name"], + task_name=pipeline_project_args["task_name"], task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, reuse_last_task_id=False @@ -382,15 +376,13 @@ class PipelineController(object): self._task._wait_for_repo_detection(timeout=300.) Task.force_store_standalone_script(force=False) - # make sure project is hidden - if self._pipeline_as_sub_project: - get_or_create_project( - self._task.session, project_name=parent_project, system_tags=["hidden"]) - get_or_create_project( - self._task.session, project_name=project_name, - project_id=self._task.project, system_tags=self._project_system_tags) - + self._create_pipeline_projects( + task=self._task, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) + if output_uri is not None: self._task.output_uri = output_uri self._output_uri = output_uri @@ -1440,6 +1432,170 @@ class PipelineController(object): """ return self._pipeline_args + @classmethod + def _create_pipeline_project_args(cls, name, project): + task_name = name or project or '{}'.format(datetime.now()) + if cls._pipeline_as_sub_project: + parent_project = (project + "/" if project else "") + cls._project_section + project_name = "{}/{}".format(parent_project, task_name) + else: + parent_project = None + project_name = project or 'Pipelines' + return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name} + + @classmethod + def _create_pipeline_projects(cls, task, parent_project, project_name): + # make sure project is hidden + if not cls._pipeline_as_sub_project: + return + get_or_create_project(Task._get_default_session(), project_name=parent_project, system_tags=["hidden"]) + return get_or_create_project( + Task._get_default_session(), + project_name=project_name, + project_id=task.project, + system_tags=cls._project_system_tags, + ) + + @classmethod + def create( + cls, + project_name, # type: str + task_name, # type: str + repo=None, # type: str + branch=None, # type: Optional[str] + commit=None, # type: Optional[str] + script=None, # type: Optional[str] + working_directory=None, # type: Optional[str] + packages=None, # type: Optional[Union[bool, Sequence[str]]] + requirements_file=None, # type: Optional[Union[str, Path]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]] + force_single_script_file=False, # type: bool + version=None, # type: Optional[str] + add_run_number=True, # type: bool + ): + # type: (...) -> PipelineController + """ + Manually create and populate a new Pipeline in the system. + Supports pipelines from functions, decorators and tasks. + + :param project_name: Set the project name for the pipeline. + :param task_name: Set the name of the remote pipeline.. + :param repo: Remote URL for the repository to use, or path to local copy of the git repository. + Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'. If ``repo`` is specified, then + the ``script`` parameter must also be specified + :param branch: Select specific repository branch/tag (implies the latest commit from the branch) + :param commit: Select specific commit ID to use (default: latest commit, + or when used with local repository matching the local commit ID) + :param script: Specify the entry point script for the remote execution. When used in tandem with + remote git repository the script should be a relative path inside the repository, + for example: './source/train.py' . When used with local repository path it supports a + direct path to a file inside the local repository itself, for example: '~/project/source/train.py' + :param working_directory: Working directory to launch the script from. Default: repository root folder. + Relative to repo root or local folder. + :param packages: Manually specify a list of required packages. Example: ``["tqdm>=2.1", "scikit-learn"]`` + or `True` to automatically create requirements + based on locally installed packages (repository must be local). + :param requirements_file: Specify requirements.txt file to install when setting the session. + If not provided, the requirements.txt from the repository will be used. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value) + Notice, only supported if the codebase itself uses argparse.ArgumentParser + :param force_single_script_file: If True, do not auto-detect local repository + + :return: The newly created PipelineController + """ + pipeline_project_args = cls._create_pipeline_project_args( + name=task_name, project=project_name + ) + pipeline_controller = Task.create( + project_name=pipeline_project_args["project_name"], + task_name=pipeline_project_args["task_name"], + task_type=Task.TaskTypes.controller, + repo=repo, + branch=branch, + commit=commit, + script=script, + working_directory=working_directory, + packages=packages, + requirements_file=requirements_file, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + argparse_args=argparse_args, + add_task_init_call=False, + force_single_script_file=force_single_script_file + ) + cls._create_pipeline_projects( + task=pipeline_controller, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) + pipeline_controller.set_system_tags((pipeline_controller.get_system_tags() or []) + [cls._tag]) + pipeline_controller.set_user_properties(version=version or cls._default_pipeline_version) + if add_run_number: + cls._add_pipeline_name_run_number(pipeline_controller) + print(pipeline_controller.get_output_log_web_page()) + return cls._create_pipeline_controller_from_task(pipeline_controller) + + @classmethod + def clone( + cls, + pipeline_controller, # type: Union[PipelineController, str] + name=None, # type: Optional[str] + comment=None, # type: Optional[str] + parent=None, # type: Optional[str] + project=None, # type: Optional[str] + version=None # type: Optional[str] + ): + # type: (...) -> PipelineController + """ + Create a duplicate (a clone) of a pipeline (experiment). The status of the cloned pipeline is ``Draft`` + and modifiable. + + :param str pipeline_controller: The pipeline to clone. Specify a PipelineController object or an ID. + :param str name: The name of the new cloned pipeline. + :param str comment: A comment / description for the new cloned pipeline. + :param str parent: The ID of the parent Task of the new pipeline. + + - If ``parent`` is not specified, then ``parent`` is set to ``source_task.parent``. + - If ``parent`` is not specified and ``source_task.parent`` is not available, + then ``parent`` set to ``source_task``. + + :param str project: The project name in which to create the new pipeline. + If ``None``, the clone inherits the original pipeline's project + :param str version: The version of the new cloned pipeline. If ``None``, the clone + inherits the original pipeline's version + + :return: The new cloned PipelineController + """ + if isinstance(pipeline_controller, six.string_types): + pipeline_controller = Task.get_task(task_id=pipeline_controller) + elif isinstance(pipeline_controller, PipelineController): + pipeline_controller = pipeline_controller.task + + if project or name: + pipeline_project_args = cls._create_pipeline_project_args( + name=name or pipeline_controller.name, project=project or pipeline_controller.get_project_name() + ) + project = cls._create_pipeline_projects( + task=pipeline_controller, + parent_project=pipeline_project_args["parent_project"], + project_name=pipeline_project_args["project_name"], + ) + name = pipeline_project_args["task_name"] + cloned_controller = Task.clone( + source_task=pipeline_controller, name=name, comment=comment, parent=parent, project=project + ) + if version: + cloned_controller.set_user_properties(version=version) + return cls._create_pipeline_controller_from_task(cloned_controller) + @classmethod def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False): # type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any @@ -1555,6 +1711,10 @@ class PipelineController(object): error_msg += ", pipeline_version={}".format(pipeline_version) raise ValueError(error_msg) pipeline_task = Task.get_task(task_id=pipeline_id) + return cls._create_pipeline_controller_from_task(pipeline_task) + + @classmethod + def _create_pipeline_controller_from_task(cls, pipeline_task): pipeline_object = cls.__new__(cls) pipeline_object._task = pipeline_task pipeline_object._nodes = {} @@ -1566,6 +1726,11 @@ class PipelineController(object): pass return pipeline_object + @property + def task(self): + # type: () -> Task + return self._task + @property def id(self): # type: () -> str @@ -3194,23 +3359,24 @@ class PipelineController(object): session=self._task.session if self._task else Task.default_session, project_name=self._target_project) - def _add_pipeline_name_run_number(self): + @classmethod + def _add_pipeline_name_run_number(cls, task): # type: () -> None - if not self._task: + if not task: return # if we were already executed, do not rename (meaning aborted pipeline that was continued) # noinspection PyProtectedMember - if self._task._get_runtime_properties().get(self._runtime_property_hash): + if task._get_runtime_properties().get(cls._runtime_property_hash): return # remove the # suffix if we have one: - task_name = re.compile(r" #\d+$").split(self._task.name or "", 1)[0] + task_name = re.compile(r" #\d+$").split(task.name or "", 1)[0] page_size = 100 # find exact name or " #" extension - prev_pipelines_ids = self._task.query_tasks( + prev_pipelines_ids = task.query_tasks( task_name=r"^{}(| #\d+)$".format(task_name), task_filter=dict( - project=[self._task.project], system_tags=[self._tag], + project=[task.project], system_tags=[cls._tag], order_by=['-created'], page_size=page_size, fetch_only_first_page=True, @@ -3223,8 +3389,8 @@ class PipelineController(object): # worst case fail to auto increment try: # we assume we are the latest so let's take a few (last 10) and check the max number - last_task_name = self._task.query_tasks( - task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[self._task.project]), + last_task_name = task.query_tasks( + task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]), additional_return_fields=['name'], ) # type: List[Dict] # let's parse the names @@ -3243,7 +3409,7 @@ class PipelineController(object): max_value = 0 if max_value > 1: - self._task.set_name(task_name + " #{}".format(max_value)) + task.set_name(task_name + " #{}".format(max_value)) @classmethod def _get_pipeline_task(cls):