mirror of
				https://github.com/clearml/clearml
				synced 2025-06-26 18:16:07 +00:00 
			
		
		
		
	Fix pipeline steps are not cached when the arguments come from the pipeline controller
This commit is contained in:
		
							parent
							
								
									c394046a2d
								
							
						
					
					
						commit
						3f882c37b9
					
				| @ -384,9 +384,10 @@ class BaseJob(object): | ||||
|             section_overrides=None, | ||||
|             params_override=None, | ||||
|             configurations_override=None, | ||||
|             explicit_docker_image=None | ||||
|             explicit_docker_image=None, | ||||
|             account_for_artifacts_hashes=True | ||||
|     ): | ||||
|         # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str]) -> Optional[str] | ||||
|         # type: (Task, Optional[dict], Optional[dict], Optional[dict], Optional[str], bool) -> Optional[str] | ||||
|         """ | ||||
|         Create Hash (str) representing the state of the Task | ||||
| 
 | ||||
| @ -397,6 +398,8 @@ class BaseJob(object): | ||||
|         :param configurations_override: dictionary of configuration override objects (tasks.ConfigurationItem) | ||||
|         :param explicit_docker_image: The explicit docker image. Used to invalidate the hash when the docker image | ||||
|             was explicitly changed | ||||
|         :param account_for_artifacts_hashes: Calculate the hash of the task by accounting for the hashes of the | ||||
|             artifacts in `kwargs_artifacts` (as opposed of the task ID/artifact name stored in this section) | ||||
| 
 | ||||
|         :return: str hash of the Task configuration | ||||
|         """ | ||||
| @ -417,21 +420,22 @@ class BaseJob(object): | ||||
|         script.pop("requirements", None) | ||||
| 
 | ||||
|         hyper_params = deepcopy(task.get_parameters() if params_override is None else params_override) | ||||
|         hyper_params_to_change = {} | ||||
|         task_cache = {} | ||||
|         for key, value in hyper_params.items(): | ||||
|             if key.startswith("kwargs_artifacts/"): | ||||
|                 # noinspection PyBroadException | ||||
|                 try: | ||||
|                     # key format is <task_id>.<artifact_name> | ||||
|                     task_id, artifact = value.split(".", 1) | ||||
|                     task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) | ||||
|                     # set the value of the hyper parameter to the hash of the artifact | ||||
|                     # because the task ID might differ, but the artifact might be the same | ||||
|                     hyper_params_to_change[key] = task_.artifacts[artifact].hash | ||||
|                 except Exception: | ||||
|                     pass | ||||
|         hyper_params.update(hyper_params_to_change) | ||||
|         if account_for_artifacts_hashes: | ||||
|             hyper_params_to_change = {} | ||||
|             task_cache = {} | ||||
|             for key, value in hyper_params.items(): | ||||
|                 if key.startswith("kwargs_artifacts/"): | ||||
|                     # noinspection PyBroadException | ||||
|                     try: | ||||
|                         # key format is <task_id>.<artifact_name> | ||||
|                         task_id, artifact = value.split(".", 1) | ||||
|                         task_ = task_cache.setdefault(task_id, Task.get_task(task_id)) | ||||
|                         # set the value of the hyper parameter to the hash of the artifact | ||||
|                         # because the task ID might differ, but the artifact might be the same | ||||
|                         hyper_params_to_change[key] = task_.artifacts[artifact].hash | ||||
|                     except Exception: | ||||
|                         pass | ||||
|             hyper_params.update(hyper_params_to_change) | ||||
|         configs = task.get_configuration_objects() if configurations_override is None else configurations_override | ||||
|         # currently we do not add the docker image to the hash (only args and setup script), | ||||
|         # because default docker image will cause the step to change | ||||
| @ -600,6 +604,14 @@ class ClearmlJob(BaseJob): | ||||
|         if allow_caching: | ||||
|             # look for a cached copy of the Task | ||||
|             # get parameters + task_overrides + as dict and hash it. | ||||
|             task_hash_legacy = self._create_task_hash( | ||||
|                 base_temp_task, | ||||
|                 section_overrides=sections, | ||||
|                 params_override=task_params, | ||||
|                 configurations_override=configuration_overrides or None, | ||||
|                 explicit_docker_image=kwargs.get("explicit_docker_image"), | ||||
|                 account_for_artifacts_hashes=False | ||||
|             ) | ||||
|             task_hash = self._create_task_hash( | ||||
|                 base_temp_task, | ||||
|                 section_overrides=sections, | ||||
| @ -607,7 +619,7 @@ class ClearmlJob(BaseJob): | ||||
|                 configurations_override=configuration_overrides or None, | ||||
|                 explicit_docker_image=kwargs.get("explicit_docker_image") | ||||
|             ) | ||||
|             task = self._get_cached_task(task_hash) | ||||
|             task = self._get_cached_task(task_hash_legacy) or self._get_cached_task(task_hash) | ||||
|             # if we found a task, just use | ||||
|             if task: | ||||
|                 if disable_clone_task and self.task and self.task.status == self.task.TaskStatusEnum.created: | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 allegroai
						allegroai