mirror of
https://github.com/clearml/clearml-server
synced 2025-02-12 15:50:46 +00:00
baba8b5b73
Add initial support for project ordering Add support for sortable task duration (used by the UI in the experiment's table) Add support for project name in worker's current task info Add support for results and artifacts in pre-populates examples Add demo server features
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
from typing import Optional, Tuple, Sequence
|
|
|
|
import attr
|
|
from elasticsearch import Elasticsearch
|
|
|
|
from bll.event.event_metrics import EventMetrics
|
|
from database.errors import translate_errors_context
|
|
from timing_context import TimingContext
|
|
|
|
|
|
@attr.s(auto_attribs=True)
|
|
class TaskEventsResult:
|
|
total_events: int = 0
|
|
next_scroll_id: str = None
|
|
events: list = attr.Factory(list)
|
|
|
|
|
|
class LogEventsIterator:
|
|
EVENT_TYPE = "log"
|
|
|
|
def __init__(self, es: Elasticsearch):
|
|
self.es = es
|
|
|
|
def get_task_events(
|
|
self,
|
|
company_id: str,
|
|
task_id: str,
|
|
batch_size: int,
|
|
navigate_earlier: bool = True,
|
|
from_timestamp: Optional[int] = None,
|
|
) -> TaskEventsResult:
|
|
es_index = EventMetrics.get_index_name(company_id, self.EVENT_TYPE)
|
|
if not self.es.indices.exists(es_index):
|
|
return TaskEventsResult()
|
|
|
|
res = TaskEventsResult()
|
|
res.events, res.total_events = self._get_events(
|
|
es_index=es_index,
|
|
task_id=task_id,
|
|
batch_size=batch_size,
|
|
navigate_earlier=navigate_earlier,
|
|
from_timestamp=from_timestamp,
|
|
)
|
|
return res
|
|
|
|
def _get_events(
|
|
self,
|
|
es_index,
|
|
task_id: str,
|
|
batch_size: int,
|
|
navigate_earlier: bool,
|
|
from_timestamp: Optional[int],
|
|
) -> Tuple[Sequence[dict], int]:
|
|
"""
|
|
Return up to 'batch size' events starting from the previous timestamp either in the
|
|
direction of earlier events (navigate_earlier=True) or in the direction of later events.
|
|
If last_min_timestamp and last_max_timestamp are not set then start either from latest or earliest.
|
|
For the last timestamp all the events are brought (even if the resulting size
|
|
exceeds batch_size) so that this timestamp events will not be lost between the calls.
|
|
In case any events were received update 'last_min_timestamp' and 'last_max_timestamp'
|
|
"""
|
|
|
|
# retrieve the next batch of events
|
|
es_req = {
|
|
"size": batch_size,
|
|
"query": {"term": {"task": task_id}},
|
|
"sort": {"timestamp": "desc" if navigate_earlier else "asc"},
|
|
}
|
|
|
|
if from_timestamp:
|
|
es_req["search_after"] = [from_timestamp]
|
|
|
|
with translate_errors_context(), TimingContext("es", "get_task_events"):
|
|
es_result = self.es.search(index=es_index, body=es_req)
|
|
hits = es_result["hits"]["hits"]
|
|
hits_total = es_result["hits"]["total"]["value"]
|
|
if not hits:
|
|
return [], hits_total
|
|
|
|
events = [hit["_source"] for hit in hits]
|
|
|
|
# retrieve the events that match the last event timestamp
|
|
# but did not make it into the previous call due to batch_size limitation
|
|
es_req = {
|
|
"size": 10000,
|
|
"query": {
|
|
"bool": {
|
|
"must": [
|
|
{"term": {"task": task_id}},
|
|
{"term": {"timestamp": events[-1]["timestamp"]}},
|
|
]
|
|
}
|
|
},
|
|
}
|
|
es_result = self.es.search(index=es_index, body=es_req)
|
|
hits = es_result["hits"]["hits"]
|
|
if not hits or len(hits) < 2:
|
|
# if only one element is returned for the last timestamp
|
|
# then it is already present in the events
|
|
return events, hits_total
|
|
|
|
last_events = [hit["_source"] for hit in es_result["hits"]["hits"]]
|
|
already_present_ids = set(ev["_id"] for ev in events)
|
|
|
|
# return the list merged from original query results +
|
|
# leftovers from the last timestamp
|
|
return (
|
|
[
|
|
*events,
|
|
*(ev for ev in last_events if ev["_id"] not in already_present_ids),
|
|
],
|
|
hits_total,
|
|
)
|