This commit is contained in:
Kenio 2025-11-16 09:59:15 -03:00
parent 78fdb706eb
commit cad8a76289

View file

@ -9,35 +9,33 @@ from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (Fornecida e Mantida) --- import json
from typing import Dict, Any, List
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (AJUSTADA) ---
def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]: def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]:
""" """
Compara os campos de uma tabela entre as estruturas padrão e cliente. Compara os campos de uma tabela entre as estruturas padrão e cliente,
retornando apenas os nomes dos campos/atributos divergentes.
""" """
discrepancies = [] discrepancies = []
# Mapear campos do cliente por nome para fácil lookup
# Esta linha pode causar KeyError: 'FIELD_NAME' se os dados forem malformados.
# Assumimos que a lista de campos é mais confiável do que a lista de tabelas.
try: try:
# 1. Mapear campos do cliente por nome para fácil lookup
client_fields_map = {field['FIELD_NAME']: field for field in client_fields} client_fields_map = {field['FIELD_NAME']: field for field in client_fields}
except KeyError as e: except KeyError:
# Se ocorrer um erro aqui, a lista de campos está malformada.
# Reportamos isso como uma discrepância na tabela.
return [{ return [{
"tabela": table_name, "tabela": table_name,
"tipo": "Erro de Estrutura Interna", "tipo": "Erro de Estrutura Interna",
"detalhe": f"Um ou mais campos na tabela do cliente não possuem a chave {e}.", "detalhe": "Um ou mais campos na tabela do cliente não possuem a chave 'FIELD_NAME'."
"padrao": None,
"cliente": client_fields
}] }]
# 2. Iterar sobre os campos padrão e verificar se existem no cliente # 2. Iterar sobre os campos padrão e verificar se existem no cliente
for standard_field in standard_fields: for standard_field in standard_fields:
field_name = standard_field.get('FIELD_NAME') field_name = standard_field.get('FIELD_NAME')
if not field_name: # Verificação de segurança adicional if not field_name:
continue continue
if field_name not in client_fields_map: if field_name not in client_fields_map:
@ -45,9 +43,8 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
discrepancies.append({ discrepancies.append({
"tabela": table_name, "tabela": table_name,
"tipo": "Campo Faltando", "tipo": "Campo Faltando",
"detalhe": f"Campo '{field_name}' ausente na tabela do cliente.", "campo": field_name, # Identificador simples
"padrao": standard_field, "detalhe": f"Campo '{field_name}' ausente na tabela do cliente."
"cliente": None
}) })
continue continue
@ -61,14 +58,13 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
standard_value = standard_field.get(attr) standard_value = standard_field.get(attr)
client_value = client_field.get(attr) client_value = client_field.get(attr)
# Nota: Normalize strings/tipos, se necessário, antes desta comparação final # Removemos a necessidade de 'padrao' e 'cliente' no JSON final
if standard_value != client_value: if standard_value != client_value:
discrepancies.append({ discrepancies.append({
"tabela": table_name, "tabela": table_name,
"tipo": f"Atributo Divergente ({attr})", "tipo": f"Atributo Divergente ({attr})",
"detalhe": f"Atributo '{attr}' do campo '{field_name}' não coincide.", "campo": field_name, # Identificador simples
"padrao": standard_value, "detalhe": f"O atributo '{attr}' do campo '{field_name}' diverge (Padrão: '{standard_value}' | Cliente: '{client_value}')."
"cliente": client_value
}) })
return discrepancies return discrepancies
@ -78,10 +74,8 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]: def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
""" """
Compara a estrutura do cliente com a estrutura padrão e lista as discrepâncias. Compara a estrutura do cliente com a estrutura padrão, retornando apenas
os identificadores das discrepâncias.
Ajuste aplicado: Mapeamento seguro das tabelas do cliente para evitar KeyError
se algum item na lista 'tables' estiver sem a chave 'TABLE_NAME'.
""" """
audit_log = { audit_log = {
"discrepancias_encontradas": False, "discrepancias_encontradas": False,
@ -89,7 +83,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
"tabelas_faltando": [], "tabelas_faltando": [],
"elementos_faltando": [], "elementos_faltando": [],
"divergencia_de_campos": [], "divergencia_de_campos": [],
"estrutura_malformada_cliente": None # Novo campo para registrar erros de TABLE_NAME "estrutura_malformada_cliente": None
} }
} }
@ -98,47 +92,41 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
audit_log["discrepancias_encontradas"] = True audit_log["discrepancias_encontradas"] = True
return audit_log return audit_log
# 1.1 Mapear tabelas padrão (Assumindo que o JSON padrão é sempre correto) # Mapeamento seguro das tabelas do cliente para evitar KeyError
standard_tables_map = {t['TABLE_NAME']: t for t in standard_structure.get('tables', [])} standard_tables_map = {t['TABLE_NAME']: t for t in standard_structure.get('tables', [])}
# 1.2 Mapear tabelas cliente (COM TRATAMENTO DE ERRO: correção para 'KeyError: TABLE_NAME')
client_tables_map = {} client_tables_map = {}
malformed_tables_data = [] malformed_tables_data = []
for t in client_structure.get('tables', []): for t in client_structure.get('tables', []):
table_name = t.get('TABLE_NAME') # Acesso seguro com .get() table_name = t.get('TABLE_NAME')
if table_name: if table_name:
client_tables_map[table_name] = t client_tables_map[table_name] = t
else: else:
# Captura objetos que não têm a chave TABLE_NAME para log
malformed_tables_data.append(t) malformed_tables_data.append(t)
# 1.3 Se houve dados malformados, registre na auditoria
if malformed_tables_data: if malformed_tables_data:
audit_log["discrepancias_encontradas"] = True audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["estrutura_malformada_cliente"] = { audit_log["detalhes"]["estrutura_malformada_cliente"] = {
"descricao": "Alguns objetos na lista 'tables' do cliente não possuem a chave 'TABLE_NAME' e foram ignorados.", "descricao": "Alguns objetos na lista 'tables' do cliente não possuem a chave 'TABLE_NAME' e foram ignorados.",
"objetos_problematicos": malformed_tables_data "total_objetos_ignorados": len(malformed_tables_data) # Remove o conteúdo completo
} }
# --- 1. AUDITORIA DE TABELAS E SEUS CAMPOS --- # --- 1. AUDITORIA DE TABELAS E SEUS CAMPOS ---
# 1.4 Tabelas Faltando (No padrão, mas não no cliente)
missing_tables = standard_tables_map.keys() - client_tables_map.keys() missing_tables = standard_tables_map.keys() - client_tables_map.keys()
if missing_tables: if missing_tables:
audit_log["discrepancias_encontradas"] = True audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["tabelas_faltando"].extend(sorted(list(missing_tables))) audit_log["detalhes"]["tabelas_faltando"].extend(sorted(list(missing_tables)))
# 1.5 Tabelas Correspondentes (Verificação de Campos) for table_name in standard_tables_map.keys():
for table_name, standard_table in standard_tables_map.items():
if table_name in client_tables_map: if table_name in client_tables_map:
standard_table = standard_tables_map[table_name]
client_table = client_tables_map[table_name] client_table = client_tables_map[table_name]
standard_fields = standard_table.get('FIELDS', []) standard_fields = standard_table.get('FIELDS', [])
client_fields = client_table.get('FIELDS', []) client_fields = client_table.get('FIELDS', [])
# Chama a função compare_fields
field_discrepancies = compare_fields(standard_fields, client_fields, table_name) field_discrepancies = compare_fields(standard_fields, client_fields, table_name)
if field_discrepancies: if field_discrepancies:
@ -153,27 +141,44 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
standard_elements = standard_structure.get(element_key, []) standard_elements = standard_structure.get(element_key, [])
client_elements = client_structure.get(element_key, []) client_elements = client_structure.get(element_key, [])
# Converte em strings JSON para comparação em conjunto (Set)
try: try:
standard_set = {json.dumps(item, sort_keys=True) for item in standard_elements} standard_set = {json.dumps(item, sort_keys=True) for item in standard_elements}
client_set = {json.dumps(item, sort_keys=True) for item in client_elements} client_set = {json.dumps(item, sort_keys=True) for item in client_elements}
except TypeError as e: except TypeError:
# Caso algum elemento na lista não seja serializável (ex: None ou tipo misto)
audit_log["discrepancias_encontradas"] = True audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["erro_serializacao"] = f"Erro ao comparar {element_key}: {e}. Os dados podem estar malformados." audit_log["detalhes"]["erro_serializacao"] = f"Erro ao comparar {element_key}. Os dados podem estar malformados."
continue continue
# Encontra elementos que estão no padrão mas não no cliente
missing_elements = standard_set - client_set missing_elements = standard_set - client_set
if missing_elements: if missing_elements:
audit_log["discrepancias_encontradas"] = True audit_log["discrepancias_encontradas"] = True
for missing_str in missing_elements: for missing_str in missing_elements:
element_dict = json.loads(missing_str)
# Tenta encontrar um identificador simples
identifier = element_dict.get('TABLE_NAME')
if not identifier:
identifier = element_dict.get('CONSTRAINT_NAME')
if not identifier:
identifier = element_dict.get('VIEW_NAME')
if not identifier:
identifier = element_dict.get('PROCEDURE_NAME')
if not identifier:
identifier = element_dict.get('TRIGGER_NAME')
# Caso não encontre chave específica, usa representação string truncada
if not identifier:
identifier = str(element_dict)
if len(identifier) > 50: # Limita a string
identifier = identifier[:50] + '...'
audit_log["detalhes"]["elementos_faltando"].append({ audit_log["detalhes"]["elementos_faltando"].append({
"tipo": element_key, "tipo": element_key,
"detalhe": "Elemento ausente", "identificador": identifier, # Identificador simples do elemento
"json_padrao": json.loads(missing_str) "detalhe": f"Um elemento '{element_key}' com identificador '{identifier}' está ausente no cliente."
# Conteúdo completo do JSON removido
}) })
return audit_log return audit_log