diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 831403da..ec89da71 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -1,18 +1,21 @@ +import json import re from copy import copy from datetime import datetime from logging import getLogger from threading import Thread, Event, RLock from time import time +from typing import Sequence, Optional, Mapping, Callable, Any, Union, List, Dict from attr import attrib, attrs -from typing import Sequence, Optional, Mapping, Callable, Any, Union, List +from .job import LocalClearmlJob +from ..automation import ClearmlJob +from ..backend_interface.task.populate import CreateFromFunction from ..backend_interface.util import get_or_create_project from ..debugging.log import LoggerRoot -from ..task import Task -from ..automation import ClearmlJob from ..model import BaseModel +from ..task import Task from ..utilities.process.mp import leave_process @@ -26,69 +29,61 @@ class PipelineController(object): _tag = 'pipeline' _step_pattern = r"\${[^}]*}" _config_section = 'Pipeline' + _args_section = 'PipelineArgs' + _pipeline_step_ref = 'pipeline' + _reserved_pipeline_names = (_pipeline_step_ref, ) _task_project_lookup = {} + _clearml_job_class = ClearmlJob @attrs class Node(object): - name = attrib(type=str) - base_task_id = attrib(type=str) - queue = attrib(type=str, default=None) - parents = attrib(type=list, default=[]) - timeout = attrib(type=float, default=None) - parameters = attrib(type=dict, default={}) - task_overrides = attrib(type=dict, default={}) - executed = attrib(type=str, default=None) - clone_task = attrib(type=bool, default=True) - job = attrib(type=ClearmlJob, default=None) - skip_job = attrib(type=bool, default=False) - cache_executed_step = attrib(type=bool, default=False) + name = attrib(type=str) # pipeline step name + base_task_id = attrib(type=str, default=None) # base Task ID to be cloned and launched + task_factory_func = attrib(type=Callable, default=None) # alternative to base_task_id, function creating a Task + queue = attrib(type=str, default=None) # execution queue name to use + parents = attrib(type=list, default=[]) # list of parent DAG steps + timeout = attrib(type=float, default=None) # execution timeout limit + parameters = attrib(type=dict, default={}) # Task hyper parameters to change + task_overrides = attrib(type=dict, default={}) # Task overrides to change + executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet) + clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task + job = attrib(type=ClearmlJob, default=None) # ClearMLJob object + skip_job = attrib(type=bool, default=False) # if True, this step should be skipped + cache_executed_step = attrib(type=bool, default=False) # if True this pipeline step should be cached + return_artifacts = attrib(type=list, default=[]) # List of artifact names returned by the step def __init__( self, + name, # type: str + project, # type: str + version, # type: str pool_frequency=0.2, # type: float - default_execution_queue=None, # type: Optional[str] - pipeline_time_limit=None, # type: Optional[float] - auto_connect_task=True, # type: Union[bool, Task] - always_create_task=False, # type: bool add_pipeline_tags=False, # type: bool target_project=None, # type: Optional[str] - pipeline_name=None, # type: Optional[str] - pipeline_project=None, # type: Optional[str] ): # type: (...) -> () """ Create a new pipeline controller. The newly created object will launch and monitor the new experiments. + :param name: Provide pipeline name (if main Task exists it overrides its name) + :param project: Provide project storing the pipeline (if main Task exists it overrides its project) + :param version: Must provide pipeline version. This version allows to uniquely identify the pipeline + template execution. Examples for semantic versions: version='1.0.1' , version='23', version='1.2' :param float pool_frequency: The pooling frequency (in minutes) for monitoring experiments / states. - :param str default_execution_queue: The execution queue to use if no execution queue is provided - :param float pipeline_time_limit: The maximum time (minutes) for the entire pipeline process. The - default is ``None``, indicating no time limit. - :param bool auto_connect_task: Store pipeline arguments and configuration in the Task - - ``True`` - The pipeline argument and configuration will be stored in the current Task. All arguments will - be under the hyper-parameter section ``Pipeline``, and the pipeline DAG will be stored as a - Task configuration object named ``Pipeline``. - Notice that when running remotely the DAG definitions will be taken from the Task itself (e.g. editing - the configuration in the UI will be reflected in the actual DAG created). - - ``False`` - Do not store DAG configuration on the Task. - In remote execution the DAG will always be created from code. - - ``Task`` - A specific Task object to connect the pipeline with. - :param bool always_create_task: Always create a new Task - - ``True`` - No current Task initialized. Create a new task named ``Pipeline`` in the ``base_task_id`` - project. - - ``False`` - Use the :py:meth:`task.Task.current_task` (if exists) to report statistics. :param bool add_pipeline_tags: (default: False) if True, add `pipe: ` tag to all steps (Tasks) created by this pipeline. :param str target_project: If provided, all pipeline steps are cloned into the target project - :param pipeline_name: Optional, provide pipeline name if main Task is not present (default current date) - :param pipeline_project: Optional, provide project storing the pipeline if main Task is not present """ self._nodes = {} self._running_nodes = [] self._start_time = None - self._pipeline_time_limit = pipeline_time_limit * 60. if pipeline_time_limit else None - self._default_execution_queue = default_execution_queue + self._pipeline_time_limit = None + self._default_execution_queue = None + self._version = str(version) self._pool_frequency = pool_frequency * 60. self._thread = None + self._pipeline_args = dict() + self._pipeline_args_desc = dict() self._stop_event = None self._experiment_created_cb = None self._experiment_completed_cb = None @@ -96,23 +91,44 @@ class PipelineController(object): self._post_step_callbacks = {} self._target_project = target_project or '' self._add_pipeline_tags = add_pipeline_tags - self._task = auto_connect_task if isinstance(auto_connect_task, Task) else Task.current_task() + self._task = Task.current_task() self._step_ref_pattern = re.compile(self._step_pattern) self._reporting_lock = RLock() self._pipeline_task_status_failed = None - if not self._task and always_create_task: + if not self._task: self._task = Task.init( - project_name=pipeline_project or 'Pipelines', - task_name=pipeline_name or 'Pipeline {}'.format(datetime.now()), + project_name=project or 'Pipelines', + task_name=name or 'Pipeline {}'.format(datetime.now()), task_type=Task.TaskTypes.controller, auto_resource_monitoring=False, ) + self._task.set_system_tags((self._task.get_system_tags() or []) + [self._tag]) + self._task.set_user_properties(version=self._version) - self._auto_connect_task = bool(auto_connect_task) and bool(self._task) + self._auto_connect_task = bool(self._task) # make sure we add to the main Task the pipeline tag if self._task: self._task.add_tags([self._tag]) + def set_default_execution_queue(self, default_execution_queue): + # type: (Optional[str]) -> None + """ + Set the default execution queue for if pipeline step does not specify an execution queue + + :param default_execution_queue: The execution queue to use if no execution queue is provided + """ + self._default_execution_queue = str(default_execution_queue) if default_execution_queue else None + + def set_pipeline_execution_time_limit(self, max_execution_minutes): + # type: (Optional[float]) -> None + """ + Set maximum execution time (minutes) for the entire pipeline. Pass None or 0 to disable execution time limit. + + :param float max_execution_minutes: The maximum time (minutes) for the entire pipeline process. The + default is ``None``, indicating no time limit. + """ + self._pipeline_time_limit = max_execution_minutes * 60. if max_execution_minutes else None + def add_step( self, name, # type: str @@ -128,19 +144,20 @@ class PipelineController(object): pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa cache_executed_step=False, # type: bool + base_task_factory=None, # type: Optional[Callable[[PipelineController.Node], Task]] ): # type: (...) -> bool """ Add a step to the pipeline execution DAG. Each step must have a unique name (this name will later be used to address the step) - :param str name: Unique of the step. For example `stage1` - :param str base_task_id: The Task ID to use for the step. Each time the step is executed, + :param name: Unique of the step. For example `stage1` + :param base_task_id: The Task ID to use for the step. Each time the step is executed, the base Task is cloned, then the cloned task will be sent for execution. - :param list parents: Optional list of parent nodes in the DAG. + :param parents: Optional list of parent nodes in the DAG. The current step in the pipeline will be sent for execution only after all the parent nodes have been executed successfully. - :param dict parameter_override: Optional parameter overriding dictionary. + :param parameter_override: Optional parameter overriding dictionary. The dict values can reference a previously executed step using the following form '${step_name}' Examples: - Artifact access @@ -151,7 +168,7 @@ class PipelineController(object): parameter_override={'Args/input_file': '${stage3.parameters.Args/input_file}' } - Task ID parameter_override={'Args/input_file': '${stage3.id}' } - :param dict task_overrides: Optional task section overriding dictionary. + :param task_overrides: Optional task section overriding dictionary. The dict values can reference a previously executed step using the following form '${step_name}' Examples: - clear git repository commit ID @@ -160,17 +177,17 @@ class PipelineController(object): parameter_override={'script.branch': '${stage1.script.branch}' } - container image parameter_override={'container.image': '${stage1.container.image}' } - :param str execution_queue: Optional, the queue to use for executing this specific step. + :param execution_queue: Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class - :param float time_limit: Default None, no time limit. + :param time_limit: Default None, no time limit. Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed. - :param str base_task_project: If base_task_id is not given, + :param base_task_project: If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step. - :param str base_task_name: If base_task_id is not given, + :param base_task_name: If base_task_id is not given, use the base_task_project and base_task_name combination to retrieve the base_task_id to use for the step. - :param bool clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue + :param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created). - :param Callable pre_execute_callback: Callback function, called when the step (Task) is created + :param pre_execute_callback: Callback function, called when the step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. `parameters` are the configuration arguments passed to the ClearmlJob. @@ -190,7 +207,7 @@ class PipelineController(object): ): pass - :param Callable post_execute_callback: Callback function, called when a step (Task) is completed + :param post_execute_callback: Callback function, called when a step (Task) is completed and it other jobs are executed. Allows a user to modify the Task status after completion. .. code-block:: py @@ -207,6 +224,8 @@ class PipelineController(object): Default: False, a new cloned copy of base_task is always used. Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. If `clone_base_task` is False there is no cloning, hence the base_task is used. + :param base_task_factory: Optional, instead of providing a pre-existing Task, + provide a Callable function to create the Task (returns Task object) :return: True if successful """ @@ -219,13 +238,12 @@ class PipelineController(object): # when running remotely do nothing, we will deserialize ourselves when we start # if we are not cloning a Task, we assume this step is created from code, not from the configuration - if clone_base_task and self._has_stored_configuration(): + if not base_task_factory and clone_base_task and self._has_stored_configuration(): return True - if name in self._nodes: - raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name)) + self._verify_node_name(name) - if not base_task_id: + if not base_task_factory and not base_task_id: if not base_task_project or not base_task_name: raise ValueError('Either base_task_id or base_task_project/base_task_name must be provided') base_task = Task.get_task( @@ -255,6 +273,202 @@ class PipelineController(object): clone_task=clone_base_task, task_overrides=task_overrides, cache_executed_step=cache_executed_step, + task_factory_func=base_task_factory, + ) + + if self._task and not self._task.running_locally(): + self.update_execution_plot() + + return True + + def add_function_step( + self, + name, # type: str + function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_return=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + packages=None, # type: Optional[Sequence[str]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + parents=None, # type: Optional[Sequence[str]], + execution_queue=None, # type: Optional[str] + time_limit=None, # type: Optional[float] + clone_base_task=True, # type: bool + pre_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + post_execute_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + cache_executed_step=False, # type: bool + ): + # type: (...) -> bool + """ + Create a Task from a function, including wrapping the function input arguments + into the hyper-parameter section as kwargs, and storing function results as named artifacts + + Example: + def mock_func(a=6, b=9): + c = a*b + print(a, b, c) + return c, c**2 + + create_task_from_function(mock_func, function_return=['mul', 'square']) + + Example arguments from other Tasks (artifact): + def mock_func(matrix_np): + c = matrix_np*matrix_np + print(matrix_np, c) + return c + + create_task_from_function( + mock_func, + function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'}, + function_return=['square_matrix'] + ) + + :param name: Unique of the step. For example `stage1` + :param function: A global function to convert into a standalone Task + :param function_kwargs: Optional, provide subset of function arguments and default values to expose. + If not provided automatically take all function arguments & defaults + Optional, pass input arguments to the function from other Tasks's output artifact. + Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: + {'numpy_matrix': 'aabbcc.answer'} + :param function_return: Provide a list of names for all the results. + If not provided no results will be stored as artifacts. + :param project_name: Set the project name for the task. Required if base_task_id is None. + :param task_name: Set the name of the remote task. Required if base_task_id is None. + :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', + 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + If not provided, packages are automatically added based on the imports used in the function. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param parents: Optional list of parent nodes in the DAG. + The current step in the pipeline will be sent for execution only after all the parent nodes + have been executed successfully. + :param execution_queue: Optional, the queue to use for executing this specific step. + If not provided, the task will be sent to the default execution queue, as defined on the class + :param time_limit: Default None, no time limit. + Step execution time limit, if exceeded the Task is aborted and the pipeline is stopped and marked failed. + :param clone_base_task: If True (default) the pipeline will clone the base task, and modify/enqueue + the cloned Task. If False, the base-task is used directly, notice it has to be in draft-mode (created). + :param pre_execute_callback: Callback function, called when the step (Task) is created + and before it is sent for execution. Allows a user to modify the Task before launch. + Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. + `parameters` are the configuration arguments passed to the ClearmlJob. + + If the callback returned value is `False`, + the Node is skipped and so is any node in the DAG that relies on this node. + + Notice the `parameters` are already parsed, + e.g. `${step1.parameters.Args/param}` is replaced with relevant value. + + .. code-block:: py + + def step_created_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + parameters, # type: dict + ): + pass + + :param post_execute_callback: Callback function, called when a step (Task) is completed + and it other jobs are executed. Allows a user to modify the Task status after completion. + + .. code-block:: py + + def step_completed_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + ): + pass + + :param cache_executed_step: If True, before launching the new step, + after updating with the latest configuration, check if an exact Task with the same parameter/code + was already executed. If it was found, use it instead of launching a new Task. + Default: False, a new cloned copy of base_task is always used. + Notice: If the git repo reference does not have a specific commit ID, the Task will never be used. + If `clone_base_task` is False there is no cloning, hence the base_task is used. + + :return: True if successful + """ + # always store callback functions (even when running remotely) + if pre_execute_callback: + self._pre_step_callbacks[name] = pre_execute_callback + if post_execute_callback: + self._post_step_callbacks[name] = post_execute_callback + + self._verify_node_name(name) + + function_kwargs = function_kwargs or {} + function_input_artifacts = {} + # go over function_kwargs, split it into string and input artifacts + for k, v in function_kwargs.items(): + if self._step_ref_pattern.match(v): + # check for step artifacts + step, _, artifact = v[2:-1].partition('.') + if step in self._nodes and artifact in self._nodes[step].return_artifacts: + function_input_artifacts[k] = "${{{}.id}}.{}".format(step, artifact) + continue + # verify the reference + self.__verify_step_reference(node=self.Node(name=name), step_ref_string=v) + + function_kwargs = {k: v for k, v in function_kwargs.items() if k not in function_input_artifacts} + parameters = {"{}/{}".format(CreateFromFunction.kwargs_section, k): v for k, v in function_kwargs.items()} + if function_input_artifacts: + parameters.update( + {"{}/{}".format(CreateFromFunction.input_artifact_section, k): v + for k, v in function_input_artifacts.items()} + ) + + if self._task.running_locally(): + project_name = project_name or self._target_project or self._task.get_project_name() + + task_definition = CreateFromFunction.create_task_from_function( + a_function=function, + function_kwargs=function_kwargs or None, + function_input_artifacts=function_input_artifacts, + function_return=function_return, + project_name=project_name, + task_name=task_name, + task_type=task_type, + packages=packages, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + output_uri=None, + dry_run=True, + ) + # noinspection PyProtectedMember + self._task._set_configuration( + name=name, config_type='json', + config_text=json.dumps(task_definition, indent=1) + ) + else: + # noinspection PyProtectedMember + task_definition = json.loads(self._task._get_configuration_text(name=name)) + + def _create_task(_): + a_task = Task.create( + project_name=project_name, + task_name=task_definition.get('name'), + task_type=task_definition.get('type'), + ) + # replace reference + a_task.update_task(task_definition) + return a_task + + self._nodes[name] = self.Node( + name=name, base_task_id=None, parents=parents or [], + queue=execution_queue, timeout=time_limit, + parameters=parameters, + clone_task=clone_base_task, + cache_executed_step=cache_executed_step, + task_factory_func=_create_task, + return_artifacts=function_return, ) if self._task and not self._task.running_locally(): @@ -264,14 +478,17 @@ class PipelineController(object): def start( self, + queue='services', step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa - step_task_completed_callback=None # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + wait=True, ): # type: (...) -> bool """ - Start the pipeline controller. - If the calling process is stopped, then the controller stops as well. + Start the current pipeline remotely (on the selected services queue) + The current process will be stopped if exit_process is True. + :param queue: queue name to launch the pipeline on :param Callable step_task_created_callback: Callback function, called when a step (Task) is created and before it is sent for execution. Allows a user to modify the Task before launch. Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. @@ -302,57 +519,11 @@ class PipelineController(object): node, # type: PipelineController.Node, ): pass - + :param wait: If True (default), start the pipeline controller, return only + after the pipeline is done (completed/aborted/failed) :return: True, if the controller started. False, if the controller did not start. - """ - if self._thread: - return True - - params, pipeline_dag = self._serialize_pipeline_task() - - # deserialize back pipeline state - if not params['continue_pipeline']: - for k in pipeline_dag: - pipeline_dag[k]['executed'] = None - - self._default_execution_queue = params['default_queue'] - self._add_pipeline_tags = params['add_pipeline_tags'] - self._target_project = params['target_project'] or '' - self._deserialize(pipeline_dag) - - # if we continue the pipeline, make sure that we re-execute failed tasks - if params['continue_pipeline']: - for node in self._nodes.values(): - if node.executed is False: - node.executed = None - - if not self._verify(): - raise ValueError("Failed verifying pipeline execution graph, " - "it has either inaccessible nodes, or contains cycles") - - self.update_execution_plot() - - self._start_time = time() - self._stop_event = Event() - self._experiment_created_cb = step_task_created_callback - self._experiment_completed_cb = step_task_completed_callback - self._thread = Thread(target=self._daemon) - self._thread.daemon = True - self._thread.start() - return True - - def start_remotely(self, queue='services', exit_process=True): - # type: (str, bool) -> Task - """ - Start the current pipeline remotely (on the selected services queue) - The current process will be stopped if exit_process is True. - - :param queue: queue name to launch the pipeline on - :param exit_process: If True exit the current process after launching on the enqueuing on the queue - - :return: The remote Task object """ if not self._task: raise ValueError( @@ -361,19 +532,47 @@ class PipelineController(object): # serialize state only if we are running locally if Task.running_locally() or not self._task.is_main_task(): + self._verify() self._serialize_pipeline_task() self.update_execution_plot() # stop current Task and execute remotely or no-op - self._task.execute_remotely(queue_name=queue, exit_process=exit_process, clone=False) + self._task.execute_remotely(queue_name=queue, exit_process=True, clone=False) if not Task.running_locally() and self._task.is_main_task(): - self.start() - self.wait() - self.stop() + self._start( + step_task_created_callback=step_task_created_callback, + step_task_completed_callback=step_task_completed_callback, + wait=wait + ) leave_process(0) - else: - return self._task + + return True + + def start_locally(self, run_pipeline_steps_locally=True): + # type: (bool) -> None + """ + Start the current pipeline locally, in most cases for debug purposes. + By default it will be running the DAG itself locally, as sub-processes. + Notice: running the DAG locally assumes the local code execution (i.e. it will not clone & apply git diff) + + :param run_pipeline_steps_locally: If True, run the pipeline steps locally as a subprocess + """ + if not self._task: + raise ValueError( + "Could not find main Task, " + "PipelineController must be created with `always_create_task=True`") + + if run_pipeline_steps_locally: + self._clearml_job_class = LocalClearmlJob + self._default_execution_queue = self._default_execution_queue or 'mock' + + # serialize state only if we are running locally + self._verify() + self._serialize_pipeline_task() + self.update_execution_plot() + + self._start(wait=True) def stop(self, timeout=None): # type: (Optional[float]) -> () @@ -495,6 +694,111 @@ class PipelineController(object): with self._reporting_lock: self._update_execution_plot() + def add_parameter(self, name, default=None, description=None): + # type: (str, Optional[Any], Optional[str]) -> None + """ + Add a parameter to the pipeline Task. + The parameter can be used as input parameter for any step in the pipeline. + Notice all parameters will appear under the PipelineController Task's Hyper-parameters -> Pipeline section + Example: pipeline.add_parameter(name='dataset', description='dataset ID to process the pipeline') + Then in one of the steps we can refer to the value of the parameter with '${pipeline.dataset}' + + :param name: String name of the parameter. + :param default: Default value to be put as the default value (can be later changed in the UI) + :param description: String description of the parameter and its usage in the pipeline + """ + self._pipeline_args[str(name)] = str(default or '') + if description: + self._pipeline_args_desc[str(name)] = str(description) + + def _start( + self, + step_task_created_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node, dict], bool]] # noqa + step_task_completed_callback=None, # type: Optional[Callable[[PipelineController, PipelineController.Node], None]] # noqa + wait=True, + ): + # type: (...) -> bool + """ + Start the pipeline controller. + If the calling process is stopped, then the controller stops as well. + + :param Callable step_task_created_callback: Callback function, called when a step (Task) is created + and before it is sent for execution. Allows a user to modify the Task before launch. + Use `node.job` to access the ClearmlJob object, or `node.job.task` to directly access the Task object. + `parameters` are the configuration arguments passed to the ClearmlJob. + + If the callback returned value is `False`, + the Node is skipped and so is any node in the DAG that relies on this node. + + Notice the `parameters` are already parsed, + e.g. `${step1.parameters.Args/param}` is replaced with relevant value. + + .. code-block:: py + + def step_created_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + parameters, # type: dict + ): + pass + + :param Callable step_task_completed_callback: Callback function, called when a step (Task) is completed + and it other jobs are executed. Allows a user to modify the Task status after completion. + + .. code-block:: py + + def step_completed_callback( + pipeline, # type: PipelineController, + node, # type: PipelineController.Node, + ): + pass + :param wait: If True (default), start the pipeline controller, return only + after the pipeline is done (completed/aborted/failed) + + :return: True, if the controller started. False, if the controller did not start. + + """ + if self._thread: + return True + + params, pipeline_dag = self._serialize_pipeline_task() + + # deserialize back pipeline state + if not params['continue_pipeline']: + for k in pipeline_dag: + pipeline_dag[k]['executed'] = None + + self._default_execution_queue = params['default_queue'] + self._add_pipeline_tags = params['add_pipeline_tags'] + self._target_project = params['target_project'] or '' + self._deserialize(pipeline_dag) + + # if we continue the pipeline, make sure that we re-execute failed tasks + if params['continue_pipeline']: + for node in self._nodes.values(): + if node.executed is False: + node.executed = None + + if not self._verify(): + raise ValueError("Failed verifying pipeline execution graph, " + "it has either inaccessible nodes, or contains cycles") + + self.update_execution_plot() + + self._start_time = time() + self._stop_event = Event() + self._experiment_created_cb = step_task_created_callback + self._experiment_completed_cb = step_task_completed_callback + self._thread = Thread(target=self._daemon) + self._thread.daemon = True + self._thread.start() + + if wait: + self.wait() + self.stop() + + return True + def _serialize_pipeline_task(self): # type: () -> (dict, dict) """ @@ -513,6 +817,15 @@ class PipelineController(object): if self._task and self._auto_connect_task: self._task.connect_configuration(pipeline_dag, name=self._config_section) self._task.connect(params, name=self._config_section) + if self._task.running_locally(): + # noinspection PyProtectedMember + self._task._set_parameters( + {'{}/{}'.format(self._args_section, k): v for k, v in self._pipeline_args.items()}, + __parameters_descriptions=self._pipeline_args_desc, + __update=True, + ) + else: + self._task.connect(self._pipeline_args, name=self._args_section) return params, pipeline_dag @@ -523,7 +836,8 @@ class PipelineController(object): This dictionary will be used to store the DAG as a configuration on the Task :return: """ - dag = {name: dict((k, v) for k, v in node.__dict__.items() if k not in ('job', 'name')) + dag = {name: dict((k, v) for k, v in node.__dict__.items() + if k not in ('job', 'name', 'task_factory_func')) for name, node in self._nodes.items()} return dag @@ -539,10 +853,13 @@ class PipelineController(object): for name in self._nodes: if self._nodes[name].clone_task and name in dag_dict and dag_dict[name].get('clone_task'): dag_dict[name] = dict( - (k, v) for k, v in self._nodes[name].__dict__.items() if k not in ('job', 'name')) + (k, v) for k, v in self._nodes[name].__dict__.items() + if k not in ('job', 'name', 'task_factory_func')) self._nodes = { - k: self.Node(name=k, **v) if not v.get('clone_task') or k not in self._nodes else self._nodes[k] + k: self.Node(name=k, **v) + if (k not in self._nodes or not self._nodes[k].task_factory_func) and ( + not v.get('clone_task') or k not in self._nodes) else self._nodes[k] for k, v in dag_dict.items()} def _has_stored_configuration(self): @@ -581,23 +898,40 @@ class PipelineController(object): :return: Return True iff the specific node is verified """ - if not node.base_task_id: + if not node.base_task_id and not node.task_factory_func: raise ValueError("Node '{}', base_task_id is empty".format(node.name)) if not self._default_execution_queue and not node.queue: raise ValueError("Node '{}' missing execution queue, " "no default queue defined and no specific node queue defined".format(node.name)) - task = Task.get_task(task_id=node.base_task_id) + task = node.task_factory_func or Task.get_task(task_id=node.base_task_id) if not task: raise ValueError("Node '{}', base_task_id={} is invalid".format(node.name, node.base_task_id)) pattern = self._step_ref_pattern + parents = set() for v in node.parameters.values(): if isinstance(v, str): + ref_matched = False for g in pattern.findall(v): - self.__verify_step_reference(node, g) + ref_matched = True + ref_step = self.__verify_step_reference(node, g) + if ref_step: + parents.add(ref_step) + # verify we have a section name + if not ref_matched and '/' not in v: + raise ValueError( + "Section name is missing in parameter \"{}\", " + "parameters should be in the form of " + "\"`section-name`/parameter\", example: \"Args/param\"".format(v)) + + if parents != set(node.parents or []): + parents = parents - set(node.parents or []) + getLogger('clearml.automation.controller').info( + 'Node "{}" missing parent reference, adding: {}'.format(node.name, parents)) + node.parents = (node.parents or []) + list(parents) return True @@ -650,11 +984,20 @@ class PipelineController(object): node.skip_job = True return True - node.job = ClearmlJob( - base_task_id=node.base_task_id, parameter_override=updated_hyper_parameters, + task_id = node.base_task_id + disable_clone_task = not node.clone_task + task_factory_func_task = None + if node.task_factory_func: + # create Task + task_factory_func_task = node.task_factory_func(node) + task_id = task_factory_func_task.id + disable_clone_task = True + + node.job = self._clearml_job_class( + base_task_id=task_id, parameter_override=updated_hyper_parameters, tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None, parent=self._task.id if self._task else None, - disable_clone_task=not node.clone_task, + disable_clone_task=disable_clone_task, task_overrides=task_overrides, allow_caching=node.cache_executed_step, **extra_args @@ -672,6 +1015,8 @@ class PipelineController(object): node.skip_job = True elif node.job.is_cached_task(): node.executed = node.job.task_id() + if task_factory_func_task: + task_factory_func_task.delete(raise_on_error=False) else: return node.job.launch(queue_name=node.queue or self._default_execution_queue) @@ -1009,13 +1354,22 @@ class PipelineController(object): return updated_overrides + def _verify_node_name(self, name): + # type: (str) -> None + if name in self._nodes: + raise ValueError('Node named \'{}\' already exists in the pipeline dag'.format(name)) + if name in self._reserved_pipeline_names: + raise ValueError('Node named \'{}\' is a reserved keyword, use a different name'.format(name)) + def __verify_step_reference(self, node, step_ref_string): - # type: (PipelineController.Node, str) -> bool + # type: (PipelineController.Node, str) -> Optional[str] """ Verify the step reference. For example "${step1.parameters.Args/param}" + Raise ValueError on misconfiguration + :param Node node: calling reference node (used for logging) :param str step_ref_string: For example "${step1.parameters.Args/param}" - :return: True if valid reference + :return: If step reference is used, return the pipeline step name, otherwise return None """ parts = step_ref_string[2:-1].split('.') v = step_ref_string @@ -1023,6 +1377,13 @@ class PipelineController(object): raise ValueError("Node '{}', parameter '{}' is invalid".format(node.name, v)) prev_step = parts[0] input_type = parts[1] + + # check if we reference the pipeline arguments themselves + if prev_step == self._pipeline_step_ref: + if input_type not in self._pipeline_args: + raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step)) + return None + if prev_step not in self._nodes: raise ValueError("Node '{}', parameter '{}', step name '{}' is invalid".format(node.name, v, prev_step)) if input_type not in ('artifacts', 'parameters', 'models', 'id'): @@ -1067,7 +1428,7 @@ class PipelineController(object): raise ValueError( "Node '{}', parameter '{}', input type '{}', model property is invalid {}".format( node.name, v, input_type, parts[4])) - return True + return prev_step def __parse_step_reference(self, step_ref_string): """ @@ -1080,6 +1441,14 @@ class PipelineController(object): raise ValueError("Could not parse reference '{}'".format(step_ref_string)) prev_step = parts[0] input_type = parts[1].lower() + + # check if we reference the pipeline arguments themselves + if prev_step == self._pipeline_step_ref: + if parts[1] not in self._pipeline_args: + raise ValueError("Could not parse reference '{}', " + "pipeline argument '{}' could not be found".format(step_ref_string, parts[1])) + return self._pipeline_args[parts[1]] + if prev_step not in self._nodes or ( not self._nodes[prev_step].job and not self._nodes[prev_step].executed and diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 10b28e2a..7723ec37 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -1,11 +1,18 @@ import hashlib +import os +import subprocess +import sys +import tempfile import warnings from datetime import datetime from logging import getLogger from time import time, sleep from typing import Optional, Mapping, Sequence, Any -from ..backend_interface.util import get_or_create_project +from pathlib2 import Path + +from ..backend_api import Session +from ..backend_interface.util import get_or_create_project, exact_match_regex from ..storage.util import hash_dict from ..task import Task from ..backend_api.services import tasks as tasks_service @@ -16,6 +23,7 @@ logger = getLogger('clearml.automation.job') class ClearmlJob(object): _job_hash_description = 'job_hash={}' + _job_hash_property = 'pipeline_job_hash' def __init__( self, @@ -80,7 +88,7 @@ class ClearmlJob(object): # check cached task self._is_cached_task = False task_hash = None - if allow_caching and not disable_clone_task and not self.task: + if allow_caching: # look for a cached copy of the Task # get parameters + task_overrides + as dict and hash it. task_hash = self._create_task_hash( @@ -88,6 +96,11 @@ class ClearmlJob(object): task = self._get_cached_task(task_hash) # if we found a task, just use if task: + if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: + # if the base task at is in draft mode, and we are using cached task + # we assume the base Task was created adhoc and we can delete it. + pass # self.task.delete() + self._is_cached_task = True self.task = task self.task_started = True @@ -379,11 +392,13 @@ class ClearmlJob(object): # type: (Task, Optional[dict], Optional[dict]) -> Optional[str] """ Create Hash (str) representing the state of the Task + :param task: A Task to hash :param section_overrides: optional dict (keys are Task's section names) with task overrides. :param params_override: Alternative to the entire Task's hyper parameters section (notice this should not be a nested dict but a flat key/value) - :return: str crc32 of the Task configuration + + :return: str hash of the Task configuration """ if not task: return None @@ -410,15 +425,20 @@ class ClearmlJob(object): docker = dict(**(task.data.container or dict())) docker.pop('image', None) + hash_func = 'md5' if Session.check_min_api_version('2.13') else 'crc32' + # make sure that if we only have docker args/bash, # we use encode it, otherwise we revert to the original encoding (excluding docker altogether) if docker: return hash_dict( dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker), - hash_func='crc32' + hash_func=hash_func ) - return hash_dict(dict(script=script, hyper_params=hyper_params, configs=configs), hash_func='crc32') + return hash_dict( + dict(script=script, hyper_params=hyper_params, configs=configs), + hash_func=hash_func + ) @classmethod def _get_cached_task(cls, task_hash): @@ -430,13 +450,23 @@ class ClearmlJob(object): """ if not task_hash: return None - # noinspection PyProtectedMember - potential_tasks = Task._query_tasks( - status=['completed', 'stopped', 'published'], - system_tags=['-{}'.format(Task.archived_tag)], - _all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)), - only_fields=['id'], - ) + if Session.check_min_api_version('2.13'): + # noinspection PyProtectedMember + potential_tasks = Task._query_tasks( + status=['completed', 'stopped', 'published'], + system_tags=['-{}'.format(Task.archived_tag)], + _all_=dict(fields=['runtime.{}'.format(cls._job_hash_property)], + pattern=exact_match_regex(task_hash)), + only_fields=['id'], + ) + else: + # noinspection PyProtectedMember + potential_tasks = Task._query_tasks( + status=['completed', 'stopped', 'published'], + system_tags=['-{}'.format(Task.archived_tag)], + _all_=dict(fields=['comment'], pattern=cls._job_hash_description.format(task_hash)), + only_fields=['id'], + ) for obj in potential_tasks: task = Task.get_task(task_id=obj.id) if task_hash == cls._create_task_hash(task): @@ -455,11 +485,99 @@ class ClearmlJob(object): return if not task_hash: task_hash = cls._create_task_hash(task=task) - hash_comment = cls._job_hash_description.format(task_hash) + '\n' - task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment) + if Session.check_min_api_version('2.13'): + # noinspection PyProtectedMember + task._set_runtime_properties(runtime_properties={cls._job_hash_property: str(task_hash)}) + else: + hash_comment = cls._job_hash_description.format(task_hash) + '\n' + task.set_comment(task.comment + '\n' + hash_comment if task.comment else hash_comment) + + +class LocalClearmlJob(ClearmlJob): + """ + Run jobs locally as a sub-process, use for debugging purposes only + """ + def __init__(self, *args, **kwargs): + super(LocalClearmlJob, self).__init__(*args, **kwargs) + self._job_process = None + self._local_temp_file = None + + def launch(self, queue_name=None): + # type: (str) -> bool + """ + Launch job as a subprocess, ignores "queue_name" + + :param queue_name: Ignored + + :return: True if successful + """ + if self._is_cached_task: + return False + + # check if standalone + diff = self.task.data.script.diff + if diff and not diff.lstrip().startswith('diff '): + # standalone, we need to create if + fd, local_filename = tempfile.mkstemp(suffix='.py') + os.close(fd) + with open(local_filename, 'wt') as f: + f.write(diff) + self._local_temp_file = local_filename + else: + local_filename = self.task.data.script.entry_point + + cwd = os.path.join(os.getcwd(), self.task.data.script.working_dir) + # try to check based on current root repo + entrypoint + if Task.current_task() and not (Path(cwd)/local_filename).is_file(): + working_dir = Task.current_task().data.script.working_dir or '' + working_dir = working_dir.strip('.') + levels = 0 + if working_dir: + levels = 1 + sum(1 for c in working_dir if c == '/') + if levels: + cwd = os.path.abspath(os.path.join(cwd, os.sep.join(['..'] * levels))) + cwd = os.path.join(cwd, self.task.data.script.working_dir) + + python = sys.executable + env = dict(**os.environ) + env.pop('CLEARML_PROC_MASTER_ID', None) + env.pop('TRAINS_PROC_MASTER_ID', None) + env['CLEARML_TASK_ID'] = env['TRAINS_TASK_ID'] = str(self.task.id) + env['CLEARML_LOG_TASK_TO_BACKEND'] = '1' + env['CLEARML_SIMULATE_REMOTE_TASK'] = '1' + self._job_process = subprocess.Popen(args=[python, local_filename], cwd=cwd, env=env) + return True + + def wait_for_process(self, timeout=None): + # type: (Optional[int]) -> Optional[int] + """ + Wait until Job subprocess completed/exited + + :param timeout: Timeout in seconds to wait for the subprocess to finish. Default None==infinite + :return Sub-process exit code. 0 is success, None if subprocess is not running or timeout + """ + if not self._job_process: + return None + try: + exit_code = self._job_process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + return None + + self._job_process = None + if self._local_temp_file: + # noinspection PyBroadException + try: + Path(self._local_temp_file).unlink() + except Exception: + pass + self._local_temp_file = None + return exit_code class TrainsJob(ClearmlJob): + """ + Deprecated, use ClearmlJob + """ def __init__(self, **kwargs): super(TrainsJob, self).__init__(**kwargs) diff --git a/clearml/backend_interface/task/populate.py b/clearml/backend_interface/task/populate.py index 2479d235..332de8f5 100644 --- a/clearml/backend_interface/task/populate.py +++ b/clearml/backend_interface/task/populate.py @@ -126,11 +126,12 @@ class CreateAndPopulate(object): self.raise_on_missing_entries = raise_on_missing_entries self.verbose = verbose - def create_task(self): - # type: () -> Task + def create_task(self, dry_run=False): + # type: (bool) -> Union[Task, Dict] """ Create the new populated Task + :param dry_run: Optional, If True do not create an actual Task, instead return the Task definition as dict :return: newly created Task object """ local_entry_file = None @@ -163,30 +164,41 @@ class CreateAndPopulate(object): not repo_info or not repo_info.script or not repo_info.script.get('repository')): raise ValueError("Standalone script detected \'{}\', but no requirements provided".format(self.script)) - if self.base_task_id: - if self.verbose: - print('Cloning task {}'.format(self.base_task_id)) - task = Task.clone(source_task=self.base_task_id, project=Task.get_project_id(self.project_name)) - - self._set_output_uri(task) + if dry_run: + task = None + task_state = dict( + name=self.task_name, + project=Task.get_project_id(self.project_name), + type=str(self.task_type or Task.TaskTypes.training), + ) + if self.output_uri: + task_state['output'] = dict(destination=self.output_uri) else: - # noinspection PyProtectedMember - task = Task._create( - task_name=self.task_name, project_name=self.project_name, - task_type=self.task_type or Task.TaskTypes.training) + task_state = dict(script={}) - self._set_output_uri(task) + if self.base_task_id: + if self.verbose: + print('Cloning task {}'.format(self.base_task_id)) + task = Task.clone(source_task=self.base_task_id, project=Task.get_project_id(self.project_name)) - # if there is nothing to populate, return - if not any([ - self.folder, self.commit, self.branch, self.repo, self.script, self.cwd, - self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values())) - ): - return task + self._set_output_uri(task) + else: + # noinspection PyProtectedMember + task = Task._create( + task_name=self.task_name, project_name=self.project_name, + task_type=self.task_type or Task.TaskTypes.training) - task_state = task.export_task() - if 'script' not in task_state: - task_state['script'] = {} + self._set_output_uri(task) + + # if there is nothing to populate, return + if not any([ + self.folder, self.commit, self.branch, self.repo, self.script, self.cwd, + self.packages, self.requirements_file, self.base_task_id] + (list(self.docker.values())) + ): + return task + + # clear the script section + task_state['script'] = {} if repo_info: task_state['script']['repository'] = repo_info.script['repository'] @@ -303,11 +315,18 @@ class CreateAndPopulate(object): # set base docker image if provided if self.docker: - task.set_base_docker( - docker_cmd=self.docker.get('image'), - docker_arguments=self.docker.get('args'), - docker_setup_bash_script=self.docker.get('bash_script'), - ) + if dry_run: + task_state['container'] = dict( + image=self.docker.get('image') or '', + arguments=self.docker.get('args') or '', + setup_shell_script=self.docker.get('bash_script') or '', + ) + else: + task.set_base_docker( + docker_image=self.docker.get('image'), + docker_arguments=self.docker.get('args'), + docker_setup_bash_script=self.docker.get('bash_script'), + ) if self.verbose: if task_state['script']['repository']: @@ -328,6 +347,9 @@ class CreateAndPopulate(object): if self.docker: print('Base docker image: {}'.format(self.docker)) + if dry_run: + return task_state + # update the Task task.update_task(task_state) self.task = task @@ -434,137 +456,177 @@ class CreateAndPopulate(object): return found_index if found_index < 0 else lines[found_index][0] -def create_task_from_function( - a_function, # type: Callable - function_kwargs=None, # type: Optional[Dict[str, Any]] - function_input_artifacts=None, # type: Optional[Dict[str, str]] - function_results=None, # type: Optional[List[str]] - project_name=None, # type: Optional[str] - task_name=None, # type: Optional[str] - task_type=None, # type: Optional[str] - packages=None, # type: Optional[Sequence[str]] - docker=None, # type: Optional[str] - docker_args=None, # type: Optional[str] - docker_bash_setup_script=None, # type: Optional[str] - output_uri=None, # type: Optional[str] -): - # type: (...) -> Optional[Task] - """ - Create a Task from a function, including wrapping the function input arguments - into the hyper-parameter section as kwargs, and storing function results as named artifacts +class CreateFromFunction(object): + kwargs_section = 'kwargs' + input_artifact_section = 'kwargs_artifacts' + task_template = """from clearml import Task - Example: - def mock_func(a=6, b=9): - c = a*b - print(a, b, c) - return c, c**2 - - create_task_from_function(mock_func, function_results=['mul', 'square']) - - Example arguments from other Tasks (artifact): - def mock_func(matrix_np): - c = matrix_np*matrix_np - print(matrix_np, c) - return c - - create_task_from_function( - mock_func, - function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'}, - function_results=['square_matrix'] - ) - - :param a_function: A global function to convert into a standalone Task - :param function_kwargs: Optional, provide subset of function arguments and default values to expose. - If not provided automatically take all function arguments & defaults - :param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact. - Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: - {'numpy_matrix': 'aabbcc.answer'} - :param function_results: Provide a list of names for all the results. - If not provided no results will be stored as artifacts. - :param project_name: Set the project name for the task. Required if base_task_id is None. - :param task_name: Set the name of the remote task. Required if base_task_id is None. - :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', - 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' - :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] - If not provided, packages are automatically added based on the imports used in the function. - :param docker: Select the docker image to be executed in by the remote session - :param docker_args: Add docker arguments, pass a single string - :param docker_bash_setup_script: Add bash script to be executed - inside the docker before setting up the Task's environment - :param output_uri: Optional, set the Tasks's output_uri (Storage destination). - examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/' - :return: Newly created Task object - """ - function_name = str(a_function.__name__) - function_source = inspect.getsource(a_function) - function_input_artifacts = function_input_artifacts or dict() - # verify artifact kwargs: - if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()): - raise ValueError( - 'function_input_artifacts={}, it must in the format: ' - '{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts) - ) - - if function_kwargs is None: - function_kwargs = dict() - inspect_args = inspect.getfullargspec(a_function) - if inspect_args and inspect_args.args: - inspect_defaults = inspect_args.defaults - if inspect_defaults and len(inspect_defaults) != len(inspect_args.args): - getLogger().warning( - 'Ignoring default argument values: ' - 'could not find all default valued for: \'{}\''.format(function_name)) - inspect_defaults = [] - - function_kwargs = {str(k): v for k, v in zip(inspect_args.args, inspect_defaults)} \ - if inspect_defaults else {str(k): None for k in inspect_args.args} - - task_template = """ -from clearml import Task {function_source} if __name__ == '__main__': task = Task.init() kwargs = {function_kwargs} - task.connect(kwargs, name='kwargs') + task.connect(kwargs, name='{kwargs_section}') function_input_artifacts = {function_input_artifacts} if function_input_artifacts: - task.connect(function_input_artifacts, name='kwargs_artifacts') + task.connect(function_input_artifacts, name='{input_artifact_section}') for k, v in function_input_artifacts.items(): if not v: continue task_id, artifact_name = v.split('.', 1) - kwargs[k] = Task.get_task(task_id=task_id).artifact[artifact_name].get() + kwargs[k] = Task.get_task(task_id=task_id).artifacts[artifact_name].get() results = {function_name}(**kwargs) - result_names = {function_results} - if results and result_names: - for name, artifact in zip(results, result_names): + result_names = {function_return} + if result_names: + if not isinstance(results, (tuple, list)) or (len(result_names)==1 and len(results) != 1): + results = [results] + for name, artifact in zip(result_names, results): task.upload_artifact(name=name, artifact_object=artifact) +""" - """.format( - function_source=function_source, - function_kwargs=function_kwargs, - function_input_artifacts=function_input_artifacts, - function_name=function_name, - function_results=function_results) + @classmethod + def create_task_from_function( + cls, + a_function, # type: Callable + function_kwargs=None, # type: Optional[Dict[str, Any]] + function_input_artifacts=None, # type: Optional[Dict[str, str]] + function_return=None, # type: Optional[List[str]] + project_name=None, # type: Optional[str] + task_name=None, # type: Optional[str] + task_type=None, # type: Optional[str] + packages=None, # type: Optional[Sequence[str]] + docker=None, # type: Optional[str] + docker_args=None, # type: Optional[str] + docker_bash_setup_script=None, # type: Optional[str] + output_uri=None, # type: Optional[str] + dry_run=False, # type: bool + ): + # type: (...) -> Optional[Dict, Task] + """ + Create a Task from a function, including wrapping the function input arguments + into the hyper-parameter section as kwargs, and storing function results as named artifacts - with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: - temp_file.write(task_template) - temp_file.flush() + Example: + def mock_func(a=6, b=9): + c = a*b + print(a, b, c) + return c, c**2 - populate = CreateAndPopulate( - project_name=project_name, - task_name=task_name, - task_type=task_type, - script=temp_file.name, - packages=packages if packages is not None else True, - docker=docker, - docker_args=docker_args, - docker_bash_setup_script=docker_bash_setup_script, - output_uri=output_uri, - add_task_init_call=False, - ) - task = populate.create_task() - task.update_task(task_data={'script': {'entry_point': '{}.py'.format(function_name)}}) - return task + create_task_from_function(mock_func, function_return=['mul', 'square']) + + Example arguments from other Tasks (artifact): + def mock_func(matrix_np): + c = matrix_np*matrix_np + print(matrix_np, c) + return c + + create_task_from_function( + mock_func, + function_input_artifacts={'matrix_np': 'aabb1122.previous_matrix'}, + function_return=['square_matrix'] + ) + + :param a_function: A global function to convert into a standalone Task + :param function_kwargs: Optional, provide subset of function arguments and default values to expose. + If not provided automatically take all function arguments & defaults + :param function_input_artifacts: Optional, pass input arguments to the function from other Tasks's output artifact. + Example argument named `numpy_matrix` from Task ID `aabbcc` artifact name `answer`: + {'numpy_matrix': 'aabbcc.answer'} + :param function_return: Provide a list of names for all the results. + If not provided no results will be stored as artifacts. + :param project_name: Set the project name for the task. Required if base_task_id is None. + :param task_name: Set the name of the remote task. Required if base_task_id is None. + :param task_type: Optional, The task type to be created. Supported values: 'training', 'testing', 'inference', + 'data_processing', 'application', 'monitor', 'controller', 'optimizer', 'service', 'qc', 'custom' + :param packages: Manually specify a list of required packages. Example: ["tqdm>=2.1", "scikit-learn"] + If not provided, packages are automatically added based on the imports used in the function. + :param docker: Select the docker image to be executed in by the remote session + :param docker_args: Add docker arguments, pass a single string + :param docker_bash_setup_script: Add bash script to be executed + inside the docker before setting up the Task's environment + :param output_uri: Optional, set the Tasks's output_uri (Storage destination). + examples: 's3://bucket/folder', 'https://server/' , 'gs://bucket/folder', 'azure://bucket', '/folder/' + :param dry_run: If True do not create the Task, but return a dict of the Task's definitions + :return: Newly created Task object + """ + function_name = str(a_function.__name__) + function_source = inspect.getsource(a_function) + function_input_artifacts = function_input_artifacts or dict() + # verify artifact kwargs: + if not all(len(v.split('.', 1)) == 2 for v in function_input_artifacts.values()): + raise ValueError( + 'function_input_artifacts={}, it must in the format: ' + '{{"argument": "task_id.artifact_name"}}'.format(function_input_artifacts) + ) + + if function_kwargs is None: + function_kwargs = dict() + inspect_args = inspect.getfullargspec(a_function) + if inspect_args and inspect_args.args: + inspect_defaults_vals = inspect_args.defaults + inspect_defaults_args = inspect_args.args + + if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args): + inspect_defaults_args = [a for a in inspect_defaults_args if a not in function_input_artifacts] + + if inspect_defaults_vals and len(inspect_defaults_vals) != len(inspect_defaults_args): + getLogger().warning( + 'Ignoring default argument values: ' + 'could not find all default valued for: \'{}\''.format(function_name)) + inspect_defaults_vals = [] + + function_kwargs = {str(k): v for k, v in zip(inspect_defaults_args, inspect_defaults_vals)} \ + if inspect_defaults_vals else {str(k): None for k in inspect_defaults_args} + + task_template = cls.task_template.format( + kwargs_section=cls.kwargs_section, + input_artifact_section=cls.input_artifact_section, + function_source=function_source, + function_kwargs=function_kwargs, + function_input_artifacts=function_input_artifacts, + function_name=function_name, + function_return=function_return) + + with tempfile.NamedTemporaryFile('w', suffix='.py') as temp_file: + temp_file.write(task_template) + temp_file.flush() + + populate = CreateAndPopulate( + project_name=project_name, + task_name=task_name or str(function_name), + task_type=task_type, + script=temp_file.name, + packages=packages if packages is not None else True, + docker=docker, + docker_args=docker_args, + docker_bash_setup_script=docker_bash_setup_script, + output_uri=output_uri, + add_task_init_call=False, + ) + entry_point = '{}.py'.format(function_name) + task = populate.create_task(dry_run=dry_run) + + if dry_run: + task['script']['entry_point'] = entry_point + task['hyperparams'] = { + cls.kwargs_section: { + k: dict(section=cls.kwargs_section, name=k, value=str(v)) + for k, v in (function_kwargs or {}).items() + }, + cls.input_artifact_section: { + k: dict(section=cls.input_artifact_section, name=k, value=str(v)) + for k, v in (function_input_artifacts or {}).items() + } + } + else: + task.update_task(task_data={'script': {'entry_point': entry_point}}) + hyper_parameters = {'{}/{}'.format(cls.kwargs_section, k): str(v) for k, v in function_kwargs} \ + if function_kwargs else {} + hyper_parameters.update( + {'{}/{}'.format(cls.input_artifact_section, k): str(v) for k, v in function_input_artifacts} + if function_input_artifacts else {} + ) + task.set_parameters(hyper_parameters) + + return task diff --git a/examples/pipeline/pipeline_self_contained.py b/examples/pipeline/pipeline_self_contained.py new file mode 100644 index 00000000..4d9b11d3 --- /dev/null +++ b/examples/pipeline/pipeline_self_contained.py @@ -0,0 +1,87 @@ +from clearml import PipelineController + + +def step_one(pickle_data_url): + import pickle + import pandas as pd + from clearml import StorageManager + pickle_data_url = \ + pickle_data_url or \ + 'https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' + local_iris_pkl = StorageManager.get_local_copy(remote_url=pickle_data_url) + with open(local_iris_pkl, 'rb') as f: + iris = pickle.load(f) + data_frame = pd.DataFrame(iris['data'], columns=iris['feature_names']) + data_frame.columns += ['target'] + data_frame['target'] = iris['target'] + return data_frame + + +def step_two(data_frame, test_size=0.2, random_state=42): + from sklearn.model_selection import train_test_split + y = data_frame['target'] + X = data_frame[(c for c in data_frame.columns if c != 'target')] + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=test_size, random_state=random_state) + + return X_train, X_test, y_train, y_test + + +def step_three(data): + from sklearn.linear_model import LogisticRegression + X_train, X_test, y_train, y_test = data + model = LogisticRegression(solver='liblinear', multi_class='auto') + model.fit(X_train, y_train) + return model + + +def debug_testing_our_pipeline(pickle_url): + data_frame = step_one(pickle_url) + processed_data = step_two(data_frame) + model = step_three(processed_data) + print(model) + + +pipe = PipelineController( + project='examples', + name='pipeline demo', + version='1.1', + add_pipeline_tags=False, +) + +pipe.add_parameter( + name='url', + description='url to pickle file', + default='https://github.com/allegroai/events/raw/master/odsc20-east/generic/iris_dataset.pkl' +) +pipe.add_function_step( + name='step_one', + function=step_one, + function_kwargs=dict(pickle_data_url='${pipeline.url}'), + function_return=['data_frame'], + cache_executed_step=True, +) +pipe.add_function_step( + name='step_two', + # parents=['step_one'], # the pipeline will automatically detect the dependencies based on the kwargs inputs + function=step_two, + function_kwargs=dict(data_frame='${step_one.data_frame}'), + function_return=['processed_data'], + cache_executed_step=True, +) +pipe.add_function_step( + name='step_three', + # parents=['step_two'], # the pipeline will automatically detect the dependencies based on the kwargs inputs + function=step_three, + function_kwargs=dict(data='${step_two.processed_data}'), + function_return=['model'], + cache_executed_step=True, +) + +# for debugging purposes use local jobs +pipe.start_locally() + +# Starting the pipeline on the services queue (remote machine, default on the clearml-server) +# pipe.start() + +print('pipeline done')