Add fixed user list support

This commit is contained in:
allegroai
2019-07-09 00:04:43 +03:00
parent a33c94e24f
commit 76962667a3
3 changed files with 57 additions and 2 deletions

View File

@@ -1,5 +1,6 @@
import base64
import jwt
from mongoengine import Q
from database.errors import translate_errors_context
from database.model.company import Company
@@ -11,6 +12,7 @@ from timing_context import TimingContext
from .payload import Payload, Token, Basic, AuthType
from .identity import Identity
from .fixed_user import FixedUser
log = config.logger(__file__)
@@ -54,8 +56,17 @@ def authorize_credentials(auth_data, service, action, call_data_items):
log.exception('malformed credentials')
raise errors.unauthorized.BadCredentials(str(e))
query = Q(credentials__match=Credentials(key=access_key, secret=secret_key))
if FixedUser.enabled():
fixed_user = FixedUser.get_by_username(access_key)
if fixed_user:
if secret_key != fixed_user.password:
raise errors.unauthorized.InvalidCredentials('bad username or password')
query = Q(id=fixed_user.user_id)
with TimingContext("mongo", "user_by_cred"), translate_errors_context('authorizing request'):
user = User.objects(credentials__match=Credentials(key=access_key, secret=secret_key)).first()
user = User.objects(query).first()
if not user:
raise errors.unauthorized.InvalidCredentials('failed to locate provided credentials')

View File

@@ -0,0 +1,38 @@
import hashlib
from functools import lru_cache
from typing import Sequence, TypeVar
import attr
from config import config
T = TypeVar("T", bound="FixedUser")
@attr.s(auto_attribs=True)
class FixedUser:
username: str
password: str
name: str
def __attrs_post_init__(self):
self.user_id = hashlib.md5(f"{self.username}:{self.password}".encode()).hexdigest()
@classmethod
def enabled(self):
return config.get("apiserver.auth.fixed_users.enabled", False)
@classmethod
@lru_cache()
def from_config(cls) -> Sequence[T]:
return [cls(**user) for user in config.get("apiserver.auth.fixed_users.users", [])]
@classmethod
@lru_cache()
def get_by_username(cls, username) -> T:
return next(
(user for user in cls.from_config() if user.username == username), None
)
def __hash__(self):
return hash(self.user_id)