369 lines
13 KiB
Python
369 lines
13 KiB
Python
from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator
|
|
from fastapi import HTTPException, status
|
|
from typing import Optional
|
|
from datetime import datetime
|
|
|
|
# Funções utilitárias para segurança (hash e verificação de senha)
|
|
from actions.security.security import Security
|
|
|
|
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
|
|
from actions.validations.text import Text
|
|
|
|
# Funções para validar E-mail
|
|
from actions.validations.email import Email
|
|
|
|
# Funções para validar cpf
|
|
from actions.validations.cpf import CPF
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema base
|
|
# ----------------------------------------------------
|
|
class GUsuarioIndexSchema(BaseModel):
|
|
class Config:
|
|
extra = "allow" # permite parâmetros dinâmicos
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema base
|
|
# ----------------------------------------------------
|
|
class GUsuarioSchema(BaseModel):
|
|
usuario_id: Optional[int] = None
|
|
trocarsenha: Optional[str] = None
|
|
login: str
|
|
senha: Optional[str] = None
|
|
situacao: Optional[str] = None
|
|
nome_completo: Optional[str] = None
|
|
funcao: Optional[str] = None
|
|
assina: Optional[str] = None
|
|
sigla: Optional[str] = None
|
|
usuario_tab: Optional[str] = None
|
|
ultimo_login: Optional[datetime] = None
|
|
ultimo_login_regs: Optional[datetime] = None
|
|
data_expiracao: Optional[datetime] = None
|
|
senha_anterior: Optional[str] = None
|
|
andamento_padrao: Optional[str] = None
|
|
lembrete_pergunta: Optional[str] = None
|
|
lembrete_resposta: Optional[str] = None
|
|
andamento_padrao2: Optional[str] = None
|
|
receber_mensagem_arrolamento: Optional[str] = None
|
|
email: Optional[EmailStr] = None
|
|
assina_certidao: Optional[str] = None
|
|
receber_email_penhora: Optional[str] = None
|
|
foto: Optional[str] = None
|
|
nao_receber_chat_todos: Optional[str] = None
|
|
pode_alterar_caixa: Optional[str] = None
|
|
receber_chat_certidao_online: Optional[str] = None
|
|
receber_chat_cancelamento: Optional[str] = None
|
|
cpf: Optional[str] = None
|
|
somente_leitura: Optional[str] = None
|
|
receber_chat_envio_onr: Optional[str] = None
|
|
tipo_usuario: Optional[str] = None
|
|
senha_api: str
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para acesso ao sistema
|
|
# ----------------------------------------------------
|
|
class GUsuarioAuthenticateSchema(BaseModel):
|
|
|
|
# Campos utilizados
|
|
login: str # login obrigatório
|
|
senha_api: str # senha_api obrigatório
|
|
|
|
# Validação e sanitização do login
|
|
@field_validator("login")
|
|
def validar_e_sanitizar_login(cls, v):
|
|
|
|
# Verifica se o login foi informado
|
|
if not v:
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Informe o login"
|
|
)
|
|
|
|
# Sanitiza o login para evitar XSS e SQL Injection
|
|
return Text.sanitize_input(v)
|
|
|
|
# Validação e sanitização da senha
|
|
@field_validator("senha_api")
|
|
def validar_e_sanitizar_senha(cls, v):
|
|
|
|
# Verifica se a senha foi informada
|
|
if not v:
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Informe a senha"
|
|
)
|
|
|
|
# Sanitiza a senha para evitar XSS e SQL Injection
|
|
return Text.sanitize_input(v)
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para localizar um usuário especifico pelo ID (GET)
|
|
# ----------------------------------------------------
|
|
class GUsuarioIdSchema(BaseModel):
|
|
usuario_id: int
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para criação de novo usuário (POST)
|
|
# ----------------------------------------------------
|
|
class GUsuarioSaveSchema(BaseModel):
|
|
usuario_id: Optional[int] = None
|
|
trocarsenha: Optional[str] = None
|
|
login: str
|
|
situacao: str
|
|
nome_completo: str
|
|
funcao: str
|
|
email: EmailStr
|
|
cpf: str
|
|
senha_api: str
|
|
|
|
# Sanitiza os inputs enviados
|
|
@field_validator("login", "email", "cpf", "situacao", "nome_completo", "funcao")
|
|
def validate_required_fields(cls, v):
|
|
if not v or len(v.strip()) == 0:
|
|
# Não lança exceção aqui. Apenas retorna para o model_validator.
|
|
# O Pydantic já faria a checagem básica.
|
|
# Este validador individual é mais para sanitização.
|
|
pass
|
|
return Text.sanitize_input(v)
|
|
|
|
# Verifica se os campos foram enviados
|
|
@model_validator(mode="after")
|
|
def validate_all_fields(self):
|
|
|
|
# Variavel responsavel em armaezar os erros
|
|
errors = []
|
|
|
|
# Validação do login
|
|
if not self.login or len(self.login.strip()) == 0:
|
|
errors.append({"input": "login", "message": "O login é obrigatório."})
|
|
|
|
# Validação da situação
|
|
if not self.situacao or len(self.situacao.strip()) == 0:
|
|
errors.append({"input": "situacao", "message": "A situação é obrigatória."})
|
|
|
|
# Validação do nome_completo
|
|
if not self.nome_completo or len(self.nome_completo.strip()) == 0:
|
|
errors.append(
|
|
{"input": "nome_completo", "message": "O nome completo é obrigatório."}
|
|
)
|
|
|
|
# Validação da função
|
|
if not self.funcao or len(self.funcao.strip()) == 0:
|
|
errors.append({"input": "funcao", "message": "A função é obrigatória."})
|
|
|
|
# Validação do email
|
|
if not self.email or len(self.email.strip()) == 0:
|
|
errors.append({"input": "email", "message": "O e-mail é obrigatório."})
|
|
|
|
# Validação do cpf
|
|
if not self.cpf or len(self.cpf.strip()) == 0:
|
|
errors.append({"input": "cpf", "message": "O CPF é obrigatório."})
|
|
|
|
# Validação da senha
|
|
if not self.senha_api or len(self.senha_api.strip()) == 0:
|
|
errors.append({"input": "senha_api", "message": "A senha é obrigatória."})
|
|
|
|
# Criptografa a senha
|
|
self.senha_api = Security.hash_senha_api(self.senha_api)
|
|
|
|
# Se houver errors, lança uma única exceção
|
|
if errors:
|
|
# Lança uma exceção do FastAPI para um tratamento limpo
|
|
# O `detail` da exceção será a lista de errors que criamos
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors
|
|
)
|
|
|
|
return self
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para atualizar usuário (PUT)
|
|
# ----------------------------------------------------
|
|
class GUsuarioUpdateSchema(BaseModel):
|
|
|
|
trocarsenha: Optional[str] = None
|
|
login: Optional[str] = None
|
|
situacao: Optional[str] = None
|
|
nome_completo: Optional[str] = None
|
|
funcao: Optional[str] = None
|
|
email: Optional[str] = None
|
|
cpf: Optional[str] = None
|
|
senha_api: Optional[str] = None
|
|
|
|
# Sanitiza os inputs enviados
|
|
@field_validator("login", "email", "cpf", "situacao", "nome_completo", "funcao")
|
|
def validate_required_fields(cls, v):
|
|
if not v or len(v.strip()) == 0:
|
|
# Não lança exceção aqui. Apenas retorna para o model_validator.
|
|
# O Pydantic já faria a checagem básica.
|
|
# Este validador individual é mais para sanitização.
|
|
pass
|
|
return Text.sanitize_input(v)
|
|
|
|
# Verifica se os campos foram enviados
|
|
@model_validator(mode="after")
|
|
def validate_all_fields(self):
|
|
|
|
# Variavel responsavel em armaezar os erros
|
|
errors = []
|
|
|
|
# Validação do login
|
|
if not self.login or len(self.login.strip()) == 0:
|
|
errors.append({"input": "login", "message": "O login é obrigatório."})
|
|
|
|
# Validação da situação
|
|
if not self.situacao or len(self.situacao.strip()) == 0:
|
|
errors.append({"input": "situacao", "message": "A situação é obrigatória."})
|
|
|
|
# Validação do nome_completo
|
|
if not self.nome_completo or len(self.nome_completo.strip()) == 0:
|
|
errors.append(
|
|
{"input": "nome_completo", "message": "O nome completo é obrigatório."}
|
|
)
|
|
|
|
# Validação da função
|
|
if not self.funcao or len(self.funcao.strip()) == 0:
|
|
errors.append({"input": "funcao", "message": "A função é obrigatória."})
|
|
|
|
# Validação do email
|
|
if not self.email or len(self.email.strip()) == 0:
|
|
errors.append({"input": "email", "message": "O e-mail é obrigatório."})
|
|
|
|
# Validação do cpf
|
|
if not self.cpf or len(self.cpf.strip()) == 0:
|
|
errors.append({"input": "cpf", "message": "O CPF é obrigatório."})
|
|
|
|
# Validação da senha
|
|
if not self.senha_api or len(self.senha_api.strip()) == 0:
|
|
errors.append({"input": "senha_api", "message": "A senha é obrigatória."})
|
|
|
|
# Se houver errors, lança uma única exceção
|
|
if errors:
|
|
# Lança uma exceção do FastAPI para um tratamento limpo
|
|
# O `detail` da exceção será a lista de errors que criamos
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=errors
|
|
)
|
|
|
|
return self
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para localizar usuário pelo e-mail
|
|
# ----------------------------------------------------
|
|
class GUsuarioEmailSchema(BaseModel):
|
|
# Use EmailStr para garantir a validação automática do formato
|
|
email: Optional[EmailStr] = None
|
|
|
|
# Sanitiza o input
|
|
@field_validator("email")
|
|
def sanitize_email(cls, v):
|
|
# A sanitização é feita apenas se o valor não for None ou vazio
|
|
if v:
|
|
return Text.sanitize_input(v)
|
|
return v
|
|
|
|
# Verifica se o e-mail é válido
|
|
@field_validator("email")
|
|
def check_email(cls, v):
|
|
# A verificação é feita apenas se o valor não for None ou vazio
|
|
if not Email.is_valid_email(v):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Informe um e-mail válido",
|
|
)
|
|
return v
|
|
|
|
# Valida se o campo não está vazio
|
|
@model_validator(mode="after")
|
|
def validate_email(self):
|
|
if not self.email or len(self.email.strip()) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Informe um e-mail",
|
|
)
|
|
return self
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para localizar usuário pelo CPF
|
|
# ----------------------------------------------------
|
|
class GUsuarioCpfSchema(BaseModel):
|
|
# Use EmailStr para garantir a validação automática do formato
|
|
cpf: Optional[str] = None
|
|
|
|
# Sanitiza o input
|
|
@field_validator("cpf")
|
|
def sanitize_cpf(cls, v):
|
|
# A sanitização é feita apenas se o valor não for None ou vazio
|
|
if v:
|
|
return Text.sanitize_input(v)
|
|
return v
|
|
|
|
# Verifica se o e-mail é válido
|
|
@field_validator("cpf")
|
|
def check_email(cls, v):
|
|
# A verificação é feita apenas se o valor não for None ou vazio
|
|
if not CPF.is_valid_cpf(v):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Informe um CPF válido",
|
|
)
|
|
return v
|
|
|
|
# Valida se o campo não está vazio
|
|
@model_validator(mode="after")
|
|
def validate_cpf(self):
|
|
if not self.cpf or len(self.cpf.strip()) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Informe um CPF",
|
|
)
|
|
return self
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
# ----------------------------------------------------
|
|
# Schema para localizar usuário pelo Login
|
|
# ----------------------------------------------------
|
|
class GUsuarioLoginSchema(BaseModel):
|
|
# Use EmailStr para garantir a validação automática do formato
|
|
login: Optional[str] = None
|
|
|
|
# Sanitiza o input
|
|
@field_validator("login")
|
|
def sanitize_login(cls, v):
|
|
# A sanitização é feita apenas se o valor não for None ou vazio
|
|
if v:
|
|
return Text.sanitize_input(v)
|
|
return v
|
|
|
|
# Valida se o campo não está vazio
|
|
@model_validator(mode="after")
|
|
def validate_login(self):
|
|
if not self.login or len(self.login.strip()) == 0:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
|
detail="Informe um Login",
|
|
)
|
|
return self
|
|
|
|
class Config:
|
|
from_attributes = True
|