Add fixed user validation

Fix the way a fixed user id is generated
This commit is contained in:
allegroai 2020-01-02 15:20:55 +02:00
parent ed910d5f6a
commit 5d279c8c5a
3 changed files with 29 additions and 3 deletions

View File

@ -3,6 +3,8 @@ from os import getenv
from pathlib import Path
from version import __version__
from config import config
root = Path(__file__).parent.parent
@ -35,3 +37,7 @@ def get_commit_number():
@lru_cache()
def get_deployment_type() -> str:
return _get("DEPLOY", env_suffix="DEPLOYMENT_TYPE", default="manual")
def get_default_company():
return config.get("apiserver.default_company")

View File

@ -11,6 +11,7 @@ from semantic_version import Version
import database.utils
from bll.queue import QueueBLL
from config import config
from config.info import get_default_company
from database import Database
from database.model.auth import Role
from database.model.auth import User as AuthUser, Credentials
@ -49,7 +50,7 @@ def init_es_data():
def _ensure_company():
company_id = config.get("apiserver.default_company")
company_id = get_default_company()
company = Company.objects(id=company_id).only("id").first()
if company:
return company_id
@ -211,6 +212,7 @@ def init_mongo_data():
if FixedUser.enabled():
log.info("Fixed users mode is enabled")
FixedUser.validate()
for user in FixedUser.from_config():
try:
_ensure_user(user, company_id)

View File

@ -5,27 +5,45 @@ from typing import Sequence, TypeVar
import attr
from config import config
from config.info import get_default_company
T = TypeVar("T", bound="FixedUser")
class FixedUsersError(Exception):
pass
@attr.s(auto_attribs=True)
class FixedUser:
username: str
password: str
name: str
company: str = get_default_company()
def __attrs_post_init__(self):
self.user_id = hashlib.md5(f"{self.username}:{self.password}".encode()).hexdigest()
self.user_id = hashlib.md5(f"{self.company}:{self.username}".encode()).hexdigest()
@classmethod
def enabled(cls):
return config.get("apiserver.auth.fixed_users.enabled", False)
@classmethod
def validate(cls):
if not cls.enabled():
return
users = cls.from_config()
if len({user.username for user in users}) < len(users):
raise FixedUsersError(
"Duplicate user names found in fixed users configuration"
)
@classmethod
@lru_cache()
def from_config(cls) -> Sequence[T]:
return [cls(**user) for user in config.get("apiserver.auth.fixed_users.users", [])]
return [
cls(**user) for user in config.get("apiserver.auth.fixed_users.users", [])
]
@classmethod
@lru_cache()