From 0cca7396e7b81c967012f7b0853b915c605fd39b Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Sat, 26 Feb 2022 13:39:45 +0200 Subject: [PATCH] Add callback support to Task.delete() --- clearml/backend_interface/task/task.py | 129 +++++++++++++++---------- clearml/task.py | 17 +++- 2 files changed, 92 insertions(+), 54 deletions(-) diff --git a/clearml/backend_interface/task/task.py b/clearml/backend_interface/task/task.py index 6c802e2f..4e564041 100644 --- a/clearml/backend_interface/task/task.py +++ b/clearml/backend_interface/task/task.py @@ -632,8 +632,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): delete_artifacts_and_models=True, skip_models_used_by_other_tasks=True, raise_on_error=False, + callback=None, ): - # type: (bool, bool, bool) -> bool + # type: (bool, bool, bool, Callable[[str, str], bool]) -> bool """ Delete the task as well as it's output models and artifacts. Models and artifacts are deleted from their storage locations, each using its URI. @@ -642,13 +643,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials are properly configured and that you have delete permission in the related buckets). - :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True) + :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True). + If callback is provided, this argument is ignored. :param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True) :param raise_on_error: If True an exception will be raised when encountering an error. If False an error would be printed and no exception will be raised. + :param callback: An optional callback accepting a uri type (string) and a uri (string) that will be called + for each artifact and model. If provided, the delete_artifacts_and_models is ignored. + Return True to indicate the artifact/model should be deleted or False otherwise. :return: True if the task was deleted successfully. """ - try: res = self.send(tasks.GetByIdRequest(self.task_id)) task = res.response.task @@ -659,19 +663,48 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): return False execution = {} - models_res = [] - if delete_artifacts_and_models: - execution = task.execution.to_dict() if task.execution else {} - models_res = self.send( - models.GetAllRequest( - task=[task.id], only_fields=["id", "uri"] - ) - ).response.models - event_uris = list(self._get_all_events( - event_type="training_debug_image", unique_selector=itemgetter("url"), batch_size=10000 - )) - event_uris.extend(self._get_image_plot_uris()) + models_res = [] + if delete_artifacts_and_models or callback: + execution = task.execution.to_dict() if task.execution else {} + models_res = self.send(models.GetAllRequest(task=[task.id], only_fields=["id", "uri"])).response.models + models_res = [ + m + for m in models_res + if not callback + or callback( + "output_model" if task.output and (m.id == task.output.model) else "model", + m.uri, + ) + ] + + event_uris = [] + event_uris.extend( + [ + x + for x in filter( + None, + self._get_all_events( + event_type="training_debug_image", + unique_selector=itemgetter("url"), + batch_size=10000, + ), + ) + if not callback or callback("debug_images", x) + ] + ) + + event_uris.extend( + [x for x in filter(None, self._get_image_plot_uris()) if not callback or callback("image_plot", x)] + ) + + artifact_uris = [] + if delete_artifacts_and_models or callback: + artifact_uris = [ + e["uri"] + for e in execution["artifacts"] + if e["mode"] == "output" and (not callback or callback("artifact", e["uri"])) + ] task_deleted = self.send(tasks.DeleteRequest(self.task_id, force=True)) if not task_deleted: @@ -689,45 +722,44 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): return False failures = [] - if delete_artifacts_and_models: - for e in execution["artifacts"]: - if e["mode"] == "output" and not self._delete_uri(e["uri"]): - failures.append(e["uri"]) + for uri in artifact_uris: + if not self._delete_uri(uri): + failures.append(uri) - for m in models_res: - # noinspection PyBroadException - try: - is_output_model = task.output and (m.id == task.output.model) - res = self.send( - models.DeleteRequest(m.id, force=(not skip_models_used_by_other_tasks)), - ignore_errors=is_output_model - ) - # Should delete if model was deleted or if this was the output model (which was already deleted - # by DeleteRequest, and it's URI is dangling - should_delete = is_output_model or res.response.deleted - except SendError as ex: - if (ex.result.meta.result_code, ex.result.meta.result_subcode) == (400, 201): - # Model not found, already deleted by DeleteRequest - should_delete = True - else: - failures.append("model id: {}".format(m.id)) - continue - except Exception: + for m in models_res: + # noinspection PyBroadException + try: + is_output_model = task.output and (m.id == task.output.model) + res = self.send( + models.DeleteRequest(m.id, force=(not skip_models_used_by_other_tasks)), + ignore_errors=is_output_model, + ) + # Should delete if model was deleted or if this was the output model (which was already deleted + # by DeleteRequest, and it's URI is dangling + should_delete = is_output_model or res.response.deleted + except SendError as ex: + if (ex.result.meta.result_code, ex.result.meta.result_subcode) == ( + 400, + 201, + ): + # Model not found, already deleted by DeleteRequest + should_delete = True + else: failures.append("model id: {}".format(m.id)) continue - if should_delete and not self._delete_uri(m.uri): - failures.append(m.uri) + except Exception: + failures.append("model id: {}".format(m.id)) + continue + if should_delete and not self._delete_uri(m.uri): + failures.append(m.uri) - event_uris = list(filter(None, event_uris)) for uri in event_uris: if not self._delete_uri(uri): failures.append(uri) failures = list(filter(None, failures)) if len(failures): - error = "Failed deleting the following URIs:\n{}".format( - "\n".join(failures) - ) + error = "Failed deleting the following URIs:\n{}".format("\n".join(failures)) if raise_on_error: raise self.DeleteError(error) self.log.error(error) @@ -757,17 +789,12 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin): try: plot = json.loads(plot) return next( - filter(None, (image.get("source") for image in plot.get("layout", {}).get("images", []))), - None + filter(None, (image.get("source") for image in plot.get("layout", {}).get("images", []))), None ) except Exception: pass - return self._get_all_events( - event_type="plot", - unique_selector=image_source_selector, - batch_size=10000 - ) + return self._get_all_events(event_type="plot", unique_selector=image_source_selector, batch_size=10000) def update_model_desc(self, new_model_desc_file=None): # type: (Optional[str]) -> () diff --git a/clearml/task.py b/clearml/task.py index 3d9fca69..14adb97f 100644 --- a/clearml/task.py +++ b/clearml/task.py @@ -1549,8 +1549,14 @@ class Task(_Task): # noinspection PyProtectedMember Logger._remove_std_logger() - def delete(self, delete_artifacts_and_models=True, skip_models_used_by_other_tasks=True, raise_on_error=False): - # type: (bool, bool, bool) -> bool + def delete( + self, + delete_artifacts_and_models=True, + skip_models_used_by_other_tasks=True, + raise_on_error=False, + callback=None, + ): + # type: (bool, bool, bool, Callable[[str, str], bool]) -> bool """ Delete the task as well as it's output models and artifacts. Models and artifacts are deleted from their storage locations, each using its URI. @@ -1559,10 +1565,14 @@ class Task(_Task): configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials are properly configured and that you have delete permission in the related buckets). - :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True) + :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True). + If callback is provided, this argument is ignored. :param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True) :param raise_on_error: If True an exception will be raised when encountering an error. If False an error would be printed and no exception will be raised. + :param callback: An optional callback accepting a uri type (string) and a uri (string) that will be called + for each artifact and model. If provided, the delete_artifacts_and_models is ignored. + Return True to indicate the artifact/model should be deleted or False otherwise. :return: True if the task was deleted successfully. """ if not running_remotely() or not self.is_main_task(): @@ -1570,6 +1580,7 @@ class Task(_Task): delete_artifacts_and_models=delete_artifacts_and_models, skip_models_used_by_other_tasks=skip_models_used_by_other_tasks, raise_on_error=raise_on_error, + callback=callback, ) return False