From bf0fcb59d5159b2fae9330fe6f29835a22e513ee Mon Sep 17 00:00:00 2001 From: Kenio de Souza Date: Mon, 8 Dec 2025 15:51:17 -0300 Subject: [PATCH] =?UTF-8?q?fix():=20Retornando=20ao=20padr=C3=A3o=20inicia?= =?UTF-8?q?l?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../services/log/log_show_database_service.py | 270 ------------------ 1 file changed, 270 deletions(-) diff --git a/packages/v1/administrativo/services/log/log_show_database_service.py b/packages/v1/administrativo/services/log/log_show_database_service.py index cd91345..c53f58d 100644 --- a/packages/v1/administrativo/services/log/log_show_database_service.py +++ b/packages/v1/administrativo/services/log/log_show_database_service.py @@ -6,254 +6,8 @@ from typing import Dict, Any, List from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction -# --- CONSTANTE DE FILTRO --- -FILTER_SUBSTRING = "" - -def is_ignored(name: str) -> bool: - """Verifica se o nome contém a substring de filtro (case-insensitive).""" - if name is None: - return False - # Garante que a comparação é feita em caixa alta para robustez - return FILTER_SUBSTRING in str(name).upper() - -# --- NOVA FUNÇÃO HELPER: NORMALIZAÇÃO DE CHAVES --- - -def _normalize_keys_to_upper(data: Any) -> Any: - """ - Recursivamente converte todas as chaves de dicionário para UPPERCASE. - Mantém os valores (conteúdo) originais. - """ - if isinstance(data, dict): - new_dict = {} - for k, v in data.items(): - # Converte a chave para UPPERCASE se for string - new_k = k.upper() if isinstance(k, str) else k - # Chama recursivamente para o valor - new_dict[new_k] = _normalize_keys_to_upper(v) - return new_dict - elif isinstance(data, list): - # Itera sobre a lista - return [_normalize_keys_to_upper(item) for item in data] - else: - # Retorna o valor (string, int, etc.) - return data - -# --- FUNÇÃO HELPER PARA REMOÇÃO DE SOURCE_CODE --- - -def _remove_source_code(item: Dict) -> Dict: - """ - Remove o campo 'SOURCE_CODE' ou 'source_code' de um item de dicionário. - (Funciona após a normalização, pois a chave sempre será 'SOURCE_CODE'). - """ - item_copy = item.copy() - - # Após a normalização para UPPERCASE, só precisamos checar 'SOURCE_CODE' - if 'SOURCE_CODE' in item_copy: - del item_copy['SOURCE_CODE'] - - return item_copy - -# --- FUNÇÃO HELPER PARA OBTER IDENTIFICADOR --- -def get_element_identifier(item): - """Retorna o principal identificador de um elemento de schema (assume UPPERCASE).""" - return ( - item.get('TABLE_NAME') or - item.get('CONSTRAINT_NAME') or - item.get('INDEX_NAME') or - item.get('VIEW_NAME') or - item.get('PROCEDURE_NAME') or - item.get('TRIGGER_NAME') - ) - -# --- FUNÇÕES DE COMPARAÇÃO (Mantidas mas não chamadas para o retorno) --- -def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]: - pass -def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]: - pass - -# --- NOVO: FUNÇÃO: ELEMENTOS SOMENTE NO PADRÃO (Padrão - Cliente) --- - -def find_standard_only_elements(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]: - """ - Identifica elementos presentes na estrutura PADRÃO mas ausentes na estrutura CLIENTE. - (Operação: Standard - Client) - Assume que as estruturas de entrada já estão com as chaves em UPPERCASE. - """ - standard_only = {} - - # --- 1. TABLES --- - - # 1a. Nomes de Tabelas Cliente (para comparação de unicidade) - client_tables_names = {t.get('TABLE_NAME') for t in client_structure.get('TABLES', []) - if t.get('TABLE_NAME') and not is_ignored(t.get('TABLE_NAME'))} - - # 1b. Agrupa Campos do Padrão por Nome de Tabela - standard_fields_by_table: Dict[str, List[Dict]] = {} - for field_item in standard_structure.get('TABLES', []): - table_name = field_item.get('TABLE_NAME') - if table_name and not is_ignored(table_name): - if table_name not in standard_fields_by_table: - standard_fields_by_table[table_name] = [] - standard_fields_by_table[table_name].append(field_item) - - # 1c. Encontra Tabelas Exclusivas do Padrão - standard_tables_names = set(standard_fields_by_table.keys()) - - # Diferença: Tabelas Padrão - Tabelas Cliente - unique_table_names = standard_tables_names - client_tables_names - - standard_only_tables = [] - for table_name in unique_table_names: - standard_only_tables.append({ - "TABLE_NAME": table_name, - "FIELDS": [_remove_source_code(f) for f in standard_fields_by_table[table_name]] - }) - - if standard_only_tables: - standard_only["TABELAS_UNICAS"] = standard_only_tables - - # --- 2. OUTROS ELEMENTOS (PKs, FKs, etc.) --- - - elements_to_check = ['PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES', 'VIEWS', 'PROCEDURES', 'TRIGGERS'] - - for element_key in elements_to_check: - standard_elements = standard_structure.get(element_key, []) - client_elements = client_structure.get(element_key, []) - - # Standard Set (Mapeamento para reobter o objeto limpo) - standard_set_normalized = {} - for item in standard_elements: - identifier = get_element_identifier(item) - if identifier and not is_ignored(identifier): - cleaned_item = _remove_source_code(item) - json_str = json.dumps(cleaned_item, sort_keys=True) - standard_set_normalized[json_str] = cleaned_item - - standard_set = set(standard_set_normalized.keys()) - - # Client Set (Basta o set dos JSON strings) - client_elements_filtered = [item for item in client_elements - if not is_ignored(get_element_identifier(item))] - client_set = {json.dumps(_remove_source_code(item), sort_keys=True) - for item in client_elements_filtered} - - # Calcula a diferença Padrão - Cliente - unique_standard_elements_str = standard_set - client_set - - if unique_standard_elements_str: - standard_only[element_key + "_UNICOS"] = [ - standard_set_normalized[json_str] for json_str in unique_standard_elements_str - ] - - return standard_only - - -# --- FUNÇÃO: ELEMENTOS SOMENTE NO CLIENTE (Agora usa chaves UPPERCASE) --- - -def find_client_only_elements(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]: - """ - Identifica elementos presentes na estrutura do cliente mas ausentes na estrutura padrão. - Assume que as estruturas de entrada já estão com as chaves em UPPERCASE. - """ - client_only = {} - - # --- 1. TABLES (Lidando com a estrutura achatada do cliente) --- - - # 1a. Normalize e Filtra Tabelas Padrão - standard_tables_names = {t.get('TABLE_NAME') for t in standard_structure.get('TABLES', []) - if t.get('TABLE_NAME') and not is_ignored(t.get('TABLE_NAME'))} - - # 1b. Agrupa e Filtra Campos do Cliente por Nome de Tabela - client_fields_by_table: Dict[str, List[Dict]] = {} - # A chave do nível superior agora é 'TABLES' (em UPPERCASE) - for field_item in client_structure.get('TABLES', []): - table_name = field_item.get('TABLE_NAME') - # Filtra a tabela e o elemento, se for CONVERSAO - if table_name and not is_ignored(table_name): - if table_name not in client_fields_by_table: - client_fields_by_table[table_name] = [] - client_fields_by_table[table_name].append(field_item) - - # 1c. Encontra Tabelas Exclusivas do Cliente - client_tables_names = set(client_fields_by_table.keys()) - - unique_table_names = client_tables_names - standard_tables_names - - client_only_tables = [] - for table_name in unique_table_names: - # Retorna a lista de campos agrupados sob o nome da tabela (limpa) - client_only_tables.append({ - "TABLE_NAME": table_name, - "FIELDS": [_remove_source_code(f) for f in client_fields_by_table[table_name]] - }) - - if client_only_tables: - client_only["TABELAS_UNICAS"] = client_only_tables - - # --- 2. OUTROS ELEMENTOS (PKs, FKs, etc.) --- - - # Chaves de nível superior agora em UPPERCASE - elements_to_check = ['PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES', 'VIEWS', 'PROCEDURES', 'TRIGGERS'] - - for element_key in elements_to_check: - standard_elements = standard_structure.get(element_key, []) - client_elements = client_structure.get(element_key, []) - - # Cria Sets baseados na representação JSON normalizada (limpa), filtrando IGNORED - - # Standard Set (Filtrado para IGNORED) - standard_elements_filtered = [item for item in standard_elements - if not is_ignored(get_element_identifier(item))] - # O _remove_source_code é chamado após a normalização. - standard_set = {json.dumps(_remove_source_code(item), sort_keys=True) - for item in standard_elements_filtered} - - # Client Set (Filtrado para IGNORED) - client_set_normalized = {} - for item in client_elements: - identifier = get_element_identifier(item) - if identifier and not is_ignored(identifier): # Filtra CONVERSAO - cleaned_item = _remove_source_code(item) - json_str = json.dumps(cleaned_item, sort_keys=True) - client_set_normalized[json_str] = cleaned_item - - client_set = set(client_set_normalized.keys()) - - # Calcula a diferença Cliente - Padrão - unique_client_elements_str = client_set - standard_set - - if unique_client_elements_str: - client_only[element_key + "_UNICOS"] = [ - client_set_normalized[json_str] for json_str in unique_client_elements_str - ] - - return client_only - - -# --- CLASSE PRINCIPAL DE SERVIÇO --- - class ShowDatabaseService: - def _clean_full_structure_for_output(self, structure: Dict[str, Any]) -> Dict[str, Any]: - """Cria uma cópia limpa da estrutura, removendo 'SOURCE_CODE' em todos os elementos relevantes. - Assume que a estrutura já está em UPPERCASE.""" - cleaned = json.loads(json.dumps(structure)) - - keys_to_clean = ['TABLES', 'VIEWS', 'PROCEDURES', 'TRIGGERS', 'PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES'] - - for key in keys_to_clean: - if key in cleaned and isinstance(cleaned[key], list): - # Aplica a limpeza a cada item da lista - cleaned[key] = [_remove_source_code(item) for item in cleaned[key]] - - # Garante que, se houver campos (FIELDS) aninhados, eles também sejam limpos - if 'TABLES' in cleaned and isinstance(cleaned['TABLES'], list): - for table in cleaned['TABLES']: - if 'FIELDS' in table and isinstance(table['FIELDS'], list): - table['FIELDS'] = [_remove_source_code(field) for field in table['FIELDS']] - - return cleaned - def execute(self, client_id_schema: LogClientIdSchema) -> Dict[str, Any]: log_show_database_action = ShowDatabaseAction() @@ -278,26 +32,6 @@ class ShowDatabaseService: database_data = dados_json.get("database", {}) client_structure: Dict[str, Any] = database_data.get("structure", {}) - # NOVO PASSO CRUCIAL: Normaliza as chaves para UPPERCASE em ambas as estruturas - standard_structure_data = _normalize_keys_to_upper(standard_structure_data) - client_structure = _normalize_keys_to_upper(client_structure) - - # Limpa ambas as estruturas de 'source_code' para o output de debug - debug_cliente = self._clean_full_structure_for_output(client_structure) - debug_padrao = self._clean_full_structure_for_output(standard_structure_data) - - # Encontra elementos exclusivos do cliente (Cliente - Padrão) - elementos_unicos_cliente = find_client_only_elements( - standard_structure_data, - client_structure - ) - - # NOVO: Encontra elementos exclusivos do padrão (Padrão - Cliente) - elementos_unicos_padrao = find_standard_only_elements( - standard_structure_data, - client_structure - ) - # Separa o campo 'partition' das demais chaves partition_info = database_data.get("partition", {}) @@ -318,10 +52,6 @@ class ShowDatabaseService: "hora": dados_json.get("hora"), "database": { "partition": partition_info, - # "default_schema": debug_padrao, - # "client_schema": debug_cliente, - # "client_only_items": elementos_unicos_cliente, - # "standard_only_items": elementos_unicos_padrao, "file_size_mb": file_size_mb, "db_accessible": db_accessible, "last_modified": last_modified