This commit is contained in:
Kenio 2025-11-16 11:47:28 -03:00
parent 11b73e179d
commit 743b213bc1

View file

@ -42,8 +42,10 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
discrepancies = []
try:
# Mapeia os campos do cliente para acesso rápido
client_fields_map = {field['FIELD_NAME']: field for field in client_fields}
except KeyError:
# Isso pode acontecer se a estrutura achatada não tiver 'FIELD_NAME'
return [{
"tabela": table_name,
"tipo": "Erro de Estrutura Interna",
@ -70,6 +72,13 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
standard_value = standard_field.get(attr)
client_value = client_field.get(attr)
# Normalização de valores nulos/vazios
if standard_value == 'NULL' or standard_value is None:
standard_value = None
if client_value == 'NULL' or client_value is None:
client_value = None
# Comparação
if standard_value != client_value:
discrepancies.append({
"tabela": table_name,
@ -81,7 +90,7 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
return discrepancies
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS ---
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (CORRIGIDA) ---
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
@ -98,7 +107,6 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
"tabelas_cliente": [],
"outros_elementos": []
}
# Removido "elementos_encontrados_e_comparados" para simplificar o retorno da auditoria
}
if not standard_structure or not client_structure:
@ -120,6 +128,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
# --- 1. AUDITORIA DE TABELAS ---
# Mapeamento do Padrão (onde cada item de 'tables' é um objeto de tabela com 'FIELDS')
standard_tables_filtered = []
for t in standard_structure.get('tables', []):
table_name = t.get('TABLE_NAME')
@ -132,23 +141,34 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
standard_tables_filtered.append(t)
standard_tables_map = {t['TABLE_NAME']: t for t in standard_tables_filtered}
client_tables_map = {}
# -------------------------------------------------------------
# NOVO Mapeamento do Cliente (Reestrutura a lista achatada de campos)
# -------------------------------------------------------------
client_fields_by_table: Dict[str, List[Dict]] = {}
client_tables_names = set()
malformed_tables_data = []
for t in client_structure.get('tables', []):
table_name = t.get('TABLE_NAME')
for field_item in client_structure.get('tables', []):
table_name = field_item.get('TABLE_NAME')
if is_ignored(table_name):
audit_log["elementos_ignorados"]["tabelas_cliente"].append({
"identificador": table_name,
"estrutura_completa": _remove_source_code(t)
})
# O cliente pode ter campos CONVERSAO em tabelas não CONVERSAO, mas ignoramos só o elemento
if table_name:
audit_log["elementos_ignorados"]["tabelas_cliente"].append({
"identificador": table_name,
"estrutura_completa": _remove_source_code(field_item)
})
elif table_name:
client_tables_map[table_name] = t
client_tables_names.add(table_name)
if table_name not in client_fields_by_table:
client_fields_by_table[table_name] = []
# Adiciona o item (que é um campo) à lista de campos daquela tabela
client_fields_by_table[table_name].append(field_item)
else:
malformed_tables_data.append(t)
malformed_tables_data.append(field_item)
if malformed_tables_data:
audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["estrutura_malformada_cliente"] = {
@ -156,19 +176,21 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
"total_objetos_ignorados": len(malformed_tables_data)
}
missing_tables = standard_tables_map.keys() - client_tables_map.keys()
# Verifica tabelas faltantes
missing_tables = standard_tables_map.keys() - client_tables_names
if missing_tables:
audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["tabelas_faltando"].extend(sorted(list(missing_tables)))
# Compara campos para tabelas existentes
for table_name in standard_tables_map.keys():
if table_name in client_tables_map:
if table_name in client_tables_names:
standard_table = standard_tables_map[table_name]
client_table = client_tables_map[table_name]
client_fields = client_fields_by_table[table_name] # Lista de campos correta
field_discrepancies = compare_fields(
standard_table.get('FIELDS', []),
client_table.get('FIELDS', []),
client_fields, # Passando a lista de campos do cliente
table_name
)
@ -183,7 +205,6 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
for element_key in elements_to_check:
standard_elements = standard_structure.get(element_key, [])
# Filtra elementos ignorados no Padrão
standard_elements_filtered = []
for item in standard_elements:
identifier = get_element_identifier(item)
@ -197,7 +218,6 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
else:
standard_elements_filtered.append(item)
# Filtra elementos ignorados no Cliente
client_elements_filtered = []
client_elements = client_structure.get(element_key, [])
for item in client_elements:
@ -212,7 +232,6 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
else:
client_elements_filtered.append(item)
# Criação de Sets a partir dos elementos normalizados (sem SOURCE_CODE) para comparação
standard_elements_normalized = [_remove_source_code(item) for item in standard_elements_filtered]
client_elements_normalized = [_remove_source_code(item) for item in client_elements_filtered]
@ -248,7 +267,6 @@ class ShowDatabaseService:
def _clean_full_structure_for_output(self, structure: Dict[str, Any]) -> Dict[str, Any]:
"""Cria uma cópia limpa da estrutura, removendo 'SOURCE_CODE' em todos os elementos relevantes."""
# Garante deep copy de toda a estrutura
cleaned = json.loads(json.dumps(structure))
keys_to_clean = ['tables', 'views', 'procedures', 'triggers', 'primary_keys', 'foreign_keys', 'indexes']
@ -258,7 +276,7 @@ class ShowDatabaseService:
# Aplica a limpeza a cada item da lista
cleaned[key] = [_remove_source_code(item) for item in cleaned[key]]
# Garante que, se houver campos (FIELDS) aninhados, eles também sejam limpos
# Garante que, se houver campos (FIELDS) aninhados (no padrão), eles também sejam limpos
if 'tables' in cleaned and isinstance(cleaned['tables'], list):
for table in cleaned['tables']:
if 'FIELDS' in table and isinstance(table['FIELDS'], list):