fix(): Refeito classe de descompactação de inteiro teor e aplicado no arquivo repository.py

This commit is contained in:
Kenio 2025-11-17 12:02:14 -03:00
parent 9ace6448bd
commit 46a3e3bd1d
2 changed files with 82 additions and 70 deletions

View file

@ -1,91 +1,99 @@
# Importa a biblioteca nativa 'zlib' usada para descompressão de dados binários.
import base64 import base64
import zlib import zlib
# Define uma classe utilitária chamada 'String', contendo apenas métodos estáticos. class DataCompressor:
# Essa abordagem permite o uso direto sem necessidade de instanciar a classe. """
class String: Classe utilitária para compressão e descompressão de dados.
@staticmethod
def decompress(vf_string):
"""
Descomprime e decodifica texto de origem WPTools/Firebird.
Finalidade: Funções:
Converter o conteúdo de campos BLOB ou strings compactadas (como no Delphi) - compress(): compacta texto/bytes com zlib e opcional Base64
em texto legível, detectando automaticamente se o conteúdo está: - decompress(): decodifica Base64, descompacta e retorna texto legível
- Compactado com zlib """
- Codificado em ISO-8859-1 (padrão ANSI)
- Representado como bytes puros @staticmethod
def decompress(data):
""" """
# Verifica se o valor recebido é nulo, vazio ou None. Descomprime dados vindos de Firebird/Delphi/strings compactadas.
# Se for, retorna string vazia para evitar erros de processamento.
if not vf_string: Processos executados automaticamente:
1. Se vier Base64 faz base64.b64decode()
2. Se vier stream/BLOB com .read()
3. Se for string converte para bytes
4. Detecta se é zlib descompacta
5. Retorna texto ISO-8859-1
Retorna:
Texto descompactado (str)
"""
# Caso seja None ou vazio
if not data:
return "" return ""
# Caso seja um objeto tipo stream (ex: campo BLOB do Firebird) # 1. Se vier como stream (ex.: campo BLOB do Firebird)
# Campos BLOB geralmente possuem o método `.read()` para leitura de bytes. if hasattr(data, "read"):
if hasattr(vf_string, "read"): data = data.read()
vf_string = vf_string.read() # Lê o conteúdo completo do stream
# Garante que o valor trabalhado é uma sequência de bytes (não string) # 2. Se for string, pode ser Base64 → tentamos primeiro decodificar base64
# Se o dado já for texto (str), converte para bytes em codificação Latin-1, if isinstance(data, str):
# que é compatível com ISO-8859-1 usado por sistemas Delphi/Firebird.
if isinstance(vf_string, str):
vf_bytes = vf_string.encode("latin1", errors="ignore")
else:
vf_bytes = vf_string # Já está em bytes, então apenas reaproveita
# Detecta se o conteúdo foi compactado com zlib.
# A assinatura padrão do formato zlib começa com bytes: 0x78 0x9C ou 0x78 0xDA.
is_zlib = (
len(vf_bytes) > 2 and vf_bytes[0] == 0x78 and vf_bytes[1] in (0x9C, 0xDA)
)
# Se a detecção confirmar que o conteúdo é zlib, tenta descompactar.
if is_zlib:
try: try:
# Descompacta os bytes e decodifica o texto usando ISO-8859-1 (ANSI), # Tentamos decodificar Base64
# que preserva corretamente acentuação e caracteres especiais. data_bytes = base64.b64decode(data, validate=False)
text = zlib.decompress(vf_bytes).decode("iso-8859-1", errors="ignore") except Exception:
return text # Não era Base64, convertemos string para bytes ISO-8859-1
data_bytes = data.encode("latin1", errors="ignore")
else:
# Já são bytes
data_bytes = data
# Tentativa extra: bytes podem ter vindo em Base64 também
try:
data_bytes = base64.b64decode(data_bytes, validate=False)
except Exception: except Exception:
# Caso falhe (por dados corrompidos ou não comprimidos de fato),
# o fluxo continua normalmente sem interromper a execução.
pass pass
# Se não for zlib, tenta tratar o conteúdo como texto puro (não compactado) # 3. Detectar assinatura zlib (0x78 0x9C ou 0x78 0xDA)
try: is_zlib = (
# Decodifica os bytes diretamente de ISO-8859-1 (padrão usado pelo Delphi) len(data_bytes) > 2
return vf_bytes.decode("iso-8859-1", errors="ignore") and data_bytes[0] == 0x78
except Exception: and data_bytes[1] in (0x9C, 0xDA)
# Como fallback, converte para string bruta para evitar falhas. )
return str(vf_string)
if is_zlib:
try:
text = zlib.decompress(data_bytes).decode("iso-8859-1", errors="ignore")
return text
except Exception:
pass
# 4. Se não era zlib, tentamos decodificar direto
try:
return data_bytes.decode("iso-8859-1", errors="ignore")
except Exception:
return str(data)
# =====================================================================
# >>> NOVO MÉTODO <<<
@staticmethod @staticmethod
def compress(text, *, encoding: str = "iso-8859-1", as_base64: bool = True): def compress(text, *, encoding="iso-8859-1", as_base64=True):
""" """
Comprime texto/dados com zlib. Compacta texto/bytes usando zlib.
Parâmetros: Parâmetros:
text: str | bytes | stream (com .read()) text -> str | bytes | stream
encoding: encoding usado quando 'text' for str (padrão: ISO-8859-1) encoding -> usado ao converter str para bytes
as_base64: se True, retorna string Base64 do conteúdo comprimido; as_base64 -> retorna Base64 (True) ou bytes puros (False)
caso False, retorna bytes comprimidos.
Retorno: Retorno:
- bytes (zlib) quando as_base64=False bytes (quando as_base64=False)
- str (Base64) quando as_base64=True str Base64 (quando as_base64=True)
Observações:
- Use o mesmo 'encoding' ao descomprimir para simetria.
- Ideal para armazenar em BLOB ou trafegar seguro (Base64).
""" """
if text is None or text == "": if text is None or text == "":
return "" if as_base64 else b"" return "" if as_base64 else b""
# Se for stream (ex.: BLOB do Firebird) # Se for stream/BLOB
if hasattr(text, "read"): if hasattr(text, "read"):
raw = text.read() raw = text.read()
else: else:
@ -97,10 +105,10 @@ class String:
else: else:
raw_bytes = bytes(raw) raw_bytes = bytes(raw)
# Comprime com zlib # Compressão zlib
comp = zlib.compress(raw_bytes) comp = zlib.compress(raw_bytes)
# Opcional: codifica em Base64 para transporte/JSON # Retorna Base64 se solicitado
if as_base64: if as_base64:
return base64.b64encode(comp).decode("ascii") return base64.b64encode(comp).decode("ascii")

View file

@ -2,7 +2,7 @@ from typing import Optional, Dict, Any, List
from sqlalchemy import func from sqlalchemy import func
from fastapi import HTTPException, status from fastapi import HTTPException, status
from database.mysql import SessionLocal, get_database_settings from database.mysql import SessionLocal, get_database_settings
from actions.compress_decompress.compress_decompress import decompress from actions.compress_decompress.compress_decompress import DataCompressor
from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal
from packages.v1.administrativo.models.ato_parte_model import AtoParte from packages.v1.administrativo.models.ato_parte_model import AtoParte
from packages.v1.administrativo.models.ato_documento_model import AtoDocumento from packages.v1.administrativo.models.ato_documento_model import AtoDocumento
@ -232,8 +232,12 @@ class ShowAtosRepository:
atos_vinculados_list.append( atos_vinculados_list.append(
{ {
"inteiro_teor": ( "inteiro_teor": DataCompressor.decompress(
av.inteiro_teor.decode("utf-8") if av.inteiro_teor else None (
av.inteiro_teor.decode("utf-8")
if av.inteiro_teor
else None
)
), ),
"identificacao_pedido_na_cgj": av.identificacao_pedido_cgj, "identificacao_pedido_na_cgj": av.identificacao_pedido_cgj,
"tipo_de_ato": av.tipo_ato, "tipo_de_ato": av.tipo_ato,
@ -268,7 +272,7 @@ class ShowAtosRepository:
# === 5. Montar JSON final === # === 5. Montar JSON final ===
return { return {
"inteiro_teor": decompress( "inteiro_teor": DataCompressor.decompress(
(ato.inteiro_teor.decode("utf-8") if ato.inteiro_teor else None) (ato.inteiro_teor.decode("utf-8") if ato.inteiro_teor else None)
), ),
"identificacao_pedido_na_cgj": ato.identificacao_pedido_cgj, "identificacao_pedido_na_cgj": ato.identificacao_pedido_cgj,