feat(): Criação de endpoint que cadastra multiplos documentos junto ao banco de dados. Criado função global que grava os arquivos em disco

This commit is contained in:
Kenio 2025-11-13 18:02:08 -03:00
parent 03caa0ba01
commit 918a7eac71
8 changed files with 328 additions and 54 deletions

View file

@ -0,0 +1,54 @@
from fastapi import HTTPException, status
import base64
# Salva o arquivo Base64 em disco e retorna o caminho completo
def save_file_from_base64(
base64_content: str, file_name: str, ato_id: int, upload_dir: str, group_size: int
) -> str:
"""
Decodifica o Base64, salva o arquivo em disco dentro de uma hierarquia
baseada no 'ato_id', e retorna o caminho completo do arquivo salvo.
Estrutura de diretórios:
storage/
100/
57/
documento.pdf
"""
# Garante que o diretório base de uploads existe
upload_dir.mkdir(parents=True, exist_ok=True)
# --- NOVO BLOCO: define a faixa de agrupamento ---
# Exemplo: IDs 1100 vão para pasta 100; IDs 101200 → 200
faixa_superior = ((ato_id - 1) // group_size + 1) * group_size
# Define o caminho hierárquico: storage/100/57/
target_dir = upload_dir / str(faixa_superior) / str(ato_id)
target_dir.mkdir(parents=True, exist_ok=True)
# Caminho completo do arquivo final
file_path = target_dir / file_name
# Caminho para visualização via url (relativo ao storage)
# Exemplo: "100/57/documento.pdf"
file_url = f"{faixa_superior}/{ato_id}/{file_name}"
try:
# Decodifica o Base64
file_bytes = base64.b64decode(base64_content)
# Grava o arquivo em disco
with open(file_path, "wb") as f:
f.write(file_bytes)
# Retorna o caminho completo do arquivo salvo para visualização via URl
return str(file_url)
except base64.binascii.Error:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"O conteúdo do arquivo '{file_name}' não é um Base64 válido.",
)
except Exception as e:
raise Exception(f"Erro inesperado ao salvar arquivo '{file_name}': {e}")

View file

@ -0,0 +1,19 @@
from typing import List
from packages.v1.administrativo.schemas.ato_documento_schema import (
AtoDocumentoSaveMultipleSchema,
)
from packages.v1.administrativo.repositories.ato_documento.ato_documento_save_multiple_repository import (
SaveMultipleRepository,
)
class SaveMultipleAction:
def execute(self, ato_documento: List[AtoDocumentoSaveMultipleSchema]):
save_repository = SaveMultipleRepository()
# A lista completa é passada diretamente para o Repository.
# O Repository é a única camada que deve iterar.
results = save_repository.execute(ato_documento)
return results

View file

@ -5,6 +5,9 @@ from packages.v1.administrativo.services.ato_documento.ato_documento_index_servi
from packages.v1.administrativo.services.ato_documento.ato_documento_save_service import (
SaveService,
)
from packages.v1.administrativo.services.ato_documento.ato_documento_save_multiple_service import (
SaveMultipleService,
)
from packages.v1.administrativo.services.ato_documento.ato_documento_show_service import (
ShowService,
)
@ -22,6 +25,7 @@ from packages.v1.administrativo.schemas.ato_documento_schema import (
AtoDocumentoSaveSchema,
AtoDocumentoUpdateSchema,
AtoDocumentoIdSchema,
AtoDocumentoSaveMultipleSchema,
)
@ -86,6 +90,19 @@ class AtoDocumentoController:
"data": save_service.execute(ato_documento_schema),
}
# Cadastra múltiplos itens
def save_multiple(self, ato_documento_schema: list[AtoDocumentoSaveMultipleSchema]):
# A lista completa é passada diretamente para o Service.
# O Service e o Action também devem ser corrigidos para parar de iterar.
save_service = SaveMultipleService()
responses = save_service.execute(ato_documento_schema)
return {
"message": "Processamento de múltiplos documentos concluído",
"results": responses, # O Service já retorna a lista de resultados
}
# Atualiza os dados de um documento
def update(
self, ato_documento_id: int, ato_documento_schema: AtoDocumentoUpdateSchema

View file

@ -10,6 +10,7 @@ from packages.v1.administrativo.schemas.ato_documento_schema import (
AtoDocumentoSaveSchema,
AtoDocumentoUpdateSchema,
AtoDocumentoIdSchema,
AtoDocumentoSaveMultipleSchema,
)
# Inicializa o roteador para as rotas de ato_documento
@ -94,6 +95,25 @@ async def save(
return response
# Cadastro de múltiplos itens
@router.post(
"/batch",
status_code=status.HTTP_200_OK,
summary="Cadastra múltiplos documentos",
response_description="Cadastra vários documentos de uma vez",
)
async def save_multiple(
ato_documento_schema: List[AtoDocumentoSaveMultipleSchema],
current_user: dict = Depends(get_current_user),
):
# A lista completa (List[AtoDocumentoSaveMultipleSchema]) é passada
# DIRETAMENTE para o controller (que a passará ao Service, Action e Repository).
# O loop de iteração deve estar APENAS no Repository.
responses = ato_documento_controller.save_multiple(ato_documento_schema)
return {"success": True, "data": responses}
# Atualiza os dados de documento
@router.put(
"/{ato_documento_id}",

View file

@ -0,0 +1,159 @@
from typing import List, Optional
from fastapi import HTTPException, status
import traceback
from sqlalchemy import func
from sqlalchemy.orm import Session
from database.mysql import SessionLocal, get_database_settings
from actions.file.save_file_from_base64 import save_file_from_base64
from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal
from packages.v1.administrativo.models.ato_documento_model import AtoDocumento
from packages.v1.administrativo.schemas.ato_documento_schema import (
AtoDocumentoSaveMultipleSchema,
)
import os
from pathlib import Path
import uuid
import datetime
# --- CONFIGURAÇÕES GLOBAIS ---
DB_SETTINGS = get_database_settings()
AES_KEY = getattr(DB_SETTINGS, "aeskey", None)
GROUP_SIZE = int(getattr(DB_SETTINGS, "group_size", None))
UPLOAD_DIR = Path(os.environ.get("STORAGE", "./storage"))
class SaveMultipleDocumentosRepository:
"""
Repositório responsável por salvar múltiplos documentos (AtoDocumento)
associados a Atos Principais existentes (identificados pelo código do selo).
"""
def _save_documento(
self, db: Session, doc_schema: AtoDocumentoSaveMultipleSchema
) -> AtoDocumento:
"""
Salva um único documento após localizar o Ato Principal correspondente
via código de selo.
"""
codigo_selo = getattr(doc_schema, "codigo_selo", None)
# 1 Verifica se o campo 'codigo_selo' foi informado
if not codigo_selo or codigo_selo.strip() == "":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="O campo 'codigo_selo' é obrigatório para associar o documento.",
)
# 2 Busca o ato principal correspondente ao código do selo
ato_principal = (
db.query(AtoPrincipal)
.filter(AtoPrincipal.codigo_selo == codigo_selo)
.first()
)
if not ato_principal:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Nenhum Ato Principal encontrado para o código de selo '{codigo_selo}'.",
)
ato_principal_id = ato_principal.ato_principal_id
# 3 Extrai e prepara os dados do schema
doc_data = doc_schema.model_dump(
exclude_unset=True,
exclude={"arquivo"}, # não envia base64 ao banco
)
# 4 Processa o arquivo Base64 (se informado)
base64_content = getattr(doc_schema, "arquivo", None)
file_url_path = None
if base64_content:
# Gera nome de arquivo, se não informado
if not doc_schema.nome_documento:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:8]
file_name = f"documento_{timestamp}_{unique_id}.pdf"
else:
file_name = doc_schema.nome_documento
# Salva o arquivo físico
file_url_path = save_file_from_base64(
base64_content, file_name, ato_principal_id, UPLOAD_DIR, GROUP_SIZE
)
# 5 Atribui o caminho salvo (ou None)
doc_data["url"] = file_url_path
# 6 Criptografa os metadados do documento
campos_criptografar = ["url", "nome_documento", "tipo_documento"]
for campo in campos_criptografar:
valor = doc_data.get(campo)
if isinstance(valor, str) and valor.strip():
doc_data[campo] = func.aes_encrypt(valor, AES_KEY)
else:
doc_data[campo] = None
# 7 Cria o registro no banco
new_documento = AtoDocumento(**doc_data, ato_principal_id=ato_principal_id)
db.add(new_documento)
return new_documento
# -------------------------------------------------------
# Método principal — grava múltiplos documentos
# -------------------------------------------------------
def execute(self, documentos: List[AtoDocumentoSaveMultipleSchema]):
db = SessionLocal()
results = []
if not AES_KEY:
db.close()
raise Exception("A chave AES (aeskey) não está configurada.")
for doc_schema in documentos:
codigo_selo_log = getattr(doc_schema, "codigo_selo", "SELO_INDISPONÍVEL")
try:
new_doc = self._save_documento(db, doc_schema)
db.commit()
results.append(
{
"success": True,
"message": "Documento salvo com sucesso.",
"data": {
"codigo_selo": codigo_selo_log,
"ato_principal_id": new_doc.ato_principal_id,
},
}
)
except HTTPException as he:
db.rollback()
results.append(
{
"success": False,
"error": he.detail,
"data": {"codigo_selo": codigo_selo_log},
}
)
except Exception as e:
db.rollback()
print(f"ERRO AO SALVAR DOCUMENTO (SELO {codigo_selo_log}): {e}")
traceback.print_exc()
results.append(
{
"success": False,
"error": "Erro interno ao salvar o documento.",
"data": {"codigo_selo": codigo_selo_log},
}
)
db.close()
return results

View file

@ -4,6 +4,7 @@ import traceback
from sqlalchemy import func
from sqlalchemy.orm import Session # Importação para tipagem da session
from database.mysql import SessionLocal, get_database_settings
from actions.file.save_file_from_base64 import save_file_from_base64
from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal
from packages.v1.administrativo.models.ato_parte_model import AtoParte
from packages.v1.administrativo.models.ato_documento_model import AtoDocumento
@ -12,7 +13,6 @@ from packages.v1.administrativo.schemas.ato_principal_schema import (
)
# NOVAS IMPORTAÇÕES para lidar com Base64 e Arquivos
import base64
import os
from pathlib import Path
import uuid
@ -32,56 +32,6 @@ GROUP_SIZE = int(
UPLOAD_DIR = Path(os.environ.get("STORAGE", "./storage"))
# Salva o arquivo Base64 em disco e retorna o caminho completo
def _save_file_from_base64(base64_content: str, file_name: str, ato_id: int) -> str:
"""
Decodifica o Base64, salva o arquivo em disco dentro de uma hierarquia
baseada no 'ato_id', e retorna o caminho completo do arquivo salvo.
Estrutura de diretórios:
storage/
100/
57/
documento.pdf
"""
# Garante que o diretório base de uploads existe
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# --- NOVO BLOCO: define a faixa de agrupamento ---
# Exemplo: IDs 1100 vão para pasta 100; IDs 101200 → 200
faixa_superior = ((ato_id - 1) // GROUP_SIZE + 1) * GROUP_SIZE
# Define o caminho hierárquico: storage/100/57/
target_dir = UPLOAD_DIR / str(faixa_superior) / str(ato_id)
target_dir.mkdir(parents=True, exist_ok=True)
# Caminho completo do arquivo final
file_path = target_dir / file_name
# Caminho para visualização via url (relativo ao storage)
# Exemplo: "100/57/documento.pdf"
file_url = f"{faixa_superior}/{ato_id}/{file_name}"
try:
# Decodifica o Base64
file_bytes = base64.b64decode(base64_content)
# Grava o arquivo em disco
with open(file_path, "wb") as f:
f.write(file_bytes)
# Retorna o caminho completo do arquivo salvo para visualização via URl
return str(file_url)
except base64.binascii.Error:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"O conteúdo do arquivo '{file_name}' não é um Base64 válido.",
)
except Exception as e:
raise Exception(f"Erro inesperado ao salvar arquivo '{file_name}': {e}")
class SaveMultipleRepository:
"""
Repositório para salvar múltiplos atos principais com suas partes e documentos,
@ -192,8 +142,8 @@ class SaveMultipleRepository:
else:
file_name = doc.nome_documento
file_url_path = _save_file_from_base64(
base64_content, file_name, new_ato_id
file_url_path = save_file_from_base64(
base64_content, file_name, new_ato_id, UPLOAD_DIR, GROUP_SIZE
)
# 2. Atribui o caminho do arquivo salvo (ou None, se não foi enviado) ao campo 'url' do banco

View file

@ -1,4 +1,4 @@
from pydantic import BaseModel, constr, field_validator, validator
from pydantic import BaseModel, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
from datetime import datetime
@ -68,6 +68,39 @@ class AtoDocumentoSaveSchema(BaseModel):
return Text.sanitize_input(v)
# ----------------------------------------------------
# Schema para Criação (SAVE MULTIPLE): Campos obrigatórios e sem ID
# ----------------------------------------------------
class AtoDocumentoSaveMultipleSchema(BaseModel):
# Campos opcionais
arquivo: Optional[str] = None
nome_documento: Optional[str] = None
tipo_documento: Optional[str] = None
codigo_selo: Optional[str] = None
# ----------------------------------------------------
# Sanitização dos campos opcionais (remove HTML, espaços, etc.)
# ----------------------------------------------------
@field_validator(
"arquivo", "nome_documento", "tipo_documento", "codigo_selo", mode="before"
)
def sanitize_optional_fields(cls, v: Optional[str]):
if v is None:
return None
return Text.sanitize_input(v)
# ----------------------------------------------------
# Validação final do schema: exige codigo_selo informado
# ----------------------------------------------------
@model_validator(mode="after")
def validate_codigo_selo(cls, values):
# Caso o código do selo não tenha sido informado, gera erro
if not values.codigo_selo or values.codigo_selo.strip() == "":
raise ValueError("O campo 'codigo_selo' é obrigatório.")
return values
# ----------------------------------------------------
# Schema para Atualização (UPDATE): Todos opcionais (parciais)
# ----------------------------------------------------

View file

@ -0,0 +1,22 @@
from fastapi import status, HTTPException
# Assumindo a existência dos Schemas com o prefixo 'ato_principal'
from packages.v1.administrativo.schemas.ato_documento_schema import (
AtoDocumentoSaveMultipleSchema,
)
# Assumindo a existência da Action de salvamento com o novo prefixo
from packages.v1.administrativo.actions.ato_documento.ato_documento_save_multiple_action import (
SaveMultipleAction,
)
class SaveMultipleService:
def execute(self, ato_documento: list[AtoDocumentoSaveMultipleSchema]):
save_action = SaveMultipleAction()
# A lista completa é passada diretamente para a Action.
results = save_action.execute(ato_documento)
return results