From 99c7eecbeeb3beb91a72bebc11edd3d044c06e5e Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 4 Aug 2023 19:06:15 +0300 Subject: [PATCH] Fix pipeline should always use artifacts hash for checking cache hits --- clearml/automation/job.py | 48 +++++++++++++++------------------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/clearml/automation/job.py b/clearml/automation/job.py index 3ebd9f7b..7b62f79e 100644 --- a/clearml/automation/job.py +++ b/clearml/automation/job.py @@ -384,10 +384,9 @@ class BaseJob(object): section_overrides=None, params_override=None, configurations_override=None, - explicit_docker_image=None, - account_for_artifacts_hashes=True + explicit_docker_image=None ): - # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str], bool) -> Optional[str] + # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str] """ Create Hash (str) representing the state of the Task @@ -398,8 +397,6 @@ class BaseJob(object): :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) :param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image was explicitly changed - :param account_for_artifacts_hashes: Calculate the hash of the task by accounting for the hashes of the - artifacts in `kwargs_artifacts` (as opposed of the task ID/artifact name stored in this section) :return: str hash of the Task configuration """ @@ -420,22 +417,21 @@ class BaseJob(object): script.pop("requirements", None) hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override) - if account_for_artifacts_hashes: - hyper_params_to_change = {} - task_cache = {} - for key, value in hyper_params.items(): - if key.startswith("kwargs_artifacts/"): - # noinspection PyBroadException - try: - # key format is . - task_id, artifact = value.split(".", 1) - task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) - # set the value of the hyper parameter to the hash of the artifact - # because the task ID might differ, but the artifact might be the same - hyper_params_to_change[key] = task_.artifacts[artifact].hash - except Exception: - pass - hyper_params.update(hyper_params_to_change) + hyper_params_to_change = {} + task_cache = {} + for key, value in hyper_params.items(): + if key.startswith("kwargs_artifacts/"): + # noinspection PyBroadException + try: + # key format is . + task_id, artifact = value.split(".", 1) + task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) + # set the value of the hyper parameter to the hash of the artifact + # because the task ID might differ, but the artifact might be the same + hyper_params_to_change[key] = task_.artifacts[artifact].hash + except Exception: + pass + hyper_params.update(hyper_params_to_change) configs = task.get_configuration_objects() if configurations_override is None else configurations_override # currently we do not add the docker image to the hash (only args and setup script), # because default docker image will cause the step to change @@ -604,14 +600,6 @@ class ClearmlJob(BaseJob): if allow_caching: # look for a cached copy of the Task # get parameters + task_overrides + as dict and hash it. - task_hash_legacy = self._create_task_hash( - base_temp_task, - section_overrides=sections, - params_override=task_params, - configurations_override=configuration_overrides or None, - explicit_docker_image=kwargs.get("explicit_docker_image"), - account_for_artifacts_hashes=False - ) task_hash = self._create_task_hash( base_temp_task, section_overrides=sections, @@ -619,7 +607,7 @@ class ClearmlJob(BaseJob): configurations_override=configuration_overrides or None, explicit_docker_image=kwargs.get("explicit_docker_image") ) - task = self._get_cached_task(task_hash_legacy) or self._get_cached_task(task_hash) + task = self._get_cached_task(task_hash) # if we found a task, just use if task: if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: