fix(): Retorno da rotina original de envio de multiplos registros

This commit is contained in:
Kenio 2025-11-04 11:50:05 -03:00
parent 4cc6ad54a5
commit 5bdbb05a47

View file

@ -1,11 +1,8 @@
import hashlib
import logging
from typing import List, Optional, Dict, Any
from typing import List, Optional
from fastapi import HTTPException, status
from sqlalchemy import text
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError, OperationalError, SQLAlchemyError
import traceback
from sqlalchemy import func
from sqlalchemy.orm import Session # Importação para tipagem da session
from database.mysql import SessionLocal, get_database_settings
from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal
from packages.v1.administrativo.models.ato_parte_model import AtoParte
@ -14,11 +11,6 @@ from packages.v1.administrativo.schemas.ato_principal_schema import (
AtoPrincipalSaveSchema,
)
# Configuração de logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# Configuração da Chave AES
DB_SETTINGS = get_database_settings()
@ -27,168 +19,11 @@ AES_KEY = getattr(DB_SETTINGS, "aeskey", None)
class SaveMultipleRepository:
"""
Repositório otimizado para salvar múltiplos atos principais com suas partes
e documentos, usando criptografia nativa do MySQL via AES_ENCRYPT.
Melhorias implementadas:
- Criptografia correta via execução SQL
- Hash SHA256 para busca rápida de duplicidades
- Gerenciamento adequado de sessões (uma por transação)
- Logging profissional estruturado
- Tratamento de exceções específicas
- Validação robusta da chave AES
Repositório para salvar múltiplos atos principais com suas partes e documentos,
usando criptografia nativa do MySQL via AES_ENCRYPT.
Implementa lógica recursiva para atos vinculados.
"""
def __init__(self):
"""Inicializa o repositório e valida a configuração da chave AES"""
if not AES_KEY:
raise ValueError(
"A chave AES (aeskey) não está configurada. "
"Verifique as configurações do banco de dados."
)
logger.info("SaveMultipleRepository inicializado com sucesso")
@staticmethod
def _generate_hash(value: str) -> str:
"""
Gera hash SHA256 para busca rápida e verificação de duplicidade.
Args:
value: Valor a ser hasheado
Returns:
Hash SHA256 em formato hexadecimal
"""
if not value:
return None
return hashlib.sha256(value.encode("utf-8")).hexdigest()
def _encrypt_field(self, db: Session, value: str) -> Optional[bytes]:
"""
Criptografa um campo usando AES_ENCRYPT do MySQL.
Args:
db: Sessão do banco de dados
value: Valor a ser criptografado
Returns:
Valor criptografado em bytes ou None se valor vazio
"""
if not value or (isinstance(value, str) and not value.strip()):
return None
try:
result = db.execute(
text("SELECT AES_ENCRYPT(:val, :key) as encrypted"),
{"val": value.strip(), "key": AES_KEY},
).scalar()
return result
except Exception as e:
logger.error(f"Erro ao criptografar campo: {str(e)}")
raise
def _check_duplicate_codigo_selo(self, db: Session, codigo_selo: str) -> bool:
"""
Verifica se o código do selo existe usando hash para busca rápida.
Args:
db: Sessão do banco de dados
codigo_selo: Código do selo a verificar
Returns:
True se existe, False caso contrário
"""
codigo_hash = self._generate_hash(codigo_selo)
existing = (
db.query(AtoPrincipal)
.filter(AtoPrincipal.codigo_selo_hash == codigo_hash)
.first()
)
return existing is not None
def _prepare_encrypted_data(
self, db: Session, data: Dict[str, Any], fields_to_encrypt: List[str]
) -> Dict[str, Any]:
"""
Prepara os dados criptografando os campos especificados.
Args:
db: Sessão do banco de dados
data: Dicionário com os dados
fields_to_encrypt: Lista de campos a criptografar
Returns:
Dicionário com campos criptografados
"""
processed_data = data.copy()
for field in fields_to_encrypt:
value = data.get(field)
if value is not None and str(value).strip():
processed_data[field] = self._encrypt_field(db, str(value))
else:
processed_data[field] = None
return processed_data
def _save_ato_partes(
self, db: Session, partes: List, ato_principal_id: int
) -> None:
"""
Salva as partes de um ato principal.
Args:
db: Sessão do banco de dados
partes: Lista de schemas de partes
ato_principal_id: ID do ato principal
"""
campos_criptografar = ["nome", "telefone", "cpf_cnpj"]
for parte in partes:
parte_data = parte.model_dump(exclude_unset=True)
# Criptografa campos sensíveis
parte_data = self._prepare_encrypted_data(
db, parte_data, campos_criptografar
)
# Adiciona relacionamento
parte_data["ato_principal_id"] = ato_principal_id
new_parte = AtoParte(**parte_data)
db.add(new_parte)
logger.debug(f"Salvas {len(partes)} partes para ato {ato_principal_id}")
def _save_ato_documentos(
self, db: Session, documentos: List, ato_principal_id: int
) -> None:
"""
Salva os documentos de um ato principal.
Args:
db: Sessão do banco de dados
documentos: Lista de schemas de documentos
ato_principal_id: ID do ato principal
"""
campos_criptografar = ["url", "nome_documento", "tipo_documento"]
for doc in documentos:
doc_data = doc.model_dump(exclude_unset=True)
# Criptografa campos sensíveis
doc_data = self._prepare_encrypted_data(db, doc_data, campos_criptografar)
# Adiciona relacionamento
doc_data["ato_principal_id"] = ato_principal_id
new_documento = AtoDocumento(**doc_data)
db.add(new_documento)
logger.debug(f"Salvos {len(documentos)} documentos para ato {ato_principal_id}")
def _save_single_recursive_transaction(
self,
db: Session,
@ -196,76 +31,95 @@ class SaveMultipleRepository:
parent_ato_principal_id: Optional[int] = None,
) -> AtoPrincipal:
"""
Salva recursivamente um ato principal com todas suas dependências.
Args:
db: Sessão do banco de dados
ato_schema: Schema do ato a salvar
parent_ato_principal_id: ID do ato pai (para atos vinculados)
Returns:
Instância do AtoPrincipal salvo
Raises:
HTTPException: Se código do selo existe
Salva um único Ato Principal, seus filhos (partes/documentos) e
chama recursivamente o salvamento dos atos vinculados.
"""
codigo_selo = ato_schema.codigo_selo
# 1. Verificação de duplicidade usando hash
if self._check_duplicate_codigo_selo(db, codigo_selo):
logger.warning(f"Tentativa de cadastro duplicado: {codigo_selo}")
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"O Código do Selo '{codigo_selo}' já está cadastrado.",
)
# 2. Preparação dos dados do ato principal
# 1. Pré-processamento e Criptografia
ato_data = ato_schema.model_dump(
exclude_unset=True,
exclude={"ato_partes", "ato_documentos", "atos_vinculados"},
)
# Define relacionamento com ato pai
# Define o ID de origem se for um ato vinculado
if parent_ato_principal_id is not None:
ato_data["origem_ato_principal_id"] = parent_ato_principal_id
elif "origem_ato_principal_id" not in ato_data:
elif ato_data.get("origem_ato_principal_id") is None:
ato_data["origem_ato_principal_id"] = None
# 3. Criptografia dos campos sensíveis
# Verifica duplicidade usando descriptografia
existing_ato = (
db.query(AtoPrincipal)
.filter(func.aes_decrypt(AtoPrincipal.codigo_selo, AES_KEY) == codigo_selo)
.first()
)
if existing_ato:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"O Código do Selo '{codigo_selo}' já está cadastrado.",
)
campos_criptografar = [
"nome_civil_ato",
"nome_serventuario_praticou_ato",
]
ato_data = self._prepare_encrypted_data(db, ato_data, campos_criptografar)
# Criptografa os campos de texto necessários
for campo in campos_criptografar:
valor = ato_data.get(campo)
if isinstance(valor, str) and valor.strip():
ato_data[campo] = func.aes_encrypt(valor, AES_KEY)
elif campo not in ato_data or valor is None:
ato_data[campo] = None
# Adiciona hash do código do selo para busca rápida
ato_data["codigo_selo_hash"] = self._generate_hash(codigo_selo)
# 4. Criação e persistência do ato principal
# 2. Criação e Persistência do Ato Principal
new_ato = AtoPrincipal(**ato_data)
db.add(new_ato)
db.flush() # Obtém o ID sem fazer commit
db.flush()
new_ato_id = new_ato.ato_principal_id
logger.info(f"Ato principal criado: ID={new_ato_id}, Selo={codigo_selo}")
# 5. Salva partes do ato
if ato_schema.ato_partes:
self._save_ato_partes(db, ato_schema.ato_partes, new_ato_id)
# 3. Salva os Filhos Diretos: Ato Partes
for parte in ato_schema.ato_partes:
parte_data = parte.model_dump(exclude_unset=True)
# 6. Salva documentos do ato
if ato_schema.ato_documentos:
self._save_ato_documentos(db, ato_schema.ato_documentos, new_ato_id)
parte_campos_criptografar = ["nome", "telefone", "cpf_cnpj"]
# 7. Salvamento recursivo de atos vinculados
for campo in parte_campos_criptografar:
valor = parte_data.get(campo)
# A validação/conversão para string já foi feita pelo Pydantic,
# mas mantemos a checagem de tipo e strip para segurança antes da criptografia
if isinstance(valor, str) and valor.strip():
parte_data[campo] = func.aes_encrypt(valor, AES_KEY)
else:
parte_data[campo] = None
new_parte = AtoParte(**parte_data, ato_principal_id=new_ato_id)
db.add(new_parte)
# 4. Salva os Filhos Diretos: Ato Documentos
for doc in ato_schema.ato_documentos:
doc_data = doc.model_dump(exclude_unset=True)
doc_campos_criptografar = ["url", "nome_documento", "tipo_documento"]
for campo in doc_campos_criptografar:
valor = doc_data.get(campo)
if isinstance(valor, str) and valor.strip():
doc_data[campo] = func.aes_encrypt(valor, AES_KEY)
else:
doc_data[campo] = None
new_documento = AtoDocumento(**doc_data, ato_principal_id=new_ato_id)
db.add(new_documento)
# 5. Lógica Recursiva para Atos Vinculados (Estrutura de loop idêntica)
if ato_schema.atos_vinculados:
logger.info(
f"Processando {len(ato_schema.atos_vinculados)} atos vinculados "
f"para selo {codigo_selo}"
)
# A iteração é semelhante, mas o que é executado é a chamada recursiva
for linked_ato_schema in ato_schema.atos_vinculados:
# O ato vinculado é persistido chamando a rotina de salvamento completa
self._save_single_recursive_transaction(
db,
linked_ato_schema,
@ -274,138 +128,70 @@ class SaveMultipleRepository:
return new_ato
def _process_single_ato(self, ato_schema: AtoPrincipalSaveSchema) -> Dict[str, Any]:
"""
Processa um único ato em uma transação isolada.
Args:
ato_schema: Schema do ato a processar
Returns:
Dicionário com resultado da operação
"""
# Método principal (chamado pela Action)
def execute(self, atos_principais: List[AtoPrincipalSaveSchema]):
db = SessionLocal()
codigo_selo = getattr(ato_schema, "codigo_selo", "CÓDIGO_INDISPONÍVEL")
try:
logger.info(f"Iniciando processamento do selo: {codigo_selo}")
# Salva o ato com toda sua hierarquia
saved_ato = self._save_single_recursive_transaction(
db, ato_schema, parent_ato_principal_id=None
)
# Commit da transação completa
db.commit()
logger.info(
f"Ato salvo com sucesso: ID={saved_ato.ato_principal_id}, "
f"Selo={codigo_selo}"
)
return {
"success": True,
"message": "Ato Principal e vinculados salvos com sucesso",
"data": {
"ato_principal_id": saved_ato.ato_principal_id,
"codigo_selo": codigo_selo,
"tipo_ato": saved_ato.tipo_ato,
},
}
except HTTPException as he:
db.rollback()
logger.warning(f"Erro HTTP ao processar selo {codigo_selo}: {he.detail}")
return {
"success": False,
"error": he.detail,
"data": {"codigo_selo": codigo_selo},
}
except IntegrityError as ie:
db.rollback()
logger.error(
f"Erro de integridade ao processar selo {codigo_selo}: {str(ie)}",
exc_info=True,
)
return {
"success": False,
"error": "Erro de integridade: violação de constraint no banco de dados",
"data": {"codigo_selo": codigo_selo},
}
except OperationalError as oe:
db.rollback()
logger.error(
f"Erro operacional ao processar selo {codigo_selo}: {str(oe)}",
exc_info=True,
)
return {
"success": False,
"error": "Erro de conexão ou operação no banco de dados",
"data": {"codigo_selo": codigo_selo},
}
except SQLAlchemyError as se:
db.rollback()
logger.error(
f"Erro SQLAlchemy ao processar selo {codigo_selo}: {str(se)}",
exc_info=True,
)
return {
"success": False,
"error": "Erro ao interagir com o banco de dados",
"data": {"codigo_selo": codigo_selo},
}
except Exception as e:
db.rollback()
logger.error(
f"Erro inesperado ao processar selo {codigo_selo}: {str(e)}",
exc_info=True,
)
return {
"success": False,
"error": f"Erro inesperado: {str(e)}",
"data": {"codigo_selo": codigo_selo},
}
finally:
db.close()
def execute(
self, atos_principais: List[AtoPrincipalSaveSchema]
) -> List[Dict[str, Any]]:
"""
Método principal para salvar múltiplos atos principais.
Cada ato é processado em uma transação independente.
Args:
atos_principais: Lista de schemas de atos a salvar
Returns:
Lista de dicionários com resultado de cada operação
"""
logger.info(f"Iniciando processamento de {len(atos_principais)} atos")
results = []
successful_count = 0
failed_count = 0
for idx, ato_schema in enumerate(atos_principais, 1):
logger.info(f"Processando ato {idx}/{len(atos_principais)}")
# 1. Checa a chave AES
if not AES_KEY:
db.close()
raise Exception("A chave AES (aeskey) não está configurada.")
result = self._process_single_ato(ato_schema)
results.append(result)
# 2. Loop principal: Cada iteração é uma transação completa (incluindo recursão)
for ato_schema in atos_principais:
codigo_selo_log = getattr(ato_schema, "codigo_selo", "SELO_INDISPONÍVEL")
if result["success"]:
successful_count += 1
else:
failed_count += 1
try:
# A rotina completa de salvamento (incluindo filhos e recursão) é encapsulada
saved_ato = self._save_single_recursive_transaction(
db,
ato_schema,
parent_ato_principal_id=None, # Ato de nível superior
)
logger.info(
f"Processamento concluído: {successful_count} sucessos, "
f"{failed_count} falhas"
)
# ---------- COMMIT final para a transação completa (Ato Principal + Filhos + Atos Vinculados) ----------
db.commit()
# Retorno de sucesso
ato_result = {
"success": True,
"message": "Ato Principal e vinculados salvos com sucesso",
"data": {
"ato_principal_id": saved_ato.ato_principal_id,
"codigo_selo": codigo_selo_log,
"tipo_ato": saved_ato.tipo_ato,
},
}
results.append(ato_result)
# Tratamento de erro específico para duplicidade ou HTTP
except HTTPException as he:
db.rollback()
results.append(
{
"success": False,
"error": he.detail,
"data": {"codigo_selo": codigo_selo_log},
}
)
# Tratamento de erro genérico
except Exception as e:
db.rollback()
# Log completo do erro
print(f"ERRO DE PERSISTÊNCIA NO SELO: {codigo_selo_log}")
traceback.print_exc()
print("--------------")
results.append(
{
"success": False,
"error": "Erro ao salvar o registro. Verifique o formato dos dados ou os logs do servidor.",
"data": {"codigo_selo": codigo_selo_log},
}
)
db.close()
return results