This commit is contained in:
Kenio 2025-11-16 10:41:04 -03:00
parent 939a301fd1
commit cc8606ba98

View file

@ -7,16 +7,20 @@ from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
# --- CONSTANTE DE FILTRO ---
# Define os identificadores a serem ignorados em TODA a auditoria
IGNORE_ELEMENTS_SET = {"CONVERSAO_RI"}
FILTER_SUBSTRING = "CONVERSAO"
def is_ignored(name: str) -> bool:
"""Verifica se o nome contém a substring de filtro (case-insensitive)."""
if name is None:
return False
return FILTER_SUBSTRING in str(name).upper()
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS ---
def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]:
"""
Compara os campos de uma tabela entre as estruturas padrão e cliente.
(Esta função não precisa de filtro, pois é chamada apenas para tabelas FILTRADAS)
Compara os campos de uma tabela entre as estruturas padrão e cliente,
retornando apenas os nomes dos campos/atributos divergentes.
"""
discrepancies = []
@ -60,7 +64,7 @@ def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table
return discrepancies
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (AJUSTADA COM FILTRO) ---
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (AJUSTADA COM FILTRO E DEBUG) ---
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
@ -71,6 +75,11 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
"elementos_faltando": [],
"divergencia_de_campos": [],
"estrutura_malformada_cliente": None
},
"elementos_ignorados": { # Novo campo para armazenar o que foi filtrado
"tabelas_padrao": [],
"tabelas_cliente": [],
"outros_elementos": []
}
}
@ -79,13 +88,32 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
audit_log["discrepancias_encontradas"] = True
return audit_log
# --- 1. AUDITORIA DE TABELAS E SEUS CAMPOS (APLICA FILTRO AQUI) ---
# --- FUNÇÃO HELPER PARA OBTER IDENTIFICADOR ---
def get_element_identifier(item):
"""Retorna o principal identificador de um elemento de schema."""
return (
item.get('TABLE_NAME') or
item.get('CONSTRAINT_NAME') or
item.get('INDEX_NAME') or
item.get('VIEW_NAME') or
item.get('PROCEDURE_NAME') or
item.get('TRIGGER_NAME')
)
# --- 1. AUDITORIA DE TABELAS ---
# 1.1 Filtragem e Mapeamento Padrão
standard_tables_filtered = [
t for t in standard_structure.get('tables', [])
if t.get('TABLE_NAME') not in IGNORE_ELEMENTS_SET
]
standard_tables_filtered = []
for t in standard_structure.get('tables', []):
table_name = t.get('TABLE_NAME')
if is_ignored(table_name):
audit_log["elementos_ignorados"]["tabelas_padrao"].append({
"identificador": table_name,
"estrutura_completa": t # Conteúdo completo para debug
})
elif table_name:
standard_tables_filtered.append(t)
standard_tables_map = {t['TABLE_NAME']: t for t in standard_tables_filtered}
# 1.2 Mapeamento Cliente (com filtro e tratamento de KeyError)
@ -95,10 +123,14 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
for t in client_structure.get('tables', []):
table_name = t.get('TABLE_NAME')
# Filtra TABLE_NAME = 'CONVERSAO_RI' E trata a falta da chave
if table_name and table_name not in IGNORE_ELEMENTS_SET:
if is_ignored(table_name):
audit_log["elementos_ignorados"]["tabelas_cliente"].append({
"identificador": table_name,
"estrutura_completa": t # Conteúdo completo para debug
})
elif table_name:
client_tables_map[table_name] = t
elif not table_name:
else:
malformed_tables_data.append(t)
if malformed_tables_data:
@ -108,7 +140,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
"total_objetos_ignorados": len(malformed_tables_data)
}
# 1.3 Compara as tabelas filtradas
# 1.3 Compara e busca divergências de campos nas tabelas filtradas
missing_tables = standard_tables_map.keys() - client_tables_map.keys()
if missing_tables:
audit_log["discrepancias_encontradas"] = True
@ -129,7 +161,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
audit_log["discrepancias_encontradas"] = True
audit_log["detalhes"]["divergencia_de_campos"].extend(field_discrepancies)
# --- 2. AUDITORIA DE OUTROS ELEMENTOS (PKs, FKs, etc.) (APLICA FILTRO AQUI) ---
# --- 2. AUDITORIA DE OUTROS ELEMENTOS (PKs, FKs, etc.) ---
elements_to_check = ['primary_keys', 'foreign_keys', 'indexes', 'views', 'procedures', 'triggers']
@ -137,15 +169,31 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
standard_elements = standard_structure.get(element_key, [])
client_elements = client_structure.get(element_key, [])
# FILTRAGEM DE ELEMENTOS (Ex: Se CONVERSAO_RI for um CONSTRAINT_NAME ou INDEX_NAME)
standard_elements_filtered = []
for item in standard_elements:
identifier = get_element_identifier(item)
if is_ignored(identifier):
audit_log["elementos_ignorados"]["outros_elementos"].append({
"origem": "padrao",
"tipo": element_key,
"identificador": identifier,
"estrutura_completa": item
})
else:
standard_elements_filtered.append(item)
def filter_element(item):
# Tenta encontrar um identificador primário para o filtro
identifier = item.get('TABLE_NAME') or item.get('CONSTRAINT_NAME') or item.get('INDEX_NAME')
return identifier not in IGNORE_ELEMENTS_SET
standard_elements_filtered = list(filter(filter_element, standard_elements))
client_elements_filtered = list(filter(filter_element, client_elements))
client_elements_filtered = []
for item in client_elements:
identifier = get_element_identifier(item)
if is_ignored(identifier):
audit_log["elementos_ignorados"]["outros_elementos"].append({
"origem": "cliente",
"tipo": element_key,
"identificador": identifier,
"estrutura_completa": item
})
else:
client_elements_filtered.append(item)
try:
standard_set = {json.dumps(item, sort_keys=True) for item in standard_elements_filtered}
@ -162,7 +210,7 @@ def compare_structures(standard_structure: Dict[str, Any], client_structure: Dic
for missing_str in missing_elements:
element_dict = json.loads(missing_str)
identifier = element_dict.get('TABLE_NAME') or element_dict.get('CONSTRAINT_NAME') or str(element_dict)[:50]
identifier = get_element_identifier(element_dict) or str(element_dict)[:50]
audit_log["detalhes"]["elementos_faltando"].append({
"tipo": element_key,
@ -181,6 +229,7 @@ class ShowDatabaseService:
log_show_database_action = ShowDatabaseAction()
# Executa a ação
dados_completos = log_show_database_action.execute(client_id_schema)
if dados_completos and dados_completos.get("file"):