Fix pipeline should always use artifacts hash for checking cache hits

This commit is contained in:
allegroai 2023-08-04 19:06:15 +03:00
parent 46c6d2bf0f
commit 99c7eecbee

View File

@ -384,10 +384,9 @@ class BaseJob(object):
section_overrides=None, section_overrides=None,
params_override=None, params_override=None,
configurations_override=None, configurations_override=None,
explicit_docker_image=None, explicit_docker_image=None
account_for_artifacts_hashes=True
): ):
# type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str], bool) -> Optional[str] # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str]
""" """
Create Hash (str) representing the state of the Task Create Hash (str) representing the state of the Task
@ -398,8 +397,6 @@ class BaseJob(object):
:param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem)
:param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image :param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image
was explicitly changed was explicitly changed
:param account_for_artifacts_hashes: Calculate the hash of the task by accounting for the hashes of the
artifacts in `kwargs_artifacts` (as opposed of the task ID/artifact name stored in this section)
:return: str hash of the Task configuration :return: str hash of the Task configuration
""" """
@ -420,22 +417,21 @@ class BaseJob(object):
script.pop("requirements", None) script.pop("requirements", None)
hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override) hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override)
if account_for_artifacts_hashes: hyper_params_to_change = {}
hyper_params_to_change = {} task_cache = {}
task_cache = {} for key, value in hyper_params.items():
for key, value in hyper_params.items(): if key.startswith("kwargs_artifacts/"):
if key.startswith("kwargs_artifacts/"): # noinspection PyBroadException
# noinspection PyBroadException try:
try: # key format is <task_id>.<artifact_name>
# key format is <task_id>.<artifact_name> task_id, artifact = value.split(".", 1)
task_id, artifact = value.split(".", 1) task_ = task_cache.setdefault(task_id, Task.get_task(task_id))
task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) # set the value of the hyper parameter to the hash of the artifact
# set the value of the hyper parameter to the hash of the artifact # because the task ID might differ, but the artifact might be the same
# because the task ID might differ, but the artifact might be the same hyper_params_to_change[key] = task_.artifacts[artifact].hash
hyper_params_to_change[key] = task_.artifacts[artifact].hash except Exception:
except Exception: pass
pass hyper_params.update(hyper_params_to_change)
hyper_params.update(hyper_params_to_change)
configs = task.get_configuration_objects() if configurations_override is None else configurations_override configs = task.get_configuration_objects() if configurations_override is None else configurations_override
# currently we do not add the docker image to the hash (only args and setup script), # currently we do not add the docker image to the hash (only args and setup script),
# because default docker image will cause the step to change # because default docker image will cause the step to change
@ -604,14 +600,6 @@ class ClearmlJob(BaseJob):
if allow_caching: if allow_caching:
# look for a cached copy of the Task # look for a cached copy of the Task
# get parameters + task_overrides + as dict and hash it. # get parameters + task_overrides + as dict and hash it.
task_hash_legacy = self._create_task_hash(
base_temp_task,
section_overrides=sections,
params_override=task_params,
configurations_override=configuration_overrides or None,
explicit_docker_image=kwargs.get("explicit_docker_image"),
account_for_artifacts_hashes=False
)
task_hash = self._create_task_hash( task_hash = self._create_task_hash(
base_temp_task, base_temp_task,
section_overrides=sections, section_overrides=sections,
@ -619,7 +607,7 @@ class ClearmlJob(BaseJob):
configurations_override=configuration_overrides or None, configurations_override=configuration_overrides or None,
explicit_docker_image=kwargs.get("explicit_docker_image") explicit_docker_image=kwargs.get("explicit_docker_image")
) )
task = self._get_cached_task(task_hash_legacy) or self._get_cached_task(task_hash) task = self._get_cached_task(task_hash)
# if we found a task, just use # if we found a task, just use
if task: if task:
if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: