This commit is contained in:
Kenio 2025-11-16 12:05:17 -03:00
parent df47dc3bbe
commit 809b256e5b

View file

@ -6,11 +6,22 @@ from typing import Dict, Any, List
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
# --- FUNÇÃO HELPER PARA REMOÇÃO DE SOURCE_CODE (MANTIDA) --- # --- CONSTANTE DE FILTRO ---
FILTER_SUBSTRING = "CONVERSAO"
def is_ignored(name: str) -> bool:
"""Verifica se o nome contém a substring de filtro (case-insensitive)."""
if name is None:
return False
# Garante que a comparação é feita em caixa alta para robustez
return FILTER_SUBSTRING in str(name).upper()
# --- FUNÇÃO HELPER PARA REMOÇÃO DE SOURCE_CODE ---
def _remove_source_code(item: Dict) -> Dict: def _remove_source_code(item: Dict) -> Dict:
""" """
Remove o campo 'SOURCE_CODE' ou 'source_code' de um item de dicionário. Remove o campo 'SOURCE_CODE' ou 'source_code' de um item de dicionário.
(Case-insensitive para chaves comuns de código-fonte).
""" """
item_copy = item.copy() item_copy = item.copy()
@ -22,28 +33,122 @@ def _remove_source_code(item: Dict) -> Dict:
return item_copy return item_copy
# --- FUNÇÃO HELPER PARA OBTER IDENTIFICADOR ---
def get_element_identifier(item):
"""Retorna o principal identificador de um elemento de schema."""
return (
item.get('TABLE_NAME') or
item.get('CONSTRAINT_NAME') or
item.get('INDEX_NAME') or
item.get('VIEW_NAME') or
item.get('PROCEDURE_NAME') or
item.get('TRIGGER_NAME')
)
# --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (MANTIDA, MAS NÃO SERÁ EXECUTADA) --- # --- FUNÇÃO AUXILIAR PARA COMPARAÇÃO DE CAMPOS (Mantida) ---
# NOTE: As funções compare_fields e compare_structures não precisam ser modificadas,
# pois não serão chamadas no novo fluxo do 'execute', mas são mantidas
# no escopo para completude, embora irrelevantes para este retorno.
def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]: def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]:
# ... (código compare_fields omitido para foco na alteração) """
pass Compara os campos de uma tabela entre as estruturas padrão e cliente.
"""
# NOTE: Esta função não é usada no fluxo atual de retorno, mas é mantida por completude.
discrepancies = []
# ... (implementação omitida)
return discrepancies
# --- FUNÇÃO PRINCIPAL DE COMPARAÇÃO DE ESTRUTURAS (Mantida para fins de contexto, mas não chamada no execute) ---
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]: def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
# ... (código compare_structures omitido para foco na alteração) # NOTE: Esta função não é usada no fluxo atual de retorno, mas é mantida por completude.
pass audit_log = {}
# ... (implementação omitida)
return audit_log
# --- CLASSE PRINCIPAL DE SERVIÇO (FLUXO DE EXECUÇÃO MODIFICADO) --- # --- NOVA FUNÇÃO: ELEMENTOS SOMENTE NO CLIENTE ---
def find_client_only_elements(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
"""
Identifica elementos presentes na estrutura do cliente mas ausentes na estrutura padrão.
Filtra elementos que contenham a substring CONVERSAO.
"""
client_only = {}
# --- 1. TABLES (Lidando com a estrutura achatada do cliente) ---
# 1a. Normalize e Filtra Tabelas Padrão
standard_tables_names = {t.get('TABLE_NAME') for t in standard_structure.get('tables', [])
if t.get('TABLE_NAME') and not is_ignored(t.get('TABLE_NAME'))}
# 1b. Agrupa e Filtra Campos do Cliente por Nome de Tabela
client_fields_by_table: Dict[str, List[Dict]] = {}
for field_item in client_structure.get('tables', []):
table_name = field_item.get('TABLE_NAME')
# Filtra a tabela e o elemento, se for CONVERSAO
if table_name and not is_ignored(table_name):
if table_name not in client_fields_by_table:
client_fields_by_table[table_name] = []
client_fields_by_table[table_name].append(field_item)
# 1c. Encontra Tabelas Exclusivas do Cliente
client_tables_names = set(client_fields_by_table.keys())
client_only_tables = []
unique_table_names = client_tables_names - standard_tables_names
for table_name in unique_table_names:
# Retorna a lista de campos agrupados sob o nome da tabela (limpa)
client_only_tables.append({
"TABLE_NAME": table_name,
"FIELDS": [_remove_source_code(f) for f in client_fields_by_table[table_name]]
})
if client_only_tables:
client_only["tabelas_unicas"] = client_only_tables
# --- 2. OUTROS ELEMENTOS (PKs, FKs, etc.) ---
elements_to_check = ['primary_keys', 'foreign_keys', 'indexes', 'views', 'procedures', 'triggers']
for element_key in elements_to_check:
standard_elements = standard_structure.get(element_key, [])
client_elements = client_structure.get(element_key, [])
# Cria Sets baseados na representação JSON normalizada (limpa), filtrando IGNORED
# Standard Set (Filtrado para IGNORED)
standard_elements_filtered = [item for item in standard_elements
if not is_ignored(get_element_identifier(item))]
standard_set = {json.dumps(_remove_source_code(item), sort_keys=True)
for item in standard_elements_filtered}
# Client Set (Filtrado para IGNORED)
client_set_normalized = {}
for item in client_elements:
identifier = get_element_identifier(item)
if identifier and not is_ignored(identifier): # Filtra CONVERSAO
cleaned_item = _remove_source_code(item)
json_str = json.dumps(cleaned_item, sort_keys=True)
client_set_normalized[json_str] = cleaned_item
client_set = set(client_set_normalized.keys())
# Calcula a diferença Cliente - Padrão
unique_client_elements_str = client_set - standard_set
if unique_client_elements_str:
client_only[element_key + "_unicos"] = [
client_set_normalized[json_str] for json_str in unique_client_elements_str
]
return client_only
# --- CLASSE PRINCIPAL DE SERVIÇO ---
class ShowDatabaseService: class ShowDatabaseService:
def _clean_full_structure_for_output(self, structure: Dict[str, Any]) -> Dict[str, Any]: def _clean_full_structure_for_output(self, structure: Dict[str, Any]) -> Dict[str, Any]:
"""Cria uma cópia limpa da estrutura, removendo 'SOURCE_CODE' em todos os elementos relevantes.""" """Cria uma cópia limpa da estrutura, removendo 'SOURCE_CODE' em todos os elementos relevantes."""
# Garante deep copy de toda a estrutura
cleaned = json.loads(json.dumps(structure)) cleaned = json.loads(json.dumps(structure))
keys_to_clean = ['tables', 'views', 'procedures', 'triggers', 'primary_keys', 'foreign_keys', 'indexes'] keys_to_clean = ['tables', 'views', 'procedures', 'triggers', 'primary_keys', 'foreign_keys', 'indexes']
@ -53,7 +158,7 @@ class ShowDatabaseService:
# Aplica a limpeza a cada item da lista # Aplica a limpeza a cada item da lista
cleaned[key] = [_remove_source_code(item) for item in cleaned[key]] cleaned[key] = [_remove_source_code(item) for item in cleaned[key]]
# Garante que, se houver campos (FIELDS) aninhados, eles também sejam limpos # Garante que, se houver campos (FIELDS) aninhados (no padrão), eles também sejam limpos
if 'tables' in cleaned and isinstance(cleaned['tables'], list): if 'tables' in cleaned and isinstance(cleaned['tables'], list):
for table in cleaned['tables']: for table in cleaned['tables']:
if 'FIELDS' in table and isinstance(table['FIELDS'], list): if 'FIELDS' in table and isinstance(table['FIELDS'], list):
@ -65,15 +170,11 @@ class ShowDatabaseService:
log_show_database_action = ShowDatabaseAction() log_show_database_action = ShowDatabaseAction()
# 1. Busca os dados completos do log
dados_completos = log_show_database_action.execute(client_id_schema) dados_completos = log_show_database_action.execute(client_id_schema)
if dados_completos and dados_completos.get("file"): if dados_completos and dados_completos.get("file"):
# 2. Converte a string JSON do 'file'
dados_json = json.loads(dados_completos["file"]) dados_json = json.loads(dados_completos["file"])
# 3. Extrai a string JSON da estrutura padrão e converte
standard_structure_json_string = dados_completos.get("standard_structure_json") standard_structure_json_string = dados_completos.get("standard_structure_json")
standard_structure_data: Dict[str, Any] = {} standard_structure_data: Dict[str, Any] = {}
if standard_structure_json_string: if standard_structure_json_string:
@ -83,14 +184,19 @@ class ShowDatabaseService:
pass pass
database_data = dados_json.get("database", {}) database_data = dados_json.get("database", {})
# 4. Extrai a estrutura do cliente
client_structure: Dict[str, Any] = database_data.get("structure", {}) client_structure: Dict[str, Any] = database_data.get("structure", {})
# 5. Limpa ambas as estruturas de 'source_code' para o output # Limpa ambas as estruturas de 'source_code' para o output de debug
debug_cliente = self._clean_full_structure_for_output(client_structure) debug_cliente = self._clean_full_structure_for_output(client_structure)
debug_padrao = self._clean_full_structure_for_output(standard_structure_data) debug_padrao = self._clean_full_structure_for_output(standard_structure_data)
# NOVO: Encontra elementos exclusivos do cliente (Cliente - Padrão)
elementos_unicos_cliente = find_client_only_elements(
standard_structure_data,
client_structure
)
# 6. Monta o JSON final como lista numerada, conforme solicitado # 1. Lista para Análise Visual (estruturas Padrão e Cliente)
data_list_for_visual_check = [ data_list_for_visual_check = [
{ {
"id": 1, "id": 1,
@ -104,10 +210,11 @@ class ShowDatabaseService:
} }
] ]
# Retorno da informação no formato esperado pelo cliente # 2. Monta o retorno final com as duas seções
return { return {
"message": "Estruturas de Banco de Dados para Análise Visual (Source Code Removido)", "message": "Estruturas de Banco de Dados para Análise Visual e Itens Exclusivos do Cliente",
"data": data_list_for_visual_check "data": data_list_for_visual_check,
"itens_nao_encontrados_no_padrao": elementos_unicos_cliente
} }
else: else: