Add programmatic pipeline clone using PipelineController.create() and PipelineController.clone() (#1353)

This commit is contained in:
clearml 2024-12-03 10:02:47 +02:00
parent 0efdd35b00
commit a494a926f2

View File

@ -59,6 +59,7 @@ class PipelineController(object):
_update_execution_plot_interval = 5.*60
_update_progress_interval = 10.
_monitor_node_interval = 5.*60
_pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17"))
_report_plot_execution_flow = dict(title='Pipeline', series='Execution Flow')
_report_plot_execution_details = dict(title='Pipeline Details', series='Execution Details')
_evaluated_return_values = {} # TID: pipeline_name
@ -328,7 +329,7 @@ class PipelineController(object):
self._version = str(version).strip() if version else None
if self._version and not Version.is_valid_version_string(self._version):
raise ValueError(
"Setting non-semantic dataset version '{}'".format(self._version)
"Setting non-semantic pipeline version '{}'".format(self._version)
)
self._pool_frequency = pool_frequency * 60.
self._thread = None
@ -348,19 +349,12 @@ class PipelineController(object):
self._reporting_lock = RLock()
self._pipeline_task_status_failed = None
self._mock_execution = False # used for nested pipelines (eager execution)
self._pipeline_as_sub_project = bool(Session.check_min_api_server_version("2.17"))
self._last_progress_update_time = 0
self._artifact_serialization_function = artifact_serialization_function
self._artifact_deserialization_function = artifact_deserialization_function
self._skip_global_imports = skip_global_imports
if not self._task:
task_name = name or project or '{}'.format(datetime.now())
if self._pipeline_as_sub_project:
parent_project = (project + "/" if project else "") + self._project_section
project_name = "{}/{}".format(parent_project, task_name)
else:
parent_project = None
project_name = project or 'Pipelines'
pipeline_project_args = self._create_pipeline_project_args(name, project)
# if user disabled the auto-repo, we force local script storage (repo="" or repo=False)
set_force_local_repo = False
@ -369,8 +363,8 @@ class PipelineController(object):
set_force_local_repo = True
self._task = Task.init(
project_name=project_name,
task_name=task_name,
project_name=pipeline_project_args["project_name"],
task_name=pipeline_project_args["task_name"],
task_type=Task.TaskTypes.controller,
auto_resource_monitoring=False,
reuse_last_task_id=False
@ -382,15 +376,13 @@ class PipelineController(object):
self._task._wait_for_repo_detection(timeout=300.)
Task.force_store_standalone_script(force=False)
# make sure project is hidden
if self._pipeline_as_sub_project:
get_or_create_project(
self._task.session, project_name=parent_project, system_tags=["hidden"])
get_or_create_project(
self._task.session, project_name=project_name,
project_id=self._task.project, system_tags=self._project_system_tags)
self._create_pipeline_projects(
task=self._task,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
if output_uri is not None:
self._task.output_uri = output_uri
self._output_uri = output_uri
@ -1440,6 +1432,170 @@ class PipelineController(object):
"""
return self._pipeline_args
@classmethod
def _create_pipeline_project_args(cls, name, project):
task_name = name or project or '{}'.format(datetime.now())
if cls._pipeline_as_sub_project:
parent_project = (project + "/" if project else "") + cls._project_section
project_name = "{}/{}".format(parent_project, task_name)
else:
parent_project = None
project_name = project or 'Pipelines'
return {"task_name": task_name, "parent_project": parent_project, "project_name": project_name}
@classmethod
def _create_pipeline_projects(cls, task, parent_project, project_name):
# make sure project is hidden
if not cls._pipeline_as_sub_project:
return
get_or_create_project(Task._get_default_session(), project_name=parent_project, system_tags=["hidden"])
return get_or_create_project(
Task._get_default_session(),
project_name=project_name,
project_id=task.project,
system_tags=cls._project_system_tags,
)
@classmethod
def create(
cls,
project_name, # type: str
task_name, # type: str
repo=None, # type: str
branch=None, # type: Optional[str]
commit=None, # type: Optional[str]
script=None, # type: Optional[str]
working_directory=None, # type: Optional[str]
packages=None, # type: Optional[Union[bool, Sequence[str]]]
requirements_file=None, # type: Optional[Union[str, Path]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
argparse_args=None, # type: Optional[Sequence[Tuple[str, str]]]
force_single_script_file=False, # type: bool
version=None, # type: Optional[str]
add_run_number=True, # type: bool
):
# type: (...) -> PipelineController
"""
Manually create and populate a new Pipeline in the system.
Supports pipelines from functions, decorators and tasks.
:param project_name: Set the project name for the pipeline.
:param task_name: Set the name of the remote pipeline..
:param repo: Remote URL for the repository to use, or path to local copy of the git repository.
Example: 'https://github.com/allegroai/clearml.git' or '~/project/repo'. If ``repo`` is specified, then
the ``script`` parameter must also be specified
:param branch: Select specific repository branch/tag (implies the latest commit from the branch)
:param commit: Select specific commit ID to use (default: latest commit,
or when used with local repository matching the local commit ID)
:param script: Specify the entry point script for the remote execution. When used in tandem with
remote git repository the script should be a relative path inside the repository,
for example: './source/train.py' . When used with local repository path it supports a
direct path to a file inside the local repository itself, for example: '~/project/source/train.py'
:param working_directory: Working directory to launch the script from. Default: repository root folder.
Relative to repo root or local folder.
:param packages: Manually specify a list of required packages. Example: ``["tqdm>=2.1", "scikit-learn"]``
or `True` to automatically create requirements
based on locally installed packages (repository must be local).
:param requirements_file: Specify requirements.txt file to install when setting the session.
If not provided, the requirements.txt from the repository will be used.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value)
Notice, only supported if the codebase itself uses argparse.ArgumentParser
:param force_single_script_file: If True, do not auto-detect local repository
:return: The newly created PipelineController
"""
pipeline_project_args = cls._create_pipeline_project_args(
name=task_name, project=project_name
)
pipeline_controller = Task.create(
project_name=pipeline_project_args["project_name"],
task_name=pipeline_project_args["task_name"],
task_type=Task.TaskTypes.controller,
repo=repo,
branch=branch,
commit=commit,
script=script,
working_directory=working_directory,
packages=packages,
requirements_file=requirements_file,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
argparse_args=argparse_args,
add_task_init_call=False,
force_single_script_file=force_single_script_file
)
cls._create_pipeline_projects(
task=pipeline_controller,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
pipeline_controller.set_system_tags((pipeline_controller.get_system_tags() or []) + [cls._tag])
pipeline_controller.set_user_properties(version=version or cls._default_pipeline_version)
if add_run_number:
cls._add_pipeline_name_run_number(pipeline_controller)
print(pipeline_controller.get_output_log_web_page())
return cls._create_pipeline_controller_from_task(pipeline_controller)
@classmethod
def clone(
cls,
pipeline_controller, # type: Union[PipelineController, str]
name=None, # type: Optional[str]
comment=None, # type: Optional[str]
parent=None, # type: Optional[str]
project=None, # type: Optional[str]
version=None # type: Optional[str]
):
# type: (...) -> PipelineController
"""
Create a duplicate (a clone) of a pipeline (experiment). The status of the cloned pipeline is ``Draft``
and modifiable.
:param str pipeline_controller: The pipeline to clone. Specify a PipelineController object or an ID.
:param str name: The name of the new cloned pipeline.
:param str comment: A comment / description for the new cloned pipeline.
:param str parent: The ID of the parent Task of the new pipeline.
- If ``parent`` is not specified, then ``parent`` is set to ``source_task.parent``.
- If ``parent`` is not specified and ``source_task.parent`` is not available,
then ``parent`` set to ``source_task``.
:param str project: The project name in which to create the new pipeline.
If ``None``, the clone inherits the original pipeline's project
:param str version: The version of the new cloned pipeline. If ``None``, the clone
inherits the original pipeline's version
:return: The new cloned PipelineController
"""
if isinstance(pipeline_controller, six.string_types):
pipeline_controller = Task.get_task(task_id=pipeline_controller)
elif isinstance(pipeline_controller, PipelineController):
pipeline_controller = pipeline_controller.task
if project or name:
pipeline_project_args = cls._create_pipeline_project_args(
name=name or pipeline_controller.name, project=project or pipeline_controller.get_project_name()
)
project = cls._create_pipeline_projects(
task=pipeline_controller,
parent_project=pipeline_project_args["parent_project"],
project_name=pipeline_project_args["project_name"],
)
name = pipeline_project_args["task_name"]
cloned_controller = Task.clone(
source_task=pipeline_controller, name=name, comment=comment, parent=parent, project=project
)
if version:
cloned_controller.set_user_properties(version=version)
return cls._create_pipeline_controller_from_task(cloned_controller)
@classmethod
def enqueue(cls, pipeline_controller, queue_name=None, queue_id=None, force=False):
# type: (Union[PipelineController, str], Optional[str], Optional[str], bool) -> Any
@ -1555,6 +1711,10 @@ class PipelineController(object):
error_msg += ", pipeline_version={}".format(pipeline_version)
raise ValueError(error_msg)
pipeline_task = Task.get_task(task_id=pipeline_id)
return cls._create_pipeline_controller_from_task(pipeline_task)
@classmethod
def _create_pipeline_controller_from_task(cls, pipeline_task):
pipeline_object = cls.__new__(cls)
pipeline_object._task = pipeline_task
pipeline_object._nodes = {}
@ -1566,6 +1726,11 @@ class PipelineController(object):
pass
return pipeline_object
@property
def task(self):
# type: () -> Task
return self._task
@property
def id(self):
# type: () -> str
@ -3194,23 +3359,24 @@ class PipelineController(object):
session=self._task.session if self._task else Task.default_session,
project_name=self._target_project)
def _add_pipeline_name_run_number(self):
@classmethod
def _add_pipeline_name_run_number(cls, task):
# type: () -> None
if not self._task:
if not task:
return
# if we were already executed, do not rename (meaning aborted pipeline that was continued)
# noinspection PyProtectedMember
if self._task._get_runtime_properties().get(self._runtime_property_hash):
if task._get_runtime_properties().get(cls._runtime_property_hash):
return
# remove the #<num> suffix if we have one:
task_name = re.compile(r" #\d+$").split(self._task.name or "", 1)[0]
task_name = re.compile(r" #\d+$").split(task.name or "", 1)[0]
page_size = 100
# find exact name or " #<num>" extension
prev_pipelines_ids = self._task.query_tasks(
prev_pipelines_ids = task.query_tasks(
task_name=r"^{}(| #\d+)$".format(task_name),
task_filter=dict(
project=[self._task.project], system_tags=[self._tag],
project=[task.project], system_tags=[cls._tag],
order_by=['-created'],
page_size=page_size,
fetch_only_first_page=True,
@ -3223,8 +3389,8 @@ class PipelineController(object):
# worst case fail to auto increment
try:
# we assume we are the latest so let's take a few (last 10) and check the max number
last_task_name = self._task.query_tasks(
task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[self._task.project]),
last_task_name = task.query_tasks(
task_filter=dict(task_ids=prev_pipelines_ids[:10], project=[task.project]),
additional_return_fields=['name'],
) # type: List[Dict]
# let's parse the names
@ -3243,7 +3409,7 @@ class PipelineController(object):
max_value = 0
if max_value > 1:
self._task.set_name(task_name + " #{}".format(max_value))
task.set_name(task_name + " #{}".format(max_value))
@classmethod
def _get_pipeline_task(cls):