diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index 9c5ead6..fa2e880 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -41,7 +41,7 @@ from apiserver.bll.event.event_metrics import EventMetrics from apiserver.bll.task import TaskBLL from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context -from apiserver.database.model.task.task import Task, TaskStatus +from apiserver.database.model.task.task import TaskStatus from apiserver.redis_manager import redman from apiserver.service_repo.auth import Identity from apiserver.utilities.dicts import nested_get @@ -1149,34 +1149,6 @@ class EventBLL(object): for tb in es_res["aggregations"]["tasks"]["buckets"] } - @staticmethod - def _validate_model_state( - company_id: str, model_id: str, allow_locked: bool = False - ): - extra_msg = None - query = Q(id=model_id, company=company_id) - if not allow_locked: - query &= Q(ready__ne=True) - extra_msg = "or model published" - res = Model.objects(query).only("id").first() - if not res: - raise errors.bad_request.InvalidModelId( - extra_msg, company=company_id, id=model_id - ) - - @staticmethod - def _validate_task_state(company_id: str, task_id: str, allow_locked: bool = False): - extra_msg = None - query = Q(id=task_id, company=company_id) - if not allow_locked: - query &= Q(status__nin=LOCKED_TASK_STATUSES) - extra_msg = "or task published" - res = Task.objects(query).only("id").first() - if not res: - raise errors.bad_request.InvalidTaskId( - extra_msg, company=company_id, id=task_id - ) - @staticmethod def _get_events_deletion_params(async_delete: bool) -> dict: if async_delete: @@ -1189,51 +1161,47 @@ class EventBLL(object): return {"refresh": True} - def delete_task_events(self, company_id, task_id, allow_locked=False, model=False): - if model: - self._validate_model_state( - company_id=company_id, - model_id=task_id, - allow_locked=allow_locked, - ) - else: - self._validate_task_state( - company_id=company_id, task_id=task_id, allow_locked=allow_locked - ) - async_delete = async_task_events_delete - if async_delete: - total = self.events_iterator.count_task_events( - event_type=EventType.all, - company_id=company_id, - task_ids=[task_id], - ) - if total <= async_delete_threshold: - async_delete = False - es_req = {"query": {"term": {"task": task_id}}} + def delete_task_events(self, company_id, task_ids: Union[str, Sequence[str]], model=False): + """ + Delete task events. No check is done for tasks write access + so it should be checked by the calling code + """ + if isinstance(task_ids, str): + task_ids = [task_ids] + deleted = 0 with translate_errors_context(): - es_res = delete_company_events( - es=self.es, - company_id=company_id, - event_type=EventType.all, - body=es_req, - **self._get_events_deletion_params(async_delete), - ) + async_delete = async_task_events_delete + if async_delete and len(task_ids) < 100: + total = self.events_iterator.count_task_events( + event_type=EventType.all, + company_id=company_id, + task_ids=task_ids, + ) + if total <= async_delete_threshold: + async_delete = False + for tasks in chunked_iter(task_ids, 100): + es_req = {"query": {"terms": {"task": tasks}}} + es_res = delete_company_events( + es=self.es, + company_id=company_id, + event_type=EventType.all, + body=es_req, + **self._get_events_deletion_params(async_delete), + ) + if not async_delete: + deleted += es_res.get("deleted", 0) if not async_delete: - return es_res.get("deleted", 0) + return deleted def clear_task_log( self, company_id: str, task_id: str, - allow_locked: bool = False, threshold_sec: int = None, include_metrics: Sequence[str] = None, exclude_metrics: Sequence[str] = None, ): - self._validate_task_state( - company_id=company_id, task_id=task_id, allow_locked=allow_locked - ) if check_empty_data( self.es, company_id=company_id, event_type=EventType.task_log ): @@ -1275,39 +1243,6 @@ class EventBLL(object): ) return es_res.get("deleted", 0) - def delete_multi_task_events( - self, company_id: str, task_ids: Sequence[str], model=False - ): - """ - Delete multiple task events. No check is done for tasks write access - so it should be checked by the calling code - """ - deleted = 0 - with translate_errors_context(): - async_delete = async_task_events_delete - if async_delete and len(task_ids) < 100: - total = self.events_iterator.count_task_events( - event_type=EventType.all, - company_id=company_id, - task_ids=task_ids, - ) - if total <= async_delete_threshold: - async_delete = False - for tasks in chunked_iter(task_ids, 100): - es_req = {"query": {"terms": {"task": tasks}}} - es_res = delete_company_events( - es=self.es, - company_id=company_id, - event_type=EventType.all, - body=es_req, - **self._get_events_deletion_params(async_delete), - ) - if not async_delete: - deleted += es_res.get("deleted", 0) - - if not async_delete: - return deleted - def clear_scroll(self, scroll_id: str): if scroll_id == self.empty_scroll: return diff --git a/apiserver/bll/model/__init__.py b/apiserver/bll/model/__init__.py index 3dc1ffe..b3951f6 100644 --- a/apiserver/bll/model/__init__.py +++ b/apiserver/bll/model/__init__.py @@ -6,7 +6,6 @@ from mongoengine import Q from apiserver.apierrors import errors from apiserver.apimodels.models import ModelTaskPublishResponse from apiserver.bll.task.utils import deleted_prefix, get_last_metric_updates -from apiserver.config_repo import config from apiserver.database.model import EntityVisibility from apiserver.database.model.model import Model from apiserver.database.model.task.task import Task, TaskStatus @@ -15,8 +14,6 @@ from .metadata import Metadata class ModelBLL: - event_bll = None - @classmethod def get_company_model_by_id( cls, company_id: str, model_id: str, only_fields=None @@ -94,7 +91,7 @@ class ModelBLL: @classmethod def delete_model( - cls, model_id: str, company_id: str, user_id: str, force: bool, delete_external_artifacts: bool = True, + cls, model_id: str, company_id: str, user_id: str, force: bool ) -> Tuple[int, Model]: model = cls.get_company_model_by_id( company_id=company_id, @@ -147,34 +144,6 @@ class ModelBLL: set__last_changed_by=user_id, ) - delete_external_artifacts = delete_external_artifacts and config.get( - "services.async_urls_delete.enabled", True - ) - if delete_external_artifacts: - from apiserver.bll.task.task_cleanup import ( - collect_debug_image_urls, - collect_plot_image_urls, - _schedule_for_delete, - ) - urls = set() - urls.update(collect_debug_image_urls(company_id, model_id)) - urls.update(collect_plot_image_urls(company_id, model_id)) - if model.uri: - urls.add(model.uri) - if urls: - _schedule_for_delete( - task_id=model_id, - company=company_id, - user=user_id, - urls=urls, - can_delete_folders=False, - ) - - if not cls.event_bll: - from apiserver.bll.event import EventBLL - cls.event_bll = EventBLL() - - cls.event_bll.delete_task_events(company_id, model_id, allow_locked=True, model=True) del_count = Model.objects(id=model_id, company=company_id).delete() return del_count, model diff --git a/apiserver/bll/project/project_cleanup.py b/apiserver/bll/project/project_cleanup.py index d10fe7c..8386cc0 100644 --- a/apiserver/bll/project/project_cleanup.py +++ b/apiserver/bll/project/project_cleanup.py @@ -8,10 +8,9 @@ from mongoengine import Q from apiserver.apierrors import errors from apiserver.bll.event import EventBLL from apiserver.bll.task.task_cleanup import ( - collect_debug_image_urls, - collect_plot_image_urls, TaskUrls, - _schedule_for_delete, + schedule_for_delete, + delete_task_events_and_collect_urls, ) from apiserver.config_repo import config from apiserver.database.model import EntityVisibility @@ -192,7 +191,7 @@ def delete_project( ) event_urls = task_event_urls | model_event_urls if delete_external_artifacts: - scheduled = _schedule_for_delete( + scheduled = schedule_for_delete( task_id=project_id, company=company, user=user, @@ -206,7 +205,6 @@ def delete_project( deleted_models=deleted_models, urls=TaskUrls( model_urls=list(model_urls), - event_urls=list(event_urls), artifact_urls=list(artifact_urls), ), ) @@ -243,9 +241,6 @@ def _delete_tasks( last_changed_by=user, ) - event_urls = collect_debug_image_urls(company, task_ids) | collect_plot_image_urls( - company, task_ids - ) artifact_urls = set() for task in tasks: if task.execution and task.execution.artifacts: @@ -257,8 +252,11 @@ def _delete_tasks( } ) - event_bll.delete_multi_task_events(company, task_ids) + event_urls = delete_task_events_and_collect_urls( + company=company, task_ids=task_ids + ) deleted = tasks.delete() + return deleted, event_urls, artifact_urls @@ -317,11 +315,10 @@ def _delete_models( set__last_changed_by=user, ) - event_urls = collect_debug_image_urls(company, model_ids) | collect_plot_image_urls( - company, model_ids - ) model_urls = {m.uri for m in models if m.uri} - - event_bll.delete_multi_task_events(company, model_ids, model=True) + event_urls = delete_task_events_and_collect_urls( + company=company, task_ids=model_ids, model=True + ) deleted = models.delete() + return deleted, event_urls, model_urls diff --git a/apiserver/bll/task/task_cleanup.py b/apiserver/bll/task/task_cleanup.py index 59b73b2..57f78c5 100644 --- a/apiserver/bll/task/task_cleanup.py +++ b/apiserver/bll/task/task_cleanup.py @@ -31,8 +31,8 @@ event_bll = EventBLL() @attr.s(auto_attribs=True) class TaskUrls: model_urls: Sequence[str] - event_urls: Sequence[str] artifact_urls: Sequence[str] + event_urls: Sequence[str] = [] # left here is in order not to break the api def __add__(self, other: "TaskUrls"): if not other: @@ -40,7 +40,6 @@ class TaskUrls: return TaskUrls( model_urls=list(set(self.model_urls) | set(other.model_urls)), - event_urls=list(set(self.event_urls) | set(other.event_urls)), artifact_urls=list(set(self.artifact_urls) | set(other.artifact_urls)), ) @@ -54,8 +53,23 @@ class CleanupResult: updated_children: int updated_models: int deleted_models: int + deleted_model_ids: Set[str] urls: TaskUrls = None + def to_res_dict(self, return_file_urls: bool) -> dict: + remove_fields = ["deleted_model_ids"] + if not return_file_urls: + remove_fields.append("urls") + + # noinspection PyTypeChecker + res = attr.asdict( + self, filter=lambda attrib, value: attrib.name not in remove_fields + ) + if not return_file_urls: + res["urls"] = None + + return res + def __add__(self, other: "CleanupResult"): if not other: return self @@ -65,6 +79,16 @@ class CleanupResult: updated_models=self.updated_models + other.updated_models, deleted_models=self.deleted_models + other.deleted_models, urls=self.urls + other.urls if self.urls else other.urls, + deleted_model_ids=self.deleted_model_ids | other.deleted_model_ids + ) + + @staticmethod + def empty(): + return CleanupResult( + updated_children=0, + updated_models=0, + deleted_models=0, + deleted_model_ids=set(), ) @@ -130,7 +154,7 @@ supported_storage_types.update( ) -def _schedule_for_delete( +def schedule_for_delete( company: str, user: str, task_id: str, @@ -197,15 +221,25 @@ def _schedule_for_delete( return processed_urls +def delete_task_events_and_collect_urls( + company: str, task_ids: Sequence[str], model=False +) -> Set[str]: + event_urls = collect_debug_image_urls( + company, task_ids + ) | collect_plot_image_urls(company, task_ids) + + event_bll.delete_task_events(company, task_ids, model=model) + + return event_urls + + def cleanup_task( company: str, user: str, task: Task, force: bool = False, update_children=True, - return_file_urls=False, delete_output_models=True, - delete_external_artifacts=True, ) -> CleanupResult: """ Validate task deletion and delete/modify all its output. @@ -216,22 +250,14 @@ def cleanup_task( published_models, draft_models, in_use_model_ids = verify_task_children_and_ouptuts( task, force ) - delete_external_artifacts = delete_external_artifacts and config.get( - "services.async_urls_delete.enabled", True - ) - event_urls, artifact_urls, model_urls = set(), set(), set() - if return_file_urls or delete_external_artifacts: - event_urls = collect_debug_image_urls(task.company, task.id) - event_urls.update(collect_plot_image_urls(task.company, task.id)) - if task.execution and task.execution.artifacts: - artifact_urls = { - a.uri - for a in task.execution.artifacts.values() - if a.mode == ArtifactModes.output and a.uri - } - model_urls = { - m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids - } + artifact_urls = { + a.uri + for a in task.execution.artifacts.values() + if a.mode == ArtifactModes.output and a.uri + } if task.execution and task.execution.artifacts else {} + model_urls = { + m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids + } deleted_task_id = f"{deleted_prefix}{task.id}" updated_children = 0 @@ -245,22 +271,15 @@ def cleanup_task( deleted_models = 0 updated_models = 0 + deleted_model_ids = set() for models, allow_delete in ((draft_models, True), (published_models, False)): if not models: continue if delete_output_models and allow_delete: model_ids = list({m.id for m in models if m.id not in in_use_model_ids}) if model_ids: - if return_file_urls or delete_external_artifacts: - event_urls.update(collect_debug_image_urls(task.company, model_ids)) - event_urls.update(collect_plot_image_urls(task.company, model_ids)) - - event_bll.delete_multi_task_events( - task.company, - model_ids, - model=True, - ) deleted_models += Model.objects(id__in=model_ids).delete() + deleted_model_ids.update(model_ids) if in_use_model_ids: Model.objects(id__in=list(in_use_model_ids)).update( @@ -283,30 +302,15 @@ def cleanup_task( set__last_changed_by=user, ) - event_bll.delete_task_events(task.company, task.id, allow_locked=force) - - if delete_external_artifacts: - scheduled = _schedule_for_delete( - task_id=task.id, - company=company, - user=user, - urls=event_urls | model_urls | artifact_urls, - can_delete_folders=not in_use_model_ids and not published_models, - ) - for urls in (event_urls, model_urls, artifact_urls): - urls.difference_update(scheduled) - return CleanupResult( deleted_models=deleted_models, updated_children=updated_children, updated_models=updated_models, urls=TaskUrls( - event_urls=list(event_urls), artifact_urls=list(artifact_urls), model_urls=list(model_urls), - ) - if return_file_urls - else None, + ), + deleted_model_ids=deleted_model_ids, ) diff --git a/apiserver/bll/task/task_operations.py b/apiserver/bll/task/task_operations.py index c8a2e3c..0378f77 100644 --- a/apiserver/bll/task/task_operations.py +++ b/apiserver/bll/task/task_operations.py @@ -295,11 +295,9 @@ def delete_task( identity: Identity, move_to_trash: bool, force: bool, - return_file_urls: bool, delete_output_models: bool, status_message: str, status_reason: str, - delete_external_artifacts: bool, include_pipeline_steps: bool, ) -> Tuple[int, Task, CleanupResult]: user_id = identity.user @@ -319,7 +317,7 @@ def delete_task( current=task.status, ) - def delete_task_core(task_: Task, force_: bool): + def delete_task_core(task_: Task, force_: bool) -> CleanupResult: try: TaskBLL.dequeue_and_change_status( task_, @@ -338,9 +336,7 @@ def delete_task( user=user_id, task=task_, force=force_, - return_file_urls=return_file_urls, delete_output_models=delete_output_models, - delete_external_artifacts=delete_external_artifacts, ) if move_to_trash: @@ -354,11 +350,12 @@ def delete_task( return res task_ids = [task.id] + cleanup_res = CleanupResult.empty() if include_pipeline_steps and ( step_tasks := _get_pipeline_steps_for_controller_task(task, company_id) ): for step in step_tasks: - delete_task_core(step, True) + cleanup_res += delete_task_core(step, True) task_ids.append(step.id) cleanup_res = delete_task_core(task, force) @@ -374,10 +371,8 @@ def reset_task( company_id: str, identity: Identity, force: bool, - return_file_urls: bool, delete_output_models: bool, clear_all: bool, - delete_external_artifacts: bool, ) -> Tuple[dict, CleanupResult, dict]: user_id = identity.user task = get_task_with_write_access( @@ -404,9 +399,7 @@ def reset_task( task=task, force=force, update_children=False, - return_file_urls=return_file_urls, delete_output_models=delete_output_models, - delete_external_artifacts=delete_external_artifacts, ) updates.update( diff --git a/apiserver/services/events.py b/apiserver/services/events.py index 724a937..b52b89a 100644 --- a/apiserver/services/events.py +++ b/apiserver/services/events.py @@ -42,6 +42,7 @@ from apiserver.apimodels.events import ( LegacyMultiTaskEventsRequest, ) from apiserver.bll.event import EventBLL +from apiserver.bll.event.event_bll import LOCKED_TASK_STATUSES from apiserver.bll.event.event_common import EventType, MetricVariants, TaskCompanies from apiserver.bll.event.events_iterator import Scroll from apiserver.bll.event.scalar_key import ScalarKeyEnum, ScalarKey @@ -52,6 +53,7 @@ from apiserver.config_repo import config from apiserver.database.model.model import Model from apiserver.database.model.task.task import Task from apiserver.service_repo import APICall, endpoint +from apiserver.service_repo.auth import Identity from apiserver.utilities import json, extract_properties_to_lists task_bll = TaskBLL() @@ -1001,30 +1003,64 @@ def get_multi_task_metrics(call: APICall, company_id, request: MultiTaskMetricsR call.result.data = {"metrics": sorted(res, key=itemgetter("metric"))} +def _validate_task_for_events_update( + company_id: str, task_id: str, identity: Identity, allow_locked: bool +): + task = get_task_with_write_access( + task_id=task_id, + company_id=company_id, + identity=identity, + only=("id", "status"), + ) + if not allow_locked and task.status in LOCKED_TASK_STATUSES: + raise errors.bad_request.InvalidTaskId( + replacement_msg="Cannot update events for a published task", + company=company_id, + id=task_id, + ) + + @endpoint("events.delete_for_task") def delete_for_task(call, company_id, request: TaskRequest): task_id = request.task allow_locked = call.data.get("allow_locked", False) - get_task_with_write_access( - task_id=task_id, company_id=company_id, identity=call.identity, only=("id",) + _validate_task_for_events_update( + company_id=company_id, + task_id=task_id, + identity=call.identity, + allow_locked=allow_locked, ) + call.result.data = dict( - deleted=event_bll.delete_task_events( - company_id, task_id, allow_locked=allow_locked - ) + deleted=event_bll.delete_task_events(company_id, task_id) ) +def _validate_model_for_events_update( + company_id: str, model_id: str, allow_locked: bool +): + model = model_bll.assert_exists(company_id, model_id, only=("id", "ready"))[0] + if not allow_locked and model.ready: + raise errors.bad_request.InvalidModelId( + replacement_msg="Cannot update events for a published model", + company=company_id, + id=model_id, + ) + + @endpoint("events.delete_for_model") def delete_for_model(call: APICall, company_id: str, request: ModelRequest): model_id = request.model allow_locked = call.data.get("allow_locked", False) - model_bll.assert_exists(company_id, model_id, return_models=False) + _validate_model_for_events_update( + company_id=company_id, model_id=model_id, allow_locked=allow_locked + ) + call.result.data = dict( deleted=event_bll.delete_task_events( - company_id, model_id, allow_locked=allow_locked, model=True + company_id, model_id, model=True ) ) @@ -1033,14 +1069,17 @@ def delete_for_model(call: APICall, company_id: str, request: ModelRequest): def clear_task_log(call: APICall, company_id: str, request: ClearTaskLogRequest): task_id = request.task - get_task_with_write_access( - task_id=task_id, company_id=company_id, identity=call.identity, only=("id",) + _validate_task_for_events_update( + company_id=company_id, + task_id=task_id, + identity=call.identity, + allow_locked=request.allow_locked, ) + call.result.data = dict( deleted=event_bll.clear_task_log( company_id=company_id, task_id=task_id, - allow_locked=request.allow_locked, threshold_sec=request.threshold_sec, exclude_metrics=request.exclude_metrics, include_metrics=request.include_metrics, diff --git a/apiserver/services/models.py b/apiserver/services/models.py index b476bd6..afb3b29 100644 --- a/apiserver/services/models.py +++ b/apiserver/services/models.py @@ -27,10 +27,15 @@ from apiserver.apimodels.models import ( UpdateModelRequest, ) from apiserver.apimodels.tasks import UpdateTagsRequest +from apiserver.bll.event import EventBLL from apiserver.bll.model import ModelBLL, Metadata from apiserver.bll.organization import OrgBLL, Tags from apiserver.bll.project import ProjectBLL from apiserver.bll.task import TaskBLL +from apiserver.bll.task.task_cleanup import ( + schedule_for_delete, + delete_task_events_and_collect_urls, +) from apiserver.bll.task.task_operations import publish_task from apiserver.bll.task.utils import get_task_with_write_access from apiserver.bll.util import run_batch_operation @@ -64,6 +69,7 @@ from apiserver.services.utils import ( log = config.logger(__file__) org_bll = OrgBLL() project_bll = ProjectBLL() +event_bll = EventBLL() def conform_model_data(call: APICall, model_data: Union[Sequence[dict], dict]): @@ -555,16 +561,59 @@ def publish_many(call: APICall, company_id, request: ModelsPublishManyRequest): ) +def _delete_model_events( + company_id: str, + user_id: str, + models: Sequence[Model], + delete_external_artifacts: bool, +): + model_ids = [m.id for m in models] + delete_external_artifacts = delete_external_artifacts and config.get( + "services.async_urls_delete.enabled", True + ) + if delete_external_artifacts: + for m in models: + if not m.uri: + continue + schedule_for_delete( + task_id=m.id, + company=company_id, + user=user_id, + urls=m.uri, + can_delete_folders=False, + ) + + event_urls = delete_task_events_and_collect_urls( + company=company_id, task_ids=model_ids, model=True + ) + if event_urls: + schedule_for_delete( + task_id=model_ids[0], + company=company_id, + user=user_id, + urls=event_urls, + can_delete_folders=False, + ) + + event_bll.delete_task_events(company_id, model_ids, model=True) + + @endpoint("models.delete", request_data_model=DeleteModelRequest) def delete(call: APICall, company_id, request: DeleteModelRequest): + user_id = call.identity.user del_count, model = ModelBLL.delete_model( model_id=request.model, company_id=company_id, - user_id=call.identity.user, + user_id=user_id, force=request.force, - delete_external_artifacts=request.delete_external_artifacts, ) if del_count: + _delete_model_events( + company_id=company_id, + user_id=user_id, + models=[model], + delete_external_artifacts=request.delete_external_artifacts, + ) _reset_cached_tags( company_id, projects=[model.project] if model.project else [] ) @@ -578,26 +627,36 @@ def delete(call: APICall, company_id, request: DeleteModelRequest): response_data_model=BatchResponse, ) def delete(call: APICall, company_id, request: ModelsDeleteManyRequest): + user_id = call.identity.user + results, failures = run_batch_operation( func=partial( ModelBLL.delete_model, company_id=company_id, user_id=call.identity.user, force=request.force, - delete_external_artifacts=request.delete_external_artifacts, ), ids=request.ids, ) - if results: - projects = set(model.project for _, (_, model) in results) + succeeded = [] + deleted_models = [] + for _id, (deleted, model) in results: + succeeded.append(dict(id=_id, deleted=bool(deleted), url=model.uri)) + deleted_models.append(model) + + if deleted_models: + _delete_model_events( + company_id=company_id, + user_id=user_id, + models=deleted_models, + delete_external_artifacts=request.delete_external_artifacts, + ) + projects = set(model.project for model in deleted_models) _reset_cached_tags(company_id, projects=list(projects)) call.result.data_model = BatchResponse( - succeeded=[ - dict(id=_id, deleted=bool(deleted), url=model.uri) - for _id, (deleted, model) in results - ], + succeeded=succeeded, failed=failures, ) diff --git a/apiserver/services/pipelines.py b/apiserver/services/pipelines.py index 6791ac8..a735db2 100644 --- a/apiserver/services/pipelines.py +++ b/apiserver/services/pipelines.py @@ -1,8 +1,6 @@ import re from functools import partial -import attr - from apiserver.apierrors.errors.bad_request import CannotRemoveAllRuns from apiserver.apimodels.pipelines import ( StartPipelineRequest, @@ -18,6 +16,7 @@ from apiserver.database.model.project import Project from apiserver.database.model.task.task import Task, TaskType from apiserver.service_repo import APICall, endpoint from apiserver.utilities.dicts import nested_get +from .tasks import _delete_task_events org_bll = OrgBLL() project_bll = ProjectBLL() @@ -62,21 +61,30 @@ def delete_runs(call: APICall, company_id: str, request: DeleteRunsRequest): identity=call.identity, move_to_trash=False, force=True, - return_file_urls=False, delete_output_models=True, status_message="", status_reason="Pipeline run deleted", - delete_external_artifacts=True, include_pipeline_steps=True, ), ids=list(ids), ) succeeded = [] + tasks = {} if results: for _id, (deleted, task, cleanup_res) in results: + if deleted: + tasks[_id] = cleanup_res succeeded.append( - dict(id=_id, deleted=bool(deleted), **attr.asdict(cleanup_res)) + dict(id=_id, deleted=bool(deleted), **cleanup_res.to_res_dict(False)) + ) + + if tasks: + _delete_task_events( + company_id=company_id, + user_id=call.identity.user, + tasks=tasks, + delete_external_artifacts=True ) call.result.data = dict(succeeded=succeeded, failed=failures) diff --git a/apiserver/services/projects.py b/apiserver/services/projects.py index 5d45519..2510151 100644 --- a/apiserver/services/projects.py +++ b/apiserver/services/projects.py @@ -370,6 +370,7 @@ def delete(call: APICall, company_id: str, request: DeleteRequest): delete_external_artifacts=request.delete_external_artifacts, ) _reset_cached_tags(company_id, projects=list(affected_projects)) + # noinspection PyTypeChecker call.result.data = {**attr.asdict(res)} diff --git a/apiserver/services/tasks.py b/apiserver/services/tasks.py index 7d02662..4e3f1c1 100644 --- a/apiserver/services/tasks.py +++ b/apiserver/services/tasks.py @@ -1,9 +1,9 @@ +import itertools from copy import deepcopy from datetime import datetime from functools import partial -from typing import Sequence, Union, Tuple +from typing import Sequence, Union, Tuple, Mapping -import attr from mongoengine import EmbeddedDocument, Q from mongoengine.queryset.transform import COMPARISON_OPERATORS from pymongo import UpdateOne @@ -80,6 +80,11 @@ from apiserver.bll.task import ( TaskBLL, ChangeStatusRequest, ) +from apiserver.bll.task.task_cleanup import ( + delete_task_events_and_collect_urls, + schedule_for_delete, + CleanupResult, +) from apiserver.bll.task.artifacts import ( artifacts_prepare_for_save, artifacts_unprepare_from_saved, @@ -109,6 +114,7 @@ from apiserver.bll.task.utils import ( get_task_with_write_access, ) from apiserver.bll.util import run_batch_operation, update_project_time +from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context from apiserver.database.model import EntityVisibility from apiserver.database.model.project import Project @@ -295,9 +301,7 @@ def get_types(call: APICall, company_id, request: GetTypesRequest): } -@endpoint( - "tasks.stop", response_data_model=UpdateResponse -) +@endpoint("tasks.stop", response_data_model=UpdateResponse) def stop(call: APICall, company_id, request: StopRequest): """ stop @@ -1016,21 +1020,88 @@ def dequeue_many(call: APICall, company_id, request: DequeueManyRequest): ) +def _delete_task_events( + company_id: str, + user_id: str, + tasks: Mapping[str, CleanupResult], + delete_external_artifacts: bool, +): + task_ids = list(tasks) + deleted_model_ids = set( + itertools.chain.from_iterable( + cr.deleted_model_ids for cr in tasks.values() if cr.deleted_model_ids + ) + ) + + delete_external_artifacts = delete_external_artifacts and config.get( + "services.async_urls_delete.enabled", True + ) + if delete_external_artifacts: + for t_id, cleanup_res in tasks.items(): + urls = set(cleanup_res.urls.model_urls) | set( + cleanup_res.urls.artifact_urls + ) + if urls: + schedule_for_delete( + task_id=t_id, + company=company_id, + user=user_id, + urls=urls, + can_delete_folders=False, + ) + + event_urls = delete_task_events_and_collect_urls( + company=company_id, task_ids=task_ids + ) + if deleted_model_ids: + event_urls.update( + delete_task_events_and_collect_urls( + company=company_id, + task_ids=list(deleted_model_ids), + model=True, + ) + ) + + if event_urls: + schedule_for_delete( + task_id=task_ids[0], + company=company_id, + user=user_id, + urls=event_urls, + can_delete_folders=False, + ) + else: + event_bll.delete_task_events(company_id, task_ids) + if deleted_model_ids: + event_bll.delete_task_events( + company_id, list(deleted_model_ids), model=True + ) + + @endpoint( "tasks.reset", request_data_model=ResetRequest, response_data_model=ResetResponse ) def reset(call: APICall, company_id, request: ResetRequest): + task_id = request.task dequeued, cleanup_res, updates = reset_task( - task_id=request.task, + task_id=task_id, company_id=company_id, identity=call.identity, force=request.force, - return_file_urls=request.return_file_urls, delete_output_models=request.delete_output_models, clear_all=request.clear_all, + ) + _delete_task_events( + company_id=company_id, + user_id=call.identity.user, + tasks={task_id: cleanup_res}, delete_external_artifacts=request.delete_external_artifacts, ) - res = ResetResponse(**updates, **attr.asdict(cleanup_res), dequeued=dequeued) + res = ResetResponse( + **updates, + **cleanup_res.to_res_dict(request.return_file_urls), + dequeued=dequeued, + ) call.result.data_model = res @@ -1046,25 +1117,32 @@ def reset_many(call: APICall, company_id, request: ResetManyRequest): company_id=company_id, identity=call.identity, force=request.force, - return_file_urls=request.return_file_urls, delete_output_models=request.delete_output_models, clear_all=request.clear_all, - delete_external_artifacts=request.delete_external_artifacts, ), ids=request.ids, ) succeeded = [] + tasks = {} for _id, (dequeued, cleanup, res) in results: + tasks[_id] = cleanup succeeded.append( ResetBatchItem( id=_id, dequeued=bool(dequeued.get("removed")) if dequeued else False, - **attr.asdict(cleanup), + **cleanup.to_res_dict(request.return_file_urls), **res, ) ) + _delete_task_events( + company_id=company_id, + user_id=call.identity.user, + tasks=tasks, + delete_external_artifacts=request.delete_external_artifacts, + ) + call.result.data_model = ResetManyResponse( succeeded=succeeded, failed=failures, @@ -1160,16 +1238,22 @@ def delete(call: APICall, company_id, request: DeleteRequest): identity=call.identity, move_to_trash=request.move_to_trash, force=request.force, - return_file_urls=request.return_file_urls, delete_output_models=request.delete_output_models, status_message=request.status_message, status_reason=request.status_reason, - delete_external_artifacts=request.delete_external_artifacts, include_pipeline_steps=request.include_pipeline_steps, ) if deleted: + _delete_task_events( + company_id=company_id, + user_id=call.identity.user, + tasks={request.task: cleanup_res}, + delete_external_artifacts=request.delete_external_artifacts, + ) _reset_cached_tags(company_id, projects=[task.project] if task.project else []) - call.result.data = dict(deleted=bool(deleted), **attr.asdict(cleanup_res)) + call.result.data = dict( + deleted=bool(deleted), **cleanup_res.to_res_dict(request.return_file_urls) + ) @endpoint("tasks.delete_many", request_data_model=DeleteManyRequest) @@ -1181,25 +1265,41 @@ def delete_many(call: APICall, company_id, request: DeleteManyRequest): identity=call.identity, move_to_trash=request.move_to_trash, force=request.force, - return_file_urls=request.return_file_urls, delete_output_models=request.delete_output_models, status_message=request.status_message, status_reason=request.status_reason, - delete_external_artifacts=request.delete_external_artifacts, include_pipeline_steps=request.include_pipeline_steps, ), ids=request.ids, ) + succeeded = [] + tasks = {} if results: - projects = set(task.project for _, (_, task, _) in results) + projects = set() + for _id, (deleted, task, cleanup_res) in results: + if deleted: + projects.add(task.project) + tasks[_id] = cleanup_res + succeeded.append( + dict( + id=_id, + deleted=bool(deleted), + **cleanup_res.to_res_dict(request.return_file_urls), + ) + ) + + if tasks: + _delete_task_events( + company_id=company_id, + user_id=call.identity.user, + tasks=tasks, + delete_external_artifacts=request.delete_external_artifacts, + ) _reset_cached_tags(company_id, projects=list(projects)) call.result.data = dict( - succeeded=[ - dict(id=_id, deleted=bool(deleted), **attr.asdict(cleanup_res)) - for _id, (deleted, _, cleanup_res) in results - ], + succeeded=succeeded, failed=failures, ) diff --git a/apiserver/tests/automated/test_tasks_delete.py b/apiserver/tests/automated/test_tasks_delete.py index 18533bc..a657402 100644 --- a/apiserver/tests/automated/test_tasks_delete.py +++ b/apiserver/tests/automated/test_tasks_delete.py @@ -59,7 +59,7 @@ class TestTasksResetDelete(TestService): event_urls.update(self.send_model_events(model)) res = self.assert_delete_task(task, force=True, return_file_urls=True) self.assertEqual(set(res.urls.model_urls), draft_model_urls) - self.assertEqual(set(res.urls.event_urls), event_urls) + self.assertFalse(set(res.urls.event_urls)) # event urls are not returned anymore self.assertEqual(set(res.urls.artifact_urls), artifact_urls) def test_reset(self): @@ -84,7 +84,7 @@ class TestTasksResetDelete(TestService): ) = self.create_task_with_data() res = self.api.tasks.reset(task=task, force=True, return_file_urls=True) self.assertEqual(set(res.urls.model_urls), draft_model_urls) - self.assertEqual(set(res.urls.event_urls), event_urls) + self.assertFalse(res.urls.event_urls) # event urls are not returned anymore self.assertEqual(set(res.urls.artifact_urls), artifact_urls) def test_model_delete(self): @@ -124,7 +124,7 @@ class TestTasksResetDelete(TestService): self.assertEqual(res.disassociated_tasks, 0) self.assertEqual(res.deleted_tasks, 1) self.assertEqual(res.deleted_models, 2) - self.assertEqual(set(res.urls.event_urls), event_urls) + self.assertFalse(set(res.urls.event_urls)) # event urls are not returned anymore self.assertEqual(set(res.urls.artifact_urls), artifact_urls) with self.api.raises(errors.bad_request.InvalidTaskId): self.api.tasks.get_by_id(task=task)