clearml-server/apiserver/service_repo/auth/fixed_user.py
2021-01-05 16:28:49 +02:00

93 lines
2.6 KiB
Python

import hashlib
from functools import lru_cache
from typing import Sequence, Optional
import attr
from apiserver.config import config
from apiserver.config.info import get_default_company
class FixedUsersError(Exception):
pass
@attr.s(auto_attribs=True)
class FixedUser:
username: str
password: str
name: str
company: str = get_default_company()
is_guest: bool = False
def __attrs_post_init__(self):
self.user_id = hashlib.md5(f"{self.company}:{self.username}".encode()).hexdigest()
@classmethod
def enabled(cls):
return config.get("apiserver.auth.fixed_users.enabled", False)
@classmethod
def guest_enabled(cls):
return cls.enabled() and config.get("services.auth.fixed_users.guest.enabled", False)
@classmethod
def validate(cls):
if not cls.enabled():
return
users = cls.from_config()
if len({user.username for user in users}) < len(users):
raise FixedUsersError(
"Duplicate user names found in fixed users configuration"
)
@classmethod
# @lru_cache()
def from_config(cls) -> Sequence["FixedUser"]:
users = [
cls(**user) for user in config.get("apiserver.auth.fixed_users.users", [])
]
if cls.guest_enabled():
users.insert(
0,
cls.get_guest_user()
)
return users
@classmethod
@lru_cache()
def get_by_username(cls, username) -> "FixedUser":
return next(
(user for user in cls.from_config() if user.username == username), None
)
@classmethod
@lru_cache()
def is_guest_endpoint(cls, service, action):
"""
Validate a potential guest user,
This method will verify the user is indeed the guest user,
and that the guest user may access the service/action using its username/password
"""
return any(
ep == ".".join((service, action))
for ep in config.get("services.auth.fixed_users.guest.allow_endpoints", [])
)
@classmethod
def get_guest_user(cls) -> Optional["FixedUser"]:
if cls.guest_enabled():
return cls(
is_guest=True,
username=config.get("services.auth.fixed_users.guest.username"),
password=config.get("services.auth.fixed_users.guest.password"),
name=config.get("services.auth.fixed_users.guest.name"),
company=config.get("services.auth.fixed_users.guest.default_company"),
)
def __hash__(self):
return hash(self.user_id)