diff --git a/apiserver/schema/services/auth.conf b/apiserver/schema/services/auth.conf index 79d89d0..ec0369a 100644 --- a/apiserver/schema/services/auth.conf +++ b/apiserver/schema/services/auth.conf @@ -145,6 +145,7 @@ validate_token { create_user { "2.1" { + allow_roles: ["system", "root", "admin"] description: """Creates a new user auth entry. Intended for internal use. """ request { type: object diff --git a/apiserver/services/auth.py b/apiserver/services/auth.py index f075387..7cd479a 100644 --- a/apiserver/services/auth.py +++ b/apiserver/services/auth.py @@ -19,7 +19,7 @@ from apiserver.bll.auth import AuthBLL from apiserver.config import info from apiserver.config_repo import config from apiserver.database.errors import translate_errors_context -from apiserver.database.model.auth import User +from apiserver.database.model.auth import User, Role from apiserver.service_repo import APICall, endpoint from apiserver.service_repo.auth import Token from apiserver.service_repo.auth.fixed_user import FixedUser @@ -56,13 +56,18 @@ def logout(call: APICall, *_, **__): request_data_model=GetTokenForUserRequest, response_data_model=GetTokenResponse, ) -def get_token_for_user(call): +def get_token_for_user(call: APICall, _: str, request: GetTokenForUserRequest): """ Generates a token based on a requested user and company. INTERNAL. """ - assert isinstance(call, APICall) + if call.identity.role not in Role.get_system_roles(): + if call.identity.role != Role.admin and call.identity.user != request.user: + raise errors.bad_request.InvalidUserId("cannot generate token for another user") + if call.identity.company != request.company: + raise errors.bad_request.InvalidId("cannot generate token in another company") + call.result.data_model = AuthBLL.get_token_for_user( - user_id=call.data_model.user, - company_id=call.data_model.company, - expiration_sec=call.data_model.expiration_sec, + user_id=request.user, + company_id=request.company, + expiration_sec=request.expiration_sec, ) @@ -71,10 +76,8 @@ def get_token_for_user(call): request_data_model=ValidateTokenRequest, response_data_model=ValidateResponse, ) -def validate_token_endpoint(call): +def validate_token_endpoint(call: APICall, _, __): """ Validate a token and return identity if valid. INTERNAL. """ - assert isinstance(call, APICall) - try: # if invalid, decoding will fail token = Token.from_encoded_token(call.data_model.token) @@ -92,12 +95,18 @@ def validate_token_endpoint(call): ) def create_user(call: APICall, _, request: CreateUserRequest): """ Create a user from. INTERNAL. """ + if call.identity.role not in Role.get_system_roles() and request.company != call.identity.company: + raise errors.bad_request.InvalidId("cannot create user in another company") + user_id = AuthBLL.create_user(request=request, call=call) call.result.data_model = CreateUserResponse(id=user_id) @endpoint("auth.create_credentials", response_data_model=CreateCredentialsResponse) def create_credentials(call: APICall, _, __): + if _is_protected_user(call.identity.user): + raise errors.bad_request.InvalidUserId("protected identity") + credentials = AuthBLL.create_credentials( user_id=call.identity.user, company_id=call.identity.company, @@ -111,11 +120,13 @@ def create_credentials(call: APICall, _, __): request_data_model=RevokeCredentialsRequest, response_data_model=RevokeCredentialsResponse, ) -def revoke_credentials(call): - assert isinstance(call, APICall) +def revoke_credentials(call: APICall, _, __): identity = call.identity access_key = call.data_model.access_key + if _is_protected_user(call.identity.user): + raise errors.bad_request.InvalidUserId("protected identity") + with translate_errors_context(): query = dict( id=identity.user, company=identity.company, credentials__key=access_key @@ -130,9 +141,7 @@ def revoke_credentials(call): @endpoint("auth.get_credentials", response_data_model=GetCredentialsResponse) -def get_credentials(call): - """ Validate a user by his email. INTERNAL. """ - assert isinstance(call, APICall) +def get_credentials(call: APICall, _, __): identity = call.identity with translate_errors_context(): @@ -153,9 +162,7 @@ def get_credentials(call): @endpoint( "auth.edit_user", request_data_model=EditUserReq, response_data_model=UpdateResponse ) -def update(call, company_id, _): - assert isinstance(call, APICall) - +def update(call: APICall, company_id: str, _): fields = { k: v for k, v in call.data_model.to_struct().items() @@ -198,3 +205,7 @@ def fixed_users_mode(call: APICall, *_, **__): data["guest"]["password"] = guest_user.password call.result.data = data + + +def _is_protected_user(user_id): + return user_id.strip("_") in config.get("secure.credentials")