Add PipelineController add_step configuration_overrides allowing to override Task configuration objects

This commit is contained in:
allegroai 2021-10-02 21:44:48 +03:00
parent a488c6a04c
commit 56230faa5b
2 changed files with 74 additions and 14 deletions

View File

@ -51,6 +51,7 @@ class PipelineController(object):
parents = attrib(type=list, default=[]) # list of parent DAG steps parents = attrib(type=list, default=[]) # list of parent DAG steps
timeout = attrib(type=float, default=None) # execution timeout limit timeout = attrib(type=float, default=None) # execution timeout limit
parameters = attrib(type=dict, default={}) # Task hyper parameters to change parameters = attrib(type=dict, default={}) # Task hyper parameters to change
configurations = attrib(type=dict, default={}) # Task configuration objects to change
task_overrides = attrib(type=dict, default={}) # Task overrides to change task_overrides = attrib(type=dict, default={}) # Task overrides to change
executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet) executed = attrib(type=str, default=None) # The actual executed Task ID (None if not executed yet)
clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task clone_task = attrib(type=bool, default=True) # If True cline the base_task_id, then execute the cloned Task
@ -172,6 +173,7 @@ class PipelineController(object):
base_task_id=None, # type: Optional[str] base_task_id=None, # type: Optional[str]
parents=None, # type: Optional[Sequence[str]] parents=None, # type: Optional[Sequence[str]]
parameter_override=None, # type: Optional[Mapping[str, Any]] parameter_override=None, # type: Optional[Mapping[str, Any]]
configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
task_overrides=None, # type: Optional[Mapping[str, Any]] task_overrides=None, # type: Optional[Mapping[str, Any]]
execution_queue=None, # type: Optional[str] execution_queue=None, # type: Optional[str]
monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]] monitor_metrics=None, # type: Optional[List[Union[Tuple[str, str], Tuple[(str, str), (str, str)]]]]
@ -208,15 +210,23 @@ class PipelineController(object):
parameter_override={'Args/input_file': '${stage3.parameters.Args/input_file}' } parameter_override={'Args/input_file': '${stage3.parameters.Args/input_file}' }
- Task ID - Task ID
parameter_override={'Args/input_file': '${stage3.id}' } parameter_override={'Args/input_file': '${stage3.id}' }
:param configuration_overrides: Optional, override Task configuration objects.
Expected dictionary of configuration object name and configuration object content.
Examples:
{'General': dict(key='value')}
{'General': 'configuration file content'}
{'OmegaConf': YAML.dumps(full_hydra_dict)}
:param task_overrides: Optional task section overriding dictionary. :param task_overrides: Optional task section overriding dictionary.
The dict values can reference a previously executed step using the following form '${step_name}' The dict values can reference a previously executed step using the following form '${step_name}'
Examples: Examples:
- clear git repository commit ID - get the latest commit from a specific branch
parameter_override={'script.version_num': '' } task_overrides={'script.version_num': '', 'script.branch': 'main'}
- git repository commit branch - match git repository branch to a previous step
parameter_override={'script.branch': '${stage1.script.branch}' } task_overrides={'script.branch': '${stage1.script.branch}', 'script.version_num': ''}
- container image - change container image
parameter_override={'container.image': '${stage1.container.image}' } task_overrides={'container.image': '${stage1.container.image}'}
- match container image to a previous step
task_overrides={'container.image': '${stage1.container.image}'}
:param execution_queue: Optional, the queue to use for executing this specific step. :param execution_queue: Optional, the queue to use for executing this specific step.
If not provided, the task will be sent to the default execution queue, as defined on the class If not provided, the task will be sent to the default execution queue, as defined on the class
:param monitor_metrics: Optional, log the step's metrics on the pipeline Task. :param monitor_metrics: Optional, log the step's metrics on the pipeline Task.
@ -327,10 +337,18 @@ class PipelineController(object):
base_task_project, base_task_name)) base_task_project, base_task_name))
base_task_id = base_task.id base_task_id = base_task.id
if configuration_overrides is not None:
# verify we have a dict or a string on all values
if not isinstance(configuration_overrides, dict) or \
not all(isinstance(v, (str, dict)) for v in configuration_overrides.values()):
raise ValueError("configuration_overrides must be a dictionary, with all values "
"either dicts or strings, got \'{}\' instead".format(configuration_overrides))
self._nodes[name] = self.Node( self._nodes[name] = self.Node(
name=name, base_task_id=base_task_id, parents=parents or [], name=name, base_task_id=base_task_id, parents=parents or [],
queue=execution_queue, timeout=time_limit, queue=execution_queue, timeout=time_limit,
parameters=parameter_override or {}, parameters=parameter_override or {},
configurations=configuration_overrides,
clone_task=clone_base_task, clone_task=clone_base_task,
task_overrides=task_overrides, task_overrides=task_overrides,
cache_executed_step=cache_executed_step, cache_executed_step=cache_executed_step,
@ -1191,7 +1209,7 @@ class PipelineController(object):
# if we do not clone the Task, only merge the parts we can override. # if we do not clone the Task, only merge the parts we can override.
for name in list(self._nodes.keys()): for name in list(self._nodes.keys()):
if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'): if not self._nodes[name].clone_task and name in dag_dict and not dag_dict[name].get('clone_task'):
for k in ('queue', 'parents', 'timeout', 'parameters', 'task_overrides'): for k in ('queue', 'parents', 'timeout', 'parameters', 'configurations', 'task_overrides'):
setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))()) setattr(self._nodes[name], k, dag_dict[name].get(k) or type(getattr(self._nodes[name], k))())
# if we do clone the Task deserialize everything, except the function creating # if we do clone the Task deserialize everything, except the function creating
@ -1250,6 +1268,10 @@ class PipelineController(object):
pattern = self._step_ref_pattern pattern = self._step_ref_pattern
# verify original node parents
if node.parents and not all(isinstance(p, str) and p in self._nodes for p in node.parents):
raise ValueError("Node '{}', parents={} is invalid".format(node.name, node.parents))
parents = set() parents = set()
for k, v in node.parameters.items(): for k, v in node.parameters.items():
if isinstance(v, str): if isinstance(v, str):
@ -1371,7 +1393,9 @@ class PipelineController(object):
disable_clone_task = True disable_clone_task = True
node.job = self._clearml_job_class( node.job = self._clearml_job_class(
base_task_id=task_id, parameter_override=updated_hyper_parameters, base_task_id=task_id,
parameter_override=updated_hyper_parameters,
configuration_overrides=node.configurations,
tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None, tags=['pipe: {}'.format(self._task.id)] if self._add_pipeline_tags and self._task else None,
parent=self._task.id if self._task else None, parent=self._task.id if self._task else None,
disable_clone_task=disable_clone_task, disable_clone_task=disable_clone_task,
@ -1673,6 +1697,7 @@ class PipelineController(object):
print('Parameters:\n{}'.format( print('Parameters:\n{}'.format(
self._nodes[name].job.task_parameter_override if self._nodes[name].job self._nodes[name].job.task_parameter_override if self._nodes[name].job
else self._nodes[name].parameters)) else self._nodes[name].parameters))
print('Configurations:\n{}'.format(self._nodes[name].configurations))
print('Overrides:\n{}'.format(self._nodes[name].task_overrides)) print('Overrides:\n{}'.format(self._nodes[name].task_overrides))
launched_nodes.add(name) launched_nodes.add(name)
# check if node is cached do not wait for event but run the loop again # check if node is cached do not wait for event but run the loop again

View File

@ -1,4 +1,5 @@
import hashlib import hashlib
import json
import os import os
import subprocess import subprocess
import sys import sys
@ -299,8 +300,8 @@ class BaseJob(object):
cls._hashing_callback = a_function cls._hashing_callback = a_function
@classmethod @classmethod
def _create_task_hash(cls, task, section_overrides=None, params_override=None): def _create_task_hash(cls, task, section_overrides=None, params_override=None, configurations_override=None):
# type: (Task, Optional[dict], Optional[dict]) -> Optional[str] # type: (Task, Optional[dict], Optional[dict], Optional[dict]) -> Optional[str]
""" """
Create Hash (str) representing the state of the Task Create Hash (str) representing the state of the Task
@ -308,6 +309,7 @@ class BaseJob(object):
:param section_overrides: optional dict (keys are Task's section names) with task overrides. :param section_overrides: optional dict (keys are Task's section names) with task overrides.
:param params_override: Alternative to the entire Task's hyper parameters section :param params_override: Alternative to the entire Task's hyper parameters section
(notice this should not be a nested dict but a flat key/value) (notice this should not be a nested dict but a flat key/value)
:param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem)
:return: str hash of the Task configuration :return: str hash of the Task configuration
""" """
@ -328,7 +330,7 @@ class BaseJob(object):
script.pop("requirements", None) script.pop("requirements", None)
hyper_params = task.get_parameters() if params_override is None else params_override hyper_params = task.get_parameters() if params_override is None else params_override
configs = task.get_configuration_objects() configs = task.get_configuration_objects() if configurations_override is None else configurations_override
# currently we do not add the docker image to the hash (only args and setup script), # currently we do not add the docker image to the hash (only args and setup script),
# because default docker image will cause the step to change # because default docker image will cause the step to change
docker = None docker = None
@ -340,8 +342,9 @@ class BaseJob(object):
# make sure that if we only have docker args/bash, # make sure that if we only have docker args/bash,
# we use encode it, otherwise we revert to the original encoding (excluding docker altogether) # we use encode it, otherwise we revert to the original encoding (excluding docker altogether)
repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs, docker=docker) \ repr_dict = dict(script=script, hyper_params=hyper_params, configs=configs)
if docker else dict(script=script, hyper_params=hyper_params, configs=configs) if docker:
repr_dict['docker'] = docker
# callback for modifying the representation dict # callback for modifying the representation dict
if cls._hashing_callback: if cls._hashing_callback:
@ -408,6 +411,7 @@ class ClearmlJob(BaseJob):
base_task_id, # type: str base_task_id, # type: str
parameter_override=None, # type: Optional[Mapping[str, str]] parameter_override=None, # type: Optional[Mapping[str, str]]
task_overrides=None, # type: Optional[Mapping[str, str]] task_overrides=None, # type: Optional[Mapping[str, str]]
configuration_overrides=None, # type: Optional[Mapping[str, Union[str, Mapping]]]
tags=None, # type: Optional[Sequence[str]] tags=None, # type: Optional[Sequence[str]]
parent=None, # type: Optional[str] parent=None, # type: Optional[str]
disable_clone_task=False, # type: bool disable_clone_task=False, # type: bool
@ -423,6 +427,12 @@ class ClearmlJob(BaseJob):
:param dict parameter_override: dictionary of parameters and values to set fo the cloned task :param dict parameter_override: dictionary of parameters and values to set fo the cloned task
:param dict task_overrides: Task object specific overrides. :param dict task_overrides: Task object specific overrides.
for example {'script.version_num': None, 'script.branch': 'main'} for example {'script.version_num': None, 'script.branch': 'main'}
:param configuration_overrides: Optional, override Task configuration objects.
Expected dictionary of configuration object name and configuration object content.
Examples:
{'config_section': dict(key='value')}
{'config_file': 'configuration file content'}
{'OmegaConf': YAML.dumps(full_hydra_dict)}
:param list tags: additional tags to add to the newly cloned task :param list tags: additional tags to add to the newly cloned task
:param str parent: Set newly created Task parent task field, default: base_tak_id. :param str parent: Set newly created Task parent task field, default: base_tak_id.
:param dict kwargs: additional Task creation parameters :param dict kwargs: additional Task creation parameters
@ -454,6 +464,22 @@ class ClearmlJob(BaseJob):
task_params.update(parameter_override) task_params.update(parameter_override)
self.task_parameter_override = dict(**parameter_override) self.task_parameter_override = dict(**parameter_override)
task_configurations = None
if configuration_overrides:
task_configurations = deepcopy(base_temp_task.data.configuration or {})
for k, v in configuration_overrides.items():
if not isinstance(v, (str, dict)):
raise ValueError('Configuration override dictionary value must be wither str or dict, '
'got {} instead'.format(type(v)))
value = v if isinstance(v, str) else json.dumps(v)
if k in task_configurations:
task_configurations[k].value = value
else:
task_configurations[k] = tasks_service.ConfigurationItem(
name=str(k), value=value, description=None, type='json' if isinstance(v, dict) else None
)
configuration_overrides = {k: v.value for k, v in task_configurations.items()}
sections = {} sections = {}
if task_overrides: if task_overrides:
# set values inside the Task # set values inside the Task
@ -471,7 +497,11 @@ class ClearmlJob(BaseJob):
# look for a cached copy of the Task # look for a cached copy of the Task
# get parameters + task_overrides + as dict and hash it. # get parameters + task_overrides + as dict and hash it.
task_hash = self._create_task_hash( task_hash = self._create_task_hash(
base_temp_task, section_overrides=sections, params_override=task_params) base_temp_task,
section_overrides=sections,
params_override=task_params,
configurations_override=configuration_overrides or None,
)
task = self._get_cached_task(task_hash) task = self._get_cached_task(task_hash)
# if we found a task, just use # if we found a task, just use
if task: if task:
@ -510,6 +540,11 @@ class ClearmlJob(BaseJob):
if task_params: if task_params:
self.task.set_parameters(task_params) self.task.set_parameters(task_params)
# store back Task configuration object into backend
if task_configurations:
# noinspection PyProtectedMember
self.task._edit(configuration=task_configurations)
if task_overrides and sections: if task_overrides and sections:
# store back Task parameters into backend # store back Task parameters into backend
# noinspection PyProtectedMember # noinspection PyProtectedMember