fix(): Retornando ao padrão inicial

This commit is contained in:
Kenio 2025-12-08 15:51:17 -03:00
parent c0f6bfd9a3
commit bf0fcb59d5

View file

@ -6,254 +6,8 @@ from typing import Dict, Any, List
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
# --- CONSTANTE DE FILTRO ---
FILTER_SUBSTRING = ""
def is_ignored(name: str) -> bool:
"""Verifica se o nome contém a substring de filtro (case-insensitive)."""
if name is None:
return False
# Garante que a comparação é feita em caixa alta para robustez
return FILTER_SUBSTRING in str(name).upper()
# --- NOVA FUNÇÃO HELPER: NORMALIZAÇÃO DE CHAVES ---
def _normalize_keys_to_upper(data: Any) -> Any:
"""
Recursivamente converte todas as chaves de dicionário para UPPERCASE.
Mantém os valores (conteúdo) originais.
"""
if isinstance(data, dict):
new_dict = {}
for k, v in data.items():
# Converte a chave para UPPERCASE se for string
new_k = k.upper() if isinstance(k, str) else k
# Chama recursivamente para o valor
new_dict[new_k] = _normalize_keys_to_upper(v)
return new_dict
elif isinstance(data, list):
# Itera sobre a lista
return [_normalize_keys_to_upper(item) for item in data]
else:
# Retorna o valor (string, int, etc.)
return data
# --- FUNÇÃO HELPER PARA REMOÇÃO DE SOURCE_CODE ---
def _remove_source_code(item: Dict) -> Dict:
"""
Remove o campo 'SOURCE_CODE' ou 'source_code' de um item de dicionário.
(Funciona após a normalização, pois a chave sempre será 'SOURCE_CODE').
"""
item_copy = item.copy()
# Após a normalização para UPPERCASE, só precisamos checar 'SOURCE_CODE'
if 'SOURCE_CODE' in item_copy:
del item_copy['SOURCE_CODE']
return item_copy
# --- FUNÇÃO HELPER PARA OBTER IDENTIFICADOR ---
def get_element_identifier(item):
"""Retorna o principal identificador de um elemento de schema (assume UPPERCASE)."""
return (
item.get('TABLE_NAME') or
item.get('CONSTRAINT_NAME') or
item.get('INDEX_NAME') or
item.get('VIEW_NAME') or
item.get('PROCEDURE_NAME') or
item.get('TRIGGER_NAME')
)
# --- FUNÇÕES DE COMPARAÇÃO (Mantidas mas não chamadas para o retorno) ---
def compare_fields(standard_fields: List[Dict], client_fields: List[Dict], table_name: str) -> List[Dict]:
pass
def compare_structures(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
pass
# --- NOVO: FUNÇÃO: ELEMENTOS SOMENTE NO PADRÃO (Padrão - Cliente) ---
def find_standard_only_elements(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
"""
Identifica elementos presentes na estrutura PADRÃO mas ausentes na estrutura CLIENTE.
(Operação: Standard - Client)
Assume que as estruturas de entrada estão com as chaves em UPPERCASE.
"""
standard_only = {}
# --- 1. TABLES ---
# 1a. Nomes de Tabelas Cliente (para comparação de unicidade)
client_tables_names = {t.get('TABLE_NAME') for t in client_structure.get('TABLES', [])
if t.get('TABLE_NAME') and not is_ignored(t.get('TABLE_NAME'))}
# 1b. Agrupa Campos do Padrão por Nome de Tabela
standard_fields_by_table: Dict[str, List[Dict]] = {}
for field_item in standard_structure.get('TABLES', []):
table_name = field_item.get('TABLE_NAME')
if table_name and not is_ignored(table_name):
if table_name not in standard_fields_by_table:
standard_fields_by_table[table_name] = []
standard_fields_by_table[table_name].append(field_item)
# 1c. Encontra Tabelas Exclusivas do Padrão
standard_tables_names = set(standard_fields_by_table.keys())
# Diferença: Tabelas Padrão - Tabelas Cliente
unique_table_names = standard_tables_names - client_tables_names
standard_only_tables = []
for table_name in unique_table_names:
standard_only_tables.append({
"TABLE_NAME": table_name,
"FIELDS": [_remove_source_code(f) for f in standard_fields_by_table[table_name]]
})
if standard_only_tables:
standard_only["TABELAS_UNICAS"] = standard_only_tables
# --- 2. OUTROS ELEMENTOS (PKs, FKs, etc.) ---
elements_to_check = ['PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES', 'VIEWS', 'PROCEDURES', 'TRIGGERS']
for element_key in elements_to_check:
standard_elements = standard_structure.get(element_key, [])
client_elements = client_structure.get(element_key, [])
# Standard Set (Mapeamento para reobter o objeto limpo)
standard_set_normalized = {}
for item in standard_elements:
identifier = get_element_identifier(item)
if identifier and not is_ignored(identifier):
cleaned_item = _remove_source_code(item)
json_str = json.dumps(cleaned_item, sort_keys=True)
standard_set_normalized[json_str] = cleaned_item
standard_set = set(standard_set_normalized.keys())
# Client Set (Basta o set dos JSON strings)
client_elements_filtered = [item for item in client_elements
if not is_ignored(get_element_identifier(item))]
client_set = {json.dumps(_remove_source_code(item), sort_keys=True)
for item in client_elements_filtered}
# Calcula a diferença Padrão - Cliente
unique_standard_elements_str = standard_set - client_set
if unique_standard_elements_str:
standard_only[element_key + "_UNICOS"] = [
standard_set_normalized[json_str] for json_str in unique_standard_elements_str
]
return standard_only
# --- FUNÇÃO: ELEMENTOS SOMENTE NO CLIENTE (Agora usa chaves UPPERCASE) ---
def find_client_only_elements(standard_structure: Dict[str, Any], client_structure: Dict[str, Any]) -> Dict[str, Any]:
"""
Identifica elementos presentes na estrutura do cliente mas ausentes na estrutura padrão.
Assume que as estruturas de entrada estão com as chaves em UPPERCASE.
"""
client_only = {}
# --- 1. TABLES (Lidando com a estrutura achatada do cliente) ---
# 1a. Normalize e Filtra Tabelas Padrão
standard_tables_names = {t.get('TABLE_NAME') for t in standard_structure.get('TABLES', [])
if t.get('TABLE_NAME') and not is_ignored(t.get('TABLE_NAME'))}
# 1b. Agrupa e Filtra Campos do Cliente por Nome de Tabela
client_fields_by_table: Dict[str, List[Dict]] = {}
# A chave do nível superior agora é 'TABLES' (em UPPERCASE)
for field_item in client_structure.get('TABLES', []):
table_name = field_item.get('TABLE_NAME')
# Filtra a tabela e o elemento, se for CONVERSAO
if table_name and not is_ignored(table_name):
if table_name not in client_fields_by_table:
client_fields_by_table[table_name] = []
client_fields_by_table[table_name].append(field_item)
# 1c. Encontra Tabelas Exclusivas do Cliente
client_tables_names = set(client_fields_by_table.keys())
unique_table_names = client_tables_names - standard_tables_names
client_only_tables = []
for table_name in unique_table_names:
# Retorna a lista de campos agrupados sob o nome da tabela (limpa)
client_only_tables.append({
"TABLE_NAME": table_name,
"FIELDS": [_remove_source_code(f) for f in client_fields_by_table[table_name]]
})
if client_only_tables:
client_only["TABELAS_UNICAS"] = client_only_tables
# --- 2. OUTROS ELEMENTOS (PKs, FKs, etc.) ---
# Chaves de nível superior agora em UPPERCASE
elements_to_check = ['PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES', 'VIEWS', 'PROCEDURES', 'TRIGGERS']
for element_key in elements_to_check:
standard_elements = standard_structure.get(element_key, [])
client_elements = client_structure.get(element_key, [])
# Cria Sets baseados na representação JSON normalizada (limpa), filtrando IGNORED
# Standard Set (Filtrado para IGNORED)
standard_elements_filtered = [item for item in standard_elements
if not is_ignored(get_element_identifier(item))]
# O _remove_source_code é chamado após a normalização.
standard_set = {json.dumps(_remove_source_code(item), sort_keys=True)
for item in standard_elements_filtered}
# Client Set (Filtrado para IGNORED)
client_set_normalized = {}
for item in client_elements:
identifier = get_element_identifier(item)
if identifier and not is_ignored(identifier): # Filtra CONVERSAO
cleaned_item = _remove_source_code(item)
json_str = json.dumps(cleaned_item, sort_keys=True)
client_set_normalized[json_str] = cleaned_item
client_set = set(client_set_normalized.keys())
# Calcula a diferença Cliente - Padrão
unique_client_elements_str = client_set - standard_set
if unique_client_elements_str:
client_only[element_key + "_UNICOS"] = [
client_set_normalized[json_str] for json_str in unique_client_elements_str
]
return client_only
# --- CLASSE PRINCIPAL DE SERVIÇO ---
class ShowDatabaseService: class ShowDatabaseService:
def _clean_full_structure_for_output(self, structure: Dict[str, Any]) -> Dict[str, Any]:
"""Cria uma cópia limpa da estrutura, removendo 'SOURCE_CODE' em todos os elementos relevantes.
Assume que a estrutura está em UPPERCASE."""
cleaned = json.loads(json.dumps(structure))
keys_to_clean = ['TABLES', 'VIEWS', 'PROCEDURES', 'TRIGGERS', 'PRIMARY_KEYS', 'FOREIGN_KEYS', 'INDEXES']
for key in keys_to_clean:
if key in cleaned and isinstance(cleaned[key], list):
# Aplica a limpeza a cada item da lista
cleaned[key] = [_remove_source_code(item) for item in cleaned[key]]
# Garante que, se houver campos (FIELDS) aninhados, eles também sejam limpos
if 'TABLES' in cleaned and isinstance(cleaned['TABLES'], list):
for table in cleaned['TABLES']:
if 'FIELDS' in table and isinstance(table['FIELDS'], list):
table['FIELDS'] = [_remove_source_code(field) for field in table['FIELDS']]
return cleaned
def execute(self, client_id_schema: LogClientIdSchema) -> Dict[str, Any]: def execute(self, client_id_schema: LogClientIdSchema) -> Dict[str, Any]:
log_show_database_action = ShowDatabaseAction() log_show_database_action = ShowDatabaseAction()
@ -278,26 +32,6 @@ class ShowDatabaseService:
database_data = dados_json.get("database", {}) database_data = dados_json.get("database", {})
client_structure: Dict[str, Any] = database_data.get("structure", {}) client_structure: Dict[str, Any] = database_data.get("structure", {})
# NOVO PASSO CRUCIAL: Normaliza as chaves para UPPERCASE em ambas as estruturas
standard_structure_data = _normalize_keys_to_upper(standard_structure_data)
client_structure = _normalize_keys_to_upper(client_structure)
# Limpa ambas as estruturas de 'source_code' para o output de debug
debug_cliente = self._clean_full_structure_for_output(client_structure)
debug_padrao = self._clean_full_structure_for_output(standard_structure_data)
# Encontra elementos exclusivos do cliente (Cliente - Padrão)
elementos_unicos_cliente = find_client_only_elements(
standard_structure_data,
client_structure
)
# NOVO: Encontra elementos exclusivos do padrão (Padrão - Cliente)
elementos_unicos_padrao = find_standard_only_elements(
standard_structure_data,
client_structure
)
# Separa o campo 'partition' das demais chaves # Separa o campo 'partition' das demais chaves
partition_info = database_data.get("partition", {}) partition_info = database_data.get("partition", {})
@ -318,10 +52,6 @@ class ShowDatabaseService:
"hora": dados_json.get("hora"), "hora": dados_json.get("hora"),
"database": { "database": {
"partition": partition_info, "partition": partition_info,
# "default_schema": debug_padrao,
# "client_schema": debug_cliente,
# "client_only_items": elementos_unicos_cliente,
# "standard_only_items": elementos_unicos_padrao,
"file_size_mb": file_size_mb, "file_size_mb": file_size_mb,
"db_accessible": db_accessible, "db_accessible": db_accessible,
"last_modified": last_modified "last_modified": last_modified