Add support for setting output_uri in pipeline decorators and functions

This commit is contained in:
allegroai 2023-10-24 18:33:53 +03:00
parent 7b85555d0d
commit 97035dcf7b
3 changed files with 76 additions and 25 deletions

View File

@ -101,6 +101,7 @@ class PipelineController(object):
explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation explicit_docker_image = attrib(type=str, default=None) # The Docker image the node uses, specified at creation
recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in recursively_parse_parameters = attrib(type=bool, default=False) # if True, recursively parse parameters in
# lists, dicts, or tuples # lists, dicts, or tuples
output_uri = attrib(type=Union[bool, str], default=None) # The default location for output models and other artifacts
def __attrs_post_init__(self): def __attrs_post_init__(self):
if self.parents is None: if self.parents is None:
@ -155,7 +156,8 @@ class PipelineController(object):
repo_commit=None, # type: Optional[str] repo_commit=None, # type: Optional[str]
always_create_from_code=True, # type: bool always_create_from_code=True, # type: bool
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> None # type: (...) -> None
""" """
@ -242,6 +244,9 @@ class PipelineController(object):
def deserialize(bytes_): def deserialize(bytes_):
import dill import dill
return dill.loads(bytes_) return dill.loads(bytes_)
:param output_uri: The storage / output url for this pipeline. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
The `output_uri` of this pipeline's steps will default to this value.
""" """
if auto_version_bump is not None: if auto_version_bump is not None:
warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning) warnings.warn("PipelineController.auto_version_bump is deprecated. It will be ignored", DeprecationWarning)
@ -316,6 +321,9 @@ class PipelineController(object):
project_id=self._task.project, system_tags=self._project_system_tags) project_id=self._task.project, system_tags=self._project_system_tags)
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
if output_uri is not None:
self._task.output_uri = output_uri
self._output_uri = output_uri
self._task.set_base_docker( self._task.set_base_docker(
docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script docker_image=docker, docker_arguments=docker_args, docker_setup_bash_script=docker_bash_setup_script
) )
@ -387,7 +395,8 @@ class PipelineController(object):
base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
recursively_parse_parameters=False # type: bool recursively_parse_parameters=False, # type: bool
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -530,6 +539,8 @@ class PipelineController(object):
): ):
pass pass
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:return: True if successful :return: True if successful
""" """
@ -588,6 +599,7 @@ class PipelineController(object):
monitor_metrics=monitor_metrics or [], monitor_metrics=monitor_metrics or [],
monitor_artifacts=monitor_artifacts or [], monitor_artifacts=monitor_artifacts or [],
monitor_models=monitor_models or [], monitor_models=monitor_models or [],
output_uri=self._output_uri if output_uri is None else output_uri
) )
self._retries[name] = 0 self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@ -632,7 +644,8 @@ class PipelineController(object):
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]] tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -799,6 +812,8 @@ class PipelineController(object):
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:return: True if successful :return: True if successful
""" """
@ -838,7 +853,8 @@ class PipelineController(object):
cache_executed_step=cache_executed_step, cache_executed_step=cache_executed_step,
retry_on_failure=retry_on_failure, retry_on_failure=retry_on_failure,
status_change_callback=status_change_callback, status_change_callback=status_change_callback,
tags=tags tags=tags,
output_uri=output_uri
) )
def start( def start(
@ -1014,8 +1030,8 @@ class PipelineController(object):
return cls._get_pipeline_task().get_logger() return cls._get_pipeline_task().get_logger()
@classmethod @classmethod
def upload_model(cls, model_name, model_local_path): def upload_model(cls, model_name, model_local_path, upload_uri=None):
# type: (str, str) -> OutputModel # type: (str, str, Optional[str]) -> OutputModel
""" """
Upload (add) a model to the main Pipeline Task object. Upload (add) a model to the main Pipeline Task object.
This function can be called from any pipeline component to directly add models into the main pipeline Task This function can be called from any pipeline component to directly add models into the main pipeline Task
@ -1028,12 +1044,16 @@ class PipelineController(object):
:param model_local_path: Path to the local model file or directory to be uploaded. :param model_local_path: Path to the local model file or directory to be uploaded.
If a local directory is provided the content of the folder (recursively) will be If a local directory is provided the content of the folder (recursively) will be
packaged into a zip file and uploaded packaged into a zip file and uploaded
:param upload_uri: The URI of the storage destination for model weights upload. The default value
is the previously used URI.
:return: The uploaded OutputModel
""" """
task = cls._get_pipeline_task() task = cls._get_pipeline_task()
model_name = str(model_name) model_name = str(model_name)
model_local_path = Path(model_local_path) model_local_path = Path(model_local_path)
out_model = OutputModel(task=task, name=model_name) out_model = OutputModel(task=task, name=model_name)
out_model.update_weights(weights_filename=model_local_path.as_posix()) out_model.update_weights(weights_filename=model_local_path.as_posix(), upload_uri=upload_uri)
return out_model return out_model
@classmethod @classmethod
@ -1457,7 +1477,7 @@ class PipelineController(object):
self, docker, docker_args, docker_bash_setup_script, self, docker, docker_args, docker_bash_setup_script,
function, function_input_artifacts, function_kwargs, function_return, function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser, auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit, helper_functions packages, project_name, task_name, task_type, repo, branch, commit, helper_functions, output_uri=None
): ):
task_definition = CreateFromFunction.create_task_from_function( task_definition = CreateFromFunction.create_task_from_function(
a_function=function, a_function=function,
@ -1476,7 +1496,7 @@ class PipelineController(object):
docker=docker, docker=docker,
docker_args=docker_args, docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script, docker_bash_setup_script=docker_bash_setup_script,
output_uri=None, output_uri=output_uri,
helper_functions=helper_functions, helper_functions=helper_functions,
dry_run=True, dry_run=True,
task_template_header=self._task_template_header, task_template_header=self._task_template_header,
@ -1927,7 +1947,8 @@ class PipelineController(object):
cache_executed_step=False, # type: bool cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]] tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> bool # type: (...) -> bool
""" """
@ -2094,6 +2115,8 @@ class PipelineController(object):
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:return: True if successful :return: True if successful
""" """
@ -2107,6 +2130,9 @@ class PipelineController(object):
self._verify_node_name(name) self._verify_node_name(name)
if output_uri is None:
output_uri = self._output_uri
function_input_artifacts = {} function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts # go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items(): for k, v in function_kwargs.items():
@ -2145,7 +2171,7 @@ class PipelineController(object):
function_input_artifacts, function_kwargs, function_return, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser, auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions) task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None: elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None:
project_name = project_name or self._get_target_project() or self._task.get_project_name() project_name = project_name or self._get_target_project() or self._task.get_project_name()
@ -2155,7 +2181,7 @@ class PipelineController(object):
function_input_artifacts, function_kwargs, function_return, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser, auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions) task_type, repo, repo_branch, repo_commit, helper_functions, output_uri=output_uri)
# update configuration with the task definitions # update configuration with the task definitions
# noinspection PyProtectedMember # noinspection PyProtectedMember
self._task._set_configuration( self._task._set_configuration(
@ -2180,6 +2206,9 @@ class PipelineController(object):
if tags: if tags:
a_task.add_tags(tags) a_task.add_tags(tags)
if output_uri is not None:
a_task.output_uri = output_uri
return a_task return a_task
self._nodes[name] = self.Node( self._nodes[name] = self.Node(
@ -2195,7 +2224,8 @@ class PipelineController(object):
monitor_metrics=monitor_metrics, monitor_metrics=monitor_metrics,
monitor_models=monitor_models, monitor_models=monitor_models,
job_code_section=job_code_section, job_code_section=job_code_section,
explicit_docker_image=docker explicit_docker_image=docker,
output_uri=output_uri
) )
self._retries[name] = 0 self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \ self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
@ -2284,6 +2314,7 @@ class PipelineController(object):
disable_clone_task=disable_clone_task, disable_clone_task=disable_clone_task,
task_overrides=task_overrides, task_overrides=task_overrides,
allow_caching=node.cache_executed_step, allow_caching=node.cache_executed_step,
output_uri=node.output_uri,
**extra_args **extra_args
) )
except Exception: except Exception:
@ -3261,7 +3292,8 @@ class PipelineDecorator(PipelineController):
repo_branch=None, # type: Optional[str] repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str] repo_commit=None, # type: Optional[str]
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> () # type: (...) -> ()
""" """
@ -3341,6 +3373,9 @@ class PipelineDecorator(PipelineController):
def deserialize(bytes_): def deserialize(bytes_):
import dill import dill
return dill.loads(bytes_) return dill.loads(bytes_)
:param output_uri: The storage / output url for this pipeline. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
The `output_uri` of this pipeline's steps will default to this value.
""" """
super(PipelineDecorator, self).__init__( super(PipelineDecorator, self).__init__(
name=name, name=name,
@ -3361,7 +3396,8 @@ class PipelineDecorator(PipelineController):
repo_commit=repo_commit, repo_commit=repo_commit,
always_create_from_code=False, always_create_from_code=False,
artifact_serialization_function=artifact_serialization_function, artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri
) )
# if we are in eager execution, make sure parent class knows it # if we are in eager execution, make sure parent class knows it
@ -3583,7 +3619,7 @@ class PipelineDecorator(PipelineController):
function, function_input_artifacts, function_kwargs, function_return, function, function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser, auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name, task_type, repo, branch, commit, packages, project_name, task_name, task_type, repo, branch, commit,
helper_functions helper_functions, output_uri=None
): ):
def sanitize(function_source): def sanitize(function_source):
matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source) matched = re.match(r"[\s]*@[\w]*.component[\s\\]*\(", function_source)
@ -3621,7 +3657,7 @@ class PipelineDecorator(PipelineController):
docker=docker, docker=docker,
docker_args=docker_args, docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script, docker_bash_setup_script=docker_bash_setup_script,
output_uri=None, output_uri=output_uri,
helper_functions=helper_functions, helper_functions=helper_functions,
dry_run=True, dry_run=True,
task_template_header=self._task_template_header, task_template_header=self._task_template_header,
@ -3703,7 +3739,8 @@ class PipelineDecorator(PipelineController):
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa status_change_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, str], None]] # noqa
tags=None # type: Optional[Union[str, Sequence[str]]] tags=None, # type: Optional[Union[str, Sequence[str]]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -3841,6 +3878,8 @@ class PipelineDecorator(PipelineController):
:param tags: A list of tags for the specific pipeline step. :param tags: A list of tags for the specific pipeline step.
When executing a Pipeline remotely When executing a Pipeline remotely
(i.e. launching the pipeline from the UI/enqueuing it), this method has no effect. (i.e. launching the pipeline from the UI/enqueuing it), this method has no effect.
:param output_uri: The storage / output url for this step. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:return: function wrapper :return: function wrapper
""" """
@ -3883,7 +3922,8 @@ class PipelineDecorator(PipelineController):
pre_execute_callback=pre_execute_callback, pre_execute_callback=pre_execute_callback,
post_execute_callback=post_execute_callback, post_execute_callback=post_execute_callback,
status_change_callback=status_change_callback, status_change_callback=status_change_callback,
tags=tags tags=tags,
output_uri=output_uri
) )
if cls._singleton: if cls._singleton:
@ -4109,7 +4149,8 @@ class PipelineDecorator(PipelineController):
repo_branch=None, # type: Optional[str] repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str] repo_commit=None, # type: Optional[str]
artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]] artifact_serialization_function=None, # type: Optional[Callable[[Any], Union[bytes, bytearray]]]
artifact_deserialization_function=None # type: Optional[Callable[[bytes], Any]] artifact_deserialization_function=None, # type: Optional[Callable[[bytes], Any]]
output_uri=None # type: Optional[Union[str, bool]]
): ):
# type: (...) -> Callable # type: (...) -> Callable
""" """
@ -4220,6 +4261,9 @@ class PipelineDecorator(PipelineController):
def deserialize(bytes_): def deserialize(bytes_):
import dill import dill
return dill.loads(bytes_) return dill.loads(bytes_)
:param output_uri: The storage / output url for this pipeline. This is the default location for output
models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
The `output_uri` of this pipeline's steps will default to this value.
""" """
def decorator_wrap(func): def decorator_wrap(func):
@ -4265,7 +4309,8 @@ class PipelineDecorator(PipelineController):
repo_branch=repo_branch, repo_branch=repo_branch,
repo_commit=repo_commit, repo_commit=repo_commit,
artifact_serialization_function=artifact_serialization_function, artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri
) )
ret_val = func(**pipeline_kwargs) ret_val = func(**pipeline_kwargs)
LazyEvalWrapper.trigger_all_remote_references() LazyEvalWrapper.trigger_all_remote_references()
@ -4316,7 +4361,8 @@ class PipelineDecorator(PipelineController):
repo_branch=repo_branch, repo_branch=repo_branch,
repo_commit=repo_commit, repo_commit=repo_commit,
artifact_serialization_function=artifact_serialization_function, artifact_serialization_function=artifact_serialization_function,
artifact_deserialization_function=artifact_deserialization_function artifact_deserialization_function=artifact_deserialization_function,
output_uri=output_uri
) )
a_pipeline._args_map = args_map or {} a_pipeline._args_map = args_map or {}

View File

@ -522,6 +522,7 @@ class ClearmlJob(BaseJob):
disable_clone_task=False, # type: bool disable_clone_task=False, # type: bool
allow_caching=False, # type: bool allow_caching=False, # type: bool
target_project=None, # type: Optional[str] target_project=None, # type: Optional[str]
output_uri=None, # type: Optional[Union[str, bool]]
**kwargs # type: Any **kwargs # type: Any
): ):
# type: (...) -> () # type: (...) -> ()
@ -545,6 +546,8 @@ class ClearmlJob(BaseJob):
If True, use the base_task_id directly (base-task must be in draft-mode / created), If True, use the base_task_id directly (base-task must be in draft-mode / created),
:param bool allow_caching: If True, check if we have a previously executed Task with the same specification. :param bool allow_caching: If True, check if we have a previously executed Task with the same specification.
If we do, use it and set internal is_cached flag. Default False (always create new Task). If we do, use it and set internal is_cached flag. Default False (always create new Task).
:param Union[str, bool] output_uri: The storage / output url for this job. This is the default location for
output models and other artifacts. Check Task.init reference docs for more info (output_uri is a parameter).
:param str target_project: Optional, Set the target project name to create the cloned Task in. :param str target_project: Optional, Set the target project name to create the cloned Task in.
""" """
super(ClearmlJob, self).__init__() super(ClearmlJob, self).__init__()
@ -660,6 +663,8 @@ class ClearmlJob(BaseJob):
# noinspection PyProtectedMember # noinspection PyProtectedMember
self.task._edit(**sections) self.task._edit(**sections)
if output_uri is not None:
self.task.output_uri = output_uri
self._set_task_cache_hash(self.task, task_hash) self._set_task_cache_hash(self.task, task_hash)
self.task_started = False self.task_started = False
self._worker = None self._worker = None

View File

@ -178,7 +178,7 @@ class CreateAndPopulate(object):
project=Task.get_project_id(self.project_name), project=Task.get_project_id(self.project_name),
type=str(self.task_type or Task.TaskTypes.training), type=str(self.task_type or Task.TaskTypes.training),
) # type: dict ) # type: dict
if self.output_uri: if self.output_uri is not None:
task_state['output'] = dict(destination=self.output_uri) task_state['output'] = dict(destination=self.output_uri)
else: else:
task_state = dict(script={}) task_state = dict(script={})
@ -391,7 +391,7 @@ class CreateAndPopulate(object):
return task return task
def _set_output_uri(self, task): def _set_output_uri(self, task):
if self.output_uri: if self.output_uri is not None:
try: try:
task.output_uri = self.output_uri task.output_uri = self.output_uri
except ValueError: except ValueError: