From 56230faa5bad32c95f095fd2e7e99d27566953ad Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 2 Oct 2021 21:44:48 +0300 Subject: [PATCH] Add PipelineController add_step configuration_overrides allowing to override Task configuration objects --- clearml/automation/controller.py | 41 ++++++++++++++++++++++------ clearml/automation/job.py | 47 ++++++++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 14 deletions(-) diff --git a/clearml/automation/controller.py b/clearml/automation/controller.py index 6de69ee0..d8bbccaf 100644 --- a/clearml/automation/controller.py +++ b/clearml/automation/controller.py @@ -51,6 +51,7 @@ class PipelineController(object): parents = attrib(type=list, default=[]) # list of parent DAG steps timeout = attrib(type=float, default=None) # execution timeout limit parameters = attrib(type=dict, default={}) # Task hyper parameters to change + configurations = attrib(type=dict, default={}) # Task configuration objects to change task_overrides = attrib(type=dict, default={}) # Task overrides to change executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet) clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task @@ -172,6 +173,7 @@ class PipelineController(object): base_task_id=None, # type: Optional[str] parents=None, # type: Optional[Sequence[str]] parameter_override=None, # type: Optional[Mapping[str, Any]] + configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] task_overrides=None, # type: Optional[Mapping[str, Any]] execution_queue=None, # type: Optional[str] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] @@ -208,15 +210,23 @@ class PipelineController(object): parameter_override={'Args/input_file': '${stage3.parameters.Args/input_file}' } - Task ID parameter_override={'Args/input_file': '${stage3.id}' } + :param configuration_overrides: Optional, override Task configuration objects. + Expected dictionary of configuration object name and configuration object content. + Examples: + {'General': dict(key='value')} + {'General': 'configuration file content'} + {'OmegaConf': YAML.dumps(full_hydra_dict)} :param task_overrides: Optional task section overriding dictionary. The dict values can reference a previously executed step using the following form '${step_name}' Examples: - - clear git repository commit ID - parameter_override={'script.version_num': '' } - - git repository commit branch - parameter_override={'script.branch': '${stage1.script.branch}' } - - container image - parameter_override={'container.image': '${stage1.container.image}' } + - get the latest commit from a specific branch + task_overrides={'script.version_num': '', 'script.branch': 'main'} + - match git repository branch to a previous step + task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''} + - change container image + task_overrides={'container.image': '${stage1.container.image}'} + - match container image to a previous step + task_overrides={'container.image': '${stage1.container.image}'} :param execution_queue: Optional, the queue to use for executing this specific step. If not provided, the task will be sent to the default execution queue, as defined on the class :param monitor_metrics: Optional, log the step's metrics on the pipeline Task. @@ -327,10 +337,18 @@ class PipelineController(object): base_task_project, base_task_name)) base_task_id = base_task.id + if configuration_overrides is not None: + # verify we have a dict or a string on all values + if not isinstance(configuration_overrides, dict) or \ + not all(isinstance(v, (str, dict)) for v in configuration_overrides.values()): + raise ValueError("configuration_overrides must be a dictionary, with all values " + "either dicts or strings, got \'{}\' instead".format(configuration_overrides)) + self._nodes[name] = self.Node( name=name, base_task_id=base_task_id, parents=parents or [], queue=execution_queue, timeout=time_limit, parameters=parameter_override or {}, + configurations=configuration_overrides, clone_task=clone_base_task, task_overrides=task_overrides, cache_executed_step=cache_executed_step, @@ -1191,7 +1209,7 @@ class PipelineController(object): # if we do not clone the Task, only merge the parts we can override. for name in list(self._nodes.keys()): if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'): - for k in ('queue', 'parents', 'timeout', 'parameters', 'task_overrides'): + for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides'): setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))()) # if we do clone the Task deserialize everything, except the function creating @@ -1250,6 +1268,10 @@ class PipelineController(object): pattern = self._step_ref_pattern + # verify original node parents + if node.parents and not all(isinstance(p, str) and p in self._nodes for p in node.parents): + raise ValueError("Node '{}', parents={} is invalid".format(node.name, node.parents)) + parents = set() for k, v in node.parameters.items(): if isinstance(v, str): @@ -1371,7 +1393,9 @@ class PipelineController(object): disable_clone_task = True node.job = self._clearml_job_class( - base_task_id=task_id, parameter_override=updated_hyper_parameters, + base_task_id=task_id, + parameter_override=updated_hyper_parameters, + configuration_overrides=node.configurations, tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None, parent=self._task.id if self._task else None, disable_clone_task=disable_clone_task, @@ -1673,6 +1697,7 @@ class PipelineController(object): print('Parameters:\n{}'.format( self._nodes[name].job.task_parameter_override if self._nodes[name].job else self._nodes[name].parameters)) + print('Configurations:\n{}'.format(self._nodes[name].configurations)) print('Overrides:\n{}'.format(self._nodes[name].task_overrides)) launched_nodes.add(name) # check if node is cached do not wait for event but run the loop again diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 0aad9c21..e8ff416f 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -1,4 +1,5 @@ import hashlib +import json import os import subprocess import sys @@ -299,8 +300,8 @@ class BaseJob(object): cls._hashing_callback = a_function @classmethod - def _create_task_hash(cls, task, section_overrides=None, params_override=None): - # type: (Task, Optional[dict], Optional[dict]) -> Optional[str] + def _create_task_hash(cls, task, section_overrides=None, params_override=None, configurations_override=None): + # type: (Task, Optional[dict], Optional[dict], Optional[dict]) -> Optional[str] """ Create Hash (str) representing the state of the Task @@ -308,6 +309,7 @@ class BaseJob(object): :param section_overrides: optional dict (keys are Task's section names) with task overrides. :param params_override: Alternative to the entire Task's hyper parameters section (notice this should not be a nested dict but a flat key/value) + :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) :return: str hash of the Task configuration """ @@ -328,7 +330,7 @@ class BaseJob(object): script.pop("requirements", None) hyper_params = task.get_parameters() if params_override is None else params_override - configs = task.get_configuration_objects() + configs = task.get_configuration_objects() if configurations_override is None else configurations_override # currently we do not add the docker image to the hash (only args and setup script), # because default docker image will cause the step to change docker = None @@ -340,8 +342,9 @@ class BaseJob(object): # make sure that if we only have docker args/bash, # we use encode it, otherwise we revert to the original encoding (excluding docker altogether) - repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker) \ - if docker else dict(script=script, hyper_params=hyper_params, configs=configs) + repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs) + if docker: + repr_dict['docker'] = docker # callback for modifying the representation dict if cls._hashing_callback: @@ -408,6 +411,7 @@ class ClearmlJob(BaseJob): base_task_id, # type: str parameter_override=None, # type: Optional[Mapping[str, str]] task_overrides=None, # type: Optional[Mapping[str, str]] + configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]] tags=None, # type: Optional[Sequence[str]] parent=None, # type: Optional[str] disable_clone_task=False, # type: bool @@ -423,6 +427,12 @@ class ClearmlJob(BaseJob): :param dict parameter_override: dictionary of parameters and values to set fo the cloned task :param dict task_overrides: Task object specific overrides. for example {'script.version_num': None, 'script.branch': 'main'} + :param configuration_overrides: Optional, override Task configuration objects. + Expected dictionary of configuration object name and configuration object content. + Examples: + {'config_section': dict(key='value')} + {'config_file': 'configuration file content'} + {'OmegaConf': YAML.dumps(full_hydra_dict)} :param list tags: additional tags to add to the newly cloned task :param str parent: Set newly created Task parent task field, default: base_tak_id. :param dict kwargs: additional Task creation parameters @@ -454,6 +464,22 @@ class ClearmlJob(BaseJob): task_params.update(parameter_override) self.task_parameter_override = dict(**parameter_override) + task_configurations = None + if configuration_overrides: + task_configurations = deepcopy(base_temp_task.data.configuration or {}) + for k, v in configuration_overrides.items(): + if not isinstance(v, (str, dict)): + raise ValueError('Configuration override dictionary value must be wither str or dict, ' + 'got {} instead'.format(type(v))) + value = v if isinstance(v, str) else json.dumps(v) + if k in task_configurations: + task_configurations[k].value = value + else: + task_configurations[k] = tasks_service.ConfigurationItem( + name=str(k), value=value, description=None, type='json' if isinstance(v, dict) else None + ) + configuration_overrides = {k: v.value for k, v in task_configurations.items()} + sections = {} if task_overrides: # set values inside the Task @@ -471,7 +497,11 @@ class ClearmlJob(BaseJob): # look for a cached copy of the Task # get parameters + task_overrides + as dict and hash it. task_hash = self._create_task_hash( - base_temp_task, section_overrides=sections, params_override=task_params) + base_temp_task, + section_overrides=sections, + params_override=task_params, + configurations_override=configuration_overrides or None, + ) task = self._get_cached_task(task_hash) # if we found a task, just use if task: @@ -510,6 +540,11 @@ class ClearmlJob(BaseJob): if task_params: self.task.set_parameters(task_params) + # store back Task configuration object into backend + if task_configurations: + # noinspection PyProtectedMember + self.task._edit(configuration=task_configurations) + if task_overrides and sections: # store back Task parameters into backend # noinspection PyProtectedMember