clearml-server/apiserver/database/utils.py

240 lines
8.0 KiB
Python
Raw Normal View History

2019-06-10 21:24:35 +00:00
import hashlib
from inspect import ismethod, getmembers
2022-09-29 16:30:57 +00:00
from typing import Sequence, Tuple, Set, Optional, Callable, Any, Mapping
2019-06-10 21:24:35 +00:00
from uuid import uuid4
from mongoengine import EmbeddedDocumentField, ListField, Document, Q
from mongoengine.base import BaseField
from .errors import translate_errors_context, ParseCallError
def get_fields(cls, of_type=BaseField, return_instance=False, subfields=False):
return _get_fields(
cls,
of_type=of_type,
subfields=subfields,
selector=lambda k, v: (k, v) if return_instance else k,
)
2019-06-10 21:24:35 +00:00
def get_fields_attr(cls, attr):
2019-06-10 21:24:35 +00:00
""" get field names from a class containing mongoengine fields """
return dict(
_get_fields(cls, with_attr=attr, selector=lambda k, v: (k, getattr(v, attr)))
)
2019-06-10 21:24:35 +00:00
def get_fields_choices(cls, attr):
def get_choices(field_name: str, field: BaseField) -> Tuple:
if isinstance(field, ListField):
return field_name, field.field.choices
return field_name, field.choices
2019-06-10 21:24:35 +00:00
return dict(_get_fields(cls, with_attr=attr, subfields=True, selector=get_choices))
2019-06-10 21:24:35 +00:00
def _get_fields(
cls,
with_attr=None,
of_type=BaseField,
subfields=False,
selector: Optional[Callable[[str, BaseField], Any]] = None,
path: Tuple[str, ...] = (),
):
2019-06-10 21:24:35 +00:00
fields = []
for field_name, field in cls._fields.items():
field_path = path + (field_name,)
if isinstance(field, of_type) and (not with_attr or hasattr(field, with_attr)):
full_name = "__".join(field_path)
fields.append(selector(full_name, field) if selector else full_name)
if subfields and isinstance(field, EmbeddedDocumentField):
fields.extend(
_get_fields(
field.document_type,
with_attr=with_attr,
of_type=of_type,
subfields=subfields,
selector=selector,
path=field_path,
)
)
2019-06-10 21:24:35 +00:00
return fields
def get_items(cls):
""" get key/value items from an enum-like class (members represent enumeration key/value) """
res = {k: v for k, v in getmembers(cls) if not (k.startswith("_") or ismethod(v))}
2019-06-10 21:24:35 +00:00
return res
def get_options(cls):
""" get options from an enum-like class (members represent enumeration key/value) """
return list(get_items(cls).values())
# return a dictionary of items which:
# 1. are in the call_data
# 2. are in the fields dictionary, and their value in the call_data matches the type in fields
# 3. are in the cls_fields
def parse_from_call(call_data, fields, cls_fields, discard_none_values=True):
if not isinstance(fields, dict):
# fields should be key=>type dict
fields = {k: None for k in fields}
fields = {k: v for k, v in fields.items() if k in cls_fields}
res = {}
with translate_errors_context("parsing call data"):
2019-06-10 21:24:35 +00:00
for field, desc in fields.items():
value = call_data.get(field)
if value is None:
if not discard_none_values and field in call_data:
# we'll keep the None value in case the field actually exists in the call data
res[field] = None
continue
if desc:
if issubclass(desc, Document):
if not desc.objects(id=value).only("id"):
raise ParseCallError(
"expecting %s id" % desc.__name__, id=value, field=field
)
elif callable(desc):
try:
desc(value)
except TypeError:
raise ParseCallError(f"expecting {desc.__name__}", field=field)
except Exception as ex:
raise ParseCallError(str(ex), field=field)
2019-06-10 21:24:35 +00:00
res[field] = value
return res
def init_cls_from_base(cls, instance):
return cls(
**{
k: v
for k, v in instance.to_mongo(use_db_field=False).to_dict().items()
if k[0] != "_"
}
)
2019-06-10 21:24:35 +00:00
def get_company_or_none_constraint(company=None):
return Q(company__in=(company, None, "")) | Q(company__exists=False)
2019-06-10 21:24:35 +00:00
def field_does_not_exist(field: str, empty_value=None, is_list=False) -> Q:
"""
Creates a query object used for finding a field that doesn't exist, or has None or an empty value.
:param field: Field name
:param empty_value: The empty value to test for (None means no specific empty value will be used)
:param is_list: Is this a list (array) field. In this case, instead of testing for an empty value,
the length of the array will be used (len==0 means empty)
:return:
"""
query = Q(**{f"{field}__exists": False}) | Q(
**{f"{field}__in": {empty_value, None}}
)
2019-06-10 21:24:35 +00:00
if is_list:
query |= Q(**{f"{field}__size": 0})
return query
def field_exists(field: str, empty_value=None, is_list=False) -> Q:
"""
Creates a query object used for finding a field that exists and is not None or empty.
:param field: Field name
:param empty_value: The empty value to test for (None means no specific empty value will be used)
:param is_list: Is this a list (array) field. In this case, instead of testing for an empty value,
the length of the array will be used (len==0 means empty)
:return:
"""
query = Q(**{f"{field}__exists": True}) & Q(
**{f"{field}__nin": {empty_value, None}}
)
if is_list:
query &= Q(**{f"{field}__not__size": 0})
return query
2019-06-10 21:24:35 +00:00
def get_subkey(d, key_path, default=None):
""" Get a key from a nested dictionary. kay_path is a '.' separated string of keys used to traverse
the nested dictionary.
"""
keys = key_path.split(".")
2019-06-10 21:24:35 +00:00
for i, key in enumerate(keys):
if not isinstance(d, dict):
raise KeyError(
"Expecting a dict (%s)" % (".".join(keys[:i]) if i else "bad input")
)
2019-06-10 21:24:35 +00:00
d = d.get(key)
if d is None:
2019-06-10 21:24:35 +00:00
return default
return d
def id():
return str(uuid4()).replace("-", "")
def hash_field_name(s):
""" Hash field name into a unique safe string """
return hashlib.md5(s.encode()).hexdigest()
def merge_dicts(*dicts):
base = {}
for dct in dicts:
base.update(dct)
return base
def filter_fields(cls, fields):
"""From the fields dictionary return only the fields that match cls fields"""
return {key: fields[key] for key in fields if key in get_fields(cls)}
def _names_set(*names: str) -> Set[str]:
"""
Given a list of names return set with names and '-names'
"""
return set(names) | set(f"-{name}" for name in names)
2022-09-29 16:30:57 +00:00
_system_tag_names = {
"model": _names_set("active", "archived"),
"project": _names_set("archived", "public", "default"),
"task": _names_set("active", "archived", "development"),
"queue": _names_set("default"),
}
2022-09-29 16:30:57 +00:00
_system_tag_prefixes = {"task": _names_set("annotat")}
def partition_tags(
2022-09-29 16:30:57 +00:00
entity: str,
tags: Sequence[str],
system_tags: Optional[Sequence[str]] = (),
system_tag_names: Mapping = _system_tag_names,
system_tag_prefixes: Mapping = _system_tag_prefixes,
) -> Tuple[Sequence[str], Sequence[str]]:
"""
Partition the given tags sequence into system and user-defined tags
:param entity: The name of the entity that defines the list of the system tags
:param tags: The tags to partition
:param system_tags: Optional. If passed then these tags are considered system together
with those defined for the entity.
:return: a tuple where the first element is the sequence of user-defined tags and
the second element is the sequence of system tags
"""
tags = set(tags)
system_tags = set(system_tags)
system_tags |= tags & system_tag_names[entity]
prefixes = system_tag_prefixes.get(entity, [])
system_tags |= {t for t in tags for p in prefixes if t.lower().startswith(p)}
return list(tags - system_tags), list(system_tags)