Fix P\pipelines ran via clearml-task do not appear in the UI

This commit is contained in:
clearml 2025-04-18 16:02:30 +03:00
parent 90d3578aeb
commit 88a828cd78
3 changed files with 93 additions and 36 deletions

View File

@ -1589,6 +1589,8 @@ class PipelineController(object):
force_single_script_file: bool = False, force_single_script_file: bool = False,
version: Optional[str] = None, version: Optional[str] = None,
add_run_number: bool = True, add_run_number: bool = True,
binary: Optional[str] = None,
module: Optional[str] = None
) -> "PipelineController": ) -> "PipelineController":
""" """
Manually create and populate a new Pipeline in the system. Manually create and populate a new Pipeline in the system.
@ -1620,6 +1622,10 @@ class PipelineController(object):
:param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value) :param argparse_args: Arguments to pass to the remote execution, list of string pairs (argument, value)
Notice, only supported if the codebase itself uses argparse.ArgumentParser Notice, only supported if the codebase itself uses argparse.ArgumentParser
:param force_single_script_file: If True, do not auto-detect local repository :param force_single_script_file: If True, do not auto-detect local repository
:param binary: Binary used to launch the pipeline
:param module: If specified instead of executing `script`, a module named `module` is executed.
Implies script is empty. Module can contain multiple argument for execution,
for example: module="my.module arg1 arg2"
:return: The newly created PipelineController :return: The newly created PipelineController
""" """
@ -1641,6 +1647,8 @@ class PipelineController(object):
argparse_args=argparse_args, argparse_args=argparse_args,
add_task_init_call=False, add_task_init_call=False,
force_single_script_file=force_single_script_file, force_single_script_file=force_single_script_file,
binary=binary,
module=module
) )
cls._create_pipeline_projects( cls._create_pipeline_projects(
task=pipeline_controller, task=pipeline_controller,

View File

@ -4,7 +4,7 @@ from argparse import ArgumentParser
from pathlib2 import Path from pathlib2 import Path
import clearml.backend_api.session import clearml.backend_api.session
from clearml import Task from clearml import Task, PipelineController
from clearml.backend_interface.task.populate import CreateAndPopulate from clearml.backend_interface.task.populate import CreateAndPopulate
from clearml.version import __version__ from clearml.version import __version__
@ -158,7 +158,7 @@ def setup_parser(parser: ArgumentParser) -> None:
default=None, default=None,
help="Set the Task type, optional values: " help="Set the Task type, optional values: "
"training, testing, inference, data_processing, application, monitor, " "training, testing, inference, data_processing, application, monitor, "
"controller, optimizer, service, qc, custom", "optimizer, service, qc, custom. Will be ignored if '--pipeline' is used.",
) )
parser.add_argument( parser.add_argument(
"--skip-task-init", "--skip-task-init",
@ -180,6 +180,22 @@ def setup_parser(parser: ArgumentParser) -> None:
default=None, default=None,
help="Specify the path to the offline session you want to import.", help="Specify the path to the offline session you want to import.",
) )
parser.add_argument(
"--pipeline",
action="store_true",
help="If specified, indicate that the created object is a pipeline instead of a regular task",
)
parser.add_argument(
"--pipeline-version",
type=str,
default=None,
help="Specify the pipeline version. Will be ignored if '--pipeline' is not specified",
)
parser.add_argument(
"--pipeline-dont-add-run-number",
action="store_true",
help="If specified, don't add the run number to the pipeline. Will be ignored if '--pipeline' is not specified",
)
def cli() -> None: def cli() -> None:
@ -217,49 +233,74 @@ def cli() -> None:
else: else:
if args.script and args.script.endswith(".sh") and not args.binary: if args.script and args.script.endswith(".sh") and not args.binary:
print("Detected shell script. Binary will be set to '/bin/bash'") print("Detected shell script. Binary will be set to '/bin/bash'")
create_populate = CreateAndPopulate( if args.pipeline:
project_name=args.project, pipeline = PipelineController.create(
task_name=args.name, project_name=args.project,
task_type=args.task_type, task_name=args.name,
repo=args.repo or args.folder, repo=args.repo or args.folder,
branch=args.branch, branch=args.branch,
commit=args.commit, commit=args.commit,
script=args.script, script=args.script,
module=args.module, module=args.module,
working_directory=args.cwd, working_directory=args.cwd,
packages=args.packages, packages=args.packages,
requirements_file=args.requirements, requirements_file=args.requirements,
docker=args.docker, docker=args.docker,
docker_args=args.docker_args, docker_args=args.docker_args,
docker_bash_setup_script=bash_setup_script, docker_bash_setup_script=bash_setup_script,
output_uri=args.output_uri, version=args.pipeline_version,
base_task_id=args.base_task_id, add_run_number=False if args.pipeline_dont_add_run_number else True,
add_task_init_call=not args.skip_task_init, binary=args.binary
raise_on_missing_entries=True, )
verbose=True, created_task = pipeline._task
binary=args.binary, else:
) created_task = CreateAndPopulate(
# verify args before creating the Task project_name=args.project,
create_populate.update_task_args(args.args) task_name=args.name,
print("Creating new task") task_type=args.task_type,
create_populate.create_task() repo=args.repo or args.folder,
branch=args.branch,
commit=args.commit,
script=args.script,
module=args.module,
working_directory=args.cwd,
packages=args.packages,
requirements_file=args.requirements,
docker=args.docker,
docker_args=args.docker_args,
docker_bash_setup_script=bash_setup_script,
output_uri=args.output_uri,
base_task_id=args.base_task_id,
add_task_init_call=not args.skip_task_init,
raise_on_missing_entries=True,
verbose=True,
binary=args.binary
)
# verify args before creating the Task
created_task.update_task_args(args.args)
print("Creating new task")
created_task.create_task()
# update Task args # update Task args
create_populate.update_task_args(args.args) created_task.update_task_args(args.args)
# set tags # set tags
if args.tags: if args.tags:
create_populate.task.add_tags(args.tags) created_task.task.add_tags(args.tags)
# noinspection PyProtectedMember # noinspection PyProtectedMember
create_populate.task._set_runtime_properties({"_CLEARML_TASK": True}) created_task.task._set_runtime_properties({"_CLEARML_TASK": True})
print("New task created id={}".format(create_populate.get_id())) print("New {} created id={}".format("pipeline" if args.pipeline else "task", created_task.get_id()))
if not args.queue: if not args.queue:
print("Warning: No queue was provided, leaving task in draft-mode.") print("Warning: No queue was provided, leaving {} in draft-mode.", "pipeline" if args.pipeline else "task")
exit(0) exit(0)
Task.enqueue(create_populate.task, queue_name=args.queue) Task.enqueue(created_task.task, queue_name=args.queue)
print("Task id={} sent for execution on queue {}".format(create_populate.get_id(), args.queue)) print(
print("Execution log at: {}".format(create_populate.task.get_output_log_web_page())) "{} id={} sent for execution on queue {}".format(
"Pipeline" if args.pipeline else "task", created_task.get_id(), args.queue
)
)
print("Execution log at: {}".format(created_task.task.get_output_log_web_page()))
def main() -> None: def main() -> None:

View File

@ -1212,6 +1212,8 @@ class Task(_Task):
base_task_id: Optional[str] = None, base_task_id: Optional[str] = None,
add_task_init_call: bool = True, add_task_init_call: bool = True,
force_single_script_file: bool = False, force_single_script_file: bool = False,
binary: Optional[str] = None,
module: Optional[str] = None
) -> TaskInstance: ) -> TaskInstance:
""" """
Manually create and populate a new Task (experiment) in the system. Manually create and populate a new Task (experiment) in the system.
@ -1254,6 +1256,10 @@ class Task(_Task):
Essentially clones an existing task and overrides arguments/requirements. Essentially clones an existing task and overrides arguments/requirements.
:param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution. :param add_task_init_call: If True, a 'Task.init()' call is added to the script entry point in remote execution.
:param force_single_script_file: If True, do not auto-detect local repository :param force_single_script_file: If True, do not auto-detect local repository
:param binary: Binary used to launch the entry point
:param module: If specified instead of executing `script`, a module named `module` is executed.
Implies script is empty. Module can contain multiple argument for execution,
for example: module="my.module arg1 arg2"
:return: The newly created Task (experiment) :return: The newly created Task (experiment)
:rtype: Task :rtype: Task
@ -1287,6 +1293,8 @@ class Task(_Task):
add_task_init_call=add_task_init_call, add_task_init_call=add_task_init_call,
force_single_script_file=force_single_script_file, force_single_script_file=force_single_script_file,
raise_on_missing_entries=False, raise_on_missing_entries=False,
module=module,
binary=binary
) )
task = manual_populate.create_task() task = manual_populate.create_task()
if task and argparse_args: if task and argparse_args: