MirrorAPI/packages/v1/administrativo/schemas/usuario_schema.py
2025-11-03 15:18:26 -03:00

223 lines
7 KiB
Python

from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator
from pydantic_core import PydanticCustomError
from fastapi import HTTPException, status
from typing import Optional
from datetime import datetime
# Funções utilitárias para segurança (hash e verificação de senha)
# Supondo que você ajustará as importações conforme seu projeto
from actions.security.security import Security
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# Funções para validar E-mail
from actions.validations.email import Email
# Funções para validar cpf (Não está na DDL, mas mantido como opcional no Save e Update)
# from actions.validations.cpf import CPF # Não utilizado neste novo escopo
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class UsuarioSchema(BaseModel):
usuario_id: Optional[int] = None
nome: Optional[str] = None
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = None
status: Optional[str] = None # Corresponde ao 'status' na DDL
date_register: Optional[datetime] = None
date_update: Optional[datetime] = None
user_id_create: Optional[int] = None
user_id_update: Optional[int] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para acesso ao sistema (Autenticação)
# ----------------------------------------------------
class UsuarioAuthenticateSchema(BaseModel):
# Campos utilizados
username: str # Usando username para login, é mais comum em DDLs simples
password: str # Corresponde ao 'password' na DDL
status: Optional[str] = None
usuario_id: Optional[int] = None
nome: Optional[str] = None
email: Optional[str] = None
# Validação e sanitização do email
@field_validator("username")
def validar_e_sanitizar_password(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Informe o username"
)
# Sanitiza e-mail
return Text.sanitize_input(v)
# Validação e sanitização da senha
@field_validator("password")
def validar_e_sanitizar_senha(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="Informe a password"
)
# Sanitiza a senha
return Text.sanitize_input(v)
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um usuário especifico pelo ID (GET)
# ----------------------------------------------------
class UsuarioIdSchema(BaseModel):
usuario_id: int
# ----------------------------------------------------
# Schema para criação de novo usuário (POST)
# ----------------------------------------------------
class UsuarioSaveSchema(BaseModel):
usuario_id: Optional[int] = None
nome: constr(min_length=1)
email: EmailStr
username: constr(min_length=1)
password: constr(min_length=1)
status: Optional[str] = "A"
user_id_create: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator("nome", "email", "username", "password", "status")
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados e hash da senha
@model_validator(mode="after")
def validate_all_fields_and_hash_password(self):
errors = []
# Validação do nome
if not self.nome or len(self.nome.strip()) == 0:
errors.append({"input": "nome", "message": "O nome é obrigatório."})
# Validação do email (EmailStr do Pydantic já faz a validação de formato)
if not self.email or len(self.email.strip()) == 0:
errors.append({"input": "email", "message": "O e-mail é obrigatório."})
# Validação da username
if not self.username or len(self.username.strip()) == 0:
errors.append({"input": "username", "message": "O username é obrigatório."})
# Validação da senha
if not self.password or len(self.password.strip()) == 0:
errors.append({"input": "password", "message": "A senha é obrigatória."})
# Criptografa a senha
if (
self.password and not errors
): # Hash somente se a senha estiver presente e sem erros anteriores
self.password = Security.hash_password(self.password)
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar usuário (PUT)
# ----------------------------------------------------
class UsuarioUpdateSchema(BaseModel):
nome: Optional[str] = None
email: Optional[EmailStr] = None
username: Optional[str] = None
password: Optional[str] = None
status: Optional[str] = None
user_id_update: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator(
"nome",
"email",
"username",
"password",
"status",
)
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Hash da senha se ela for fornecida
@model_validator(mode="after")
def hash_password_if_provided(self):
# Hash da nova senha, se fornecida.
if self.password:
self.password = Security.hash_password(self.password)
# Não estamos forçando a obrigatoriedade de todos os campos aqui (PUT é parcial),
# mas garantimos a sanitização e o hash da senha se ela existir.
return self
# ----------------------------------------------------
# Schema para localizar usuário pelo e-mail
# ----------------------------------------------------
class UsuarioEmailSchema(BaseModel):
email: Optional[EmailStr] = None
# Sanitiza o e-mail informado
@field_validator("email", mode="before")
def sanitize_email(cls, v: Optional[str]):
if v is None:
return v
v = v.strip()
return v
# Verifica se o e-mail foi informado e se é válido
@field_validator("email")
def validate_email(cls, v: str):
errors = []
# vazio
if not v:
errors.append({"input": "e-mail", "message": "Informe um e-mail."})
# inválido pelo seu helper
if not Email.is_valid_email(v):
errors.append({"input": "e-mail", "message": "E-mail inválido."})
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors
)
return v
class Config:
from_attributes = True