From 3f882c37b952a69579e9dde0b870c3a1fa61b9d7 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Mon, 10 Jul 2023 11:18:37 +0300 Subject: [PATCH] Fix pipeline steps are not cached when the arguments come from the pipeline controller --- clearml/automation/job.py | 48 ++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 7b62f79e..3ebd9f7b 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -384,9 +384,10 @@ class BaseJob(object): section_overrides=None, params_override=None, configurations_override=None, - explicit_docker_image=None + explicit_docker_image=None, + account_for_artifacts_hashes=True ): - # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str] + # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str], bool) -> Optional[str] """ Create Hash (str) representing the state of the Task @@ -397,6 +398,8 @@ class BaseJob(object): :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) :param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image was explicitly changed + :param account_for_artifacts_hashes: Calculate the hash of the task by accounting for the hashes of the + artifacts in `kwargs_artifacts` (as opposed of the task ID/artifact name stored in this section) :return: str hash of the Task configuration """ @@ -417,21 +420,22 @@ class BaseJob(object): script.pop("requirements", None) hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override) - hyper_params_to_change = {} - task_cache = {} - for key, value in hyper_params.items(): - if key.startswith("kwargs_artifacts/"): - # noinspection PyBroadException - try: - # key format is . - task_id, artifact = value.split(".", 1) - task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) - # set the value of the hyper parameter to the hash of the artifact - # because the task ID might differ, but the artifact might be the same - hyper_params_to_change[key] = task_.artifacts[artifact].hash - except Exception: - pass - hyper_params.update(hyper_params_to_change) + if account_for_artifacts_hashes: + hyper_params_to_change = {} + task_cache = {} + for key, value in hyper_params.items(): + if key.startswith("kwargs_artifacts/"): + # noinspection PyBroadException + try: + # key format is . + task_id, artifact = value.split(".", 1) + task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) + # set the value of the hyper parameter to the hash of the artifact + # because the task ID might differ, but the artifact might be the same + hyper_params_to_change[key] = task_.artifacts[artifact].hash + except Exception: + pass + hyper_params.update(hyper_params_to_change) configs = task.get_configuration_objects() if configurations_override is None else configurations_override # currently we do not add the docker image to the hash (only args and setup script), # because default docker image will cause the step to change @@ -600,6 +604,14 @@ class ClearmlJob(BaseJob): if allow_caching: # look for a cached copy of the Task # get parameters + task_overrides + as dict and hash it. + task_hash_legacy = self._create_task_hash( + base_temp_task, + section_overrides=sections, + params_override=task_params, + configurations_override=configuration_overrides or None, + explicit_docker_image=kwargs.get("explicit_docker_image"), + account_for_artifacts_hashes=False + ) task_hash = self._create_task_hash( base_temp_task, section_overrides=sections, @@ -607,7 +619,7 @@ class ClearmlJob(BaseJob): configurations_override=configuration_overrides or None, explicit_docker_image=kwargs.get("explicit_docker_image") ) - task = self._get_cached_task(task_hash) + task = self._get_cached_task(task_hash_legacy) or self._get_cached_task(task_hash) # if we found a task, just use if task: if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: