2023-11-19 08:13:59 +00:00
|
|
|
from fastapi import Response
|
|
|
|
from fastapi import Depends, FastAPI, HTTPException, status
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
from typing import List, Union, Optional
|
|
|
|
|
|
|
|
from fastapi import APIRouter
|
|
|
|
from pydantic import BaseModel
|
|
|
|
import time
|
|
|
|
import uuid
|
|
|
|
|
|
|
|
from apps.web.models.users import UserModel, UserRoleUpdateForm, Users
|
2023-12-29 07:24:51 +00:00
|
|
|
from apps.web.models.auths import Auths
|
|
|
|
|
2023-11-19 08:13:59 +00:00
|
|
|
|
|
|
|
from utils.utils import (
|
|
|
|
get_password_hash,
|
|
|
|
bearer_scheme,
|
|
|
|
create_token,
|
|
|
|
)
|
|
|
|
from constants import ERROR_MESSAGES
|
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
############################
|
|
|
|
# GetUsers
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/", response_model=List[UserModel])
|
|
|
|
async def get_users(skip: int = 0, limit: int = 50, cred=Depends(bearer_scheme)):
|
|
|
|
token = cred.credentials
|
|
|
|
user = Users.get_user_by_token(token)
|
|
|
|
|
|
|
|
if user:
|
|
|
|
if user.role == "admin":
|
|
|
|
return Users.get_users(skip, limit)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
############################
|
|
|
|
# UpdateUserRole
|
|
|
|
############################
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/update/role", response_model=Optional[UserModel])
|
|
|
|
async def update_user_role(form_data: UserRoleUpdateForm, cred=Depends(bearer_scheme)):
|
|
|
|
token = cred.credentials
|
|
|
|
user = Users.get_user_by_token(token)
|
|
|
|
|
|
|
|
if user:
|
|
|
|
if user.role == "admin":
|
|
|
|
if user.id != form_data.id:
|
|
|
|
return Users.update_user_role_by_id(form_data.id, form_data.role)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
|
|
|
)
|
2023-12-29 07:02:49 +00:00
|
|
|
|
|
|
|
|
|
|
|
############################
|
2023-12-29 07:24:51 +00:00
|
|
|
# DeleteUserById
|
2023-12-29 07:02:49 +00:00
|
|
|
############################
|
|
|
|
|
|
|
|
|
|
|
|
@router.delete("/{user_id}", response_model=bool)
|
|
|
|
async def delete_user_by_id(user_id: str, cred=Depends(bearer_scheme)):
|
|
|
|
token = cred.credentials
|
|
|
|
user = Users.get_user_by_token(token)
|
|
|
|
|
|
|
|
if user:
|
|
|
|
if user.role == "admin":
|
2023-12-29 07:07:46 +00:00
|
|
|
if user.id != user_id:
|
2023-12-29 07:24:51 +00:00
|
|
|
result = Auths.delete_auth_by_id(user_id)
|
2023-12-29 07:07:46 +00:00
|
|
|
|
|
|
|
if result:
|
|
|
|
return True
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
|
|
detail=ERROR_MESSAGES.DELETE_USER_ERROR,
|
|
|
|
)
|
2023-12-29 07:02:49 +00:00
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
2023-12-29 07:07:46 +00:00
|
|
|
detail=ERROR_MESSAGES.ACTION_PROHIBITED,
|
2023-12-29 07:02:49 +00:00
|
|
|
)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
|
|
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
raise HTTPException(
|
|
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
|
|
detail=ERROR_MESSAGES.INVALID_TOKEN,
|
|
|
|
)
|