monitoring-api/packages/v1/administrativo/schemas/user_schema.py
2025-10-06 09:30:41 -03:00

201 lines
No EOL
6.6 KiB
Python

from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
from datetime import datetime
# Funções utilitárias para segurança (hash e verificação de senha)
# Supondo que você ajustará as importações conforme seu projeto
from actions.security.security import Security
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# Funções para validar E-mail
from actions.validations.email import Email
# Funções para validar cpf (Não está na DDL, mas mantido como opcional no Save e Update)
# from actions.validations.cpf import CPF # Não utilizado neste novo escopo
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class UserSchema(BaseModel):
user_id: Optional[int] = None
name: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None # Corresponde ao 'password' na DDL
password_temp: Optional[str] = None
password_temp_confirm: Optional[str] = None
position: Optional[str] = None # Corresponde ao 'position' na DDL
team: Optional[str] = None
status: Optional[str] = None # Corresponde ao 'status' na DDL
date_register: Optional[datetime] = None
date_update: Optional[datetime] = None
user_id_create: Optional[int] = None
user_id_update: Optional[int] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para acesso ao sistema (Autenticação)
# ----------------------------------------------------
class UserAuthenticateSchema(BaseModel):
# Campos utilizados
email: EmailStr # Usando email para login, é mais comum em DDLs simples
password: str # Corresponde ao 'password' na DDL
# Validação e sanitização do email
@field_validator('email')
def validar_e_sanitizar_email(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe o e-mail'
)
# Sanitiza e-mail
return Text.sanitize_input(v)
# Validação e sanitização da senha
@field_validator('password')
def validar_e_sanitizar_senha(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe a senha'
)
# Sanitiza a senha
return Text.sanitize_input(v)
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um usuário especifico pelo ID (GET)
# ----------------------------------------------------
class UserIdSchema(BaseModel):
user_id: int
# ----------------------------------------------------
# Schema para criação de novo usuário (POST)
# ----------------------------------------------------
class UserSaveSchema(BaseModel):
user_id: Optional[int] = None
name: constr(min_length=1)
email: EmailStr
password: constr(min_length=1)
position: constr(min_length=1)
team: Optional[str] = None
status: Optional[str] = 'A'
user_id_create: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator('name', 'email', 'position', 'team', 'status')
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados e hash da senha
@model_validator(mode='after')
def validate_all_fields_and_hash_password(self):
errors = []
# Validação do nome
if not self.name or len(self.name.strip()) == 0:
errors.append({'input': 'name', 'message': 'O nome é obrigatório.'})
# Validação do email (EmailStr do Pydantic já faz a validação de formato)
if not self.email or len(self.email.strip()) == 0:
errors.append({'input': 'email', 'message': 'O e-mail é obrigatório.'})
# Validação da senha
if not self.password or len(self.password.strip()) == 0:
errors.append({'input': 'password', 'message': 'A senha é obrigatória.'})
# Criptografa a senha
if self.password and not errors: # Hash somente se a senha estiver presente e sem erros anteriores
self.password = Security.hash_password(self.password)
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar usuário (PUT)
# ----------------------------------------------------
class UserUpdateSchema(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
password_temp: Optional[str] = None
password_temp_confirm: Optional[str] = None
position: Optional[str] = None
team: Optional[str] = None
status: Optional[str] = None
user_id_update: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator('name', 'email', 'password_temp', 'password_temp_confirm', 'position', 'team', 'status')
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Hash da senha se ela for fornecida
@model_validator(mode='after')
def hash_password_if_provided(self):
# Hash da nova senha, se fornecida.
if self.password:
self.password = Security.hash_password(self.password)
# Não estamos forçando a obrigatoriedade de todos os campos aqui (PUT é parcial),
# mas garantimos a sanitização e o hash da senha se ela existir.
return self
# ----------------------------------------------------
# Schema para localizar usuário pelo e-mail
# ----------------------------------------------------
class UserEmailSchema(BaseModel):
email: Optional[EmailStr] = None
# Sanitiza o input
@field_validator('email')
def sanitize_email(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_email(self):
if not self.email or len(self.email.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um e-mail'
)
return self
class Config:
from_attributes = True