Add Task.delete() support

This commit is contained in:
allegroai 2021-01-10 12:46:02 +02:00
parent 3a7cf8af15
commit 3c00453bd4
3 changed files with 210 additions and 12 deletions

View File

@ -8,9 +8,10 @@ import sys
from copy import copy
from enum import Enum
from multiprocessing import RLock
from operator import itemgetter
from tempfile import gettempdir
from threading import Thread
from typing import Optional, Any, Sequence, Callable, Mapping, Union, List
from typing import Optional, Any, Sequence, Callable, Mapping, Union, List, Set
from uuid import uuid4
from pathlib2 import Path
@ -30,6 +31,7 @@ from ...utilities.attrs import readonly
from ...utilities.proxy_object import verify_basic_type
from ...binding.artifacts import Artifacts
from ...backend_interface.task.development.worker import DevWorker
from ...backend_interface.session import SendError
from ...backend_api import Session
from ...backend_api.services import tasks, models, events, projects
from ...backend_api.session.defs import ENV_OFFLINE_MODE
@ -107,6 +109,9 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
completed = "completed"
unknown = "unknown"
class DeleteError(Exception):
pass
def __init__(self, session=None, task_id=None, log=None, project_name=None,
task_name=None, task_type=TaskTypes.training, log_to_backend=True,
raise_on_validation_errors=True, force_create=False):
@ -579,6 +584,146 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
assert isinstance(resp.response, tasks.PublishResponse)
return resp
def _delete(
self,
delete_artifacts_and_models=True,
skip_models_used_by_other_tasks=True,
raise_on_error=False,
):
# type: (bool, bool, bool) -> bool
"""
Delete the task as well as it's output models and artifacts.
Models and artifacts are deleted from their storage locations, each using its URI.
Note: in order to delete models and artifacts using their URI, make sure the proper storage credentials are
configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials
are properly configured and that you have delete permission in the related buckets).
:param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True)
:param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True)
:param raise_on_error: If True an exception will be raised when encountering an error.
If False an error would be printed and no exception will be raised.
:return: True if the task was deleted successfully.
"""
try:
res = self.send(tasks.GetByIdRequest(self.task_id))
task = res.response.task
if task.status == Task.TaskStatusEnum.published:
if raise_on_error:
raise self.DeleteError("Cannot delete published task {}".format(self.task_id))
self.log.error("Cannot delete published task {}".format(self.task_id))
return False
execution = {}
models_res = []
if delete_artifacts_and_models:
execution = task.execution.to_dict() if task.execution else {}
models_res = self.send(
models.GetAllRequest(
task=[task.id], only_fields=["id", "uri"]
)
).response.models
event_uris = list(self._get_all_events(
event_type="training_debug_image", unique_selector=itemgetter("url"), batch_size=10000
))
event_uris.extend(self._get_image_plot_uris())
task_deleted = self.send(tasks.DeleteRequest(self.task_id, force=True))
if not task_deleted:
if raise_on_error:
raise self.DeleteError("Failed deleting task {}".format(self.task_id))
self.log.error("Failed deleting task {}".format(self.task_id))
return False
except self.DeleteError:
raise
except Exception as ex:
if raise_on_error:
raise self.DeleteError("Task deletion failed: {}".format(ex))
self.log.error("Task deletion failed: {}".format(ex))
return False
failures = []
if delete_artifacts_and_models:
for e in execution["artifacts"]:
if e["mode"] == "output" and not self._delete_uri(e["uri"]):
failures.append(e["uri"])
for m in models_res:
# noinspection PyBroadException
try:
is_output_model = task.output and (m.id == task.output.model)
res = self.send(
models.DeleteRequest(m.id, force=(not skip_models_used_by_other_tasks)),
ignore_errors=is_output_model
)
# Should delete if model was deleted or if this was the output model (which was already deleted
# by DeleteRequest, and it's URI is dangling
should_delete = is_output_model or res.response.deleted
except SendError as ex:
if (ex.result.meta.result_code, ex.result.meta.result_subcode) == (400, 201):
# Model not found, already deleted by DeleteRequest
should_delete = True
else:
failures.append("model id: {}".format(m.id))
continue
except Exception as ex:
failures.append("model id: {}".format(m.id))
continue
if should_delete and not self._delete_uri(m.uri):
failures.append(m.uri)
for uri in event_uris:
if not self._delete_uri(uri):
failures.append(uri)
if len(failures):
error = "Failed deleting the following URIs:\n{}".format(
"\n".join(failures)
)
if raise_on_error:
raise self.DeleteError(error)
self.log.error(error)
return task_deleted
def _delete_uri(self, uri):
# type: (str) -> bool
# noinspection PyBroadException
try:
deleted = StorageHelper.get(uri).delete(uri)
if deleted:
self.log.debug("Deleted file: {}".format(uri))
return True
except Exception as ex:
self.log.error("Failed deleting {}: {}".format(uri, str(ex)))
return False
return False
def _get_image_plot_uris(self):
# type: () -> Set[str]
def image_source_selector(d):
plot = d.get("plot_str")
if plot:
# noinspection PyBroadException
try:
plot = json.loads(plot)
return next(
filter(None, (image.get("source") for image in plot.get("layout", {}).get("images", []))),
None
)
except Exception:
pass
return self._get_all_events(
event_type="plot",
unique_selector=image_source_selector,
batch_size=10000
)
def update_model_desc(self, new_model_desc_file=None):
# type: (Optional[str]) -> ()
""" Change the Task's model description. """
@ -1828,8 +1973,10 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
return None
return res.response.project.name
def _get_all_events(self, max_events=100):
# type: (int) -> Any
def _get_all_events(
self, max_events=100, batch_size=500, order='asc', event_type=None, unique_selector=itemgetter("url")
):
# type: (int, int, str, str, Callable[[dict], Any]) -> Union[List[Any], Set[Any]]
"""
Get a list of all reported events.
@ -1837,28 +1984,46 @@ class Task(IdObjectBase, AccessMixin, SetupUploadMixin):
:param max_events: The maximum events the function will return. Pass None
to return all the reported events.
:return: A list of events from the task.
:param batch_size: The maximum number of events retrieved by each internal call performed by this method.
:param order: Events order (by timestamp) - "asc" for ascending, "desc" for descending.
:param event_type: Event type. Pass None to get all event types.
:param unique_selector: If provided, used to select a value from each event, only a unique set of these
values will be returned by this method.
:return: A list of events from the task. If unique_selector was provided, a set of values selected from events
of the task.
"""
batch_size = max_events or batch_size
log_events = self.send(events.GetTaskEventsRequest(
task=self.id,
order='asc',
batch_size=max_events,
order=order,
batch_size=batch_size,
event_type=event_type,
))
events_list = log_events.response.events
returned_count = log_events.response.returned
total_events = log_events.response.total
scroll = log_events.response.scroll_id
if unique_selector:
events_list = set(map(unique_selector, log_events.response.events))
else:
events_list = log_events.response.events
while len(events_list) < total_events and (max_events is None or len(events_list) < max_events):
while returned_count < total_events and (max_events is None or len(events_list) < max_events):
log_events = self.send(events.GetTaskEventsRequest(
task=self.id,
order='asc',
batch_size=max_events,
order=order,
batch_size=batch_size,
event_type=event_type,
scroll_id=scroll,
))
events_list.extend(log_events.response.events)
scroll = log_events.response.scroll_id
returned_count += log_events.response.returned
if unique_selector:
events_list.update(log_events.response.events)
else:
events_list.extend(log_events.response.events)
return events_list

View File

@ -1067,7 +1067,7 @@ class _HttpDriver(_Driver):
res = container.session.delete(obj.url, headers=container.get_headers(obj.url))
if res.status_code != requests.codes.ok:
raise ValueError('Failed deleting object %s (%d): %s' % (obj.object_name, res.status_code, res.text))
return res
return True
def get_object(self, container_name, object_name, *args, **kwargs):
is_stream = kwargs.get('stream', True)
@ -1325,7 +1325,14 @@ class _Boto3Driver(_Driver):
yield self.ListResult(name=res.key)
def delete_object(self, object, **kwargs):
from botocore.exceptions import ClientError
object.delete()
try:
# Try loading the file to verify deletion
object.load()
return False
except ClientError as e:
return int(e.response['Error']['Code']) == 404
def get_object(self, container_name, object_name, *args, **kwargs):
full_container_name = 's3://' + container_name
@ -1536,6 +1543,7 @@ class _GoogleCloudStorageDriver(_Driver):
def delete_object(self, object, **kwargs):
object.delete()
return not object.exists()
def get_object(self, container_name, object_name, *args, **kwargs):
full_container_name = str(furl(scheme=self.scheme, netloc=container_name))
@ -1683,6 +1691,7 @@ class _AzureBlobServiceStorageDriver(_Driver):
container.name,
object.blob_name,
)
return object.container.blob_service.exists(container.name, object.blob_name)
def get_object(self, container_name, object_name, *args, **kwargs):
container = self._containers.get(container_name)

View File

@ -1314,6 +1314,30 @@ class Task(_Task):
if is_main:
self.__register_at_exit(None)
def delete(self, delete_artifacts_and_models=True, skip_models_used_by_other_tasks=True, raise_on_error=False):
# type: (bool, bool, bool) -> bool
"""
Delete the task as well as it's output models and artifacts.
Models and artifacts are deleted from their storage locations, each using its URI.
Note: in order to delete models and artifacts using their URI, make sure the proper storage credentials are
configured in your configuration file (e.g. if an artifact is stored in S3, make sure sdk.aws.s3.credentials
are properly configured and that you have delete permission in the related buckets).
:param delete_artifacts_and_models: If True, artifacts and models would also be deleted (default True)
:param skip_models_used_by_other_tasks: If True, models used by other tasks would not be deleted (default True)
:param raise_on_error: If True an exception will be raised when encountering an error.
If False an error would be printed and no exception will be raised.
:return: True if the task was deleted successfully.
"""
if not running_remotely() or not self.is_main_task():
return super(Task, self)._delete(
delete_artifacts_and_models=delete_artifacts_and_models,
skip_models_used_by_other_tasks=skip_models_used_by_other_tasks,
raise_on_error=raise_on_error,
)
return False
def register_artifact(self, name, artifact, metadata=None, uniqueness_columns=True):
# type: (str, pandas.DataFrame, Dict, Union[bool, Sequence[str]]) -> None
"""