Embed jsonmodels v2.4 package

This commit is contained in:
allegroai 2020-06-13 22:08:57 +03:00
parent 7173a16a16
commit 2784a48c47
11 changed files with 1382 additions and 4 deletions

View File

@ -6,7 +6,6 @@ furl>=2.0.0
future>=0.16.0
futures>=3.0.5 ; python_version < '3'
humanfriendly>=2.1
jsonmodels>=2.2
jsonschema>=2.6.0
numpy>=1.10
pathlib2>=2.3.0

View File

@ -0,0 +1,9 @@
# coding: utf-8
__author__ = 'Szczepan Cieślik'
__email__ = 'szczepan.cieslik@gmail.com'
__version__ = '2.4'
from . import models
from . import fields
from . import errors

View File

@ -0,0 +1,230 @@
"""Builders to generate in memory representation of model and fields tree."""
from __future__ import absolute_import
from collections import defaultdict
import six
from . import errors
from .fields import NotSet
class Builder(object):
def __init__(self, parent=None, nullable=False, default=NotSet):
self.parent = parent
self.types_builders = {}
self.types_count = defaultdict(int)
self.definitions = set()
self.nullable = nullable
self.default = default
@property
def has_default(self):
return self.default is not NotSet
def register_type(self, type, builder):
if self.parent:
return self.parent.register_type(type, builder)
self.types_count[type] += 1
if type not in self.types_builders:
self.types_builders[type] = builder
def get_builder(self, type):
if self.parent:
return self.parent.get_builder(type)
return self.types_builders[type]
def count_type(self, type):
if self.parent:
return self.parent.count_type(type)
return self.types_count[type]
@staticmethod
def maybe_build(value):
return value.build() if isinstance(value, Builder) else value
def add_definition(self, builder):
if self.parent:
return self.parent.add_definition(builder)
self.definitions.add(builder)
class ObjectBuilder(Builder):
def __init__(self, model_type, *args, **kwargs):
super(ObjectBuilder, self).__init__(*args, **kwargs)
self.properties = {}
self.required = []
self.type = model_type
self.register_type(self.type, self)
def add_field(self, name, field, schema):
_apply_validators_modifications(schema, field)
self.properties[name] = schema
if field.required:
self.required.append(name)
def build(self):
builder = self.get_builder(self.type)
if self.is_definition and not self.is_root:
self.add_definition(builder)
[self.maybe_build(value) for _, value in self.properties.items()]
return '#/definitions/{name}'.format(name=self.type_name)
else:
return builder.build_definition(nullable=self.nullable)
@property
def type_name(self):
module_name = '{module}.{name}'.format(
module=self.type.__module__,
name=self.type.__name__,
)
return module_name.replace('.', '_').lower()
def build_definition(self, add_defintitions=True, nullable=False):
properties = dict(
(name, self.maybe_build(value))
for name, value
in self.properties.items()
)
schema = {
'type': 'object',
'additionalProperties': False,
'properties': properties,
}
if self.required:
schema['required'] = self.required
if self.definitions and add_defintitions:
schema['definitions'] = dict(
(builder.type_name, builder.build_definition(False, False))
for builder in self.definitions
)
return schema
@property
def is_definition(self):
if self.count_type(self.type) > 1:
return True
elif self.parent:
return self.parent.is_definition
else:
return False
@property
def is_root(self):
return not bool(self.parent)
def _apply_validators_modifications(field_schema, field):
for validator in field.validators:
try:
validator.modify_schema(field_schema)
except AttributeError:
pass
class PrimitiveBuilder(Builder):
def __init__(self, type, *args, **kwargs):
super(PrimitiveBuilder, self).__init__(*args, **kwargs)
self.type = type
def build(self):
schema = {}
if issubclass(self.type, six.string_types):
obj_type = 'string'
elif issubclass(self.type, bool):
obj_type = 'boolean'
elif issubclass(self.type, int):
obj_type = 'number'
elif issubclass(self.type, float):
obj_type = 'number'
else:
raise errors.FieldNotSupported(
"Can't specify value schema!", self.type
)
if self.nullable:
obj_type = [obj_type, 'null']
schema['type'] = obj_type
if self.has_default:
schema["default"] = self.default
return schema
class ListBuilder(Builder):
def __init__(self, *args, **kwargs):
super(ListBuilder, self).__init__(*args, **kwargs)
self.schemas = []
def add_type_schema(self, schema):
self.schemas.append(schema)
def build(self):
schema = {'type': 'array'}
if self.nullable:
self.add_type_schema({'type': 'null'})
if self.has_default:
schema["default"] = [self.to_struct(i) for i in self.default]
schemas = [self.maybe_build(s) for s in self.schemas]
if len(schemas) == 1:
items = schemas[0]
else:
items = {'oneOf': schemas}
schema['items'] = items
return schema
@property
def is_definition(self):
return self.parent.is_definition
@staticmethod
def to_struct(item):
from .models import Base
if isinstance(item, Base):
return item.to_struct()
return item
class EmbeddedBuilder(Builder):
def __init__(self, *args, **kwargs):
super(EmbeddedBuilder, self).__init__(*args, **kwargs)
self.schemas = []
def add_type_schema(self, schema):
self.schemas.append(schema)
def build(self):
if self.nullable:
self.add_type_schema({'type': 'null'})
schemas = [self.maybe_build(schema) for schema in self.schemas]
if len(schemas) == 1:
schema = schemas[0]
else:
schema = {'oneOf': schemas}
if self.has_default:
# The default value of EmbeddedField is expected to be an instance
# of a subclass of models.Base, thus have `to_struct`
schema["default"] = self.default.to_struct()
return schema
@property
def is_definition(self):
return self.parent.is_definition

View File

@ -0,0 +1,21 @@
class ModelCollection(list):
"""`ModelCollection` is list which validates stored values.
Validation is made with use of field passed to `__init__` at each point,
when new value is assigned.
"""
def __init__(self, field):
self.field = field
def append(self, value):
self.field.validate_single_value(value)
super(ModelCollection, self).append(value)
def __setitem__(self, key, value):
self.field.validate_single_value(value)
super(ModelCollection, self).__setitem__(key, value)

View File

@ -0,0 +1,15 @@
class ValidationError(RuntimeError):
pass
class FieldNotFound(RuntimeError):
pass
class FieldNotSupported(ValueError):
pass

View File

@ -0,0 +1,488 @@
import datetime
import re
from weakref import WeakKeyDictionary
import six
from dateutil.parser import parse
from .errors import ValidationError
from .collections import ModelCollection
# unique marker for "no default value specified". None is not good enough since
# it is a completely valid default value.
NotSet = object()
class BaseField(object):
"""Base class for all fields."""
types = None
def __init__(
self,
required=False,
nullable=False,
help_text=None,
validators=None,
default=NotSet,
name=None):
self.memory = WeakKeyDictionary()
self.required = required
self.help_text = help_text
self.nullable = nullable
self._assign_validators(validators)
self.name = name
self._validate_name()
if default is not NotSet:
self.validate(default)
self._default = default
@property
def has_default(self):
return self._default is not NotSet
def _assign_validators(self, validators):
if validators and not isinstance(validators, list):
validators = [validators]
self.validators = validators or []
def __set__(self, instance, value):
self._finish_initialization(type(instance))
value = self.parse_value(value)
self.validate(value)
self.memory[instance._cache_key] = value
def __get__(self, instance, owner=None):
if instance is None:
self._finish_initialization(owner)
return self
self._finish_initialization(type(instance))
self._check_value(instance)
return self.memory[instance._cache_key]
def _finish_initialization(self, owner):
pass
def _check_value(self, obj):
if obj._cache_key not in self.memory:
self.__set__(obj, self.get_default_value())
def validate_for_object(self, obj):
value = self.__get__(obj)
self.validate(value)
def validate(self, value):
self._check_types()
self._validate_against_types(value)
self._check_against_required(value)
self._validate_with_custom_validators(value)
def _check_against_required(self, value):
if value is None and self.required:
raise ValidationError('Field is required!')
def _validate_against_types(self, value):
if value is not None and not isinstance(value, self.types):
raise ValidationError(
'Value is wrong, expected type "{types}"'.format(
types=', '.join([t.__name__ for t in self.types])
),
value,
)
def _check_types(self):
if self.types is None:
raise ValidationError(
'Field "{type}" is not usable, try '
'different field type.'.format(type=type(self).__name__))
def to_struct(self, value):
"""Cast value to Python structure."""
return value
def parse_value(self, value):
"""Parse value from primitive to desired format.
Each field can parse value to form it wants it to be (like string or
int).
"""
return value
def _validate_with_custom_validators(self, value):
if value is None and self.nullable:
return
for validator in self.validators:
try:
validator.validate(value)
except AttributeError:
validator(value)
def get_default_value(self):
"""Get default value for field.
Each field can specify its default.
"""
return self._default if self.has_default else None
def _validate_name(self):
if self.name is None:
return
if not re.match('^[A-Za-z_](([\w\-]*)?\w+)?$', self.name):
raise ValueError('Wrong name', self.name)
def structue_name(self, default):
return self.name if self.name is not None else default
class StringField(BaseField):
"""String field."""
types = six.string_types
class IntField(BaseField):
"""Integer field."""
types = (int,)
def parse_value(self, value):
"""Cast value to `int`, e.g. from string or long"""
parsed = super(IntField, self).parse_value(value)
if parsed is None:
return parsed
return int(parsed)
class FloatField(BaseField):
"""Float field."""
types = (float, int)
class BoolField(BaseField):
"""Bool field."""
types = (bool,)
def parse_value(self, value):
"""Cast value to `bool`."""
parsed = super(BoolField, self).parse_value(value)
return bool(parsed) if parsed is not None else None
class ListField(BaseField):
"""List field."""
types = (list,)
def __init__(self, items_types=None, *args, **kwargs):
"""Init.
`ListField` is **always not required**. If you want to control number
of items use validators.
"""
self._assign_types(items_types)
super(ListField, self).__init__(*args, **kwargs)
self.required = False
def get_default_value(self):
default = super(ListField, self).get_default_value()
if default is None:
return ModelCollection(self)
return default
def _assign_types(self, items_types):
if items_types:
try:
self.items_types = tuple(items_types)
except TypeError:
self.items_types = items_types,
else:
self.items_types = tuple()
types = []
for type_ in self.items_types:
if isinstance(type_, six.string_types):
types.append(_LazyType(type_))
else:
types.append(type_)
self.items_types = tuple(types)
def validate(self, value):
super(ListField, self).validate(value)
if len(self.items_types) == 0:
return
for item in value:
self.validate_single_value(item)
def validate_single_value(self, item):
if len(self.items_types) == 0:
return
if not isinstance(item, self.items_types):
raise ValidationError(
'All items must be instances '
'of "{types}", and not "{type}".'.format(
types=', '.join([t.__name__ for t in self.items_types]),
type=type(item).__name__,
))
def parse_value(self, values):
"""Cast value to proper collection."""
result = self.get_default_value()
if not values:
return result
if not isinstance(values, list):
return values
return [self._cast_value(value) for value in values]
def _cast_value(self, value):
if isinstance(value, self.items_types):
return value
else:
if len(self.items_types) != 1:
tpl = 'Cannot decide which type to choose from "{types}".'
raise ValidationError(
tpl.format(
types=', '.join([t.__name__ for t in self.items_types])
)
)
return self.items_types[0](**value)
def _finish_initialization(self, owner):
super(ListField, self)._finish_initialization(owner)
types = []
for type in self.items_types:
if isinstance(type, _LazyType):
types.append(type.evaluate(owner))
else:
types.append(type)
self.items_types = tuple(types)
def _elem_to_struct(self, value):
try:
return value.to_struct()
except AttributeError:
return value
def to_struct(self, values):
return [self._elem_to_struct(v) for v in values]
class EmbeddedField(BaseField):
"""Field for embedded models."""
def __init__(self, model_types, *args, **kwargs):
self._assign_model_types(model_types)
super(EmbeddedField, self).__init__(*args, **kwargs)
def _assign_model_types(self, model_types):
if not isinstance(model_types, (list, tuple)):
model_types = (model_types,)
types = []
for type_ in model_types:
if isinstance(type_, six.string_types):
types.append(_LazyType(type_))
else:
types.append(type_)
self.types = tuple(types)
def _finish_initialization(self, owner):
super(EmbeddedField, self)._finish_initialization(owner)
types = []
for type in self.types:
if isinstance(type, _LazyType):
types.append(type.evaluate(owner))
else:
types.append(type)
self.types = tuple(types)
def validate(self, value):
super(EmbeddedField, self).validate(value)
try:
value.validate()
except AttributeError:
pass
def parse_value(self, value):
"""Parse value to proper model type."""
if not isinstance(value, dict):
return value
embed_type = self._get_embed_type()
return embed_type(**value)
def _get_embed_type(self):
if len(self.types) != 1:
raise ValidationError(
'Cannot decide which type to choose from "{types}".'.format(
types=', '.join([t.__name__ for t in self.types])
)
)
return self.types[0]
def to_struct(self, value):
return value.to_struct()
class _LazyType(object):
def __init__(self, path):
self.path = path
def evaluate(self, base_cls):
module, type_name = _evaluate_path(self.path, base_cls)
return _import(module, type_name)
def _evaluate_path(relative_path, base_cls):
base_module = base_cls.__module__
modules = _get_modules(relative_path, base_module)
type_name = modules.pop()
module = '.'.join(modules)
if not module:
module = base_module
return module, type_name
def _get_modules(relative_path, base_module):
canonical_path = relative_path.lstrip('.')
canonical_modules = canonical_path.split('.')
if not relative_path.startswith('.'):
return canonical_modules
parents_amount = len(relative_path) - len(canonical_path)
parent_modules = base_module.split('.')
parents_amount = max(0, parents_amount - 1)
if parents_amount > len(parent_modules):
raise ValueError("Can't evaluate path '{}'".format(relative_path))
return parent_modules[:parents_amount * -1] + canonical_modules
def _import(module_name, type_name):
module = __import__(module_name, fromlist=[type_name])
try:
return getattr(module, type_name)
except AttributeError:
raise ValueError(
"Can't find type '{}.{}'.".format(module_name, type_name))
class TimeField(StringField):
"""Time field."""
types = (datetime.time,)
def __init__(self, str_format=None, *args, **kwargs):
"""Init.
:param str str_format: Format to cast time to (if `None` - casting to
ISO 8601 format).
"""
self.str_format = str_format
super(TimeField, self).__init__(*args, **kwargs)
def to_struct(self, value):
"""Cast `time` object to string."""
if self.str_format:
return value.strftime(self.str_format)
return value.isoformat()
def parse_value(self, value):
"""Parse string into instance of `time`."""
if value is None:
return value
if isinstance(value, datetime.time):
return value
return parse(value).timetz()
class DateField(StringField):
"""Date field."""
types = (datetime.date,)
default_format = '%Y-%m-%d'
def __init__(self, str_format=None, *args, **kwargs):
"""Init.
:param str str_format: Format to cast date to (if `None` - casting to
%Y-%m-%d format).
"""
self.str_format = str_format
super(DateField, self).__init__(*args, **kwargs)
def to_struct(self, value):
"""Cast `date` object to string."""
if self.str_format:
return value.strftime(self.str_format)
return value.strftime(self.default_format)
def parse_value(self, value):
"""Parse string into instance of `date`."""
if value is None:
return value
if isinstance(value, datetime.date):
return value
return parse(value).date()
class DateTimeField(StringField):
"""Datetime field."""
types = (datetime.datetime,)
def __init__(self, str_format=None, *args, **kwargs):
"""Init.
:param str str_format: Format to cast datetime to (if `None` - casting
to ISO 8601 format).
"""
self.str_format = str_format
super(DateTimeField, self).__init__(*args, **kwargs)
def to_struct(self, value):
"""Cast `datetime` object to string."""
if self.str_format:
return value.strftime(self.str_format)
return value.isoformat()
def parse_value(self, value):
"""Parse string into instance of `datetime`."""
if isinstance(value, datetime.datetime):
return value
if value:
return parse(value)
else:
return None

View File

@ -0,0 +1,154 @@
import six
from . import parsers, errors
from .fields import BaseField
from .errors import ValidationError
class JsonmodelMeta(type):
def __new__(cls, name, bases, attributes):
cls.validate_fields(attributes)
return super(cls, cls).__new__(cls, name, bases, attributes)
@staticmethod
def validate_fields(attributes):
fields = {
key: value for key, value in attributes.items()
if isinstance(value, BaseField)
}
taken_names = set()
for name, field in fields.items():
structue_name = field.structue_name(name)
if structue_name in taken_names:
raise ValueError('Name taken', structue_name, name)
taken_names.add(structue_name)
class Base(six.with_metaclass(JsonmodelMeta, object)):
"""Base class for all models."""
def __init__(self, **kwargs):
self._cache_key = _CacheKey()
self.populate(**kwargs)
def populate(self, **values):
"""Populate values to fields. Skip non-existing."""
values = values.copy()
fields = list(self.iterate_with_name())
for _, structure_name, field in fields:
if structure_name in values:
field.__set__(self, values.pop(structure_name))
for name, _, field in fields:
if name in values:
field.__set__(self, values.pop(name))
def get_field(self, field_name):
"""Get field associated with given attribute."""
for attr_name, field in self:
if field_name == attr_name:
return field
raise errors.FieldNotFound('Field not found', field_name)
def __iter__(self):
"""Iterate through fields and values."""
for name, field in self.iterate_over_fields():
yield name, field
def validate(self):
"""Explicitly validate all the fields."""
for name, field in self:
try:
field.validate_for_object(self)
except ValidationError as error:
raise ValidationError(
"Error for field '{name}'.".format(name=name),
error,
)
@classmethod
def iterate_over_fields(cls):
"""Iterate through fields as `(attribute_name, field_instance)`."""
for attr in dir(cls):
clsattr = getattr(cls, attr)
if isinstance(clsattr, BaseField):
yield attr, clsattr
@classmethod
def iterate_with_name(cls):
"""Iterate over fields, but also give `structure_name`.
Format is `(attribute_name, structue_name, field_instance)`.
Structure name is name under which value is seen in structure and
schema (in primitives) and only there.
"""
for attr_name, field in cls.iterate_over_fields():
structure_name = field.structue_name(attr_name)
yield attr_name, structure_name, field
def to_struct(self):
"""Cast model to Python structure."""
return parsers.to_struct(self)
@classmethod
def to_json_schema(cls):
"""Generate JSON schema for model."""
return parsers.to_json_schema(cls)
def __repr__(self):
attrs = {}
for name, _ in self:
try:
attr = getattr(self, name)
if attr is not None:
attrs[name] = repr(attr)
except ValidationError:
pass
return '{class_name}({fields})'.format(
class_name=self.__class__.__name__,
fields=', '.join(
'{0[0]}={0[1]}'.format(x) for x in sorted(attrs.items())
),
)
def __str__(self):
return '{name} object'.format(name=self.__class__.__name__)
def __setattr__(self, name, value):
try:
return super(Base, self).__setattr__(name, value)
except ValidationError as error:
raise ValidationError(
"Error for field '{name}'.".format(name=name),
error
)
def __eq__(self, other):
if type(other) is not type(self):
return False
for name, _ in self.iterate_over_fields():
try:
our = getattr(self, name)
except errors.ValidationError:
our = None
try:
their = getattr(other, name)
except errors.ValidationError:
their = None
if our != their:
return False
return True
def __ne__(self, other):
return not (self == other)
class _CacheKey(object):
"""Object to identify model in memory."""

View File

@ -0,0 +1,106 @@
"""Parsers to change model structure into different ones."""
import inspect
from . import fields, builders, errors
def to_struct(model):
"""Cast instance of model to python structure.
:param model: Model to be casted.
:rtype: ``dict``
"""
model.validate()
resp = {}
for _, name, field in model.iterate_with_name():
value = field.__get__(model)
if value is None:
continue
value = field.to_struct(value)
resp[name] = value
return resp
def to_json_schema(cls):
"""Generate JSON schema for given class.
:param cls: Class to be casted.
:rtype: ``dict``
"""
builder = build_json_schema(cls)
return builder.build()
def build_json_schema(value, parent_builder=None):
from .models import Base
cls = value if inspect.isclass(value) else value.__class__
if issubclass(cls, Base):
return build_json_schema_object(cls, parent_builder)
else:
return build_json_schema_primitive(cls, parent_builder)
def build_json_schema_object(cls, parent_builder=None):
builder = builders.ObjectBuilder(cls, parent_builder)
if builder.count_type(builder.type) > 1:
return builder
for _, name, field in cls.iterate_with_name():
if isinstance(field, fields.EmbeddedField):
builder.add_field(name, field, _parse_embedded(field, builder))
elif isinstance(field, fields.ListField):
builder.add_field(name, field, _parse_list(field, builder))
else:
builder.add_field(
name, field, _create_primitive_field_schema(field))
return builder
def _parse_list(field, parent_builder):
builder = builders.ListBuilder(
parent_builder, field.nullable, default=field._default)
for type in field.items_types:
builder.add_type_schema(build_json_schema(type, builder))
return builder
def _parse_embedded(field, parent_builder):
builder = builders.EmbeddedBuilder(
parent_builder, field.nullable, default=field._default)
for type in field.types:
builder.add_type_schema(build_json_schema(type, builder))
return builder
def build_json_schema_primitive(cls, parent_builder):
builder = builders.PrimitiveBuilder(cls, parent_builder)
return builder
def _create_primitive_field_schema(field):
if isinstance(field, fields.StringField):
obj_type = 'string'
elif isinstance(field, fields.IntField):
obj_type = 'number'
elif isinstance(field, fields.FloatField):
obj_type = 'float'
elif isinstance(field, fields.BoolField):
obj_type = 'boolean'
else:
raise errors.FieldNotSupported(
'Field {field} is not supported!'.format(
field=type(field).__class__.__name__))
if field.nullable:
obj_type = [obj_type, 'null']
schema = {'type': obj_type}
if field.has_default:
schema["default"] = field._default
return schema

View File

@ -0,0 +1,156 @@
from __future__ import absolute_import
import six
import re
from collections import namedtuple
SCALAR_TYPES = tuple(list(six.string_types) + [int, float, bool])
ECMA_TO_PYTHON_FLAGS = {
'i': re.I,
'm': re.M,
}
PYTHON_TO_ECMA_FLAGS = dict(
(value, key) for key, value in ECMA_TO_PYTHON_FLAGS.items()
)
PythonRegex = namedtuple('PythonRegex', ['regex', 'flags'])
def _normalize_string_type(value):
if isinstance(value, six.string_types):
return six.text_type(value)
else:
return value
def _compare_dicts(one, two):
if len(one) != len(two):
return False
for key, value in one.items():
if key not in one or key not in two:
return False
if not compare_schemas(one[key], two[key]):
return False
return True
def _compare_lists(one, two):
if len(one) != len(two):
return False
they_match = False
for first_item in one:
for second_item in two:
if they_match:
continue
they_match = compare_schemas(first_item, second_item)
return they_match
def _assert_same_types(one, two):
if not isinstance(one, type(two)) or not isinstance(two, type(one)):
raise RuntimeError('Types mismatch! "{type1}" and "{type2}".'.format(
type1=type(one).__name__, type2=type(two).__name__))
def compare_schemas(one, two):
"""Compare two structures that represents JSON schemas.
For comparison you can't use normal comparison, because in JSON schema
lists DO NOT keep order (and Python lists do), so this must be taken into
account during comparison.
Note this wont check all configurations, only first one that seems to
match, which can lead to wrong results.
:param one: First schema to compare.
:param two: Second schema to compare.
:rtype: `bool`
"""
one = _normalize_string_type(one)
two = _normalize_string_type(two)
_assert_same_types(one, two)
if isinstance(one, list):
return _compare_lists(one, two)
elif isinstance(one, dict):
return _compare_dicts(one, two)
elif isinstance(one, SCALAR_TYPES):
return one == two
elif one is None:
return one is two
else:
raise RuntimeError('Not allowed type "{type}"'.format(
type=type(one).__name__))
def is_ecma_regex(regex):
"""Check if given regex is of type ECMA 262 or not.
:rtype: bool
"""
parts = regex.split('/')
if len(parts) == 1:
return False
if len(parts) < 3:
raise ValueError('Given regex isn\'t ECMA regex nor Python regex.')
parts.pop()
parts.append('')
raw_regex = '/'.join(parts)
if raw_regex.startswith('/') and raw_regex.endswith('/'):
return True
return False
def convert_ecma_regex_to_python(value):
"""Convert ECMA 262 regex to Python tuple with regex and flags.
If given value is already Python regex it will be returned unchanged.
:param string value: ECMA regex.
:return: 2-tuple with `regex` and `flags`
:rtype: namedtuple
"""
if not is_ecma_regex(value):
return PythonRegex(value, [])
parts = value.split('/')
flags = parts.pop()
try:
result_flags = [ECMA_TO_PYTHON_FLAGS[f] for f in flags]
except KeyError:
raise ValueError('Wrong flags "{}".'.format(flags))
return PythonRegex('/'.join(parts[1:]), result_flags)
def convert_python_regex_to_ecma(value, flags=[]):
"""Convert Python regex to ECMA 262 regex.
If given value is already ECMA regex it will be returned unchanged.
:param string value: Python regex.
:param list flags: List of flags (allowed flags: `re.I`, `re.M`)
:return: ECMA 262 regex
:rtype: str
"""
if is_ecma_regex(value):
return value
result_flags = [PYTHON_TO_ECMA_FLAGS[f] for f in flags]
result_flags = ''.join(result_flags)
return '/{value}/{flags}'.format(value=value, flags=result_flags)

View File

@ -0,0 +1,202 @@
"""Predefined validators."""
import re
from six.moves import reduce
from .errors import ValidationError
from . import utilities
class Min(object):
"""Validator for minimum value."""
def __init__(self, minimum_value, exclusive=False):
"""Init.
:param minimum_value: Minimum value for validator.
:param bool exclusive: If `True`, then validated value must be strongly
lower than given threshold.
"""
self.minimum_value = minimum_value
self.exclusive = exclusive
def validate(self, value):
"""Validate value."""
if self.exclusive:
if value <= self.minimum_value:
tpl = "'{value}' is lower or equal than minimum ('{min}')."
raise ValidationError(
tpl.format(value=value, min=self.minimum_value))
else:
if value < self.minimum_value:
raise ValidationError(
"'{value}' is lower than minimum ('{min}').".format(
value=value, min=self.minimum_value))
def modify_schema(self, field_schema):
"""Modify field schema."""
field_schema['minimum'] = self.minimum_value
if self.exclusive:
field_schema['exclusiveMinimum'] = True
class Max(object):
"""Validator for maximum value."""
def __init__(self, maximum_value, exclusive=False):
"""Init.
:param maximum_value: Maximum value for validator.
:param bool exclusive: If `True`, then validated value must be strongly
bigger than given threshold.
"""
self.maximum_value = maximum_value
self.exclusive = exclusive
def validate(self, value):
"""Validate value."""
if self.exclusive:
if value >= self.maximum_value:
tpl = "'{val}' is bigger or equal than maximum ('{max}')."
raise ValidationError(
tpl.format(val=value, max=self.maximum_value))
else:
if value > self.maximum_value:
raise ValidationError(
"'{value}' is bigger than maximum ('{max}').".format(
value=value, max=self.maximum_value))
def modify_schema(self, field_schema):
"""Modify field schema."""
field_schema['maximum'] = self.maximum_value
if self.exclusive:
field_schema['exclusiveMaximum'] = True
class Regex(object):
"""Validator for regular expressions."""
FLAGS = {
'ignorecase': re.I,
'multiline': re.M,
}
def __init__(self, pattern, **flags):
"""Init.
Note, that if given pattern is ECMA regex, given flags will be
**completely ignored** and taken from given regex.
:param string pattern: Pattern of regex.
:param bool flags: Flags used for the regex matching.
Allowed flag names are in the `FLAGS` attribute. The flag value
does not matter as long as it evaluates to True.
Flags with False values will be ignored.
Invalid flags will be ignored.
"""
if utilities.is_ecma_regex(pattern):
result = utilities.convert_ecma_regex_to_python(pattern)
self.pattern, self.flags = result
else:
self.pattern = pattern
self.flags = [self.FLAGS[key] for key, value in flags.items()
if key in self.FLAGS and value]
def validate(self, value):
"""Validate value."""
flags = self._calculate_flags()
try:
result = re.search(self.pattern, value, flags)
except TypeError as te:
raise ValidationError(*te.args)
if not result:
raise ValidationError(
'Value "{value}" did not match pattern "{pattern}".'.format(
value=value, pattern=self.pattern
))
def _calculate_flags(self):
return reduce(lambda x, y: x | y, self.flags, 0)
def modify_schema(self, field_schema):
"""Modify field schema."""
field_schema['pattern'] = utilities.convert_python_regex_to_ecma(
self.pattern, self.flags)
class Length(object):
"""Validator for length."""
def __init__(self, minimum_value=None, maximum_value=None):
"""Init.
Note that if no `minimum_value` neither `maximum_value` will be
specified, `ValueError` will be raised.
:param int minimum_value: Minimum value (optional).
:param int maximum_value: Maximum value (optional).
"""
if minimum_value is None and maximum_value is None:
raise ValueError(
"Either 'minimum_value' or 'maximum_value' must be specified.")
self.minimum_value = minimum_value
self.maximum_value = maximum_value
def validate(self, value):
"""Validate value."""
len_ = len(value)
if self.minimum_value is not None and len_ < self.minimum_value:
tpl = "Value '{val}' length is lower than allowed minimum '{min}'."
raise ValidationError(tpl.format(
val=value, min=self.minimum_value
))
if self.maximum_value is not None and len_ > self.maximum_value:
raise ValidationError(
"Value '{val}' length is bigger than "
"allowed maximum '{max}'.".format(
val=value,
max=self.maximum_value,
))
def modify_schema(self, field_schema):
"""Modify field schema."""
if self.minimum_value:
field_schema['minLength'] = self.minimum_value
if self.maximum_value:
field_schema['maxLength'] = self.maximum_value
class Enum(object):
"""Validator for enums."""
def __init__(self, *choices):
"""Init.
:param [] choices: Valid choices for the field.
"""
self.choices = list(choices)
def validate(self, value):
if value not in self.choices:
tpl = "Value '{val}' is not a valid choice."
raise ValidationError(tpl.format(val=value))
def modify_schema(self, field_schema):
field_schema['enum'] = self.choices

View File

@ -1,10 +1,8 @@
import requests
import six
import jsonmodels.models
import jsonmodels.fields
import jsonmodels.errors
from . import jsonmodels
from .apimodel import ApiModel
from .datamodel import NonStrictDataModelMixin