Add callback support to Task.delete()

This commit is contained in:
allegroai 2022-02-26 13:39:45 +02:00
parent a5b16db95c
commit 0cca7396e7
2 changed files with 92 additions and 54 deletions

View File

@ -632,8 +632,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
delete_artifacts_and_models=True, delete_artifacts_and_models=True,
skip_models_used_by_other_tasks=True, skip_models_used_by_other_tasks=True,
raise_on_error=False, raise_on_error=False,
callback=None,
): ):
# type: (bool, bool, bool) -> bool # type: (bool, bool, bool, Callable[[str, str], bool]) -> bool
""" """
Delete the task as well as it's output models and artifacts. Delete the task as well as it's output models and artifacts.
Models and artifacts are deleted from their storage locations, each using its URI. Models and artifacts are deleted from their storage locations, each using its URI.
@ -642,13 +643,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials
are properly configured and that you have delete permission in the related buckets). are properly configured and that you have delete permission in the related buckets).
:param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True) :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True).
If callback is provided, this argument is ignored.
:param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True) :param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True)
:param raise_on_error: If True an exception will be raised when encountering an error. :param raise_on_error: If True an exception will be raised when encountering an error.
If False an error would be printed and no exception will be raised. If False an error would be printed and no exception will be raised.
:param callback: An optional callback accepting a uri type (string) and a uri (string) that will be called
for each artifact and model. If provided, the delete_artifacts_and_models is ignored.
Return True to indicate the artifact/model should be deleted or False otherwise.
:return: True if the task was deleted successfully. :return: True if the task was deleted successfully.
""" """
try: try:
res = self.send(tasks.GetByIdRequest(self.task_id)) res = self.send(tasks.GetByIdRequest(self.task_id))
task = res.response.task task = res.response.task
@ -659,19 +663,48 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return False return False
execution = {} execution = {}
models_res = []
if delete_artifacts_and_models:
execution = task.execution.to_dict() if task.execution else {}
models_res = self.send(
models.GetAllRequest(
task=[task.id], only_fields=["id", "uri"]
)
).response.models
event_uris = list(self._get_all_events( models_res = []
event_type="training_debug_image", unique_selector=itemgetter("url"), batch_size=10000 if delete_artifacts_and_models or callback:
)) execution = task.execution.to_dict() if task.execution else {}
event_uris.extend(self._get_image_plot_uris()) models_res = self.send(models.GetAllRequest(task=[task.id], only_fields=["id", "uri"])).response.models
models_res = [
m
for m in models_res
if not callback
or callback(
"output_model" if task.output and (m.id == task.output.model) else "model",
m.uri,
)
]
event_uris = []
event_uris.extend(
[
x
for x in filter(
None,
self._get_all_events(
event_type="training_debug_image",
unique_selector=itemgetter("url"),
batch_size=10000,
),
)
if not callback or callback("debug_images", x)
]
)
event_uris.extend(
[x for x in filter(None, self._get_image_plot_uris()) if not callback or callback("image_plot", x)]
)
artifact_uris = []
if delete_artifacts_and_models or callback:
artifact_uris = [
e["uri"]
for e in execution["artifacts"]
if e["mode"] == "output" and (not callback or callback("artifact", e["uri"]))
]
task_deleted = self.send(tasks.DeleteRequest(self.task_id, force=True)) task_deleted = self.send(tasks.DeleteRequest(self.task_id, force=True))
if not task_deleted: if not task_deleted:
@ -689,45 +722,44 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return False return False
failures = [] failures = []
if delete_artifacts_and_models: for uri in artifact_uris:
for e in execution["artifacts"]: if not self._delete_uri(uri):
if e["mode"] == "output" and not self._delete_uri(e["uri"]): failures.append(uri)
failures.append(e["uri"])
for m in models_res: for m in models_res:
# noinspection PyBroadException # noinspection PyBroadException
try: try:
is_output_model = task.output and (m.id == task.output.model) is_output_model = task.output and (m.id == task.output.model)
res = self.send( res = self.send(
models.DeleteRequest(m.id, force=(not skip_models_used_by_other_tasks)), models.DeleteRequest(m.id, force=(not skip_models_used_by_other_tasks)),
ignore_errors=is_output_model ignore_errors=is_output_model,
) )
# Should delete if model was deleted or if this was the output model (which was already deleted # Should delete if model was deleted or if this was the output model (which was already deleted
# by DeleteRequest, and it's URI is dangling # by DeleteRequest, and it's URI is dangling
should_delete = is_output_model or res.response.deleted should_delete = is_output_model or res.response.deleted
except SendError as ex: except SendError as ex:
if (ex.result.meta.result_code, ex.result.meta.result_subcode) == (400, 201): if (ex.result.meta.result_code, ex.result.meta.result_subcode) == (
# Model not found, already deleted by DeleteRequest 400,
should_delete = True 201,
else: ):
failures.append("model id: {}".format(m.id)) # Model not found, already deleted by DeleteRequest
continue should_delete = True
except Exception: else:
failures.append("model id: {}".format(m.id)) failures.append("model id: {}".format(m.id))
continue continue
if should_delete and not self._delete_uri(m.uri): except Exception:
failures.append(m.uri) failures.append("model id: {}".format(m.id))
continue
if should_delete and not self._delete_uri(m.uri):
failures.append(m.uri)
event_uris = list(filter(None, event_uris))
for uri in event_uris: for uri in event_uris:
if not self._delete_uri(uri): if not self._delete_uri(uri):
failures.append(uri) failures.append(uri)
failures = list(filter(None, failures)) failures = list(filter(None, failures))
if len(failures): if len(failures):
error = "Failed deleting the following URIs:\n{}".format( error = "Failed deleting the following URIs:\n{}".format("\n".join(failures))
"\n".join(failures)
)
if raise_on_error: if raise_on_error:
raise self.DeleteError(error) raise self.DeleteError(error)
self.log.error(error) self.log.error(error)
@ -757,17 +789,12 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
try: try:
plot = json.loads(plot) plot = json.loads(plot)
return next( return next(
filter(None, (image.get("source") for image in plot.get("layout", {}).get("images", []))), filter(None, (image.get("source") for image in plot.get("layout", {}).get("images", []))), None
None
) )
except Exception: except Exception:
pass pass
return self._get_all_events( return self._get_all_events(event_type="plot", unique_selector=image_source_selector, batch_size=10000)
event_type="plot",
unique_selector=image_source_selector,
batch_size=10000
)
def update_model_desc(self, new_model_desc_file=None): def update_model_desc(self, new_model_desc_file=None):
# type: (Optional[str]) -> () # type: (Optional[str]) -> ()

View File

@ -1549,8 +1549,14 @@ class Task(_Task):
# noinspection PyProtectedMember # noinspection PyProtectedMember
Logger._remove_std_logger() Logger._remove_std_logger()
def delete(self, delete_artifacts_and_models=True, skip_models_used_by_other_tasks=True, raise_on_error=False): def delete(
# type: (bool, bool, bool) -> bool self,
delete_artifacts_and_models=True,
skip_models_used_by_other_tasks=True,
raise_on_error=False,
callback=None,
):
# type: (bool, bool, bool, Callable[[str, str], bool]) -> bool
""" """
Delete the task as well as it's output models and artifacts. Delete the task as well as it's output models and artifacts.
Models and artifacts are deleted from their storage locations, each using its URI. Models and artifacts are deleted from their storage locations, each using its URI.
@ -1559,10 +1565,14 @@ class Task(_Task):
configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials
are properly configured and that you have delete permission in the related buckets). are properly configured and that you have delete permission in the related buckets).
:param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True) :param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True).
If callback is provided, this argument is ignored.
:param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True) :param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True)
:param raise_on_error: If True an exception will be raised when encountering an error. :param raise_on_error: If True an exception will be raised when encountering an error.
If False an error would be printed and no exception will be raised. If False an error would be printed and no exception will be raised.
:param callback: An optional callback accepting a uri type (string) and a uri (string) that will be called
for each artifact and model. If provided, the delete_artifacts_and_models is ignored.
Return True to indicate the artifact/model should be deleted or False otherwise.
:return: True if the task was deleted successfully. :return: True if the task was deleted successfully.
""" """
if not running_remotely() or not self.is_main_task(): if not running_remotely() or not self.is_main_task():
@ -1570,6 +1580,7 @@ class Task(_Task):
delete_artifacts_and_models=delete_artifacts_and_models, delete_artifacts_and_models=delete_artifacts_and_models,
skip_models_used_by_other_tasks=skip_models_used_by_other_tasks, skip_models_used_by_other_tasks=skip_models_used_by_other_tasks,
raise_on_error=raise_on_error, raise_on_error=raise_on_error,
callback=callback,
) )
return False return False