from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator from pydantic_core import PydanticCustomError from fastapi import HTTPException, status from typing import Optional from datetime import datetime # Funções utilitárias para segurança (hash e verificação de senha) # Supondo que você ajustará as importações conforme seu projeto from actions.security.security import Security # Funções para sanitização de entradas (evitar XSS, SQLi etc.) from actions.validations.text import Text # Funções para validar E-mail from actions.validations.email import Email # Funções para validar cpf (Não está na DDL, mas mantido como opcional no Save e Update) # from actions.validations.cpf import CPF # Não utilizado neste novo escopo # ---------------------------------------------------- # Schema base # ---------------------------------------------------- class UsuarioSchema(BaseModel): usuario_id: Optional[int] = None nome: Optional[str] = None email: Optional[EmailStr] = None username: Optional[str] = None password: Optional[str] = None status: Optional[str] = None # Corresponde ao 'status' na DDL date_register: Optional[datetime] = None date_update: Optional[datetime] = None user_id_create: Optional[int] = None user_id_update: Optional[int] = None class Config: from_attributes = True # ---------------------------------------------------- # Schema para acesso ao sistema (Autenticação) # ---------------------------------------------------- class UsuarioAuthenticateSchema(BaseModel): # Campos utilizados username: str # Usando username para login, é mais comum em DDLs simples password: str # Corresponde ao 'password' na DDL status: Optional[str] = None usuario_id: Optional[int] = None nome: Optional[str] = None email: Optional[str] = None # Validação e sanitização do email @field_validator("username") def validar_e_sanitizar_password(cls, v): if not v: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Informe o username" ) # Sanitiza e-mail return Text.sanitize_input(v) # Validação e sanitização da senha @field_validator("password") def validar_e_sanitizar_senha(cls, v): if not v: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Informe a password" ) # Sanitiza a senha return Text.sanitize_input(v) class Config: from_attributes = True # ---------------------------------------------------- # Schema para localizar um usuário especifico pelo ID (GET) # ---------------------------------------------------- class UsuarioIdSchema(BaseModel): usuario_id: int # ---------------------------------------------------- # Schema para criação de novo usuário (POST) # ---------------------------------------------------- class UsuarioSaveSchema(BaseModel): usuario_id: Optional[int] = None nome: constr(min_length=1) email: EmailStr username: constr(min_length=1) password: constr(min_length=1) status: Optional[str] = "A" user_id_create: Optional[int] = None # Sanitiza os inputs enviados @field_validator("nome", "email", "username", "password", "status") def validate_and_sanitize_fields(cls, v): if v is not None: return Text.sanitize_input(v) return v # Verifica se os campos obrigatórios foram enviados e hash da senha @model_validator(mode="after") def validate_all_fields_and_hash_password(self): errors = [] # Validação do nome if not self.nome or len(self.nome.strip()) == 0: errors.append({"input": "nome", "message": "O nome é obrigatório."}) # Validação do email (EmailStr do Pydantic já faz a validação de formato) if not self.email or len(self.email.strip()) == 0: errors.append({"input": "email", "message": "O e-mail é obrigatório."}) # Validação da username if not self.username or len(self.username.strip()) == 0: errors.append({"input": "username", "message": "O username é obrigatório."}) # Validação da senha if not self.password or len(self.password.strip()) == 0: errors.append({"input": "password", "message": "A senha é obrigatória."}) # Criptografa a senha if ( self.password and not errors ): # Hash somente se a senha estiver presente e sem erros anteriores self.password = Security.hash_password(self.password) # Se houver errors, lança uma única exceção if errors: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors ) return self # ---------------------------------------------------- # Schema para atualizar usuário (PUT) # ---------------------------------------------------- class UsuarioUpdateSchema(BaseModel): nome: Optional[str] = None email: Optional[EmailStr] = None username: Optional[str] = None password: Optional[str] = None status: Optional[str] = None user_id_update: Optional[int] = None # Sanitiza os inputs enviados @field_validator( "nome", "email", "username", "password", "status", ) def validate_and_sanitize_fields(cls, v): if v is not None: return Text.sanitize_input(v) return v # Hash da senha se ela for fornecida @model_validator(mode="after") def hash_password_if_provided(self): # Hash da nova senha, se fornecida. if self.password: self.password = Security.hash_password(self.password) # Não estamos forçando a obrigatoriedade de todos os campos aqui (PUT é parcial), # mas garantimos a sanitização e o hash da senha se ela existir. return self # ---------------------------------------------------- # Schema para localizar usuário pelo e-mail # ---------------------------------------------------- class UsuarioEmailSchema(BaseModel): email: Optional[EmailStr] = None # Sanitiza o e-mail informado @field_validator("email", mode="before") def sanitize_email(cls, v: Optional[str]): if v is None: return v v = v.strip() return v # Verifica se o e-mail foi informado e se é válido @field_validator("email") def validate_email(cls, v: str): errors = [] # vazio if not v: errors.append({"input": "e-mail", "message": "Informe um e-mail."}) # inválido pelo seu helper if not Email.is_valid_email(v): errors.append({"input": "e-mail", "message": "E-mail inválido."}) # Se houver errors, lança uma única exceção if errors: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors ) return v class Config: from_attributes = True