Add API version 2.4 with new trains-server capabilities including DevOps and scheduling

This commit is contained in:
allegroai
2019-10-25 15:36:58 +03:00
parent 2ea25e498f
commit 1a732ccd8e
54 changed files with 4964 additions and 341 deletions

View File

@@ -1,12 +1,12 @@
import re
from collections import namedtuple
from functools import reduce
from typing import Collection, Sequence
from typing import Collection, Sequence, Union
from boltons.iterutils import first
from dateutil.parser import parse as parse_datetime
from mongoengine import Q, Document
from six import string_types
from mongoengine import Q, Document, ListField, StringField
from pymongo.command_cursor import CommandCursor
from apierrors import errors
from config import config
@@ -16,9 +16,10 @@ from database.props import PropsMixin
from database.query import RegexQ, RegexWrapper
from database.utils import (
get_company_or_none_constraint,
get_fields_with_attr,
field_exists,
get_fields_choices,
field_does_not_exist,
field_exists,
get_fields,
)
log = config.logger("dbmodel")
@@ -62,6 +63,7 @@ class GetMixin(PropsMixin):
_text_score = "$text_score"
_ordering_key = "order_by"
_search_text_key = "search_text"
_multi_field_param_sep = "__"
_multi_field_param_prefix = {
@@ -221,6 +223,24 @@ class GetMixin(PropsMixin):
return get_company_or_none_constraint(company)
return Q(company=company)
@classmethod
def validate_order_by(cls, parameters, search_text) -> Sequence:
"""
Validate and extract order_by params as a list
"""
order_by = parameters.get(cls._ordering_key)
if not order_by:
return []
order_by = order_by if isinstance(order_by, list) else [order_by]
order_by = [cls._text_score if x == "@text_score" else x for x in order_by]
if not search_text and cls._text_score in order_by:
raise errors.bad_request.FieldsValueError(
"text score cannot be used in order_by when search text is not used"
)
return order_by
@classmethod
def validate_paging(
cls, parameters=None, default_page=None, default_page_size=None
@@ -267,7 +287,6 @@ class GetMixin(PropsMixin):
allow_public=False,
override_projection=None,
expand_reference_ids=True,
override_none_ordering=False,
):
"""
Fetch all documents matching a provided query with support for joining referenced documents according to the
@@ -303,7 +322,6 @@ class GetMixin(PropsMixin):
query=query,
query_options=query_options,
allow_public=allow_public,
override_none_ordering=override_none_ordering,
)
def projection_func(doc_type, projection, ids):
@@ -328,7 +346,6 @@ class GetMixin(PropsMixin):
allow_public=False,
override_projection: Collection[str] = None,
return_dicts=True,
override_none_ordering=False,
):
"""
Fetch all documents matching a provided query. Supported several built-in options
@@ -341,8 +358,9 @@ class GetMixin(PropsMixin):
`@text_score` keyword. A text index must be defined on the document type, otherwise an error will
be raised.
:param return_dicts: Return a list of dictionaries. If True, a list of dicts is returned (if projection was
requested, each contains only the requested projection).
If False, a QuerySet object is returned (lazy evaluated)
requested, each contains only the requested projection). If False, a QuerySet object is returned
(lazy evaluated). If return_dicts is requested then the entities with the None value in order_by field
are returned last in the ordering.
:param company: Company ID (required)
:param parameters: Parameters dict from which paging ordering and searching parameters are extracted.
:param query_dict: If provided, passed to prepare_query() along with all of the relevant arguments to produce
@@ -352,8 +370,6 @@ class GetMixin(PropsMixin):
:param override_projection: A list of projection fields overriding any projection specified in the `param_dict`
argument
:param allow_public: If True, objects marked as public (no associated company) are also queried.
:param override_none_ordering: If True, then items with the None values in the first ordered field
are always sorted in the end
:return: A list of objects matching the query.
"""
if query_dict is not None:
@@ -367,26 +383,19 @@ class GetMixin(PropsMixin):
q = cls._prepare_perm_query(company, allow_public=allow_public)
_query = (q & query) if query else q
if override_none_ordering:
if return_dicts:
return cls._get_many_override_none_ordering(
query=_query,
parameters=parameters,
query_dict=query_dict,
query_options=query_options,
override_projection=override_projection,
)
return cls._get_many_no_company(
query=_query,
parameters=parameters,
override_projection=override_projection,
return_dicts=return_dicts,
query=_query, parameters=parameters, override_projection=override_projection
)
@classmethod
def _get_many_no_company(
cls, query, parameters=None, override_projection=None, return_dicts=True
):
def _get_many_no_company(cls, query, parameters=None, override_projection=None):
"""
Fetch all documents matching a provided query.
This is a company-less version for internal uses. We assume the caller has either added any necessary
@@ -395,44 +404,25 @@ class GetMixin(PropsMixin):
NOTE: BE VERY CAREFUL WITH THIS CALL, as it allows returning data across companies.
:param query: Query object (mongoengine.Q)
:param return_dicts: Return a list of dictionaries. If True, a list of dicts is returned (if projection was
requested, each contains only the requested projection).
If False, a QuerySet object is returned (lazy evaluated)
:param parameters: Parameters dict from which paging ordering and searching parameters are extracted.
:param override_projection: A list of projection fields overriding any projection specified in the `param_dict`
argument
"""
parameters = parameters or {}
if not query:
raise ValueError("query or call_data must be provided")
parameters = parameters or {}
search_text = parameters.get(cls._search_text_key)
order_by = cls.validate_order_by(parameters=parameters, search_text=search_text)
page, page_size = cls.validate_paging(parameters=parameters)
order_by = parameters.get(cls._ordering_key)
if order_by:
order_by = order_by if isinstance(order_by, list) else [order_by]
order_by = [cls._text_score if x == "@text_score" else x for x in order_by]
search_text = parameters.get("search_text")
only = cls.get_projection(parameters, override_projection)
if not search_text and order_by and cls._text_score in order_by:
raise errors.bad_request.FieldsValueError(
"text score cannot be used in order_by when search text is not used"
)
qs = cls.objects(query)
if search_text:
qs = qs.search_text(search_text)
if order_by:
# add ordering
qs = (
qs.order_by(order_by)
if isinstance(order_by, string_types)
else qs.order_by(*order_by)
)
qs = qs.order_by(*order_by)
if only:
# add projection
qs = qs.only(*only)
@@ -444,17 +434,13 @@ class GetMixin(PropsMixin):
# add paging
qs = qs.skip(page * page_size).limit(page_size)
if return_dicts:
return [obj.to_proper_dict(only=only) for obj in qs]
return qs
@classmethod
def _get_many_override_none_ordering(
cls,
cls: Union[Document, "GetMixin"],
query: Q = None,
parameters: dict = None,
query_dict: dict = None,
query_options: QueryParameterOptions = None,
override_projection: Collection[str] = None,
) -> Sequence[dict]:
"""
@@ -467,57 +453,45 @@ class GetMixin(PropsMixin):
:param query: Query object (mongoengine.Q)
:param parameters: Parameters dict from which paging ordering and searching parameters are extracted.
:param query_dict: If provided, passed to prepare_query() along with all of the relevant arguments to produce
a query. The resulting query is AND'ed with the `query` parameter (if provided).
:param query_options: query parameters options (see ParametersOptions)
:param override_projection: A list of projection fields overriding any projection specified in the `param_dict`
argument
"""
if not query:
raise ValueError("query or call_data must be provided")
parameters = parameters or {}
search_text = parameters.get("search_text")
search_text = parameters.get(cls._search_text_key)
order_by = cls.validate_order_by(parameters=parameters, search_text=search_text)
page, page_size = cls.validate_paging(parameters=parameters)
only = cls.get_projection(parameters, override_projection)
query_sets = []
order_by = parameters.get(cls._ordering_key)
query_sets = [cls.objects(query)]
if order_by:
order_by = order_by if isinstance(order_by, list) else [order_by]
order_by = [cls._text_score if x == "@text_score" else x for x in order_by]
if not search_text and cls._text_score in order_by:
raise errors.bad_request.FieldsValueError(
"text score cannot be used in order_by when search text is not used"
)
order_field = first(
field for field in order_by if not field.startswith("$")
)
if (
order_field
and not order_field.startswith("-")
and (not query_dict or order_field not in query_dict)
and "[" not in order_field
):
empty_value = None
if order_field in query_options.list_fields:
empty_value = []
elif order_field in query_options.pattern_fields:
empty_value = ""
params = {}
mongo_field = order_field.replace(".", "__")
non_empty = query & field_exists(mongo_field, empty_value=empty_value)
empty = query & field_does_not_exist(
mongo_field, empty_value=empty_value
)
if mongo_field in get_fields(cls, of_type=ListField, subfields=True):
params["is_list"] = True
elif mongo_field in get_fields(
cls, of_type=StringField, subfields=True
):
params["empty_value"] = ""
non_empty = query & field_exists(mongo_field, **params)
empty = query & field_does_not_exist(mongo_field, **params)
query_sets = [cls.objects(non_empty), cls.objects(empty)]
if not query_sets:
query_sets = [cls.objects(query)]
query_sets = [qs.order_by(*order_by) for qs in query_sets]
if search_text:
query_sets = [qs.search_text(search_text) for qs in query_sets]
if order_by:
# add ordering
query_sets = [qs.order_by(*order_by) for qs in query_sets]
only = cls.get_projection(parameters, override_projection)
if only:
# add projection
query_sets = [qs.only(*only) for qs in query_sets]
@@ -583,8 +557,8 @@ class UpdateMixin(object):
def user_set_allowed(cls):
res = getattr(cls, "__user_set_allowed_fields", None)
if res is None:
res = cls.__user_set_allowed_fields = dict(
get_fields_with_attr(cls, "user_set_allowed")
res = cls.__user_set_allowed_fields = get_fields_choices(
cls, "user_set_allowed"
)
return res
@@ -622,7 +596,24 @@ class UpdateMixin(object):
class DbModelMixin(GetMixin, ProperDictMixin, UpdateMixin):
""" Provide convenience methods for a subclass of mongoengine.Document """
pass
@classmethod
def aggregate(
cls: Document, *pipeline: dict, allow_disk_use=None, **kwargs
) -> CommandCursor:
"""
Aggregate objects of this document class according to the provided pipeline.
:param pipeline: a list of dictionaries describing the pipeline stages
:param allow_disk_use: if True, allow the server to use disk space if aggregation query cannot fit in memory.
If None, default behavior will be used (see apiserver.conf/mongo/aggregate/allow_disk_use)
:param kwargs: additional keyword arguments passed to mongoengine
:return:
"""
kwargs.update(
allowDiskUse=allow_disk_use
if allow_disk_use is not None
else config.get("apiserver.mongo.aggregate.allow_disk_use", True)
)
return cls.objects.aggregate(*pipeline, **kwargs)
def validate_id(cls, company, **kwargs):

View File

@@ -0,0 +1,47 @@
from mongoengine import (
Document,
EmbeddedDocument,
StringField,
DateTimeField,
EmbeddedDocumentListField,
ListField,
)
from database import Database, strict
from database.fields import StrippedStringField
from database.model import DbModelMixin
from database.model.base import ProperDictMixin, GetMixin
from database.model.company import Company
from database.model.task.task import Task
class Entry(EmbeddedDocument, ProperDictMixin):
""" Entry representing a task waiting in the queue """
task = StringField(required=True, reference_field=Task)
''' Task ID '''
added = DateTimeField(required=True)
''' Added to the queue '''
class Queue(DbModelMixin, Document):
get_all_query_options = GetMixin.QueryParameterOptions(
pattern_fields=("name",),
list_fields=("tags", "system_tags", "id"),
)
meta = {
'db_alias': Database.backend,
'strict': strict,
}
id = StringField(primary_key=True)
name = StrippedStringField(
required=True, unique_with="company", min_length=3, user_set_allowed=True
)
company = StringField(required=True, reference_field=Company)
created = DateTimeField(required=True)
tags = ListField(StringField(required=True), default=list, user_set_allowed=True)
system_tags = ListField(StringField(required=True), user_set_allowed=True)
entries = EmbeddedDocumentListField(Entry, default=list)
last_update = DateTimeField()

View File

@@ -29,6 +29,7 @@ DEFAULT_LAST_ITERATION = 0
class TaskStatus(object):
created = "created"
queued = "queued"
in_progress = "in_progress"
stopped = "stopped"
publishing = "publishing"
@@ -85,7 +86,7 @@ class Execution(EmbeddedDocument):
model_labels = ModelLabels()
framework = StringField()
artifacts = EmbeddedDocumentSortedListField(Artifact)
docker_cmd = StringField()
queue = StringField()
""" Queue ID where task was queued """
@@ -150,6 +151,8 @@ class Task(AttributedDocument):
tags = ListField(StringField(required=True), user_set_allowed=True)
system_tags = ListField(StringField(required=True), user_set_allowed=True)
script = EmbeddedDocumentField(Script)
last_worker = StringField()
last_worker_report = DateTimeField()
last_update = DateTimeField()
last_iteration = IntField(default=DEFAULT_LAST_ITERATION)
last_metrics = SafeMapField(field=SafeMapField(EmbeddedDocumentField(MetricEvent)))