From 97035dcf7b2e97c7a286a6924fa801b2561f2c4b Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 24 Oct 2023 18:33:53 +0300 Subject: [PATCH] Add support for setting `output_uri` in pipeline decorators and functions --- clearml/automation/controller.py | 92 ++++++++++++++++------ clearml/automation/job.py | 5 ++ clearml/backend_interface/task/populate.py | 4 +- 3 files changed, 76 insertions(+), 25 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index acb5b94e..f61c32d6 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -101,6 +101,7 @@ class PipelineController(object): explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in # lists, dicts, or tuples + output_uri = attrib(type=Union[bool, str], default=None) # The default location for output models and other artifacts def __attrs_post_init__(self): if self.parents is None: @@ -155,7 +156,8 @@ class PipelineController(object): repo_commit=None, # type: Optional[str] always_create_from_code=True, # type: bool artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> None """ @@ -242,6 +244,9 @@ class PipelineController(object): def deserialize(bytes_): import dill return dill.loads(bytes_) + :param output_uri: The storage / output url for this pipeline. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). + The `output_uri` of this pipeline's steps will default to this value. """ if auto_version_bump is not None: warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning) @@ -316,6 +321,9 @@ class PipelineController(object): project_id=self._task.project, system_tags=self._project_system_tags) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) + if output_uri is not None: + self._task.output_uri = output_uri + self._output_uri = output_uri self._task.set_base_docker( docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script ) @@ -387,7 +395,8 @@ class PipelineController(object): base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - recursively_parse_parameters=False # type: bool + recursively_parse_parameters=False, # type: bool + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> bool """ @@ -529,7 +538,9 @@ class PipelineController(object): previous_status # type: str ): pass - + + :param output_uri: The storage / output url for this step. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :return: True if successful """ @@ -588,6 +599,7 @@ class PipelineController(object): monitor_metrics=monitor_metrics or [], monitor_artifacts=monitor_artifacts or [], monitor_models=monitor_models or [], + output_uri=self._output_uri if output_uri is None else output_uri ) self._retries[name] = 0 self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ @@ -632,7 +644,8 @@ class PipelineController(object): cache_executed_step=False, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None # type: Optional[Union[str, Sequence[str]]] + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> bool """ @@ -799,6 +812,8 @@ class PipelineController(object): :param tags: A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + :param output_uri: The storage / output url for this step. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :return: True if successful """ @@ -838,7 +853,8 @@ class PipelineController(object): cache_executed_step=cache_executed_step, retry_on_failure=retry_on_failure, status_change_callback=status_change_callback, - tags=tags + tags=tags, + output_uri=output_uri ) def start( @@ -1014,8 +1030,8 @@ class PipelineController(object): return cls._get_pipeline_task().get_logger() @classmethod - def upload_model(cls, model_name, model_local_path): - # type: (str, str) -> OutputModel + def upload_model(cls, model_name, model_local_path, upload_uri=None): + # type: (str, str, Optional[str]) -> OutputModel """ Upload (add) a model to the main Pipeline Task object. This function can be called from any pipeline component to directly add models into the main pipeline Task @@ -1028,12 +1044,16 @@ class PipelineController(object): :param model_local_path: Path to the local model file or directory to be uploaded. If a local directory is provided the content of the folder (recursively) will be packaged into a zip file and uploaded + :param upload_uri: The URI of the storage destination for model weights upload. The default value + is the previously used URI. + + :return: The uploaded OutputModel """ task = cls._get_pipeline_task() model_name = str(model_name) model_local_path = Path(model_local_path) out_model = OutputModel(task=task, name=model_name) - out_model.update_weights(weights_filename=model_local_path.as_posix()) + out_model.update_weights(weights_filename=model_local_path.as_posix(), upload_uri=upload_uri) return out_model @classmethod @@ -1457,7 +1477,7 @@ class PipelineController(object): self, docker, docker_args, docker_bash_setup_script, function, function_input_artifacts, function_kwargs, function_return, auto_connect_frameworks, auto_connect_arg_parser, - packages, project_name, task_name, task_type, repo, branch, commit, helper_functions + packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, output_uri=None ): task_definition = CreateFromFunction.create_task_from_function( a_function=function, @@ -1476,7 +1496,7 @@ class PipelineController(object): docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, - output_uri=None, + output_uri=output_uri, helper_functions=helper_functions, dry_run=True, task_template_header=self._task_template_header, @@ -1927,7 +1947,8 @@ class PipelineController(object): cache_executed_step=False, # type: bool retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None # type: Optional[Union[str, Sequence[str]]] + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> bool """ @@ -2094,6 +2115,8 @@ class PipelineController(object): :param tags: A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + :param output_uri: The storage / output url for this step. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :return: True if successful """ @@ -2107,6 +2130,9 @@ class PipelineController(object): self._verify_node_name(name) + if output_uri is None: + output_uri = self._output_uri + function_input_artifacts = {} # go over function_kwargs, split it into string and input artifacts for k, v in function_kwargs.items(): @@ -2145,7 +2171,7 @@ class PipelineController(object): function_input_artifacts, function_kwargs, function_return, auto_connect_frameworks, auto_connect_arg_parser, packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions) + task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri) elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None: project_name = project_name or self._get_target_project() or self._task.get_project_name() @@ -2155,7 +2181,7 @@ class PipelineController(object): function_input_artifacts, function_kwargs, function_return, auto_connect_frameworks, auto_connect_arg_parser, packages, project_name, task_name, - task_type, repo, repo_branch, repo_commit, helper_functions) + task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri) # update configuration with the task definitions # noinspection PyProtectedMember self._task._set_configuration( @@ -2180,6 +2206,9 @@ class PipelineController(object): if tags: a_task.add_tags(tags) + if output_uri is not None: + a_task.output_uri = output_uri + return a_task self._nodes[name] = self.Node( @@ -2195,7 +2224,8 @@ class PipelineController(object): monitor_metrics=monitor_metrics, monitor_models=monitor_models, job_code_section=job_code_section, - explicit_docker_image=docker + explicit_docker_image=docker, + output_uri=output_uri ) self._retries[name] = 0 self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ @@ -2284,6 +2314,7 @@ class PipelineController(object): disable_clone_task=disable_clone_task, task_overrides=task_overrides, allow_caching=node.cache_executed_step, + output_uri=node.output_uri, **extra_args ) except Exception: @@ -3261,7 +3292,8 @@ class PipelineDecorator(PipelineController): repo_branch=None, # type: Optional[str] repo_commit=None, # type: Optional[str] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> () """ @@ -3341,6 +3373,9 @@ class PipelineDecorator(PipelineController): def deserialize(bytes_): import dill return dill.loads(bytes_) + :param output_uri: The storage / output url for this pipeline. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). + The `output_uri` of this pipeline's steps will default to this value. """ super(PipelineDecorator, self).__init__( name=name, @@ -3361,7 +3396,8 @@ class PipelineDecorator(PipelineController): repo_commit=repo_commit, always_create_from_code=False, artifact_serialization_function=artifact_serialization_function, - artifact_deserialization_function=artifact_deserialization_function + artifact_deserialization_function=artifact_deserialization_function, + output_uri=output_uri ) # if we are in eager execution, make sure parent class knows it @@ -3583,7 +3619,7 @@ class PipelineDecorator(PipelineController): function, function_input_artifacts, function_kwargs, function_return, auto_connect_frameworks, auto_connect_arg_parser, packages, project_name, task_name, task_type, repo, branch, commit, - helper_functions + helper_functions, output_uri=None ): def sanitize(function_source): matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source) @@ -3621,7 +3657,7 @@ class PipelineDecorator(PipelineController): docker=docker, docker_args=docker_args, docker_bash_setup_script=docker_bash_setup_script, - output_uri=None, + output_uri=output_uri, helper_functions=helper_functions, dry_run=True, task_template_header=self._task_template_header, @@ -3703,7 +3739,8 @@ class PipelineDecorator(PipelineController): pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa - tags=None # type: Optional[Union[str, Sequence[str]]] + tags=None, # type: Optional[Union[str, Sequence[str]]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> Callable """ @@ -3841,6 +3878,8 @@ class PipelineDecorator(PipelineController): :param tags: A list of tags for the specific pipeline step. When executing a Pipeline remotely (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. + :param output_uri: The storage / output url for this step. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :return: function wrapper """ @@ -3883,7 +3922,8 @@ class PipelineDecorator(PipelineController): pre_execute_callback=pre_execute_callback, post_execute_callback=post_execute_callback, status_change_callback=status_change_callback, - tags=tags + tags=tags, + output_uri=output_uri ) if cls._singleton: @@ -4109,7 +4149,8 @@ class PipelineDecorator(PipelineController): repo_branch=None, # type: Optional[str] repo_commit=None, # type: Optional[str] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] - artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] + artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]] + output_uri=None # type: Optional[Union[str, bool]] ): # type: (...) -> Callable """ @@ -4220,6 +4261,9 @@ class PipelineDecorator(PipelineController): def deserialize(bytes_): import dill return dill.loads(bytes_) + :param output_uri: The storage / output url for this pipeline. This is the default location for output + models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). + The `output_uri` of this pipeline's steps will default to this value. """ def decorator_wrap(func): @@ -4265,7 +4309,8 @@ class PipelineDecorator(PipelineController): repo_branch=repo_branch, repo_commit=repo_commit, artifact_serialization_function=artifact_serialization_function, - artifact_deserialization_function=artifact_deserialization_function + artifact_deserialization_function=artifact_deserialization_function, + output_uri=output_uri ) ret_val = func(**pipeline_kwargs) LazyEvalWrapper.trigger_all_remote_references() @@ -4316,7 +4361,8 @@ class PipelineDecorator(PipelineController): repo_branch=repo_branch, repo_commit=repo_commit, artifact_serialization_function=artifact_serialization_function, - artifact_deserialization_function=artifact_deserialization_function + artifact_deserialization_function=artifact_deserialization_function, + output_uri=output_uri ) a_pipeline._args_map = args_map or {} diff --git a/clearml/automation/job.py b/clearml/automation/job.py index dde8eb34..dcd52ce3 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -522,6 +522,7 @@ class ClearmlJob(BaseJob): disable_clone_task=False, # type: bool allow_caching=False, # type: bool target_project=None, # type: Optional[str] + output_uri=None, # type: Optional[Union[str, bool]] **kwargs # type: Any ): # type: (...) -> () @@ -545,6 +546,8 @@ class ClearmlJob(BaseJob): If True, use the base_task_id directly (base-task must be in draft-mode / created), :param bool allow_caching: If True, check if we have a previously executed Task with the same specification. If we do, use it and set internal is_cached flag. Default False (always create new Task). + :param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for + output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter). :param str target_project: Optional, Set the target project name to create the cloned Task in. """ super(ClearmlJob, self).__init__() @@ -660,6 +663,8 @@ class ClearmlJob(BaseJob): # noinspection PyProtectedMember self.task._edit(**sections) + if output_uri is not None: + self.task.output_uri = output_uri self._set_task_cache_hash(self.task, task_hash) self.task_started = False self._worker = None diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index a4361365..0f1fe53c 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -178,7 +178,7 @@ class CreateAndPopulate(object): project=Task.get_project_id(self.project_name), type=str(self.task_type or Task.TaskTypes.training), ) # type: dict - if self.output_uri: + if self.output_uri is not None: task_state['output'] = dict(destination=self.output_uri) else: task_state = dict(script={}) @@ -391,7 +391,7 @@ class CreateAndPopulate(object): return task def _set_output_uri(self, task): - if self.output_uri: + if self.output_uri is not None: try: task.output_uri = self.output_uri except ValueError: