Add events.clear_task_log

This commit is contained in:
allegroai 2022-05-17 16:09:23 +03:00
parent 67cd461145
commit 4941ac70e0
5 changed files with 121 additions and 11 deletions

View File

@ -26,6 +26,9 @@
23: ["invalid_domain_name", "malformed domain name"]
24: ["not_public_object", "object is not public"]
# Auth / Login
75: ["invalid_access_key", "access key not found for user"]
# Tasks
100: ["task_error", "general task error"]
101: ["invalid_task_id", "invalid task id"]

View File

@ -141,3 +141,9 @@ class TaskPlotsRequest(Base):
class ClearScrollRequest(Base):
scroll_id: str = StringField()
class ClearTaskLogRequest(Base):
task: str = StringField(required=True)
threshold_sec = IntField()
allow_locked = BoolField(default=False)

View File

@ -965,18 +965,23 @@ class EventBLL(object):
for tb in es_res["aggregations"]["tasks"]["buckets"]
}
@staticmethod
def _validate_task_state(company_id: str, task_id: str, allow_locked: bool = False):
extra_msg = None
query = Q(id=task_id, company=company_id)
if not allow_locked:
query &= Q(status__nin=LOCKED_TASK_STATUSES)
extra_msg = "or task published"
res = Task.objects(query).only("id").first()
if not res:
raise errors.bad_request.InvalidTaskId(
extra_msg, company=company_id, id=task_id
)
def delete_task_events(self, company_id, task_id, allow_locked=False):
with translate_errors_context():
extra_msg = None
query = Q(id=task_id, company=company_id)
if not allow_locked:
query &= Q(status__nin=LOCKED_TASK_STATUSES)
extra_msg = "or task published"
res = Task.objects(query).only("id").first()
if not res:
raise errors.bad_request.InvalidTaskId(
extra_msg, company=company_id, id=task_id
)
self._validate_task_state(
company_id=company_id, task_id=task_id, allow_locked=allow_locked
)
es_req = {"query": {"term": {"task": task_id}}}
with translate_errors_context(), TimingContext("es", "delete_task_events"):
@ -990,6 +995,52 @@ class EventBLL(object):
return es_res.get("deleted", 0)
def clear_task_log(
self,
company_id: str,
task_id: str,
allow_locked: bool = False,
threshold_sec: int = None,
):
self._validate_task_state(
company_id=company_id, task_id=task_id, allow_locked=allow_locked
)
if check_empty_data(
self.es, company_id=company_id, event_type=EventType.task_log
):
return 0
with translate_errors_context(), TimingContext("es", "clear_task_log"):
must = [{"term": {"task": task_id}}]
sort = None
if threshold_sec:
timestamp_ms = int(threshold_sec * 1000)
must.append(
{
"range": {
"timestamp": {
"lt": (
es_factory.get_timestamp_millis() - timestamp_ms
)
}
}
}
)
sort = {"timestamp": {"order": "desc"}}
es_req = {
"query": {"bool": {"must": must}},
**({"sort": sort} if sort else {}),
}
es_res = delete_company_events(
es=self.es,
company_id=company_id,
event_type=EventType.task_log,
body=es_req,
routing=task_id,
refresh=True,
)
return es_res.get("deleted", 0)
def delete_multi_task_events(self, company_id: str, task_ids: Sequence[str]):
"""
Delete mutliple task events. No check is done for tasks write access

View File

@ -1324,4 +1324,38 @@ clear_scroll {
additionalProperties: false
}
}
}
clear_task_log {
"2.19" {
description: Remove old logs from task
request {
type: object
required: [task]
properties {
task {
description: Task ID
type: string
}
allow_locked {
type: boolean
description: Allow deleting events even if the task is locked
default: false
}
threshold_sec {
description: The amount of seconds ago to retain the log records. The older log records will be deleted. If not passed or 0 then all the log records for the task will be deleted
type: integer
}
}
}
response {
type: object
properties {
deleted {
description: The number of deleted log records
type: integer
}
}
}
}
}

View File

@ -26,6 +26,7 @@ from apiserver.apimodels.events import (
TaskEventsRequest,
ScalarMetricsIterRawRequest,
ClearScrollRequest,
ClearTaskLogRequest,
)
from apiserver.bll.event import EventBLL
from apiserver.bll.event.event_common import EventType, MetricVariants
@ -797,6 +798,21 @@ def delete_for_task(call, company_id, req_model):
)
@endpoint("events.clear_task_log")
def clear_task_log(call: APICall, company_id: str, request: ClearTaskLogRequest):
task_id = request.task
task_bll.assert_exists(company_id, task_id, return_tasks=False)
call.result.data = dict(
deleted=event_bll.clear_task_log(
company_id=company_id,
task_id=task_id,
allow_locked=request.allow_locked,
threshold_sec=request.threshold_sec,
)
)
def _get_top_iter_unique_events_per_task(events, max_iters, tasks):
key = itemgetter("metric", "variant", "task", "iter")