From 52c0c4d4383e287f03516ae01e93199b61a48670 Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Fri, 8 Jul 2022 17:43:54 +0300 Subject: [PATCH] Add model task cleanup on tasks.reset --- apiserver/bll/task/task_cleanup.py | 156 +++++++++++------------------ 1 file changed, 59 insertions(+), 97 deletions(-) diff --git a/apiserver/bll/task/task_cleanup.py b/apiserver/bll/task/task_cleanup.py index df88855..c8aab83 100644 --- a/apiserver/bll/task/task_cleanup.py +++ b/apiserver/bll/task/task_cleanup.py @@ -1,10 +1,9 @@ from itertools import chain from operator import attrgetter -from typing import Sequence, Generic, Callable, Type, Iterable, TypeVar, List, Set +from typing import Sequence, Set, Tuple import attr from boltons.iterutils import partition -from mongoengine import QuerySet, Document from apiserver.apierrors import errors from apiserver.bll.event import EventBLL @@ -16,49 +15,6 @@ from apiserver.database.model.task.task import Task, TaskStatus, ArtifactModes from apiserver.timing_context import TimingContext event_bll = EventBLL() -T = TypeVar("T", bound=Document) - - -class DocumentGroup(List[T]): - """ - Operate on a list of documents as if they were a query result - """ - - def __init__(self, document_type: Type[T], documents: Iterable[T]): - super(DocumentGroup, self).__init__(documents) - self.type = document_type - - @property - def ids(self) -> Set[str]: - return {obj.id for obj in self} - - def objects(self, *args, **kwargs) -> QuerySet: - return self.type.objects(id__in=self.ids, *args, **kwargs) - - -class TaskOutputs(Generic[T]): - """ - Split task outputs of the same type by the ready state - """ - - published: DocumentGroup[T] - draft: DocumentGroup[T] - - def __init__( - self, - is_published: Callable[[T], bool], - document_type: Type[T], - children: Iterable[T], - ): - """ - :param is_published: predicate returning whether items is considered published - :param document_type: type of output - :param children: output documents - """ - self.published, self.draft = map( - lambda x: DocumentGroup(document_type, x), - partition(children, key=is_published), - ) @attr.s(auto_attribs=True) @@ -166,7 +122,9 @@ def cleanup_task( :param force: whether to delete task with published outputs :return: count of delete and modified items """ - models = verify_task_children_and_ouptuts(task, force) + published_models, draft_models, in_use_model_ids = verify_task_children_and_ouptuts( + task, force + ) event_urls, artifact_urls, model_urls = set(), set(), set() if return_file_urls: @@ -178,7 +136,9 @@ def cleanup_task( for a in task.execution.artifacts.values() if a.mode == ArtifactModes.output and a.uri } - model_urls = {m.uri for m in models.draft.objects().only("uri") if m.uri} + model_urls = { + m.uri for m in draft_models if m.uri and m.id not in in_use_model_ids + } deleted_task_id = f"{deleted_prefix}{task.id}" updated_children = 0 @@ -190,16 +150,23 @@ def cleanup_task( deleted_models = 0 updated_models = 0 - if models.draft: - if delete_output_models: - deleted_models = models.draft.objects().delete() - elif update_children: - updated_models = models.draft.objects().update(task=deleted_task_id) - else: - models.draft.objects().update(unset__task=1) + for models, allow_delete in ((draft_models, True), (published_models, False)): + if not models: + continue + if delete_output_models and allow_delete: + deleted_models += Model.objects( + id__in=[m.id for m in models if m.id not in in_use_model_ids] + ).delete() + if in_use_model_ids: + Model.objects(id__in=list(in_use_model_ids)).update(unset__task=1) + continue - if models.published and update_children: - updated_models += models.published.objects().update(task=deleted_task_id) + if update_children: + updated_models += Model.objects(id__in=[m.id for m in models]).update( + task=deleted_task_id + ) + else: + Model.objects(id__in=[m.id for m in models]).update(unset__task=1) event_bll.delete_task_events(task.company, task.id, allow_locked=force) @@ -217,7 +184,9 @@ def cleanup_task( ) -def verify_task_children_and_ouptuts(task: Task, force: bool) -> TaskOutputs[Model]: +def verify_task_children_and_ouptuts( + task, force: bool +) -> Tuple[Sequence[Model], Sequence[Model], Set[str]]: if not force: with TimingContext("mongo", "count_published_children"): published_children_count = Task.objects( @@ -230,49 +199,42 @@ def verify_task_children_and_ouptuts(task: Task, force: bool) -> TaskOutputs[Mod children=published_children_count, ) - with TimingContext("mongo", "get_task_models"): - models = TaskOutputs( - attrgetter("ready"), - Model, - Model.objects(task=task.id).only("id", "task", "ready"), + model_fields = ["id", "ready", "uri"] + published_models, draft_models = partition( + Model.objects(task=task.id).only(*model_fields), key=attrgetter("ready"), + ) + if not force and published_models: + raise errors.bad_request.TaskCannotBeDeleted( + "has output models, use force=True", + task=task.id, + models=len(published_models), ) - if not force and models.published: - raise errors.bad_request.TaskCannotBeDeleted( - "has output models, use force=True", - task=task.id, - models=len(models.published), - ) if task.models and task.models.output: - with TimingContext("mongo", "get_task_output_model"): - model_ids = [m.model for m in task.models.output] - for output_model in Model.objects(id__in=model_ids): - if output_model.ready: - if not force: - raise errors.bad_request.TaskCannotBeDeleted( - "has output model, use force=True", - task=task.id, - model=output_model.id, - ) - models.published.append(output_model) - else: - models.draft.append(output_model) + model_ids = [m.model for m in task.models.output] + for output_model in Model.objects(id__in=model_ids).only(*model_fields): + if output_model.ready: + if not force: + raise errors.bad_request.TaskCannotBeDeleted( + "has output model, use force=True", + task=task.id, + model=output_model.id, + ) + published_models.append(output_model) + else: + draft_models.append(output_model) - if models.draft: - with TimingContext("mongo", "get_execution_models"): - model_ids = models.draft.ids - dependent_tasks = Task.objects(models__input__model__in=model_ids).only( - "id", "models" + in_use_model_ids = {} + if draft_models: + model_ids = {m.id for m in draft_models} + dependent_tasks = Task.objects(models__input__model__in=list(model_ids)).only( + "id", "models" + ) + in_use_model_ids = model_ids & { + m.model + for m in chain.from_iterable( + t.models.input for t in dependent_tasks if t.models ) - input_models = { - m.model - for m in chain.from_iterable( - t.models.input for t in dependent_tasks if t.models - ) - } - if input_models: - models.draft = DocumentGroup( - Model, (m for m in models.draft if m.id not in input_models) - ) + } - return models + return published_models, draft_models, in_use_model_ids