Add delete_from_storage (default true) to Task.delete_artifacts()

This commit is contained in:
allegroai 2022-12-23 22:22:16 +02:00
parent 79185487a8
commit 690a4b42c4
2 changed files with 52 additions and 6 deletions

View File

@ -37,6 +37,7 @@ from ...backend_api.services import tasks, models, events, projects
from ...backend_api.session.defs import ENV_OFFLINE_MODE
from ...utilities.pyhocon import ConfigTree, ConfigFactory
from ...utilities.config import config_dict_to_text, text_to_config_dict
from ...errors import ArtifactUriDeleteError
from ..base import IdObjectBase, InterfaceBase
from ..metrics import Metrics, Reporter
@ -1505,32 +1506,55 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
self._edit(execution=execution)
return self.data.execution.artifacts or []
def delete_artifacts(self, artifact_names, raise_on_errors=True):
# type: (Sequence[str], bool) -> bool
def delete_artifacts(self, artifact_names, raise_on_errors=True, delete_from_storage=True):
# type: (Sequence[str], bool, bool) -> bool
"""
Delete a list of artifacts, by artifact name, from the Task.
:param list artifact_names: list of artifact names
:param bool raise_on_errors: if True, do not suppress connectivity related exceptions
:param bool delete_from_storage: If True try to delete the actual
file from the external storage (e.g. S3, GS, Azure, File Server etc.)
:return: True if successful
"""
return self._delete_artifacts(artifact_names, raise_on_errors)
return self._delete_artifacts(artifact_names, raise_on_errors, delete_from_storage)
def _delete_artifacts(self, artifact_names, raise_on_errors=False):
# type: (Sequence[str], bool) -> bool
def _delete_artifacts(self, artifact_names, raise_on_errors=False, delete_from_storage=True):
# type: (Sequence[str], bool, bool) -> bool
"""
Delete a list of artifacts, by artifact name, from the Task.
:param list artifact_names: list of artifact names
:param bool raise_on_errors: if True, do not suppress connectivity related exceptions
:param bool delete_from_storage: If True try to delete the actual
file from the external storage (e.g. S3, GS, Azure, File Server etc.)
:return: True if successful
"""
if not Session.check_min_api_version('2.3'):
return False
if not artifact_names:
return True
if not isinstance(artifact_names, (list, tuple)):
raise ValueError('Expected artifact names as List[str]')
uris = []
with self._edit_lock:
if delete_from_storage:
if any(a not in self.artifacts for a in artifact_names):
self.reload()
for artifact in artifact_names:
# noinspection PyBroadException
try:
uri = self.artifacts[artifact].url
except Exception:
if raise_on_errors:
raise
uri = None
uris.append(uri)
if Session.check_min_api_version("2.13") and not self._offline_mode:
req = tasks.DeleteArtifactsRequest(
task=self.task_id, artifacts=[{"key": n, "mode": "output"} for n in artifact_names], force=True)
@ -1543,7 +1567,16 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
execution = self.data.execution
execution.artifacts = [a for a in execution.artifacts or [] if a.key not in artifact_names]
self._edit(execution=execution)
return self.data.execution.artifacts or []
# check if we need to remove the actual files from an external storage, it can also be our file server
if uris:
for i, (artifact, uri) in enumerate(zip(artifact_names, uris)):
# delete the actual file from storage, and raise if error and needed
if uri and not self._delete_uri(uri) and raise_on_errors:
remaining_uris = {name: uri for name, uri in zip(artifact_names[i + 1:], uris[i + 1:])}
raise ArtifactUriDeleteError(artifact=artifact, uri=uri, remaining_uris=remaining_uris)
return True
def _set_model_design(self, design=None):
# type: (str) -> ()

View File

@ -1,3 +1,16 @@
class UsageError(RuntimeError):
""" An exception raised for illegal usage of clearml objects"""
pass
class ArtifactUriDeleteError(ValueError):
def __init__(self, artifact, uri, remaining_uris):
super(ArtifactUriDeleteError, self).__init__("Failed deleting artifact {}: file {}".format(artifact, uri))
self.artifact = artifact
self.uri = uri
self._remaining_uris = remaining_uris
@property
def remaining_uris(self):
""" Remaining URIs to delete. Deletion of these URIs was aborted due to the error. """
return self._remaining_uris