Add PipelineController v2 (notice new constructor is not backwards compatible)

This commit is contained in:
allegroai 2021-09-03 03:27:33 +03:00
parent dbfe45d005
commit e491f4352f
4 changed files with 924 additions and 288 deletions

View File

@ -1,18 +1,21 @@
import json
import re
from copy import copy
from datetime import datetime
from logging import getLogger
from threading import Thread, Event, RLock
from time import time
from typing import Sequence, Optional, Mapping, Callable, Any, Union, List, Dict
from attr import attrib, attrs
from typing import Sequence, Optional, Mapping, Callable, Any, Union, List
from .job import LocalClearmlJob
from ..automation import ClearmlJob
from ..backend_interface.task.populate import CreateFromFunction
from ..backend_interface.util import get_or_create_project
from ..debugging.log import LoggerRoot
from ..task import Task
from ..automation import ClearmlJob
from ..model import BaseModel
from ..task import Task
from ..utilities.process.mp import leave_process
@ -26,69 +29,61 @@ class PipelineController(object):
_tag = 'pipeline'
_step_pattern = r"\${[^}]*}"
_config_section = 'Pipeline'
_args_section = 'PipelineArgs'
_pipeline_step_ref = 'pipeline'
_reserved_pipeline_names = (_pipeline_step_ref, )
_task_project_lookup = {}
_clearml_job_class = ClearmlJob
@attrs
class Node(object):
name = attrib(type=str)
base_task_id = attrib(type=str)
queue = attrib(type=str, default=None)
parents = attrib(type=list, default=[])
timeout = attrib(type=float, default=None)
parameters = attrib(type=dict, default={})
task_overrides = attrib(type=dict, default={})
executed = attrib(type=str, default=None)
clone_task = attrib(type=bool, default=True)
job = attrib(type=ClearmlJob, default=None)
skip_job = attrib(type=bool, default=False)
cache_executed_step = attrib(type=bool, default=False)
name = attrib(type=str) # pipeline step name
base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched
task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task
queue = attrib(type=str, default=None) # execution queue name to use
parents = attrib(type=list, default=[]) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default={}) # Task hyper parameters to change
task_overrides = attrib(type=dict, default={}) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
job = attrib(type=ClearmlJob, default=None) # ClearMLJob object
skip_job = attrib(type=bool, default=False) # if True, this step should be skipped
cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached
return_artifacts = attrib(type=list, default=[]) # List of artifact names returned by the step
def __init__(
self,
name, # type: str
project, # type: str
version, # type: str
pool_frequency=0.2, # type: float
default_execution_queue=None, # type: Optional[str]
pipeline_time_limit=None, # type: Optional[float]
auto_connect_task=True, # type: Union[bool, Task]
always_create_task=False, # type: bool
add_pipeline_tags=False, # type: bool
target_project=None, # type: Optional[str]
pipeline_name=None, # type: Optional[str]
pipeline_project=None, # type: Optional[str]
):
# type: (...) -> ()
"""
Create a new pipeline controller. The newly created object will launch and monitor the new experiments.
:param name: Provide pipeline name (if main Task exists it overrides its name)
:param project: Provide project storing the pipeline (if main Task exists it overrides its project)
:param version: Must provide pipeline version. This version allows to uniquely identify the pipeline
template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2'
:param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states.
:param str default_execution_queue: The execution queue to use if no execution queue is provided
:param float pipeline_time_limit: The maximum time (minutes) for the entire pipeline process. The
default is ``None``, indicating no time limit.
:param bool auto_connect_task: Store pipeline arguments and configuration in the Task
- ``True`` - The pipeline argument and configuration will be stored in the current Task. All arguments will
be under the hyper-parameter section ``Pipeline``, and the pipeline DAG will be stored as a
Task configuration object named ``Pipeline``.
Notice that when running remotely the DAG definitions will be taken from the Task itself (e.g. editing
the configuration in the UI will be reflected in the actual DAG created).
- ``False`` - Do not store DAG configuration on the Task.
In remote execution the DAG will always be created from code.
- ``Task`` - A specific Task object to connect the pipeline with.
:param bool always_create_task: Always create a new Task
- ``True`` - No current Task initialized. Create a new task named ``Pipeline`` in the ``base_task_id``
project.
- ``False`` - Use the :py:meth:`task.Task.current_task` (if exists) to report statistics.
:param bool add_pipeline_tags: (default: False) if True, add `pipe: <pipeline_task_id>` tag to all
steps (Tasks) created by this pipeline.
:param str target_project: If provided, all pipeline steps are cloned into the target project
:param pipeline_name: Optional, provide pipeline name if main Task is not present (default current date)
:param pipeline_project: Optional, provide project storing the pipeline if main Task is not present
"""
self._nodes = {}
self._running_nodes = []
self._start_time = None
self._pipeline_time_limit = pipeline_time_limit * 60. if pipeline_time_limit else None
self._default_execution_queue = default_execution_queue
self._pipeline_time_limit = None
self._default_execution_queue = None
self._version = str(version)
self._pool_frequency = pool_frequency * 60.
self._thread = None
self._pipeline_args = dict()
self._pipeline_args_desc = dict()
self._stop_event = None
self._experiment_created_cb = None
self._experiment_completed_cb = None
@ -96,23 +91,44 @@ class PipelineController(object):
self._post_step_callbacks = {}
self._target_project = target_project or ''
self._add_pipeline_tags = add_pipeline_tags
self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task()
self._task = Task.current_task()
self._step_ref_pattern = re.compile(self._step_pattern)
self._reporting_lock = RLock()
self._pipeline_task_status_failed = None
if not self._task and always_create_task:
if not self._task:
self._task = Task.init(
project_name=pipeline_project or 'Pipelines',
task_name=pipeline_name or 'Pipeline {}'.format(datetime.now()),
project_name=project or 'Pipelines',
task_name=name or 'Pipeline {}'.format(datetime.now()),
task_type=Task.TaskTypes.controller,
auto_resource_monitoring=False,
)
self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag])
self._task.set_user_properties(version=self._version)
self._auto_connect_task = bool(auto_connect_task) and bool(self._task)
self._auto_connect_task = bool(self._task)
# make sure we add to the main Task the pipeline tag
if self._task:
self._task.add_tags([self._tag])
def set_default_execution_queue(self, default_execution_queue):
# type: (Optional[str]) -> None
"""
Set the default execution queue for if pipeline step does not specify an execution queue
:param default_execution_queue: The execution queue to use if no execution queue is provided
"""
self._default_execution_queue = str(default_execution_queue) if default_execution_queue else None
def set_pipeline_execution_time_limit(self, max_execution_minutes):
# type: (Optional[float]) -> None
"""
Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit.
:param float max_execution_minutes: The maximum time (minutes) for the entire pipeline process. The
default is ``None``, indicating no time limit.
"""
self._pipeline_time_limit = max_execution_minutes * 60. if max_execution_minutes else None
def add_step(
self,
name, # type: str
@ -128,19 +144,20 @@ class PipelineController(object):
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]]
):
# type: (...) -> bool
"""
Add a step to the pipeline execution DAG.
Each step must have a unique name (this name will later be used to address the step)
:param str name: Unique of the step. For example `stage1`
:param str base_task_id: The Task ID to use for the step. Each time the step is executed,
:param name: Unique of the step. For example `stage1`
:param base_task_id: The Task ID to use for the step. Each time the step is executed,
the base Task is cloned, then the cloned task will be sent for execution.
:param list parents: Optional list of parent nodes in the DAG.
:param parents: Optional list of parent nodes in the DAG.
The current step in the pipeline will be sent for execution only after all the parent nodes
have been executed successfully.
:param dict parameter_override: Optional parameter overriding dictionary.
:param parameter_override: Optional parameter overriding dictionary.
The dict values can reference a previously executed step using the following form '${step_name}'
Examples:
- Artifact access
@ -151,7 +168,7 @@ class PipelineController(object):
parameter_override={'Args/input_file': '${stage3.parameters.Args/input_file}' }
- Task ID
parameter_override={'Args/input_file': '${stage3.id}' }
:param dict task_overrides: Optional task section overriding dictionary.
:param task_overrides: Optional task section overriding dictionary.
The dict values can reference a previously executed step using the following form '${step_name}'
Examples:
- clear git repository commit ID
@ -160,17 +177,17 @@ class PipelineController(object):
parameter_override={'script.branch': '${stage1.script.branch}' }
- container image
parameter_override={'container.image': '${stage1.container.image}' }
:param str execution_queue: Optional, the queue to use for executing this specific step.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class
:param float time_limit: Default None, no time limit.
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param str base_task_project: If base_task_id is not given,
:param base_task_project: If base_task_id is not given,
use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
:param str base_task_name: If base_task_id is not given,
:param base_task_name: If base_task_id is not given,
use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step.
:param bool clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue
:param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param Callable pre_execute_callback: Callback function, called when the step (Task) is created
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
`parameters` are the configuration arguments passed to the ClearmlJob.
@ -190,7 +207,7 @@ class PipelineController(object):
):
pass
:param Callable post_execute_callback: Callback function, called when a step (Task) is completed
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
@ -207,6 +224,8 @@ class PipelineController(object):
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:param base_task_factory: Optional, instead of providing a pre-existing Task,
provide a Callable function to create the Task (returns Task object)
:return: True if successful
"""
@ -219,13 +238,12 @@ class PipelineController(object):
# when running remotely do nothing, we will deserialize ourselves when we start
# if we are not cloning a Task, we assume this step is created from code, not from the configuration
if clone_base_task and self._has_stored_configuration():
if not base_task_factory and clone_base_task and self._has_stored_configuration():
return True
if name in self._nodes:
raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name))
self._verify_node_name(name)
if not base_task_id:
if not base_task_factory and not base_task_id:
if not base_task_project or not base_task_name:
raise ValueError('Either base_task_id or base_task_project/base_task_name must be provided')
base_task = Task.get_task(
@ -255,6 +273,202 @@ class PipelineController(object):
clone_task=clone_base_task,
task_overrides=task_overrides,
cache_executed_step=cache_executed_step,
task_factory_func=base_task_factory,
)
if self._task and not self._task.running_locally():
self.update_execution_plot()
return True
def add_function_step(
self,
name, # type: str
function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_return=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]],
execution_queue=None, # type: Optional[str]
time_limit=None, # type: Optional[float]
clone_base_task=True, # type: bool
pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
cache_executed_step=False, # type: bool
):
# type: (...) -> bool
"""
Create a Task from a function, including wrapping the function input arguments
into the hyper-parameter section as kwargs, and storing function results as named artifacts
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_return=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
:param name: Unique of the step. For example `stage1`
:param function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults
Optional, pass input arguments to the function from other Tasks's output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_return: Provide a list of names for all the results.
If not provided no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
If not provided, packages are automatically added based on the imports used in the function.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param parents: Optional list of parent nodes in the DAG.
The current step in the pipeline will be sent for execution only after all the parent nodes
have been executed successfully.
:param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class
:param time_limit: Default None, no time limit.
Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed.
:param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue
the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created).
:param pre_execute_callback: Callback function, called when the step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
`parameters` are the configuration arguments passed to the ClearmlJob.
If the callback returned value is `False`,
the Node is skipped and so is any node in the DAG that relies on this node.
Notice the `parameters` are already parsed,
e.g. `${step1.parameters.Args/param}` is replaced with relevant value.
.. code-block:: py
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
pass
:param post_execute_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
pass
:param cache_executed_step: If True, before launching the new step,
after updating with the latest configuration, check if an exact Task with the same parameter/code
was already executed. If it was found, use it instead of launching a new Task.
Default: False, a new cloned copy of base_task is always used.
Notice: If the git repo reference does not have a specific commit ID, the Task will never be used.
If `clone_base_task` is False there is no cloning, hence the base_task is used.
:return: True if successful
"""
# always store callback functions (even when running remotely)
if pre_execute_callback:
self._pre_step_callbacks[name] = pre_execute_callback
if post_execute_callback:
self._post_step_callbacks[name] = post_execute_callback
self._verify_node_name(name)
function_kwargs = function_kwargs or {}
function_input_artifacts = {}
# go over function_kwargs, split it into string and input artifacts
for k, v in function_kwargs.items():
if self._step_ref_pattern.match(v):
# check for step artifacts
step, _, artifact = v[2:-1].partition('.')
if step in self._nodes and artifact in self._nodes[step].return_artifacts:
function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact)
continue
# verify the reference
self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v)
function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts}
parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()}
if function_input_artifacts:
parameters.update(
{"{}/{}".format(CreateFromFunction.input_artifact_section, k): v
for k, v in function_input_artifacts.items()}
)
if self._task.running_locally():
project_name = project_name or self._target_project or self._task.get_project_name()
task_definition = CreateFromFunction.create_task_from_function(
a_function=function,
function_kwargs=function_kwargs or None,
function_input_artifacts=function_input_artifacts,
function_return=function_return,
project_name=project_name,
task_name=task_name,
task_type=task_type,
packages=packages,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=None,
dry_run=True,
)
# noinspection PyProtectedMember
self._task._set_configuration(
name=name, config_type='json',
config_text=json.dumps(task_definition, indent=1)
)
else:
# noinspection PyProtectedMember
task_definition = json.loads(self._task._get_configuration_text(name=name))
def _create_task(_):
a_task = Task.create(
project_name=project_name,
task_name=task_definition.get('name'),
task_type=task_definition.get('type'),
)
# replace reference
a_task.update_task(task_definition)
return a_task
self._nodes[name] = self.Node(
name=name, base_task_id=None, parents=parents or [],
queue=execution_queue, timeout=time_limit,
parameters=parameters,
clone_task=clone_base_task,
cache_executed_step=cache_executed_step,
task_factory_func=_create_task,
return_artifacts=function_return,
)
if self._task and not self._task.running_locally():
@ -264,14 +478,17 @@ class PipelineController(object):
def start(
self,
queue='services',
step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
step_task_completed_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
wait=True,
):
# type: (...) -> bool
"""
Start the pipeline controller.
If the calling process is stopped, then the controller stops as well.
Start the current pipeline remotely (on the selected services queue)
The current process will be stopped if exit_process is True.
:param queue: queue name to launch the pipeline on
:param Callable step_task_created_callback: Callback function, called when a step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
@ -302,57 +519,11 @@ class PipelineController(object):
node, # type: PipelineController.Node,
):
pass
:param wait: If True (default), start the pipeline controller, return only
after the pipeline is done (completed/aborted/failed)
:return: True, if the controller started. False, if the controller did not start.
"""
if self._thread:
return True
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
if not params['continue_pipeline']:
for k in pipeline_dag:
pipeline_dag[k]['executed'] = None
self._default_execution_queue = params['default_queue']
self._add_pipeline_tags = params['add_pipeline_tags']
self._target_project = params['target_project'] or ''
self._deserialize(pipeline_dag)
# if we continue the pipeline, make sure that we re-execute failed tasks
if params['continue_pipeline']:
for node in self._nodes.values():
if node.executed is False:
node.executed = None
if not self._verify():
raise ValueError("Failed verifying pipeline execution graph, "
"it has either inaccessible nodes, or contains cycles")
self.update_execution_plot()
self._start_time = time()
self._stop_event = Event()
self._experiment_created_cb = step_task_created_callback
self._experiment_completed_cb = step_task_completed_callback
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
return True
def start_remotely(self, queue='services', exit_process=True):
# type: (str, bool) -> Task
"""
Start the current pipeline remotely (on the selected services queue)
The current process will be stopped if exit_process is True.
:param queue: queue name to launch the pipeline on
:param exit_process: If True exit the current process after launching on the enqueuing on the queue
:return: The remote Task object
"""
if not self._task:
raise ValueError(
@ -361,19 +532,47 @@ class PipelineController(object):
# serialize state only if we are running locally
if Task.running_locally() or not self._task.is_main_task():
self._verify()
self._serialize_pipeline_task()
self.update_execution_plot()
# stop current Task and execute remotely or no-op
self._task.execute_remotely(queue_name=queue, exit_process=exit_process, clone=False)
self._task.execute_remotely(queue_name=queue, exit_process=True, clone=False)
if not Task.running_locally() and self._task.is_main_task():
self.start()
self.wait()
self.stop()
self._start(
step_task_created_callback=step_task_created_callback,
step_task_completed_callback=step_task_completed_callback,
wait=wait
)
leave_process(0)
else:
return self._task
return True
def start_locally(self, run_pipeline_steps_locally=True):
# type: (bool) -> None
"""
Start the current pipeline locally, in most cases for debug purposes.
By default it will be running the DAG itself locally, as sub-processes.
Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff)
:param run_pipeline_steps_locally: If True, run the pipeline steps locally as a subprocess
"""
if not self._task:
raise ValueError(
"Could not find main Task, "
"PipelineController must be created with `always_create_task=True`")
if run_pipeline_steps_locally:
self._clearml_job_class = LocalClearmlJob
self._default_execution_queue = self._default_execution_queue or 'mock'
# serialize state only if we are running locally
self._verify()
self._serialize_pipeline_task()
self.update_execution_plot()
self._start(wait=True)
def stop(self, timeout=None):
# type: (Optional[float]) -> ()
@ -495,6 +694,111 @@ class PipelineController(object):
with self._reporting_lock:
self._update_execution_plot()
def add_parameter(self, name, default=None, description=None):
# type: (str, Optional[Any], Optional[str]) -> None
"""
Add a parameter to the pipeline Task.
The parameter can be used as input parameter for any step in the pipeline.
Notice all parameters will appear under the PipelineController Task's Hyper-parameters -> Pipeline section
Example: pipeline.add_parameter(name='dataset', description='dataset ID to process the pipeline')
Then in one of the steps we can refer to the value of the parameter with '${pipeline.dataset}'
:param name: String name of the parameter.
:param default: Default value to be put as the default value (can be later changed in the UI)
:param description: String description of the parameter and its usage in the pipeline
"""
self._pipeline_args[str(name)] = str(default or '')
if description:
self._pipeline_args_desc[str(name)] = str(description)
def _start(
self,
step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa
step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa
wait=True,
):
# type: (...) -> bool
"""
Start the pipeline controller.
If the calling process is stopped, then the controller stops as well.
:param Callable step_task_created_callback: Callback function, called when a step (Task) is created
and before it is sent for execution. Allows a user to modify the Task before launch.
Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object.
`parameters` are the configuration arguments passed to the ClearmlJob.
If the callback returned value is `False`,
the Node is skipped and so is any node in the DAG that relies on this node.
Notice the `parameters` are already parsed,
e.g. `${step1.parameters.Args/param}` is replaced with relevant value.
.. code-block:: py
def step_created_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
parameters, # type: dict
):
pass
:param Callable step_task_completed_callback: Callback function, called when a step (Task) is completed
and it other jobs are executed. Allows a user to modify the Task status after completion.
.. code-block:: py
def step_completed_callback(
pipeline, # type: PipelineController,
node, # type: PipelineController.Node,
):
pass
:param wait: If True (default), start the pipeline controller, return only
after the pipeline is done (completed/aborted/failed)
:return: True, if the controller started. False, if the controller did not start.
"""
if self._thread:
return True
params, pipeline_dag = self._serialize_pipeline_task()
# deserialize back pipeline state
if not params['continue_pipeline']:
for k in pipeline_dag:
pipeline_dag[k]['executed'] = None
self._default_execution_queue = params['default_queue']
self._add_pipeline_tags = params['add_pipeline_tags']
self._target_project = params['target_project'] or ''
self._deserialize(pipeline_dag)
# if we continue the pipeline, make sure that we re-execute failed tasks
if params['continue_pipeline']:
for node in self._nodes.values():
if node.executed is False:
node.executed = None
if not self._verify():
raise ValueError("Failed verifying pipeline execution graph, "
"it has either inaccessible nodes, or contains cycles")
self.update_execution_plot()
self._start_time = time()
self._stop_event = Event()
self._experiment_created_cb = step_task_created_callback
self._experiment_completed_cb = step_task_completed_callback
self._thread = Thread(target=self._daemon)
self._thread.daemon = True
self._thread.start()
if wait:
self.wait()
self.stop()
return True
def _serialize_pipeline_task(self):
# type: () -> (dict, dict)
"""
@ -513,6 +817,15 @@ class PipelineController(object):
if self._task and self._auto_connect_task:
self._task.connect_configuration(pipeline_dag, name=self._config_section)
self._task.connect(params, name=self._config_section)
if self._task.running_locally():
# noinspection PyProtectedMember
self._task._set_parameters(
{'{}/{}'.format(self._args_section, k): v for k, v in self._pipeline_args.items()},
__parameters_descriptions=self._pipeline_args_desc,
__update=True,
)
else:
self._task.connect(self._pipeline_args, name=self._args_section)
return params, pipeline_dag
@ -523,7 +836,8 @@ class PipelineController(object):
This dictionary will be used to store the DAG as a configuration on the Task
:return:
"""
dag = {name: dict((k, v) for k, v in node.__dict__.items() if k not in ('job', 'name'))
dag = {name: dict((k, v) for k, v in node.__dict__.items()
if k not in ('job', 'name', 'task_factory_func'))
for name, node in self._nodes.items()}
return dag
@ -539,10 +853,13 @@ class PipelineController(object):
for name in self._nodes:
if self._nodes[name].clone_task and name in dag_dict and dag_dict[name].get('clone_task'):
dag_dict[name] = dict(
(k, v) for k, v in self._nodes[name].__dict__.items() if k not in ('job', 'name'))
(k, v) for k, v in self._nodes[name].__dict__.items()
if k not in ('job', 'name', 'task_factory_func'))
self._nodes = {
k: self.Node(name=k, **v) if not v.get('clone_task') or k not in self._nodes else self._nodes[k]
k: self.Node(name=k, **v)
if (k not in self._nodes or not self._nodes[k].task_factory_func) and (
not v.get('clone_task') or k not in self._nodes) else self._nodes[k]
for k, v in dag_dict.items()}
def _has_stored_configuration(self):
@ -581,23 +898,40 @@ class PipelineController(object):
:return: Return True iff the specific node is verified
"""
if not node.base_task_id:
if not node.base_task_id and not node.task_factory_func:
raise ValueError("Node '{}', base_task_id is empty".format(node.name))
if not self._default_execution_queue and not node.queue:
raise ValueError("Node '{}' missing execution queue, "
"no default queue defined and no specific node queue defined".format(node.name))
task = Task.get_task(task_id=node.base_task_id)
task = node.task_factory_func or Task.get_task(task_id=node.base_task_id)
if not task:
raise ValueError("Node '{}', base_task_id={} is invalid".format(node.name, node.base_task_id))
pattern = self._step_ref_pattern
parents = set()
for v in node.parameters.values():
if isinstance(v, str):
ref_matched = False
for g in pattern.findall(v):
self.__verify_step_reference(node, g)
ref_matched = True
ref_step = self.__verify_step_reference(node, g)
if ref_step:
parents.add(ref_step)
# verify we have a section name
if not ref_matched and '/' not in v:
raise ValueError(
"Section name is missing in parameter \"{}\", "
"parameters should be in the form of "
"\"`section-name`/parameter\", example: \"Args/param\"".format(v))
if parents != set(node.parents or []):
parents = parents - set(node.parents or [])
getLogger('clearml.automation.controller').info(
'Node "{}" missing parent reference, adding: {}'.format(node.name, parents))
node.parents = (node.parents or []) + list(parents)
return True
@ -650,11 +984,20 @@ class PipelineController(object):
node.skip_job = True
return True
node.job = ClearmlJob(
base_task_id=node.base_task_id, parameter_override=updated_hyper_parameters,
task_id = node.base_task_id
disable_clone_task = not node.clone_task
task_factory_func_task = None
if node.task_factory_func:
# create Task
task_factory_func_task = node.task_factory_func(node)
task_id = task_factory_func_task.id
disable_clone_task = True
node.job = self._clearml_job_class(
base_task_id=task_id, parameter_override=updated_hyper_parameters,
tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None,
parent=self._task.id if self._task else None,
disable_clone_task=not node.clone_task,
disable_clone_task=disable_clone_task,
task_overrides=task_overrides,
allow_caching=node.cache_executed_step,
**extra_args
@ -672,6 +1015,8 @@ class PipelineController(object):
node.skip_job = True
elif node.job.is_cached_task():
node.executed = node.job.task_id()
if task_factory_func_task:
task_factory_func_task.delete(raise_on_error=False)
else:
return node.job.launch(queue_name=node.queue or self._default_execution_queue)
@ -1009,13 +1354,22 @@ class PipelineController(object):
return updated_overrides
def _verify_node_name(self, name):
# type: (str) -> None
if name in self._nodes:
raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name))
if name in self._reserved_pipeline_names:
raise ValueError('Node named \'{}\' is a reserved keyword, use a different name'.format(name))
def __verify_step_reference(self, node, step_ref_string):
# type: (PipelineController.Node, str) -> bool
# type: (PipelineController.Node, str) -> Optional[str]
"""
Verify the step reference. For example "${step1.parameters.Args/param}"
Raise ValueError on misconfiguration
:param Node node: calling reference node (used for logging)
:param str step_ref_string: For example "${step1.parameters.Args/param}"
:return: True if valid reference
:return: If step reference is used, return the pipeline step name, otherwise return None
"""
parts = step_ref_string[2:-1].split('.')
v = step_ref_string
@ -1023,6 +1377,13 @@ class PipelineController(object):
raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v))
prev_step = parts[0]
input_type = parts[1]
# check if we reference the pipeline arguments themselves
if prev_step == self._pipeline_step_ref:
if input_type not in self._pipeline_args:
raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step))
return None
if prev_step not in self._nodes:
raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step))
if input_type not in ('artifacts', 'parameters', 'models', 'id'):
@ -1067,7 +1428,7 @@ class PipelineController(object):
raise ValueError(
"Node '{}', parameter '{}', input type '{}', model property is invalid {}".format(
node.name, v, input_type, parts[4]))
return True
return prev_step
def __parse_step_reference(self, step_ref_string):
"""
@ -1080,6 +1441,14 @@ class PipelineController(object):
raise ValueError("Could not parse reference '{}'".format(step_ref_string))
prev_step = parts[0]
input_type = parts[1].lower()
# check if we reference the pipeline arguments themselves
if prev_step == self._pipeline_step_ref:
if parts[1] not in self._pipeline_args:
raise ValueError("Could not parse reference '{}', "
"pipeline argument '{}' could not be found".format(step_ref_string, parts[1]))
return self._pipeline_args[parts[1]]
if prev_step not in self._nodes or (
not self._nodes[prev_step].job and
not self._nodes[prev_step].executed and

View File

@ -1,11 +1,18 @@
import hashlib
import os
import subprocess
import sys
import tempfile
import warnings
from datetime import datetime
from logging import getLogger
from time import time, sleep
from typing import Optional, Mapping, Sequence, Any
from ..backend_interface.util import get_or_create_project
from pathlib2 import Path
from ..backend_api import Session
from ..backend_interface.util import get_or_create_project, exact_match_regex
from ..storage.util import hash_dict
from ..task import Task
from ..backend_api.services import tasks as tasks_service
@ -16,6 +23,7 @@ logger = getLogger('clearml.automation.job')
class ClearmlJob(object):
_job_hash_description = 'job_hash={}'
_job_hash_property = 'pipeline_job_hash'
def __init__(
self,
@ -80,7 +88,7 @@ class ClearmlJob(object):
# check cached task
self._is_cached_task = False
task_hash = None
if allow_caching and not disable_clone_task and not self.task:
if allow_caching:
# look for a cached copy of the Task
# get parameters + task_overrides + as dict and hash it.
task_hash = self._create_task_hash(
@ -88,6 +96,11 @@ class ClearmlJob(object):
task = self._get_cached_task(task_hash)
# if we found a task, just use
if task:
if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created:
# if the base task at is in draft mode, and we are using cached task
# we assume the base Task was created adhoc and we can delete it.
pass # self.task.delete()
self._is_cached_task = True
self.task = task
self.task_started = True
@ -379,11 +392,13 @@ class ClearmlJob(object):
# type: (Task, Optional[dict], Optional[dict]) -> Optional[str]
"""
Create Hash (str) representing the state of the Task
:param task: A Task to hash
:param section_overrides: optional dict (keys are Task's section names) with task overrides.
:param params_override: Alternative to the entire Task's hyper parameters section
(notice this should not be a nested dict but a flat key/value)
:return: str crc32 of the Task configuration
:return: str hash of the Task configuration
"""
if not task:
return None
@ -410,15 +425,20 @@ class ClearmlJob(object):
docker = dict(**(task.data.container or dict()))
docker.pop('image', None)
hash_func = 'md5' if Session.check_min_api_version('2.13') else 'crc32'
# make sure that if we only have docker args/bash,
# we use encode it, otherwise we revert to the original encoding (excluding docker altogether)
if docker:
return hash_dict(
dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker),
hash_func='crc32'
hash_func=hash_func
)
return hash_dict(dict(script=script, hyper_params=hyper_params, configs=configs), hash_func='crc32')
return hash_dict(
dict(script=script, hyper_params=hyper_params, configs=configs),
hash_func=hash_func
)
@classmethod
def _get_cached_task(cls, task_hash):
@ -430,13 +450,23 @@ class ClearmlJob(object):
"""
if not task_hash:
return None
# noinspection PyProtectedMember
potential_tasks = Task._query_tasks(
status=['completed', 'stopped', 'published'],
system_tags=['-{}'.format(Task.archived_tag)],
_all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)),
only_fields=['id'],
)
if Session.check_min_api_version('2.13'):
# noinspection PyProtectedMember
potential_tasks = Task._query_tasks(
status=['completed', 'stopped', 'published'],
system_tags=['-{}'.format(Task.archived_tag)],
_all_=dict(fields=['runtime.{}'.format(cls._job_hash_property)],
pattern=exact_match_regex(task_hash)),
only_fields=['id'],
)
else:
# noinspection PyProtectedMember
potential_tasks = Task._query_tasks(
status=['completed', 'stopped', 'published'],
system_tags=['-{}'.format(Task.archived_tag)],
_all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)),
only_fields=['id'],
)
for obj in potential_tasks:
task = Task.get_task(task_id=obj.id)
if task_hash == cls._create_task_hash(task):
@ -455,11 +485,99 @@ class ClearmlJob(object):
return
if not task_hash:
task_hash = cls._create_task_hash(task=task)
hash_comment = cls._job_hash_description.format(task_hash) + '\n'
task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment)
if Session.check_min_api_version('2.13'):
# noinspection PyProtectedMember
task._set_runtime_properties(runtime_properties={cls._job_hash_property: str(task_hash)})
else:
hash_comment = cls._job_hash_description.format(task_hash) + '\n'
task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment)
class LocalClearmlJob(ClearmlJob):
"""
Run jobs locally as a sub-process, use for debugging purposes only
"""
def __init__(self, *args, **kwargs):
super(LocalClearmlJob, self).__init__(*args, **kwargs)
self._job_process = None
self._local_temp_file = None
def launch(self, queue_name=None):
# type: (str) -> bool
"""
Launch job as a subprocess, ignores "queue_name"
:param queue_name: Ignored
:return: True if successful
"""
if self._is_cached_task:
return False
# check if standalone
diff = self.task.data.script.diff
if diff and not diff.lstrip().startswith('diff '):
# standalone, we need to create if
fd, local_filename = tempfile.mkstemp(suffix='.py')
os.close(fd)
with open(local_filename, 'wt') as f:
f.write(diff)
self._local_temp_file = local_filename
else:
local_filename = self.task.data.script.entry_point
cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir)
# try to check based on current root repo + entrypoint
if Task.current_task() and not (Path(cwd)/local_filename).is_file():
working_dir = Task.current_task().data.script.working_dir or ''
working_dir = working_dir.strip('.')
levels = 0
if working_dir:
levels = 1 + sum(1 for c in working_dir if c == '/')
if levels:
cwd = os.path.abspath(os.path.join(cwd, os.sep.join(['..'] * levels)))
cwd = os.path.join(cwd, self.task.data.script.working_dir)
python = sys.executable
env = dict(**os.environ)
env.pop('CLEARML_PROC_MASTER_ID', None)
env.pop('TRAINS_PROC_MASTER_ID', None)
env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id)
env['CLEARML_LOG_TASK_TO_BACKEND'] = '1'
env['CLEARML_SIMULATE_REMOTE_TASK'] = '1'
self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env)
return True
def wait_for_process(self, timeout=None):
# type: (Optional[int]) -> Optional[int]
"""
Wait until Job subprocess completed/exited
:param timeout: Timeout in seconds to wait for the subprocess to finish. Default None==infinite
:return Sub-process exit code. 0 is success, None if subprocess is not running or timeout
"""
if not self._job_process:
return None
try:
exit_code = self._job_process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
return None
self._job_process = None
if self._local_temp_file:
# noinspection PyBroadException
try:
Path(self._local_temp_file).unlink()
except Exception:
pass
self._local_temp_file = None
return exit_code
class TrainsJob(ClearmlJob):
"""
Deprecated, use ClearmlJob
"""
def __init__(self, **kwargs):
super(TrainsJob, self).__init__(**kwargs)

View File

@ -126,11 +126,12 @@ class CreateAndPopulate(object):
self.raise_on_missing_entries = raise_on_missing_entries
self.verbose = verbose
def create_task(self):
# type: () -> Task
def create_task(self, dry_run=False):
# type: (bool) -> Union[Task, Dict]
"""
Create the new populated Task
:param dry_run: Optional, If True do not create an actual Task, instead return the Task definition as dict
:return: newly created Task object
"""
local_entry_file = None
@ -163,30 +164,41 @@ class CreateAndPopulate(object):
not repo_info or not repo_info.script or not repo_info.script.get('repository')):
raise ValueError("Standalone script detected \'{}\', but no requirements provided".format(self.script))
if self.base_task_id:
if self.verbose:
print('Cloning task {}'.format(self.base_task_id))
task = Task.clone(source_task=self.base_task_id, project=Task.get_project_id(self.project_name))
self._set_output_uri(task)
if dry_run:
task = None
task_state = dict(
name=self.task_name,
project=Task.get_project_id(self.project_name),
type=str(self.task_type or Task.TaskTypes.training),
)
if self.output_uri:
task_state['output'] = dict(destination=self.output_uri)
else:
# noinspection PyProtectedMember
task = Task._create(
task_name=self.task_name, project_name=self.project_name,
task_type=self.task_type or Task.TaskTypes.training)
task_state = dict(script={})
self._set_output_uri(task)
if self.base_task_id:
if self.verbose:
print('Cloning task {}'.format(self.base_task_id))
task = Task.clone(source_task=self.base_task_id, project=Task.get_project_id(self.project_name))
# if there is nothing to populate, return
if not any([
self.folder, self.commit, self.branch, self.repo, self.script, self.cwd,
self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values()))
):
return task
self._set_output_uri(task)
else:
# noinspection PyProtectedMember
task = Task._create(
task_name=self.task_name, project_name=self.project_name,
task_type=self.task_type or Task.TaskTypes.training)
task_state = task.export_task()
if 'script' not in task_state:
task_state['script'] = {}
self._set_output_uri(task)
# if there is nothing to populate, return
if not any([
self.folder, self.commit, self.branch, self.repo, self.script, self.cwd,
self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values()))
):
return task
# clear the script section
task_state['script'] = {}
if repo_info:
task_state['script']['repository'] = repo_info.script['repository']
@ -303,11 +315,18 @@ class CreateAndPopulate(object):
# set base docker image if provided
if self.docker:
task.set_base_docker(
docker_cmd=self.docker.get('image'),
docker_arguments=self.docker.get('args'),
docker_setup_bash_script=self.docker.get('bash_script'),
)
if dry_run:
task_state['container'] = dict(
image=self.docker.get('image') or '',
arguments=self.docker.get('args') or '',
setup_shell_script=self.docker.get('bash_script') or '',
)
else:
task.set_base_docker(
docker_image=self.docker.get('image'),
docker_arguments=self.docker.get('args'),
docker_setup_bash_script=self.docker.get('bash_script'),
)
if self.verbose:
if task_state['script']['repository']:
@ -328,6 +347,9 @@ class CreateAndPopulate(object):
if self.docker:
print('Base docker image: {}'.format(self.docker))
if dry_run:
return task_state
# update the Task
task.update_task(task_state)
self.task = task
@ -434,137 +456,177 @@ class CreateAndPopulate(object):
return found_index if found_index < 0 else lines[found_index][0]
def create_task_from_function(
a_function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_input_artifacts=None, # type: Optional[Dict[str, str]]
function_results=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
):
# type: (...) -> Optional[Task]
"""
Create a Task from a function, including wrapping the function input arguments
into the hyper-parameter section as kwargs, and storing function results as named artifacts
class CreateFromFunction(object):
kwargs_section = 'kwargs'
input_artifact_section = 'kwargs_artifacts'
task_template = """from clearml import Task
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
create_task_from_function(mock_func, function_results=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'},
function_results=['square_matrix']
)
:param a_function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults
:param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_results: Provide a list of names for all the results.
If not provided no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
If not provided, packages are automatically added based on the imports used in the function.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param output_uri: Optional, set the Tasks's output_uri (Storage destination).
examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/'
:return: Newly created Task object
"""
function_name = str(a_function.__name__)
function_source = inspect.getsource(a_function)
function_input_artifacts = function_input_artifacts or dict()
# verify artifact kwargs:
if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()):
raise ValueError(
'function_input_artifacts={}, it must in the format: '
'{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts)
)
if function_kwargs is None:
function_kwargs = dict()
inspect_args = inspect.getfullargspec(a_function)
if inspect_args and inspect_args.args:
inspect_defaults = inspect_args.defaults
if inspect_defaults and len(inspect_defaults) != len(inspect_args.args):
getLogger().warning(
'Ignoring default argument values: '
'could not find all default valued for: \'{}\''.format(function_name))
inspect_defaults = []
function_kwargs = {str(k): v for k, v in zip(inspect_args.args, inspect_defaults)} \
if inspect_defaults else {str(k): None for k in inspect_args.args}
task_template = """
from clearml import Task
{function_source}
if __name__ == '__main__':
task = Task.init()
kwargs = {function_kwargs}
task.connect(kwargs, name='kwargs')
task.connect(kwargs, name='{kwargs_section}')
function_input_artifacts = {function_input_artifacts}
if function_input_artifacts:
task.connect(function_input_artifacts, name='kwargs_artifacts')
task.connect(function_input_artifacts, name='{input_artifact_section}')
for k, v in function_input_artifacts.items():
if not v:
continue
task_id, artifact_name = v.split('.', 1)
kwargs[k] = Task.get_task(task_id=task_id).artifact[artifact_name].get()
kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get()
results = {function_name}(**kwargs)
result_names = {function_results}
if results and result_names:
for name, artifact in zip(results, result_names):
result_names = {function_return}
if result_names:
if not isinstance(results, (tuple, list)) or (len(result_names)==1 and len(results) != 1):
results = [results]
for name, artifact in zip(result_names, results):
task.upload_artifact(name=name, artifact_object=artifact)
"""
""".format(
function_source=function_source,
function_kwargs=function_kwargs,
function_input_artifacts=function_input_artifacts,
function_name=function_name,
function_results=function_results)
@classmethod
def create_task_from_function(
cls,
a_function, # type: Callable
function_kwargs=None, # type: Optional[Dict[str, Any]]
function_input_artifacts=None, # type: Optional[Dict[str, str]]
function_return=None, # type: Optional[List[str]]
project_name=None, # type: Optional[str]
task_name=None, # type: Optional[str]
task_type=None, # type: Optional[str]
packages=None, # type: Optional[Sequence[str]]
docker=None, # type: Optional[str]
docker_args=None, # type: Optional[str]
docker_bash_setup_script=None, # type: Optional[str]
output_uri=None, # type: Optional[str]
dry_run=False, # type: bool
):
# type: (...) -> Optional[Dict, Task]
"""
Create a Task from a function, including wrapping the function input arguments
into the hyper-parameter section as kwargs, and storing function results as named artifacts
with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file:
temp_file.write(task_template)
temp_file.flush()
Example:
def mock_func(a=6, b=9):
c = a*b
print(a, b, c)
return c, c**2
populate = CreateAndPopulate(
project_name=project_name,
task_name=task_name,
task_type=task_type,
script=temp_file.name,
packages=packages if packages is not None else True,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
)
task = populate.create_task()
task.update_task(task_data={'script': {'entry_point': '{}.py'.format(function_name)}})
return task
create_task_from_function(mock_func, function_return=['mul', 'square'])
Example arguments from other Tasks (artifact):
def mock_func(matrix_np):
c = matrix_np*matrix_np
print(matrix_np, c)
return c
create_task_from_function(
mock_func,
function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'},
function_return=['square_matrix']
)
:param a_function: A global function to convert into a standalone Task
:param function_kwargs: Optional, provide subset of function arguments and default values to expose.
If not provided automatically take all function arguments & defaults
:param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact.
Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`:
{'numpy_matrix': 'aabbcc.answer'}
:param function_return: Provide a list of names for all the results.
If not provided no results will be stored as artifacts.
:param project_name: Set the project name for the task. Required if base_task_id is None.
:param task_name: Set the name of the remote task. Required if base_task_id is None.
:param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference',
'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom'
:param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"]
If not provided, packages are automatically added based on the imports used in the function.
:param docker: Select the docker image to be executed in by the remote session
:param docker_args: Add docker arguments, pass a single string
:param docker_bash_setup_script: Add bash script to be executed
inside the docker before setting up the Task's environment
:param output_uri: Optional, set the Tasks's output_uri (Storage destination).
examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/'
:param dry_run: If True do not create the Task, but return a dict of the Task's definitions
:return: Newly created Task object
"""
function_name = str(a_function.__name__)
function_source = inspect.getsource(a_function)
function_input_artifacts = function_input_artifacts or dict()
# verify artifact kwargs:
if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()):
raise ValueError(
'function_input_artifacts={}, it must in the format: '
'{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts)
)
if function_kwargs is None:
function_kwargs = dict()
inspect_args = inspect.getfullargspec(a_function)
if inspect_args and inspect_args.args:
inspect_defaults_vals = inspect_args.defaults
inspect_defaults_args = inspect_args.args
if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args):
inspect_defaults_args = [a for a in inspect_defaults_args if a not in function_input_artifacts]
if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args):
getLogger().warning(
'Ignoring default argument values: '
'could not find all default valued for: \'{}\''.format(function_name))
inspect_defaults_vals = []
function_kwargs = {str(k): v for k, v in zip(inspect_defaults_args, inspect_defaults_vals)} \
if inspect_defaults_vals else {str(k): None for k in inspect_defaults_args}
task_template = cls.task_template.format(
kwargs_section=cls.kwargs_section,
input_artifact_section=cls.input_artifact_section,
function_source=function_source,
function_kwargs=function_kwargs,
function_input_artifacts=function_input_artifacts,
function_name=function_name,
function_return=function_return)
with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file:
temp_file.write(task_template)
temp_file.flush()
populate = CreateAndPopulate(
project_name=project_name,
task_name=task_name or str(function_name),
task_type=task_type,
script=temp_file.name,
packages=packages if packages is not None else True,
docker=docker,
docker_args=docker_args,
docker_bash_setup_script=docker_bash_setup_script,
output_uri=output_uri,
add_task_init_call=False,
)
entry_point = '{}.py'.format(function_name)
task = populate.create_task(dry_run=dry_run)
if dry_run:
task['script']['entry_point'] = entry_point
task['hyperparams'] = {
cls.kwargs_section: {
k: dict(section=cls.kwargs_section, name=k, value=str(v))
for k, v in (function_kwargs or {}).items()
},
cls.input_artifact_section: {
k: dict(section=cls.input_artifact_section, name=k, value=str(v))
for k, v in (function_input_artifacts or {}).items()
}
}
else:
task.update_task(task_data={'script': {'entry_point': entry_point}})
hyper_parameters = {'{}/{}'.format(cls.kwargs_section, k): str(v) for k, v in function_kwargs} \
if function_kwargs else {}
hyper_parameters.update(
{'{}/{}'.format(cls.input_artifact_section, k): str(v) for k, v in function_input_artifacts}
if function_input_artifacts else {}
)
task.set_parameters(hyper_parameters)
return task

View File

@ -0,0 +1,87 @@
from clearml import PipelineController
def step_one(pickle_data_url):
import pickle
import pandas as pd
from clearml import StorageManager
pickle_data_url = \
pickle_data_url or \
'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url)
with open(local_iris_pkl, 'rb') as f:
iris = pickle.load(f)
data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names'])
data_frame.columns += ['target']
data_frame['target'] = iris['target']
return data_frame
def step_two(data_frame, test_size=0.2, random_state=42):
from sklearn.model_selection import train_test_split
y = data_frame['target']
X = data_frame[(c for c in data_frame.columns if c != 'target')]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state)
return X_train, X_test, y_train, y_test
def step_three(data):
from sklearn.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = data
model = LogisticRegression(solver='liblinear', multi_class='auto')
model.fit(X_train, y_train)
return model
def debug_testing_our_pipeline(pickle_url):
data_frame = step_one(pickle_url)
processed_data = step_two(data_frame)
model = step_three(processed_data)
print(model)
pipe = PipelineController(
project='examples',
name='pipeline demo',
version='1.1',
add_pipeline_tags=False,
)
pipe.add_parameter(
name='url',
description='url to pickle file',
default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl'
)
pipe.add_function_step(
name='step_one',
function=step_one,
function_kwargs=dict(pickle_data_url='${pipeline.url}'),
function_return=['data_frame'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_two',
# parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_two,
function_kwargs=dict(data_frame='${step_one.data_frame}'),
function_return=['processed_data'],
cache_executed_step=True,
)
pipe.add_function_step(
name='step_three',
# parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs
function=step_three,
function_kwargs=dict(data='${step_two.processed_data}'),
function_return=['model'],
cache_executed_step=True,
)
# for debugging purposes use local jobs
pipe.start_locally()
# Starting the pipeline on the services queue (remote machine, default on the clearml-server)
# pipe.start()
print('pipeline done')