This commit is contained in:
Kenio 2025-11-16 10:13:07 -03:00
parent cad8a76289
commit 0017c231eb

View file

@ -2,17 +2,11 @@ from fastapi import HTTPException, status
import json
from typing import Dict, Any, List
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
# NOTE: Manter imports necessários do seu projeto
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
import json
from typing import Dict, Any, List
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (AJUSTADA) ---
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (MANTIDA SEM CONTEÚDO) ---
def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]:
"""
@ -22,7 +16,6 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
discrepancies = []
try:
# 1. Mapear campos do cliente por nome para fácil lookup
client_fields_map = {field['FIELD_NAME']: field for field in client_fields}
except KeyError:
return [{
@ -31,52 +24,41 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
"detalhe": "Um ou mais campos na tabela do cliente não possuem a chave 'FIELD_NAME'."
}]
# 2. Iterar sobre os campos padrão e verificar se existem no cliente
for standard_field in standard_fields:
field_name = standard_field.get('FIELD_NAME')
if not field_name:
continue
if not field_name: continue
if field_name not in client_fields_map:
# Campo que existe no padrão mas não no cliente
discrepancies.append({
"tabela": table_name,
"tipo": "Campo Faltando",
"campo": field_name, # Identificador simples
"campo": field_name,
"detalhe": f"Campo '{field_name}' ausente na tabela do cliente."
})
continue
# 3. Comparar atributos do campo (tipo, nulidade, default)
client_field = client_fields_map[field_name]
# Lista de atributos cruciais para comparação
attributes_to_check = ['DATA_TYPE', 'NULLABLE', 'DEFAULT']
for attr in attributes_to_check:
standard_value = standard_field.get(attr)
client_value = client_field.get(attr)
# Removemos a necessidade de 'padrao' e 'cliente' no JSON final
if standard_value != client_value:
discrepancies.append({
"tabela": table_name,
"tipo": f"Atributo Divergente ({attr})",
"campo": field_name, # Identificador simples
"campo": field_name,
"detalhe": f"O atributo '{attr}' do campo '{field_name}' diverge (Padrão: '{standard_value}' | Cliente: '{client_value}')."
})
return discrepancies
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (AJUSTADA) ---
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (MANTIDA SEM CONTEÚDO) ---
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
"""
Compara a estrutura do cliente com a estrutura padrão, retornando apenas
os identificadores das discrepâncias.
"""
audit_log = {
"discrepancias_encontradas": False,
"detalhes": {
@ -92,7 +74,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
audit_log["discrepancias_encontradas"] = True
return audit_log
# Mapeamento seguro das tabelas do cliente para evitar KeyError
# Mapeamento seguro das tabelas (Correção do KeyError)
standard_tables_map = {t['TABLE_NAME']: t for t in standard_structure.get('tables', [])}
client_tables_map = {}
malformed_tables_data = []
@ -109,11 +91,10 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["estrutura_malformada_cliente"] = {
"descricao": "Alguns objetos na lista 'tables' do cliente não possuem a chave 'TABLE_NAME' e foram ignorados.",
"total_objetos_ignorados": len(malformed_tables_data) # Remove o conteúdo completo
"total_objetos_ignorados": len(malformed_tables_data)
}
# --- 1. AUDITORIA DE TABELAS E SEUS CAMPOS ---
# 1. AUDITORIA DE TABELAS
missing_tables = standard_tables_map.keys() - client_tables_map.keys()
if missing_tables:
audit_log["discrepancias_encontradas"] = True
@ -124,17 +105,17 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
standard_table = standard_tables_map[table_name]
client_table = client_tables_map[table_name]
standard_fields = standard_table.get('FIELDS', [])
client_fields = client_table.get('FIELDS', [])
field_discrepancies = compare_fields(standard_fields, client_fields, table_name)
field_discrepancies = compare_fields(
standard_table.get('FIELDS', []),
client_table.get('FIELDS', []),
table_name
)
if field_discrepancies:
audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["divergencia_de_campos"].extend(field_discrepancies)
# --- 2. AUDITORIA DE OUTROS ELEMENTOS (PKs, FKs, etc.) ---
# 2. AUDITORIA DE OUTROS ELEMENTOS (PKs, FKs, etc.)
elements_to_check = ['primary_keys', 'foreign_keys', 'indexes', 'views', 'procedures', 'triggers']
for element_key in elements_to_check:
@ -156,114 +137,82 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
for missing_str in missing_elements:
element_dict = json.loads(missing_str)
identifier = element_dict.get('TABLE_NAME') or element_dict.get('CONSTRAINT_NAME') or str(element_dict)[:50]
# Tenta encontrar um identificador simples
identifier = element_dict.get('TABLE_NAME')
if not identifier:
identifier = element_dict.get('CONSTRAINT_NAME')
if not identifier:
identifier = element_dict.get('VIEW_NAME')
if not identifier:
identifier = element_dict.get('PROCEDURE_NAME')
if not identifier:
identifier = element_dict.get('TRIGGER_NAME')
# Caso não encontre chave específica, usa representação string truncada
if not identifier:
identifier = str(element_dict)
if len(identifier) > 50: # Limita a string
identifier = identifier[:50] + '...'
audit_log["detalhes"]["elementos_faltando"].append({
"tipo": element_key,
"identificador": identifier, # Identificador simples do elemento
"identificador": identifier,
"detalhe": f"Um elemento '{element_key}' com identificador '{identifier}' está ausente no cliente."
# Conteúdo completo do JSON removido
})
return audit_log
# --- CLASSE PRINCIPAL DE SERVIÇO (AJUSTADA PARA DEBUG) ---
class ShowDatabaseService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_database_action = ShowDatabaseAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_database_action.execute(client_id_schema)
# Executa a ação, que agora retorna os dados do log E o standard_structure_json
dados_completos = log_show_database_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# verifica se 'dados_completos' não é None e contém 'file'
if dados_completos and dados_completos.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
# 1. Parsing do Log do Cliente
dados_json = json.loads(dados_completos["file"])
# 2. Extração da Estrutura Padrão (JSON em formato string)
standard_structure_json_string = dados_completos.get("standard_structure_json")
# Informações da estrutura de referência (JSON em formato string)
standard_structure_json_string = dados.get("standard_structure_json")
# Converte a string JSON da estrutura padrão para dicionário, se existir
standard_structure_data = {}
# 3. Parsing do JSON da Estrutura Padrão
standard_structure_data: Dict[str, Any] = {}
if standard_structure_json_string:
try:
# O 'structure' já é um JSON que foi salvo pelo FirebirdSchemaExtractor
standard_structure_data = json.loads(standard_structure_json_string)
except json.JSONDecodeError as e:
print(f"[ERRO] Falha ao decodificar JSON da estrutura padrão: {e}")
# Pode-se forçar um erro ou continuar com um dict vazio, dependendo da regra de negócio
"""
Separa as informações de database em:
- 'partition': informações de disco do database
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de database do JSON
except json.JSONDecodeError:
# Se falhar, a comparação será feita com um dict vazio
pass
database_data = dados_json.get("database", {})
# 2 Separa o campo 'partition' das demais chaves
partition = database_data.get("partition", {})
# 3 Separa o campo 'file_size_mb' das demais chaves
file_size_mb = database_data.get("file_size_mb", {})
# 4 Separa o campo 'db_accessible' das demais chaves
db_accessible = database_data.get("db_accessible", {})
# 5 Separa o campo 'last_modified' das demais chaves
last_modified = database_data.get("last_modified", {})
# 6 Separa o campo 'structure' das demais chaves
client_structure = database_data.get("structure", {})
# 7 Efetua a auditoria comparando as duas estruturas
audit = compare_structures(
standard_structure_data, # Estrutura padrão (atualizada)
# 4. Extração da Estrutura do Cliente
client_structure: Dict[str, Any] = database_data.get("structure", {})
# 5. Efetua a auditoria comparando as duas estruturas
auditoria_do_banco = compare_structures(
standard_structure_data, # Estrutura padrão
client_structure # Estrutura do cliente
)
)
# 7 Monta o JSON final
# 6. Monta o JSON final de RETORNO (incluindo as estruturas para debug)
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"database": {
"partition": partition,
"file_size_mb": file_size_mb,
"db_accessible": db_accessible,
"last_modified": last_modified,
"audit": audit
"partition": database_data.get("partition", {}),
"file_size_mb": database_data.get("file_size_mb", {}),
"db_accessible": database_data.get("db_accessible", {}),
"last_modified": database_data.get("last_modified", {}),
# --- CAMPOS DE DEBUG ---
"estrutura_cliente_debug": client_structure,
"estrutura_padrao_debug": standard_structure_data,
# -----------------------
"auditoria_do_banco": auditoria_do_banco
}
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de Database encontrado para o cliente fornecido."
)
)