From 5bdbb05a475b084d0c1bbc728fd29fd338fac53a Mon Sep 17 00:00:00 2001 From: Kenio de Souza Date: Tue, 4 Nov 2025 11:50:05 -0300 Subject: [PATCH] fix(): Retorno da rotina original de envio de multiplos registros --- .../ato_principal_save_multiple_repository.py | 470 +++++------------- 1 file changed, 128 insertions(+), 342 deletions(-) diff --git a/packages/v1/administrativo/repositories/ato_principal/ato_principal_save_multiple_repository.py b/packages/v1/administrativo/repositories/ato_principal/ato_principal_save_multiple_repository.py index 19b3a8e..f5d73e3 100644 --- a/packages/v1/administrativo/repositories/ato_principal/ato_principal_save_multiple_repository.py +++ b/packages/v1/administrativo/repositories/ato_principal/ato_principal_save_multiple_repository.py @@ -1,11 +1,8 @@ -import hashlib -import logging -from typing import List, Optional, Dict, Any +from typing import List, Optional from fastapi import HTTPException, status -from sqlalchemy import text -from sqlalchemy.orm import Session -from sqlalchemy.exc import IntegrityError, OperationalError, SQLAlchemyError - +import traceback +from sqlalchemy import func +from sqlalchemy.orm import Session # Importação para tipagem da session from database.mysql import SessionLocal, get_database_settings from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal from packages.v1.administrativo.models.ato_parte_model import AtoParte @@ -14,11 +11,6 @@ from packages.v1.administrativo.schemas.ato_principal_schema import ( AtoPrincipalSaveSchema, ) -# Configuração de logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) # Configuração da Chave AES DB_SETTINGS = get_database_settings() @@ -27,168 +19,11 @@ AES_KEY = getattr(DB_SETTINGS, "aeskey", None) class SaveMultipleRepository: """ - Repositório otimizado para salvar múltiplos atos principais com suas partes - e documentos, usando criptografia nativa do MySQL via AES_ENCRYPT. - - Melhorias implementadas: - - Criptografia correta via execução SQL - - Hash SHA256 para busca rápida de duplicidades - - Gerenciamento adequado de sessões (uma por transação) - - Logging profissional estruturado - - Tratamento de exceções específicas - - Validação robusta da chave AES + Repositório para salvar múltiplos atos principais com suas partes e documentos, + usando criptografia nativa do MySQL via AES_ENCRYPT. + Implementa lógica recursiva para atos vinculados. """ - def __init__(self): - """Inicializa o repositório e valida a configuração da chave AES""" - if not AES_KEY: - raise ValueError( - "A chave AES (aeskey) não está configurada. " - "Verifique as configurações do banco de dados." - ) - logger.info("SaveMultipleRepository inicializado com sucesso") - - @staticmethod - def _generate_hash(value: str) -> str: - """ - Gera hash SHA256 para busca rápida e verificação de duplicidade. - - Args: - value: Valor a ser hasheado - - Returns: - Hash SHA256 em formato hexadecimal - """ - if not value: - return None - return hashlib.sha256(value.encode("utf-8")).hexdigest() - - def _encrypt_field(self, db: Session, value: str) -> Optional[bytes]: - """ - Criptografa um campo usando AES_ENCRYPT do MySQL. - - Args: - db: Sessão do banco de dados - value: Valor a ser criptografado - - Returns: - Valor criptografado em bytes ou None se valor vazio - """ - if not value or (isinstance(value, str) and not value.strip()): - return None - - try: - result = db.execute( - text("SELECT AES_ENCRYPT(:val, :key) as encrypted"), - {"val": value.strip(), "key": AES_KEY}, - ).scalar() - return result - except Exception as e: - logger.error(f"Erro ao criptografar campo: {str(e)}") - raise - - def _check_duplicate_codigo_selo(self, db: Session, codigo_selo: str) -> bool: - """ - Verifica se o código do selo já existe usando hash para busca rápida. - - Args: - db: Sessão do banco de dados - codigo_selo: Código do selo a verificar - - Returns: - True se já existe, False caso contrário - """ - codigo_hash = self._generate_hash(codigo_selo) - - existing = ( - db.query(AtoPrincipal) - .filter(AtoPrincipal.codigo_selo_hash == codigo_hash) - .first() - ) - - return existing is not None - - def _prepare_encrypted_data( - self, db: Session, data: Dict[str, Any], fields_to_encrypt: List[str] - ) -> Dict[str, Any]: - """ - Prepara os dados criptografando os campos especificados. - - Args: - db: Sessão do banco de dados - data: Dicionário com os dados - fields_to_encrypt: Lista de campos a criptografar - - Returns: - Dicionário com campos criptografados - """ - processed_data = data.copy() - - for field in fields_to_encrypt: - value = data.get(field) - if value is not None and str(value).strip(): - processed_data[field] = self._encrypt_field(db, str(value)) - else: - processed_data[field] = None - - return processed_data - - def _save_ato_partes( - self, db: Session, partes: List, ato_principal_id: int - ) -> None: - """ - Salva as partes de um ato principal. - - Args: - db: Sessão do banco de dados - partes: Lista de schemas de partes - ato_principal_id: ID do ato principal - """ - campos_criptografar = ["nome", "telefone", "cpf_cnpj"] - - for parte in partes: - parte_data = parte.model_dump(exclude_unset=True) - - # Criptografa campos sensíveis - parte_data = self._prepare_encrypted_data( - db, parte_data, campos_criptografar - ) - - # Adiciona relacionamento - parte_data["ato_principal_id"] = ato_principal_id - - new_parte = AtoParte(**parte_data) - db.add(new_parte) - - logger.debug(f"Salvas {len(partes)} partes para ato {ato_principal_id}") - - def _save_ato_documentos( - self, db: Session, documentos: List, ato_principal_id: int - ) -> None: - """ - Salva os documentos de um ato principal. - - Args: - db: Sessão do banco de dados - documentos: Lista de schemas de documentos - ato_principal_id: ID do ato principal - """ - campos_criptografar = ["url", "nome_documento", "tipo_documento"] - - for doc in documentos: - doc_data = doc.model_dump(exclude_unset=True) - - # Criptografa campos sensíveis - doc_data = self._prepare_encrypted_data(db, doc_data, campos_criptografar) - - # Adiciona relacionamento - doc_data["ato_principal_id"] = ato_principal_id - - new_documento = AtoDocumento(**doc_data) - db.add(new_documento) - - logger.debug(f"Salvos {len(documentos)} documentos para ato {ato_principal_id}") - def _save_single_recursive_transaction( self, db: Session, @@ -196,76 +31,95 @@ class SaveMultipleRepository: parent_ato_principal_id: Optional[int] = None, ) -> AtoPrincipal: """ - Salva recursivamente um ato principal com todas suas dependências. - - Args: - db: Sessão do banco de dados - ato_schema: Schema do ato a salvar - parent_ato_principal_id: ID do ato pai (para atos vinculados) - - Returns: - Instância do AtoPrincipal salvo - - Raises: - HTTPException: Se código do selo já existe + Salva um único Ato Principal, seus filhos (partes/documentos) e + chama recursivamente o salvamento dos atos vinculados. """ codigo_selo = ato_schema.codigo_selo - # 1. Verificação de duplicidade usando hash - if self._check_duplicate_codigo_selo(db, codigo_selo): - logger.warning(f"Tentativa de cadastro duplicado: {codigo_selo}") - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"O Código do Selo '{codigo_selo}' já está cadastrado.", - ) - - # 2. Preparação dos dados do ato principal + # 1. Pré-processamento e Criptografia ato_data = ato_schema.model_dump( exclude_unset=True, exclude={"ato_partes", "ato_documentos", "atos_vinculados"}, ) - # Define relacionamento com ato pai + # Define o ID de origem se for um ato vinculado if parent_ato_principal_id is not None: ato_data["origem_ato_principal_id"] = parent_ato_principal_id - elif "origem_ato_principal_id" not in ato_data: + elif ato_data.get("origem_ato_principal_id") is None: ato_data["origem_ato_principal_id"] = None - # 3. Criptografia dos campos sensíveis + # Verifica duplicidade usando descriptografia + existing_ato = ( + db.query(AtoPrincipal) + .filter(func.aes_decrypt(AtoPrincipal.codigo_selo, AES_KEY) == codigo_selo) + .first() + ) + + if existing_ato: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"O Código do Selo '{codigo_selo}' já está cadastrado.", + ) + campos_criptografar = [ "nome_civil_ato", "nome_serventuario_praticou_ato", ] - ato_data = self._prepare_encrypted_data(db, ato_data, campos_criptografar) + # Criptografa os campos de texto necessários + for campo in campos_criptografar: + valor = ato_data.get(campo) + if isinstance(valor, str) and valor.strip(): + ato_data[campo] = func.aes_encrypt(valor, AES_KEY) + elif campo not in ato_data or valor is None: + ato_data[campo] = None - # Adiciona hash do código do selo para busca rápida - ato_data["codigo_selo_hash"] = self._generate_hash(codigo_selo) - - # 4. Criação e persistência do ato principal + # 2. Criação e Persistência do Ato Principal new_ato = AtoPrincipal(**ato_data) db.add(new_ato) - db.flush() # Obtém o ID sem fazer commit - + db.flush() new_ato_id = new_ato.ato_principal_id - logger.info(f"Ato principal criado: ID={new_ato_id}, Selo={codigo_selo}") - # 5. Salva partes do ato - if ato_schema.ato_partes: - self._save_ato_partes(db, ato_schema.ato_partes, new_ato_id) + # 3. Salva os Filhos Diretos: Ato Partes + for parte in ato_schema.ato_partes: + parte_data = parte.model_dump(exclude_unset=True) - # 6. Salva documentos do ato - if ato_schema.ato_documentos: - self._save_ato_documentos(db, ato_schema.ato_documentos, new_ato_id) + parte_campos_criptografar = ["nome", "telefone", "cpf_cnpj"] - # 7. Salvamento recursivo de atos vinculados + for campo in parte_campos_criptografar: + valor = parte_data.get(campo) + # A validação/conversão para string já foi feita pelo Pydantic, + # mas mantemos a checagem de tipo e strip para segurança antes da criptografia + if isinstance(valor, str) and valor.strip(): + parte_data[campo] = func.aes_encrypt(valor, AES_KEY) + else: + parte_data[campo] = None + + new_parte = AtoParte(**parte_data, ato_principal_id=new_ato_id) + db.add(new_parte) + + # 4. Salva os Filhos Diretos: Ato Documentos + for doc in ato_schema.ato_documentos: + doc_data = doc.model_dump(exclude_unset=True) + + doc_campos_criptografar = ["url", "nome_documento", "tipo_documento"] + + for campo in doc_campos_criptografar: + valor = doc_data.get(campo) + if isinstance(valor, str) and valor.strip(): + doc_data[campo] = func.aes_encrypt(valor, AES_KEY) + else: + doc_data[campo] = None + + new_documento = AtoDocumento(**doc_data, ato_principal_id=new_ato_id) + db.add(new_documento) + + # 5. Lógica Recursiva para Atos Vinculados (Estrutura de loop idêntica) if ato_schema.atos_vinculados: - logger.info( - f"Processando {len(ato_schema.atos_vinculados)} atos vinculados " - f"para selo {codigo_selo}" - ) + # A iteração é semelhante, mas o que é executado é a chamada recursiva for linked_ato_schema in ato_schema.atos_vinculados: + # O ato vinculado é persistido chamando a rotina de salvamento completa self._save_single_recursive_transaction( db, linked_ato_schema, @@ -274,138 +128,70 @@ class SaveMultipleRepository: return new_ato - def _process_single_ato(self, ato_schema: AtoPrincipalSaveSchema) -> Dict[str, Any]: - """ - Processa um único ato em uma transação isolada. - - Args: - ato_schema: Schema do ato a processar - - Returns: - Dicionário com resultado da operação - """ + # Método principal (chamado pela Action) + def execute(self, atos_principais: List[AtoPrincipalSaveSchema]): db = SessionLocal() - codigo_selo = getattr(ato_schema, "codigo_selo", "CÓDIGO_INDISPONÍVEL") - - try: - logger.info(f"Iniciando processamento do selo: {codigo_selo}") - - # Salva o ato com toda sua hierarquia - saved_ato = self._save_single_recursive_transaction( - db, ato_schema, parent_ato_principal_id=None - ) - - # Commit da transação completa - db.commit() - - logger.info( - f"Ato salvo com sucesso: ID={saved_ato.ato_principal_id}, " - f"Selo={codigo_selo}" - ) - - return { - "success": True, - "message": "Ato Principal e vinculados salvos com sucesso", - "data": { - "ato_principal_id": saved_ato.ato_principal_id, - "codigo_selo": codigo_selo, - "tipo_ato": saved_ato.tipo_ato, - }, - } - - except HTTPException as he: - db.rollback() - logger.warning(f"Erro HTTP ao processar selo {codigo_selo}: {he.detail}") - return { - "success": False, - "error": he.detail, - "data": {"codigo_selo": codigo_selo}, - } - - except IntegrityError as ie: - db.rollback() - logger.error( - f"Erro de integridade ao processar selo {codigo_selo}: {str(ie)}", - exc_info=True, - ) - return { - "success": False, - "error": "Erro de integridade: violação de constraint no banco de dados", - "data": {"codigo_selo": codigo_selo}, - } - - except OperationalError as oe: - db.rollback() - logger.error( - f"Erro operacional ao processar selo {codigo_selo}: {str(oe)}", - exc_info=True, - ) - return { - "success": False, - "error": "Erro de conexão ou operação no banco de dados", - "data": {"codigo_selo": codigo_selo}, - } - - except SQLAlchemyError as se: - db.rollback() - logger.error( - f"Erro SQLAlchemy ao processar selo {codigo_selo}: {str(se)}", - exc_info=True, - ) - return { - "success": False, - "error": "Erro ao interagir com o banco de dados", - "data": {"codigo_selo": codigo_selo}, - } - - except Exception as e: - db.rollback() - logger.error( - f"Erro inesperado ao processar selo {codigo_selo}: {str(e)}", - exc_info=True, - ) - return { - "success": False, - "error": f"Erro inesperado: {str(e)}", - "data": {"codigo_selo": codigo_selo}, - } - - finally: - db.close() - - def execute( - self, atos_principais: List[AtoPrincipalSaveSchema] - ) -> List[Dict[str, Any]]: - """ - Método principal para salvar múltiplos atos principais. - Cada ato é processado em uma transação independente. - - Args: - atos_principais: Lista de schemas de atos a salvar - - Returns: - Lista de dicionários com resultado de cada operação - """ - logger.info(f"Iniciando processamento de {len(atos_principais)} atos") - results = [] - successful_count = 0 - failed_count = 0 - for idx, ato_schema in enumerate(atos_principais, 1): - logger.info(f"Processando ato {idx}/{len(atos_principais)}") + # 1. Checa a chave AES + if not AES_KEY: + db.close() + raise Exception("A chave AES (aeskey) não está configurada.") - result = self._process_single_ato(ato_schema) - results.append(result) + # 2. Loop principal: Cada iteração é uma transação completa (incluindo recursão) + for ato_schema in atos_principais: + codigo_selo_log = getattr(ato_schema, "codigo_selo", "SELO_INDISPONÍVEL") - if result["success"]: - successful_count += 1 - else: - failed_count += 1 + try: + # A rotina completa de salvamento (incluindo filhos e recursão) é encapsulada + saved_ato = self._save_single_recursive_transaction( + db, + ato_schema, + parent_ato_principal_id=None, # Ato de nível superior + ) - logger.info( - f"Processamento concluído: {successful_count} sucessos, " - f"{failed_count} falhas" - ) + # ---------- COMMIT final para a transação completa (Ato Principal + Filhos + Atos Vinculados) ---------- + db.commit() + # Retorno de sucesso + ato_result = { + "success": True, + "message": "Ato Principal e vinculados salvos com sucesso", + "data": { + "ato_principal_id": saved_ato.ato_principal_id, + "codigo_selo": codigo_selo_log, + "tipo_ato": saved_ato.tipo_ato, + }, + } + results.append(ato_result) + + # Tratamento de erro específico para duplicidade ou HTTP + except HTTPException as he: + db.rollback() + results.append( + { + "success": False, + "error": he.detail, + "data": {"codigo_selo": codigo_selo_log}, + } + ) + + # Tratamento de erro genérico + except Exception as e: + db.rollback() + + # Log completo do erro + print(f"ERRO DE PERSISTÊNCIA NO SELO: {codigo_selo_log}") + traceback.print_exc() + print("--------------") + + results.append( + { + "success": False, + "error": "Erro ao salvar o registro. Verifique o formato dos dados ou os logs do servidor.", + "data": {"codigo_selo": codigo_selo_log}, + } + ) + + db.close() return results