From 4941ac70e0c72c213175c7233c154715c84e27fe Mon Sep 17 00:00:00 2001 From: allegroai <> Date: Tue, 17 May 2022 16:09:23 +0300 Subject: [PATCH] Add events.clear_task_log --- apiserver/apierrors/errors.conf | 3 ++ apiserver/apimodels/events.py | 6 +++ apiserver/bll/event/event_bll.py | 73 +++++++++++++++++++++++---- apiserver/schema/services/events.conf | 34 +++++++++++++ apiserver/services/events.py | 16 ++++++ 5 files changed, 121 insertions(+), 11 deletions(-) diff --git a/apiserver/apierrors/errors.conf b/apiserver/apierrors/errors.conf index b9c300d..b456f49 100644 --- a/apiserver/apierrors/errors.conf +++ b/apiserver/apierrors/errors.conf @@ -26,6 +26,9 @@ 23: ["invalid_domain_name", "malformed domain name"] 24: ["not_public_object", "object is not public"] + # Auth / Login + 75: ["invalid_access_key", "access key not found for user"] + # Tasks 100: ["task_error", "general task error"] 101: ["invalid_task_id", "invalid task id"] diff --git a/apiserver/apimodels/events.py b/apiserver/apimodels/events.py index 200059e..c0e02b6 100644 --- a/apiserver/apimodels/events.py +++ b/apiserver/apimodels/events.py @@ -141,3 +141,9 @@ class TaskPlotsRequest(Base): class ClearScrollRequest(Base): scroll_id: str = StringField() + + +class ClearTaskLogRequest(Base): + task: str = StringField(required=True) + threshold_sec = IntField() + allow_locked = BoolField(default=False) diff --git a/apiserver/bll/event/event_bll.py b/apiserver/bll/event/event_bll.py index ce808a5..3491c24 100644 --- a/apiserver/bll/event/event_bll.py +++ b/apiserver/bll/event/event_bll.py @@ -965,18 +965,23 @@ class EventBLL(object): for tb in es_res["aggregations"]["tasks"]["buckets"] } + @staticmethod + def _validate_task_state(company_id: str, task_id: str, allow_locked: bool = False): + extra_msg = None + query = Q(id=task_id, company=company_id) + if not allow_locked: + query &= Q(status__nin=LOCKED_TASK_STATUSES) + extra_msg = "or task published" + res = Task.objects(query).only("id").first() + if not res: + raise errors.bad_request.InvalidTaskId( + extra_msg, company=company_id, id=task_id + ) + def delete_task_events(self, company_id, task_id, allow_locked=False): - with translate_errors_context(): - extra_msg = None - query = Q(id=task_id, company=company_id) - if not allow_locked: - query &= Q(status__nin=LOCKED_TASK_STATUSES) - extra_msg = "or task published" - res = Task.objects(query).only("id").first() - if not res: - raise errors.bad_request.InvalidTaskId( - extra_msg, company=company_id, id=task_id - ) + self._validate_task_state( + company_id=company_id, task_id=task_id, allow_locked=allow_locked + ) es_req = {"query": {"term": {"task": task_id}}} with translate_errors_context(), TimingContext("es", "delete_task_events"): @@ -990,6 +995,52 @@ class EventBLL(object): return es_res.get("deleted", 0) + def clear_task_log( + self, + company_id: str, + task_id: str, + allow_locked: bool = False, + threshold_sec: int = None, + ): + self._validate_task_state( + company_id=company_id, task_id=task_id, allow_locked=allow_locked + ) + if check_empty_data( + self.es, company_id=company_id, event_type=EventType.task_log + ): + return 0 + + with translate_errors_context(), TimingContext("es", "clear_task_log"): + must = [{"term": {"task": task_id}}] + sort = None + if threshold_sec: + timestamp_ms = int(threshold_sec * 1000) + must.append( + { + "range": { + "timestamp": { + "lt": ( + es_factory.get_timestamp_millis() - timestamp_ms + ) + } + } + } + ) + sort = {"timestamp": {"order": "desc"}} + es_req = { + "query": {"bool": {"must": must}}, + **({"sort": sort} if sort else {}), + } + es_res = delete_company_events( + es=self.es, + company_id=company_id, + event_type=EventType.task_log, + body=es_req, + routing=task_id, + refresh=True, + ) + return es_res.get("deleted", 0) + def delete_multi_task_events(self, company_id: str, task_ids: Sequence[str]): """ Delete mutliple task events. No check is done for tasks write access diff --git a/apiserver/schema/services/events.conf b/apiserver/schema/services/events.conf index b681c01..ee7c5a7 100644 --- a/apiserver/schema/services/events.conf +++ b/apiserver/schema/services/events.conf @@ -1324,4 +1324,38 @@ clear_scroll { additionalProperties: false } } +} + +clear_task_log { + "2.19" { + description: Remove old logs from task + request { + type: object + required: [task] + properties { + task { + description: Task ID + type: string + } + allow_locked { + type: boolean + description: Allow deleting events even if the task is locked + default: false + } + threshold_sec { + description: The amount of seconds ago to retain the log records. The older log records will be deleted. If not passed or 0 then all the log records for the task will be deleted + type: integer + } + } + } + response { + type: object + properties { + deleted { + description: The number of deleted log records + type: integer + } + } + } + } } \ No newline at end of file diff --git a/apiserver/services/events.py b/apiserver/services/events.py index e2bed39..7d88d02 100644 --- a/apiserver/services/events.py +++ b/apiserver/services/events.py @@ -26,6 +26,7 @@ from apiserver.apimodels.events import ( TaskEventsRequest, ScalarMetricsIterRawRequest, ClearScrollRequest, + ClearTaskLogRequest, ) from apiserver.bll.event import EventBLL from apiserver.bll.event.event_common import EventType, MetricVariants @@ -797,6 +798,21 @@ def delete_for_task(call, company_id, req_model): ) +@endpoint("events.clear_task_log") +def clear_task_log(call: APICall, company_id: str, request: ClearTaskLogRequest): + task_id = request.task + + task_bll.assert_exists(company_id, task_id, return_tasks=False) + call.result.data = dict( + deleted=event_bll.clear_task_log( + company_id=company_id, + task_id=task_id, + allow_locked=request.allow_locked, + threshold_sec=request.threshold_sec, + ) + ) + + def _get_top_iter_unique_events_per_task(events, max_iters, tasks): key = itemgetter("metric", "variant", "task", "iter")