mirror of
https://github.com/clearml/clearml-server
synced 2025-06-26 23:15:47 +00:00
Refactor APICall and schema validation
This commit is contained in:
3
apiserver/schema/__init__.py
Normal file
3
apiserver/schema/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .schema_reader import EndpointSchema, EndpointVersionsGroup, SchemaReader, Schema
|
||||
|
||||
__all__ = [EndpointSchema, EndpointVersionsGroup, SchemaReader, Schema]
|
||||
@@ -248,9 +248,9 @@ def remove_description(dct):
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
def main(here: str):
|
||||
args = parse_args()
|
||||
meta = load_hocon(os.path.dirname(__file__) + "/meta.conf")
|
||||
meta = load_hocon(here + "/meta.conf")
|
||||
validator_for(meta).check_schema(meta)
|
||||
|
||||
driver = LazyDriver()
|
||||
@@ -300,4 +300,4 @@ def main():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main(here=os.path.dirname(__file__))
|
||||
|
||||
267
apiserver/schema/schema_reader.py
Normal file
267
apiserver/schema/schema_reader.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""
|
||||
Objects representing schema entities
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
from operator import attrgetter
|
||||
from pathlib import Path
|
||||
from typing import Mapping, Sequence, Type
|
||||
|
||||
import attr
|
||||
from boltons.dictutils import subdict
|
||||
from pyhocon import ConfigFactory
|
||||
|
||||
from apiserver.config_repo import config
|
||||
from apiserver.utilities.partial_version import PartialVersion
|
||||
|
||||
|
||||
log = config.logger(__file__)
|
||||
|
||||
|
||||
ALL_ROLES = "*"
|
||||
|
||||
|
||||
class EndpointSchema:
|
||||
REQUEST_KEY = "request"
|
||||
RESPONSE_KEY = "response"
|
||||
BATCH_REQUEST_KEY = "batch_request"
|
||||
DEFINITIONS_KEY = "definitions"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
service_name: str,
|
||||
action_name: str,
|
||||
version: PartialVersion,
|
||||
schema: dict,
|
||||
definitions: dict = None,
|
||||
):
|
||||
"""
|
||||
Class for interacting with the schema of a single endpoint
|
||||
:param service_name: name of containing service
|
||||
:param action_name: name of action
|
||||
:param version: endpoint version
|
||||
:param schema: endpoint schema
|
||||
:param definitions: service definitions
|
||||
"""
|
||||
self.service_name = service_name
|
||||
self.action_name = action_name
|
||||
self.full_name = f"{service_name}.{action_name}"
|
||||
self.version = version
|
||||
self.definitions = definitions
|
||||
self.request_schema = None
|
||||
self.batch_request_schema = None
|
||||
if self.REQUEST_KEY in schema:
|
||||
self.request_schema = {
|
||||
**schema[self.REQUEST_KEY],
|
||||
self.DEFINITIONS_KEY: self.definitions,
|
||||
}
|
||||
elif self.BATCH_REQUEST_KEY in schema:
|
||||
self.batch_request_schema = {
|
||||
**schema[self.BATCH_REQUEST_KEY],
|
||||
self.DEFINITIONS_KEY: self.definitions,
|
||||
}
|
||||
else:
|
||||
raise RuntimeError(
|
||||
f"endpoint {self.full_name} version {self.version} "
|
||||
"has no request or batch_request schema",
|
||||
schema,
|
||||
)
|
||||
self.response_schema = {
|
||||
**schema[self.RESPONSE_KEY],
|
||||
"definitions": self.definitions,
|
||||
}
|
||||
|
||||
|
||||
class EndpointVersionsGroup:
|
||||
|
||||
endpoints: Sequence[EndpointSchema]
|
||||
allow_roles: Sequence[str]
|
||||
internal: bool
|
||||
authorize: bool
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"{type(self).__name__}<{self.full_name}, "
|
||||
f"versions={tuple(e.version for e in self.endpoints)}>"
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
service_name: str,
|
||||
action_name: str,
|
||||
conf: dict,
|
||||
definitions: dict = None,
|
||||
defaults: dict = None,
|
||||
):
|
||||
"""
|
||||
Represents multiple implementations of a single endpoint, discriminated by API version
|
||||
:param service_name: name of containing service
|
||||
:param action_name: name of action
|
||||
:param conf: mapping between minimum version to endpoint schema
|
||||
:param definitions: service definitions
|
||||
:param defaults: service defaults
|
||||
"""
|
||||
self.service_name = service_name
|
||||
self.action_name = action_name
|
||||
self.full_name = f"{service_name}.{action_name}"
|
||||
self.definitions = definitions or {}
|
||||
self.defaults = defaults or {}
|
||||
self.internal = self._pop_attr_with_default(conf, "internal")
|
||||
self.allow_roles = self._pop_attr_with_default(conf, "allow_roles")
|
||||
self.authorize = self._pop_attr_with_default(conf, "authorize")
|
||||
|
||||
def parse_version(version):
|
||||
if not re.match(r"^\d+\.\d+$", version):
|
||||
raise ValueError(
|
||||
f"Encountered unrecognized key {version!r} in {self.service_name}.{self.action_name}"
|
||||
)
|
||||
return PartialVersion(version)
|
||||
|
||||
self.endpoints = sorted(
|
||||
(
|
||||
SchemaReader.endpoint_schema_cls(
|
||||
service_name=self.service_name,
|
||||
action_name=self.action_name,
|
||||
version=parse_version(version),
|
||||
schema=endpoint_conf,
|
||||
definitions=self.definitions,
|
||||
)
|
||||
for version, endpoint_conf in conf.items()
|
||||
),
|
||||
key=attrgetter("version"),
|
||||
)
|
||||
|
||||
def allows(self, role):
|
||||
return ALL_ROLES in self.allow_roles or role in self.allow_roles
|
||||
|
||||
def _pop_attr_with_default(self, conf, attr):
|
||||
return conf.pop(attr, self.defaults[attr])
|
||||
|
||||
def get_for_version(self, min_version: PartialVersion):
|
||||
"""
|
||||
Return endpoint schema for version
|
||||
"""
|
||||
if not self.endpoints:
|
||||
raise ValueError(f"endpoint group {self} has no versions")
|
||||
for endpoint in self.endpoints:
|
||||
if min_version <= endpoint.version:
|
||||
return endpoint
|
||||
raise ValueError(
|
||||
f"min_version {min_version} is higher than highest version in group {self}"
|
||||
)
|
||||
|
||||
|
||||
class Service:
|
||||
|
||||
endpoint_groups: Mapping[str, EndpointVersionsGroup]
|
||||
|
||||
def __init__(self, name: str, conf: dict, api_defaults: dict):
|
||||
"""
|
||||
Represents schema of one service
|
||||
:param name: name of service
|
||||
:param conf: service configuration, containing endpoint groups and other details
|
||||
:param api_defaults: API-wide endpoint attributes default values
|
||||
"""
|
||||
self.name = name
|
||||
conf = subdict(conf, drop=("_description", "_references"))
|
||||
self.defaults = {**api_defaults, **conf.pop("_default", {})}
|
||||
self.definitions = conf.pop("_definitions", None)
|
||||
self.endpoint_groups: Mapping[str, EndpointVersionsGroup] = {
|
||||
endpoint_name: SchemaReader.endpoint_versions_group_cls(
|
||||
service_name=self.name,
|
||||
action_name=endpoint_name,
|
||||
conf=endpoint_conf,
|
||||
defaults=self.defaults,
|
||||
definitions=self.definitions,
|
||||
)
|
||||
for endpoint_name, endpoint_conf in conf.items()
|
||||
}
|
||||
|
||||
|
||||
class Schema:
|
||||
services: Mapping[str, Service]
|
||||
|
||||
def __init__(self, services: dict, api_defaults: dict):
|
||||
"""
|
||||
Represents the entire API schema
|
||||
:param services: services schema
|
||||
:param api_defaults: default values of service configuration
|
||||
"""
|
||||
self.api_defaults = api_defaults
|
||||
self.services = {
|
||||
name: SchemaReader.service_cls(name, conf, api_defaults=self.api_defaults)
|
||||
for name, conf in services.items()
|
||||
}
|
||||
|
||||
|
||||
@attr.s()
|
||||
class SchemaReader:
|
||||
service_cls: Type[Service] = Service
|
||||
endpoint_versions_group_cls: Type[EndpointVersionsGroup] = EndpointVersionsGroup
|
||||
endpoint_schema_cls: Type[EndpointSchema] = EndpointSchema
|
||||
|
||||
root: Path = Path(__file__).parent / "services"
|
||||
cache_path: Path = None
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
if not self.cache_path:
|
||||
self.cache_path = self.root / "_cache.json"
|
||||
|
||||
@staticmethod
|
||||
def mod_time(path):
|
||||
"""
|
||||
return file modification time
|
||||
"""
|
||||
return path.stat().st_mtime
|
||||
|
||||
@staticmethod
|
||||
def read_file(path):
|
||||
return ConfigFactory.parse_file(path).as_plain_ordered_dict()
|
||||
|
||||
def get_schema(self):
|
||||
"""
|
||||
Parse the API schema to schema object.
|
||||
Load from config files and write to cache file if possible.
|
||||
"""
|
||||
services = [
|
||||
service
|
||||
for service in self.root.glob("*.conf")
|
||||
if not service.name.startswith("_")
|
||||
]
|
||||
|
||||
current_services_names = {path.stem for path in services}
|
||||
|
||||
try:
|
||||
if self.mod_time(self.cache_path) >= max(map(self.mod_time, services)):
|
||||
log.info("loading schema from cache")
|
||||
result = json.loads(self.cache_path.read_text())
|
||||
cached_services_names = set(result.pop("services_names", []))
|
||||
if cached_services_names == current_services_names:
|
||||
return Schema(**result)
|
||||
else:
|
||||
log.info(
|
||||
f"found services files changed: "
|
||||
f"added: {list(current_services_names - cached_services_names)}, "
|
||||
f"removed: {list(cached_services_names - current_services_names)}"
|
||||
)
|
||||
except (IOError, KeyError, TypeError, ValueError, AttributeError) as ex:
|
||||
log.warning(f"failed loading cache: {ex}")
|
||||
|
||||
log.info("regenerating schema cache")
|
||||
services = {path.stem: self.read_file(path) for path in services}
|
||||
api_defaults = self.read_file(self.root / "_api_defaults.conf")
|
||||
|
||||
try:
|
||||
self.cache_path.write_text(
|
||||
json.dumps(
|
||||
dict(
|
||||
services_names=list(current_services_names),
|
||||
services=services,
|
||||
api_defaults=api_defaults,
|
||||
)
|
||||
)
|
||||
)
|
||||
except IOError:
|
||||
log.exception(f"failed cache file to {self.cache_path}")
|
||||
|
||||
return Schema(services, api_defaults)
|
||||
Reference in New Issue
Block a user