Allow pipeline steps to return string paths without them being treated as a folder artifact and zipped (#780)

This commit is contained in:
allegroai 2022-10-14 10:32:36 +03:00
parent a64b918c79
commit 6d6b54f5a1
2 changed files with 375 additions and 142 deletions

View File

@ -691,112 +691,42 @@ class PipelineController(object):
:return: True if successful
"""
# always store callback functions (even when running remotely)
if pre_execute_callback:
self._pre_step_callbacks[name] = pre_execute_callback
if post_execute_callback:
self._post_step_callbacks[name] = post_execute_callback
self._verify_node_name(name)
function_kwargs = function_kwargs or {}
function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items():
if v is None:
continue
if self._step_ref_pattern.match(str(v)):
# check for step artifacts
step, _, artifact = v[2:-1].partition('.')
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact)
continue
# verify the reference only if we are running locally (on remote when we have multiple
# steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG)
if self._task.running_locally():
self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v)
elif not isinstance(v, (float, int, bool, six.string_types)):
function_input_artifacts[k] = "{}.{}.{}".format(self._task.id, name, k)
self._task.upload_artifact(
"{}.{}".format(name, k),
artifact_object=v,
wait_on_upload=True,
extension_name=".pkl" if isinstance(v, dict) else None,
)
default_kwargs = inspect.getfullargspec(function)
if default_kwargs and default_kwargs.args and default_kwargs.defaults:
for key, val in zip(default_kwargs.args[-len(default_kwargs.defaults):], default_kwargs.defaults):
function_kwargs.setdefault(key, val)
function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts}
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
{"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
for k, v in function_input_artifacts.items()}
)
job_code_section = None
task_name = task_name or name or None
if self._mock_execution:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions)
elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions)
# update configuration with the task definitions
# noinspection PyProtectedMember
self._task._set_configuration(
name=name, config_type='json',
config_text=json.dumps(task_definition, indent=1)
)
job_code_section = name
else:
# load task definition from configuration
# noinspection PyProtectedMember
config_text = self._task._get_configuration_text(name=name)
task_definition = json.loads(config_text) if config_text else dict()
def _create_task(_):
a_task = Task.create(
project_name=project_name,
task_name=task_definition.get('name'),
task_type=task_definition.get('type'),
)
# replace reference
a_task.update_task(task_definition)
return a_task
self._nodes[name] = self.Node(
name=name, base_task_id=None, parents=parents or [],
queue=execution_queue, timeout=time_limit,
parameters=parameters,
clone_task=False,
cache_executed_step=cache_executed_step,
task_factory_func=_create_task,
continue_on_fail=continue_on_fail,
return_artifacts=function_return,
monitor_artifacts=monitor_artifacts,
return self._add_function_step(
name=name,
function=function,
function_kwargs=function_kwargs,
function_return=function_return,
project_name=project_name,
task_name=task_name,
task_type=task_type,
auto_connect_frameworks=auto_connect_frameworks,
auto_connect_arg_parser=auto_connect_arg_parser,
packages=packages,
repo=repo,
repo_branch=repo_branch,
repo_commit=repo_commit,
helper_functions=helper_functions,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
parents=parents,
execution_queue=execution_queue,
monitor_metrics=monitor_metrics,
monitor_artifacts=monitor_artifacts,
monitor_models=monitor_models,
job_code_section=job_code_section,
time_limit=time_limit,
continue_on_fail=continue_on_fail,
pre_execute_callback=pre_execute_callback,
post_execute_callback=post_execute_callback,
cache_executed_step=cache_executed_step,
retry_on_failure=retry_on_failure,
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
(functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
return True
def start(
self,
@ -1659,6 +1589,289 @@ class PipelineController(object):
# return False if we did not cover all the nodes
return not bool(set(self._nodes.keys()) - visited)
def _add_function_step(
self,
name, # type: str
function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_return=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
auto_connect_frameworks=None, # type: Optional[dict]
auto_connect_arg_parser=None, # type: Optional[dict]
packages=None, # type: Optional[Union[str, Sequence[str]]]
repo=None, # type: Optional[str]
repo_branch=None, # type: Optional[str]
repo_commit=None, # type: Optional[str]
helper_functions=None, # type: Optional[Sequence[Callable]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]],
execution_queue=None, # type: Optional[str]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
monitor_artifacts=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
monitor_models=None, # type: Optional[List[Union[str, Tuple[str, str]]]]
time_limit=None, # type: Optional[float]
continue_on_fail=False, # type: bool
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
retry_on_failure=None, # type: Optional[Union[int, Callable[[PipelineController, PipelineController.Node, int], bool]]] # noqa
):
# type: (...) -> bool
"""
Create a Task from a function, including wrapping the function input arguments
into the hyper-parameter section as kwargs, and storing function results as named artifacts
Example:
.. code-block:: py
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_return=['mul', 'square'])
Example arguments from other Tasks (artifact):
.. code-block:: py
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_kwargs={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
:param name: Unique of the step. For example `stage1`
:param function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults
Optional, pass input arguments to the function from other Tasks's output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_return: Provide a list of names for all the results.
If not provided, no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task, if not provided use `name` argument.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param auto_connect_frameworks: Control the frameworks auto connect, see `Task.init` auto_connect_frameworks
:param auto_connect_arg_parser: Control the ArgParser auto connect, see `Task.init` auto_connect_arg_parser
:param packages: Manually specify a list of required packages or a local requirements.txt file.
Example: ["tqdm>=2.1", "scikit-learn"] or "./requirements.txt"
If not provided, packages are automatically added based on the imports used in the function.
:param repo: Optional, specify a repository to attach to the function, when remotely executing.
Allow users to execute the function inside the specified repository, enabling to load modules/script
from a repository Notice the execution work directory will be the repository root folder.
Supports both git repo url link, and local repository path.
Example remote url: 'https://github.com/user/repo.git'
Example local repo copy: './repo' -> will automatically store the remote
repo url and commit ID based on the locally cloned copy
:param repo_branch: Optional, specify the remote repository branch (Ignored, if local repo path is used)
:param repo_commit: Optional, specify the repository commit id (Ignored, if local repo path is used)
:param helper_functions: Optional, a list of helper functions to make available
for the standalone function Task.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param parents: Optional list of parent nodes in the DAG.
The current step in the pipeline will be sent for execution only after all the parent nodes
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class
:param monitor_metrics: Optional, log the step's metrics on the pipeline Task.
Format is a list of pairs metric (title, series) to log:
[(step_metric_title, step_metric_series), ]
Example: [('test', 'accuracy'), ]
Or a list of tuple pairs, to specify a different target metric for to use on the pipeline Task:
[((step_metric_title, step_metric_series), (target_metric_title, target_metric_series)), ]
Example: [[('test', 'accuracy'), ('model', 'accuracy')], ]
:param monitor_artifacts: Optional, log the step's artifacts on the pipeline Task.
Provided a list of artifact names existing on the step's Task, they will also appear on the Pipeline itself.
Example: [('processed_data', 'final_processed_data'), ]
Alternatively user can also provide a list of artifacts to monitor
(target artifact name will be the same as original artifact name)
Example: ['processed_data', ]
:param monitor_models: Optional, log the step's output models on the pipeline Task.
Provided a list of model names existing on the step's Task, they will also appear on the Pipeline itself.
Example: [('model_weights', 'final_model_weights'), ]
Alternatively user can also provide a list of models to monitor
(target models name will be the same as original model)
Example: ['model_weights', ]
To select the latest (lexicographic) model use "model_*", or the last created model with just "*"
Example: ['model_weights_*', ]
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param continue_on_fail: (default False). If True, failed step will not cause the pipeline to stop
(or marked as failed). Notice, that steps that are connected (or indirectly connected)
to the failed step will be skipped.
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
`parameters` are the configuration arguments passed to the ClearmlJob.
If the callback returned value is `False`,
the Node is skipped and so is any node in the DAG that relies on this node.
Notice the `parameters` are already parsed,
e.g. `${step1.parameters.Args/param}` is replaced with relevant value.
.. code-block:: py
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
pass
:param cache_executed_step: If True, before launching the new step,
after updating with the latest configuration, check if an exact Task with the same parameter/code
was already executed. If it was found, use it instead of launching a new Task.
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
:param retry_on_failure: Integer (number of retries) or Callback function that returns True to allow a retry
- Integer: In case of node failure, retry the node the number of times indicated by this parameter.
- Callable: A function called on node failure. Takes as parameters:
the PipelineController instance, the PipelineController.Node that failed and an int
representing the number of previous retries for the node that failed
The function must return a `bool`: True if the node should be retried and False otherwise.
If True, the node will be re-queued and the number of retries left will be decremented by 1.
By default, if this callback is not specified, the function will be retried the number of
times indicated by `retry_on_failure`.
.. code-block:: py
def example_retry_on_failure_callback(pipeline, node, retries):
print(node.name, ' failed')
# allow up to 5 retries (total of 6 runs)
return retries < 5
:return: True if successful
"""
# always store callback functions (even when running remotely)
if pre_execute_callback:
self._pre_step_callbacks[name] = pre_execute_callback
if post_execute_callback:
self._post_step_callbacks[name] = post_execute_callback
self._verify_node_name(name)
function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items():
if v is None:
continue
if self._step_ref_pattern.match(str(v)):
# check for step artifacts
step, _, artifact = v[2:-1].partition('.')
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact)
continue
# verify the reference only if we are running locally (on remote when we have multiple
# steps from tasks the _nodes is till empty, only after deserializing we will have the full DAG)
if self._task.running_locally():
self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v)
elif not isinstance(v, (float, int, bool, six.string_types)):
function_input_artifacts[k] = "{}.{}.{}".format(self._task.id, name, k)
self._upload_pipeline_artifact(artifact_name="{}.{}".format(name, k), artifact_object=v)
function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts}
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
{"{}/{}".format(CreateFromFunction.input_artifact_section, k): str(v)
for k, v in function_input_artifacts.items()}
)
job_code_section = None
task_name = task_name or name or None
if self._mock_execution:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions)
elif self._task.running_locally() or self._task.get_configuration_object(name=name) is None:
project_name = project_name or self._get_target_project() or self._task.get_project_name()
task_definition = self._create_task_from_function(
docker, docker_args, docker_bash_setup_script, function,
function_input_artifacts, function_kwargs, function_return,
auto_connect_frameworks, auto_connect_arg_parser,
packages, project_name, task_name,
task_type, repo, repo_branch, repo_commit, helper_functions)
# update configuration with the task definitions
# noinspection PyProtectedMember
self._task._set_configuration(
name=name, config_type='json',
config_text=json.dumps(task_definition, indent=1)
)
job_code_section = name
else:
# load task definition from configuration
# noinspection PyProtectedMember
config_text = self._task._get_configuration_text(name=name)
task_definition = json.loads(config_text) if config_text else dict()
def _create_task(_):
a_task = Task.create(
project_name=project_name,
task_name=task_definition.get('name'),
task_type=task_definition.get('type'),
)
# replace reference
a_task.update_task(task_definition)
return a_task
self._nodes[name] = self.Node(
name=name, base_task_id=None, parents=parents or [],
queue=execution_queue, timeout=time_limit,
parameters=parameters,
clone_task=False,
cache_executed_step=cache_executed_step,
task_factory_func=_create_task,
continue_on_fail=continue_on_fail,
return_artifacts=function_return,
monitor_artifacts=monitor_artifacts,
monitor_metrics=monitor_metrics,
monitor_models=monitor_models,
job_code_section=job_code_section,
)
self._retries[name] = 0
self._retries_callbacks[name] = retry_on_failure if callable(retry_on_failure) else \
(functools.partial(self._default_retry_on_failure_callback, max_retries=retry_on_failure)
if isinstance(retry_on_failure, int) else self._retry_on_failure_callback)
return True
def _relaunch_node(self, node):
if not node.job:
getLogger("clearml.automation.controller").warning(
@ -1941,24 +2154,6 @@ class PipelineController(object):
)
return False
@classmethod
def _wait_for_node(cls, node):
pool_period = 5.0 if cls._debug_execute_step_process else 20.0
while True:
node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True)
job_status = str(node.job.status(force=True))
if (
(
job_status == str(Task.TaskStatusEnum.stopped)
and node.job.status_message() == cls._relaunch_status_message
)
or (job_status == str(Task.TaskStatusEnum.failed) and not cls._final_failure.get(node.name))
or not node.job.is_stopped()
):
sleep(pool_period)
else:
break
@classmethod
def _get_node_color(cls, node):
# type (self.Mode) -> str
@ -2637,6 +2832,14 @@ class PipelineController(object):
def _default_retry_on_failure_callback(self, _pipeline_controller, _node, retries, max_retries=None):
return retries < (self._def_max_retry_on_failure if max_retries is None else max_retries)
def _upload_pipeline_artifact(self, artifact_name, artifact_object):
self._task.upload_artifact(
name=artifact_name,
artifact_object=artifact_object,
wait_on_upload=True,
extension_name=".pkl" if isinstance(artifact_object, dict) else None,
)
class PipelineDecorator(PipelineController):
_added_decorator = [] # type: List[dict]
@ -2722,7 +2925,7 @@ class PipelineDecorator(PipelineController):
PipelineDecorator._default_execution_queue)
for n in self._added_decorator:
self.add_function_step(**n)
self._add_function_step(**n)
self._added_decorator.clear()
PipelineDecorator._singleton = self
self._reference_callback = []
@ -3001,6 +3204,24 @@ class PipelineDecorator(PipelineController):
return task_hash
@classmethod
def _wait_for_node(cls, node):
pool_period = 5.0 if cls._debug_execute_step_process else 20.0
while True:
node.job.wait(pool_period=pool_period, aborted_nonresponsive_as_running=True)
job_status = str(node.job.status(force=True))
if (
(
job_status == str(Task.TaskStatusEnum.stopped)
and node.job.status_message() == cls._relaunch_status_message
)
or (job_status == str(Task.TaskStatusEnum.failed) and not cls._final_failure.get(node.name))
or not node.job.is_stopped()
):
sleep(pool_period)
else:
break
@classmethod
def component(
cls,
@ -3157,7 +3378,7 @@ class PipelineDecorator(PipelineController):
)
if cls._singleton:
cls._singleton.add_function_step(**add_step_spec)
cls._singleton._add_function_step(**add_step_spec)
else:
cls._added_decorator.append(add_step_spec)
@ -3319,7 +3540,10 @@ class PipelineDecorator(PipelineController):
cls._evaluated_return_values[_tid] = []
cls._evaluated_return_values[_tid].append(_node.name)
return Task.get_task(_node.job.task_id()).artifacts[return_name].get()
task = Task.get_task(_node.job.task_id())
if return_name in task.artifacts:
return task.artifacts[return_name].get()
return task.get_parameters(cast=True)[CreateFromFunction.return_section + "/" + return_name]
return_w = [LazyEvalWrapper(
callback=functools.partial(result_wrapper, n),
@ -3562,11 +3786,8 @@ class PipelineDecorator(PipelineController):
waited = True
# store the pipeline result of we have any:
if return_value and pipeline_result is not None:
a_pipeline._task.upload_artifact(
name=str(return_value),
artifact_object=pipeline_result,
wait_on_upload=True,
extension_name=".pkl" if isinstance(pipeline_result, dict) else None,
a_pipeline._upload_pipeline_artifact(
artifact_name=str(return_value), artifact_object=pipeline_result
)
# now we can stop the pipeline
@ -3674,12 +3895,7 @@ class PipelineDecorator(PipelineController):
else:
# we need to create an artifact
artifact_name = 'result_{}_{}'.format(re.sub(r'\W+', '', _node.name), k)
cls._singleton._task.upload_artifact(
name=artifact_name,
artifact_object=v,
wait_on_upload=True,
extension_name=".pkl" if isinstance(v, dict) else None,
)
cls._singleton._upload_pipeline_artifact(artifact_name=artifact_name, artifact_object=v)
_node.parameters["{}/{}".format(CreateFromFunction.input_artifact_section, k)] = \
"{}.{}".format(cls._singleton._task.id, artifact_name)

View File

@ -471,10 +471,12 @@ class CreateAndPopulate(object):
class CreateFromFunction(object):
kwargs_section = 'kwargs'
input_artifact_section = 'kwargs_artifacts'
kwargs_section = "kwargs"
return_section = "return"
input_artifact_section = "kwargs_artifacts"
task_template = """from clearml import Task, TaskTypes
from clearml.automation.controller import PipelineDecorator
from clearml.utilities.proxy_object import get_basic_type
{function_source}
@ -488,23 +490,36 @@ if __name__ == '__main__':
task.connect(kwargs, name='{kwargs_section}')
function_input_artifacts = {function_input_artifacts}
params = task.get_parameters() or dict()
return_section = '{return_section}'
for k, v in params.items():
if not v or not k.startswith('{input_artifact_section}/'):
continue
k = k.replace('{input_artifact_section}/', '', 1)
task_id, artifact_name = v.split('.', 1)
kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get()
parent_task = Task.get_task(task_id=task_id)
if artifact_name in parent_task.artifacts:
kwargs[k] = parent_task.artifacts[artifact_name].get()
else:
kwargs[k] = parent_task.get_parameters(cast=True)[return_section + '/' + artifact_name]
results = {function_name}(**kwargs)
result_names = {function_return}
if result_names:
if not isinstance(results, (tuple, list)) or len(result_names) == 1:
results = [results]
parameters = dict()
parameters_types = dict()
for name, artifact in zip(result_names, results):
task.upload_artifact(
name=name,
artifact_object=artifact,
extension_name='.pkl' if isinstance(artifact, dict) else None
)
if isinstance(artifact, (float, int, bool, str)):
parameters[return_section + '/' + name] = artifact
parameters_types[return_section + '/' + name] = get_basic_type(artifact)
else:
task.upload_artifact(
name=name,
artifact_object=artifact,
extension_name='.pkl' if isinstance(artifact, dict) else None
)
if parameters:
task._set_parameters(parameters, __parameters_types=parameters_types, __update=True)
"""
@classmethod
@ -663,7 +678,9 @@ if __name__ == '__main__':
function_kwargs=function_kwargs,
function_input_artifacts=function_input_artifacts,
function_name=function_name,
function_return=function_return)
function_return=function_return,
return_section=cls.return_section,
)
temp_dir = repo if repo and os.path.isdir(repo) else None
with tempfile.NamedTemporaryFile('w', suffix='.py', dir=temp_dir) as temp_file: