monitoring-api/packages/v1/administrativo/schemas/log_schema.py
2025-10-07 13:39:43 -03:00

155 lines
No EOL
5.5 KiB
Python

from pydantic import BaseModel, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional, Any, List
from datetime import datetime
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
# Mantido por consistência com o arquivo original
from actions.validations.text import Text
# ----------------------------------------------------
# Schema base - Inclui todos os campos da DDL
# ----------------------------------------------------
class LogSchema(BaseModel):
log_id: Optional[int] = None
client_id: Optional[int] = None
date_post: Optional[datetime] = None
file: Optional[Any] = None # Mapeamento do campo JSON para Any ou dict
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um log especifico pelo CLIENT_ID (GET)
# ----------------------------------------------------
class LogClientIdSchema(BaseModel):
client_id: int
# ----------------------------------------------------
# Schema para localizar um log especifico pelo ID (GET/DELETE)
# ----------------------------------------------------
class LogIdSchema(BaseModel):
log_id: int
# ----------------------------------------------------
# Schema para localizar um log por critérios de filtro
# (Adaptado de UserAuthenticateSchema, usando client_id e data)
# ----------------------------------------------------
class LogAuthenticateSchema(BaseModel):
# Campos utilizados: client_id e date_post (para filtro de range, por exemplo)
client_id: Optional[int] = None
start_date_post: Optional[datetime] = None
end_date_post: Optional[datetime] = None
# Adicionando um campo genérico para buscar algo dentro do JSON 'file'
file_search_term: Optional[str] = None
@field_validator('file_search_term')
def sanitize_search_term(cls, v):
if v:
return Text.sanitize_input(v)
return v
@model_validator(mode='after')
def check_required_filter(self):
# Garante que pelo menos um critério de busca seja fornecido
if not any([self.client_id, self.start_date_post, self.end_date_post, self.file_search_term]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Pelo menos um critério de busca (client_id, data inicial, data final ou termo de busca no arquivo) é obrigatório.'
)
return self
# ----------------------------------------------------
# Schema para localizar log pelo campo 'file' (Adaptado de UserEmailSchema)
# ----------------------------------------------------
class LogFileSchema(BaseModel):
# Usado para buscar logs que contenham esta string no campo JSON 'file'
file_content_search: Optional[str] = None
client_id: Optional[int] = None
# Sanitiza o input
@field_validator('file_content_search')
def sanitize_search(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_search_fields(self):
if not self.file_content_search and not self.client_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um termo de busca no arquivo ou o ID do cliente.'
)
return self
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para criação de novo log (POST)
# ----------------------------------------------------
class LogSaveSchema(BaseModel):
log_id: Optional[int] = None # Opcional para operações de upsert
client_id: int # NOT NULL
date_post: Optional[datetime] = None # Possui DEFAULT CURRENT_TIMESTAMP
file: Any # NOT NULL (Mapeamento do campo JSON)
# Sanitiza o input (apenas para campos string, mas mantido por consistência)
@field_validator('client_id', mode='before')
def validate_client_id(cls, v):
if v is not None:
# Converte para string para sanitizar e depois para int (se necessário)
return int(Text.sanitize_input(str(v)))
return v
# O campo 'file' sendo Any/dict não tem sanitização Text.sanitize_input direta.
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_required_fields(self):
errors = []
# Validação do client_id
if not self.client_id or self.client_id <= 0:
errors.append({'input': 'client_id', 'message': 'O ID do cliente é obrigatório.'})
# Validação do file (JSON)
if self.file is None:
errors.append({'input': 'file', 'message': 'O campo de arquivo (JSON) é obrigatório.'})
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar log (PUT)
# ----------------------------------------------------
class LogUpdateSchema(BaseModel):
client_id: Optional[int] = None
file: Optional[Any] = None # Mapeamento do campo JSON
# Sanitiza o input (apenas para campos string, mas mantido por consistência)
@field_validator('client_id', mode='before')
def validate_client_id(cls, v):
if v is not None:
return int(Text.sanitize_input(str(v)))
return v