From 88a828cd78eb8c62bd2ead6eb89452c74afb4e90 Mon Sep 17 00:00:00 2001 From: clearml <> Date: Fri, 18 Apr 2025 16:02:30 +0300 Subject: [PATCH] Fix P\pipelines ran via clearml-task do not appear in the UI --- clearml/automation/controller.py | 8 +++ clearml/cli/task/__main__.py | 113 +++++++++++++++++++++---------- clearml/task.py | 8 +++ 3 files changed, 93 insertions(+), 36 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 1022fad3..61bcdc28 100755 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -1589,6 +1589,8 @@ class PipelineController(object): force_single_script_file: bool = False, version: Optional[str] = None, add_run_number: bool = True, + binary: Optional[str] = None, + module: Optional[str] = None ) -> "PipelineController": """ Manually create and populate a new Pipeline in the system. @@ -1620,6 +1622,10 @@ class PipelineController(object): :param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value) Notice, only supported if the codebase itself uses argparse.ArgumentParser :param force_single_script_file: If True, do not auto-detect local repository + :param binary: Binary used to launch the pipeline + :param module: If specified instead of executing `script`, a module named `module` is executed. + Implies script is empty. Module can contain multiple argument for execution, + for example: module="my.module arg1 arg2" :return: The newly created PipelineController """ @@ -1641,6 +1647,8 @@ class PipelineController(object): argparse_args=argparse_args, add_task_init_call=False, force_single_script_file=force_single_script_file, + binary=binary, + module=module ) cls._create_pipeline_projects( task=pipeline_controller, diff --git a/clearml/cli/task/__main__.py b/clearml/cli/task/__main__.py index bb817bf3..cc8c5ccd 100644 --- a/clearml/cli/task/__main__.py +++ b/clearml/cli/task/__main__.py @@ -4,7 +4,7 @@ from argparse import ArgumentParser from pathlib2 import Path import clearml.backend_api.session -from clearml import Task +from clearml import Task, PipelineController from clearml.backend_interface.task.populate import CreateAndPopulate from clearml.version import __version__ @@ -158,7 +158,7 @@ def setup_parser(parser: ArgumentParser) -> None: default=None, help="Set the Task type, optional values: " "training, testing, inference, data_processing, application, monitor, " - "controller, optimizer, service, qc, custom", + "optimizer, service, qc, custom. Will be ignored if '--pipeline' is used.", ) parser.add_argument( "--skip-task-init", @@ -180,6 +180,22 @@ def setup_parser(parser: ArgumentParser) -> None: default=None, help="Specify the path to the offline session you want to import.", ) + parser.add_argument( + "--pipeline", + action="store_true", + help="If specified, indicate that the created object is a pipeline instead of a regular task", + ) + parser.add_argument( + "--pipeline-version", + type=str, + default=None, + help="Specify the pipeline version. Will be ignored if '--pipeline' is not specified", + ) + parser.add_argument( + "--pipeline-dont-add-run-number", + action="store_true", + help="If specified, don't add the run number to the pipeline. Will be ignored if '--pipeline' is not specified", + ) def cli() -> None: @@ -217,49 +233,74 @@ def cli() -> None: else: if args.script and args.script.endswith(".sh") and not args.binary: print("Detected shell script. Binary will be set to '/bin/bash'") - create_populate = CreateAndPopulate( - project_name=args.project, - task_name=args.name, - task_type=args.task_type, - repo=args.repo or args.folder, - branch=args.branch, - commit=args.commit, - script=args.script, - module=args.module, - working_directory=args.cwd, - packages=args.packages, - requirements_file=args.requirements, - docker=args.docker, - docker_args=args.docker_args, - docker_bash_setup_script=bash_setup_script, - output_uri=args.output_uri, - base_task_id=args.base_task_id, - add_task_init_call=not args.skip_task_init, - raise_on_missing_entries=True, - verbose=True, - binary=args.binary, - ) - # verify args before creating the Task - create_populate.update_task_args(args.args) - print("Creating new task") - create_populate.create_task() + if args.pipeline: + pipeline = PipelineController.create( + project_name=args.project, + task_name=args.name, + repo=args.repo or args.folder, + branch=args.branch, + commit=args.commit, + script=args.script, + module=args.module, + working_directory=args.cwd, + packages=args.packages, + requirements_file=args.requirements, + docker=args.docker, + docker_args=args.docker_args, + docker_bash_setup_script=bash_setup_script, + version=args.pipeline_version, + add_run_number=False if args.pipeline_dont_add_run_number else True, + binary=args.binary + ) + created_task = pipeline._task + else: + created_task = CreateAndPopulate( + project_name=args.project, + task_name=args.name, + task_type=args.task_type, + repo=args.repo or args.folder, + branch=args.branch, + commit=args.commit, + script=args.script, + module=args.module, + working_directory=args.cwd, + packages=args.packages, + requirements_file=args.requirements, + docker=args.docker, + docker_args=args.docker_args, + docker_bash_setup_script=bash_setup_script, + output_uri=args.output_uri, + base_task_id=args.base_task_id, + add_task_init_call=not args.skip_task_init, + raise_on_missing_entries=True, + verbose=True, + binary=args.binary + ) + # verify args before creating the Task + created_task.update_task_args(args.args) + print("Creating new task") + created_task.create_task() # update Task args - create_populate.update_task_args(args.args) + created_task.update_task_args(args.args) # set tags if args.tags: - create_populate.task.add_tags(args.tags) + created_task.task.add_tags(args.tags) # noinspection PyProtectedMember - create_populate.task._set_runtime_properties({"_CLEARML_TASK": True}) + created_task.task._set_runtime_properties({"_CLEARML_TASK": True}) - print("New task created id={}".format(create_populate.get_id())) + print("New {} created id={}".format("pipeline" if args.pipeline else "task", created_task.get_id())) if not args.queue: - print("Warning: No queue was provided, leaving task in draft-mode.") + print("Warning: No queue was provided, leaving {} in draft-mode.", "pipeline" if args.pipeline else "task") exit(0) - Task.enqueue(create_populate.task, queue_name=args.queue) - print("Task id={} sent for execution on queue {}".format(create_populate.get_id(), args.queue)) - print("Execution log at: {}".format(create_populate.task.get_output_log_web_page())) + Task.enqueue(created_task.task, queue_name=args.queue) + print( + "{} id={} sent for execution on queue {}".format( + "Pipeline" if args.pipeline else "task", created_task.get_id(), args.queue + ) + ) + print("Execution log at: {}".format(created_task.task.get_output_log_web_page())) def main() -> None: diff --git a/clearml/task.py b/clearml/task.py index 9d9982df..f6b9004e 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1212,6 +1212,8 @@ class Task(_Task): base_task_id: Optional[str] = None, add_task_init_call: bool = True, force_single_script_file: bool = False, + binary: Optional[str] = None, + module: Optional[str] = None ) -> TaskInstance: """ Manually create and populate a new Task (experiment) in the system. @@ -1254,6 +1256,10 @@ class Task(_Task): Essentially clones an existing task and overrides arguments/requirements. :param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution. :param force_single_script_file: If True, do not auto-detect local repository + :param binary: Binary used to launch the entry point + :param module: If specified instead of executing `script`, a module named `module` is executed. + Implies script is empty. Module can contain multiple argument for execution, + for example: module="my.module arg1 arg2" :return: The newly created Task (experiment) :rtype: Task @@ -1287,6 +1293,8 @@ class Task(_Task): add_task_init_call=add_task_init_call, force_single_script_file=force_single_script_file, raise_on_missing_entries=False, + module=module, + binary=binary ) task = manual_populate.create_task() if task and argparse_args: