clearml-server/apiserver/services/queues.py

329 lines
10 KiB
Python
Raw Normal View History

2023-05-25 16:17:40 +00:00
from typing import Union, Sequence
2022-09-29 16:20:24 +00:00
from mongoengine import Q
2021-01-05 14:28:49 +00:00
from apiserver.apimodels.base import UpdateResponse
from apiserver.apimodels.queues import (
GetDefaultResp,
CreateRequest,
DeleteRequest,
UpdateRequest,
MoveTaskRequest,
MoveTaskResponse,
TaskRequest,
QueueRequest,
GetMetricsRequest,
GetMetricsResponse,
QueueMetrics,
AddOrUpdateMetadataRequest,
DeleteMetadataRequest,
GetNextTaskRequest,
GetByIdRequest,
2022-09-29 16:20:24 +00:00
GetAllRequest,
)
from apiserver.bll.model import Metadata
2021-01-05 14:28:49 +00:00
from apiserver.bll.queue import QueueBLL
from apiserver.bll.queue.queue_bll import MOVE_FIRST, MOVE_LAST
2021-01-05 14:28:49 +00:00
from apiserver.bll.workers import WorkerBLL
2022-09-29 16:20:24 +00:00
from apiserver.config_repo import config
from apiserver.database.model.task.task import Task
2021-01-05 14:28:49 +00:00
from apiserver.service_repo import APICall, endpoint
from apiserver.services.utils import (
conform_tag_fields,
conform_output_tags,
conform_tags,
escape_metadata,
unescape_metadata,
)
2021-05-03 14:26:44 +00:00
from apiserver.utilities import extract_properties_to_lists
worker_bll = WorkerBLL()
queue_bll = QueueBLL(worker_bll)
2023-05-25 16:17:40 +00:00
def conform_queue_data(call: APICall, queue_data: Union[Sequence[dict], dict]):
conform_output_tags(call, queue_data)
unescape_metadata(call, queue_data)
@endpoint("queues.get_by_id", min_version="2.4", request_data_model=GetByIdRequest)
def get_by_id(call: APICall, company_id, request: GetByIdRequest):
queue = queue_bll.get_by_id(
company_id, request.queue, max_task_entries=request.max_task_entries
)
queue_dict = queue.to_proper_dict()
2023-05-25 16:17:40 +00:00
conform_queue_data(call, queue_dict)
call.result.data = {"queue": queue_dict}
@endpoint("queues.get_default", min_version="2.4", response_data_model=GetDefaultResp)
def get_by_id(call: APICall):
queue = queue_bll.get_default(call.identity.company)
call.result.data_model = GetDefaultResp(id=queue.id, name=queue.name)
2022-09-29 16:20:24 +00:00
def _hidden_query(data: dict) -> Q:
"""
1. Add only non-hidden queues search condition (unless specifically specified differently)
"""
hidden_tags = config.get("services.queues.hidden_tags", [])
if (
not hidden_tags
or data.get("search_hidden")
or data.get("id")
or data.get("name")
):
return Q()
return Q(system_tags__nin=hidden_tags)
@endpoint("queues.get_all_ex", min_version="2.4")
2022-09-29 16:20:24 +00:00
def get_all_ex(call: APICall, company: str, request: GetAllRequest):
conform_tag_fields(call, call.data)
2022-02-13 17:23:29 +00:00
ret_params = {}
call_data = Metadata.escape_query_parameters(call.data)
queues = queue_bll.get_queue_infos(
2022-09-29 16:20:24 +00:00
company_id=company,
query_dict=call_data,
query=_hidden_query(call_data),
2022-09-29 16:20:24 +00:00
max_task_entries=request.max_task_entries,
ret_params=ret_params,
)
2023-05-25 16:17:40 +00:00
conform_queue_data(call, queues)
2022-02-13 17:23:29 +00:00
call.result.data = {"queues": queues, **ret_params}
@endpoint("queues.get_all", min_version="2.4")
2022-09-29 16:20:24 +00:00
def get_all(call: APICall, company: str, request: GetAllRequest):
conform_tag_fields(call, call.data)
2022-02-13 17:23:29 +00:00
ret_params = {}
call_data = Metadata.escape_query_parameters(call.data)
2022-02-13 17:23:29 +00:00
queues = queue_bll.get_all(
2022-09-29 16:20:24 +00:00
company_id=company,
query_dict=call_data,
query=_hidden_query(call_data),
2022-09-29 16:20:24 +00:00
max_task_entries=request.max_task_entries,
ret_params=ret_params,
2022-02-13 17:23:29 +00:00
)
2023-05-25 16:17:40 +00:00
conform_queue_data(call, queues)
2022-02-13 17:23:29 +00:00
call.result.data = {"queues": queues, **ret_params}
@endpoint("queues.create", min_version="2.4", request_data_model=CreateRequest)
def create(call: APICall, company_id, request: CreateRequest):
tags, system_tags = conform_tags(
call, request.tags, request.system_tags, validate=True
)
queue = queue_bll.create(
company_id=company_id,
name=request.name,
tags=tags,
system_tags=system_tags,
metadata=Metadata.metadata_from_api(request.metadata),
)
call.result.data = {"id": queue.id}
@endpoint(
"queues.update",
min_version="2.4",
request_data_model=UpdateRequest,
response_data_model=UpdateResponse,
)
def update(call: APICall, company_id, req_model: UpdateRequest):
data = call.data_model_for_partial_update
conform_tag_fields(call, data, validate=True)
escape_metadata(data)
updated, fields = queue_bll.update(
company_id=company_id, queue_id=req_model.queue, **data
)
2023-05-25 16:17:40 +00:00
conform_queue_data(call, fields)
call.result.data_model = UpdateResponse(updated=updated, fields=fields)
@endpoint("queues.delete", min_version="2.4", request_data_model=DeleteRequest)
def delete(call: APICall, company_id, req_model: DeleteRequest):
queue_bll.delete(
company_id=company_id,
user_id=call.identity.user,
queue_id=req_model.queue,
force=req_model.force,
)
call.result.data = {"deleted": 1}
@endpoint("queues.add_task", min_version="2.4", request_data_model=TaskRequest)
def add_task(call: APICall, company_id, req_model: TaskRequest):
call.result.data = {
"added": queue_bll.add_task(
company_id=company_id, queue_id=req_model.queue, task_id=req_model.task
)
}
@endpoint("queues.get_next_task", request_data_model=GetNextTaskRequest)
2022-09-29 16:29:36 +00:00
def get_next_task(call: APICall, company_id, request: GetNextTaskRequest):
entry = queue_bll.get_next_task(
company_id=company_id, queue_id=request.queue, task_id=request.task
)
if entry:
data = {"entry": entry.to_proper_dict()}
2022-09-29 16:29:36 +00:00
if request.get_task_info:
task = Task.objects(id=entry.task).only("company", "user").first()
if task:
data["task_info"] = {"company": task.company, "user": task.user}
call.result.data = data
@endpoint("queues.remove_task", min_version="2.4", request_data_model=TaskRequest)
def remove_task(call: APICall, company_id, req_model: TaskRequest):
call.result.data = {
"removed": queue_bll.remove_task(
company_id=company_id, queue_id=req_model.queue, task_id=req_model.task
)
}
@endpoint(
"queues.move_task_forward",
min_version="2.4",
request_data_model=MoveTaskRequest,
response_data_model=MoveTaskResponse,
)
def move_task_forward(call: APICall, company_id, req_model: MoveTaskRequest):
call.result.data_model = MoveTaskResponse(
position=queue_bll.reposition_task(
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
move_count=-req_model.count,
)
)
@endpoint(
"queues.move_task_backward",
min_version="2.4",
request_data_model=MoveTaskRequest,
response_data_model=MoveTaskResponse,
)
def move_task_backward(call: APICall, company_id, req_model: MoveTaskRequest):
call.result.data_model = MoveTaskResponse(
position=queue_bll.reposition_task(
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
move_count=req_model.count,
)
)
@endpoint(
"queues.move_task_to_front",
min_version="2.4",
request_data_model=TaskRequest,
response_data_model=MoveTaskResponse,
)
def move_task_to_front(call: APICall, company_id, req_model: TaskRequest):
call.result.data_model = MoveTaskResponse(
position=queue_bll.reposition_task(
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
move_count=MOVE_FIRST,
)
)
@endpoint(
"queues.move_task_to_back",
min_version="2.4",
request_data_model=TaskRequest,
response_data_model=MoveTaskResponse,
)
def move_task_to_back(call: APICall, company_id, req_model: TaskRequest):
call.result.data_model = MoveTaskResponse(
position=queue_bll.reposition_task(
company_id=company_id,
queue_id=req_model.queue,
task_id=req_model.task,
move_count=MOVE_LAST,
)
)
@endpoint(
"queues.get_queue_metrics",
min_version="2.4",
request_data_model=GetMetricsRequest,
response_data_model=GetMetricsResponse,
)
def get_queue_metrics(
call: APICall, company_id, request: GetMetricsRequest
) -> GetMetricsResponse:
ret = queue_bll.metrics.get_queue_metrics(
company_id=company_id,
from_date=request.from_date,
to_date=request.to_date,
interval=request.interval,
queue_ids=request.queue_ids,
refresh=request.refresh,
)
queue_dicts = {
queue: extract_properties_to_lists(
["date", "avg_waiting_time", "queue_length"], data
)
for queue, data in ret.items()
}
return GetMetricsResponse(
queues=[
QueueMetrics(
queue=queue,
dates=data["date"],
avg_waiting_times=data["avg_waiting_time"],
queue_lengths=data["queue_length"],
)
if data
else QueueMetrics(queue=queue)
for queue, data in queue_dicts.items()
]
)
@endpoint("queues.add_or_update_metadata", min_version="2.13")
def add_or_update_metadata(
call: APICall, company_id: str, request: AddOrUpdateMetadataRequest
):
queue_id = request.queue
queue = queue_bll.get_by_id(company_id=company_id, queue_id=queue_id, only=("id",))
return {
"updated": Metadata.edit_metadata(
queue, items=request.metadata, replace_metadata=request.replace_metadata
)
}
@endpoint("queues.delete_metadata", min_version="2.13")
def delete_metadata(call: APICall, company_id: str, request: DeleteMetadataRequest):
queue_id = request.queue
queue = queue_bll.get_by_id(company_id=company_id, queue_id=queue_id, only=("id",))
return {"updated": Metadata.delete_metadata(queue, keys=request.keys)}
@endpoint("queues.peek_task", min_version="2.15")
def peek_task(call: APICall, company_id: str, request: QueueRequest):
queue_id = request.queue
queue = queue_bll.get_by_id(
company_id=company_id, queue_id=queue_id, max_task_entries=1
)
return {"task": queue.entries[0].task if queue.entries else None}
@endpoint("queues.get_num_entries", min_version="2.15")
def get_num_entries(call: APICall, company_id: str, request: QueueRequest):
return {"num": queue_bll.count_entries(company=company_id, queue_id=request.queue)}