Commit Inicial

This commit is contained in:
Keven 2025-12-02 10:37:13 -03:00
commit 145ebce336
101 changed files with 5313 additions and 0 deletions

View file

@ -0,0 +1,9 @@
# Normaliza finais de linha
* text=auto
# Força Python e arquivos de configuração a usarem LF
*.py text eol=lf
*.sh text eol=lf
*.yml text eol=lf
*.yaml text eol=lf
*.env text eol=lf

46
RCVinculaPartesLivroAntigo/.gitignore vendored Normal file
View file

@ -0,0 +1,46 @@
# Ambiente virtual
venv/
.env
.env.*
# Bytecode compilado
__pycache__/
*.py[cod]
*$py.class
# Arquivos temporários do sistema
.DS_Store
Thumbs.db
# Logs e databases locais
*.log
*.sqlite3
# VSCode
.vscode/
# PyCharm
.idea/
# Arquivos de testes ou builds
*.coverage
htmlcov/
coverage.xml
dist/
build/
.eggs/
*.egg-info/
# Cache do pip
pip-wheel-metadata/
*.egg
.cache/
.tox/
# Arquivo s de conexão
config/database/firebird.json
storage/temp
storage/temp.json
# Ignorar arquivos storage
storage/

View file

@ -0,0 +1,15 @@
;TipoBanco=Firebird ou Oracle
[Geral]
;Palmenlo
;BaseDados=202A294B585F62033E343434D1DE15140E1F362B406F
;São Miguel
;BaseDados=202A294B242E780832343734D2DE15140E1F362B406F
;Santa Rita
BaseDados=202A294B293F641D3A2D3634D2DE15140E1F362B406F64BABDB7B7BFA05BB0A2BDADB1BB7A
Usuario=232C2B363138
Senha=015D0D575858
Key=265C425D5F461C0E564F4E58B5BB4B41394C31413F0E18B2BB
Versao='C:\OriusDekstop\Sistemas'

View file

@ -0,0 +1,162 @@
{
"folders": [
{
"path": "S:/Web/RCCasamentoLvAntigo"
}
],
"settings": {
// ============================================================
// 🔧 GERAL
// ============================================================
"editor.minimap.enabled": false,
"editor.formatOnSave": true,
"editor.formatOnPaste": false,
"editor.formatOnType": false,
"files.trimTrailingWhitespace": true,
"files.autoSave": "onFocusChange",
"telemetry.telemetryLevel": "off",
"update.mode": "manual",
"workbench.startupEditor": "none",
"workbench.editor.enablePreview": false,
// ============================================================
// ⚡ PERFORMANCE — Ignorar lixo
// ============================================================
"files.watcherExclude": {
"**/__pycache__/**": true,
"**/.pytest_cache/**": true,
"**/.mypy_cache/**": true,
"**/.git/objects/**": true
},
"search.exclude": {
"**/__pycache__": true,
"**/.pytest_cache": true,
"**/.mypy_cache": true,
"**/.git": true
},
// ============================================================
// 🐍 PYTHON
// ============================================================
"python.defaultInterpreterPath": "S:/Web/RCCasamentoLvAntigo/venv/Scripts/python.exe",
"python.languageServer": "Pylance",
"python.analysis.autoImportCompletions": true,
"python.analysis.indexing": true,
"python.analysis.typeCheckingMode": "basic",
"python.analysis.useLibraryCodeForTypes": true,
// ============================================================
// 🧹 FORMATADOR (Black)
// ============================================================
"python.formatting.provider": "black",
"python.formatting.blackArgs": [
"--line-length",
"100"
],
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
},
// ============================================================
// 🔍 LINTING (Flake8)
// ============================================================
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.linting.flake8Args": [
"--max-line-length=100"
],
"python.linting.pylintEnabled": false,
// ============================================================
// 🧠 GIT
// ============================================================
"git.enabled": true,
"git.autorefresh": true,
"git.fetchOnPull": true,
"git.confirmSync": false,
"git.postCommitCommand": "sync",
"git.openDiffOnClick": true,
// ============================================================
// 🔍 GITLENS
// ============================================================
"gitlens.codeLens.enabled": false,
"gitlens.currentLine.enabled": false,
"gitlens.hovers.enabled": true,
"gitlens.defaultDateFormat": "DD/MM/YYYY HH:mm",
"gitlens.views.repositories.autoRefresh": true,
"gitlens.views.repositories.location": "scm",
// ============================================================
// 💻 TERMINAL — Perfis úteis
// ============================================================
"terminal.integrated.profiles.windows": {
"Python Shell": {
"path": "cmd.exe",
"args": [
"/k",
"cd S:\\Web\\RCCasamentoLvAntigo && venv\\Scripts\\activate"
]
},
"Run Script": {
"path": "cmd.exe",
"args": [
"/k",
"cd S:\\Web\\RCCasamentoLvAntigo && venv\\Scripts\\activate && python main.py"
]
},
"Git Bash": {
"path": "C:\\Program Files\\Git\\bin\\bash.exe"
}
},
"terminal.integrated.defaultProfile.windows": "Python Shell",
"terminal.integrated.scrollback": 10000,
"terminal.integrated.enablePersistentSessions": false,
// ============================================================
// 🗂️ FILTROS DE ARQUIVOS
// ============================================================
"files.exclude": {
"**/.DS_Store": true,
"**/*.log": true
},
// ============================================================
// 🚫 DESATIVAR COPILOT (opcional)
// ============================================================
"github.copilot.enable": {
"*": false
},
"github.copilot.inlineSuggest.enable": false,
"editor.inlineSuggest.enabled": false
},
// ==============================================================
// 🎯 DEBUG CONFIG — Python Standalone (não FastAPI)
// ==============================================================
"launch": {
"version": "0.2.0",
"configurations": [
{
"name": "Debug Python — main.py",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/main.py",
"console": "integratedTerminal",
"justMyCode": true,
"cwd": "${workspaceFolder}",
"env": {
"PYTHONPATH": "${workspaceFolder}"
}
}
]
},
// ==============================================================
// 📦 Extensões recomendadas
// ==============================================================
"extensions": {
"recommendations": [
"ms-python.python",
"ms-python.pylance",
"ms-python.black-formatter",
"ms-python.flake8",
"eamodio.gitlens",
"mhutchie.git-graph",
"donjayamanne.githistory",
"formulahendry.code-runner",
"streetsidesoftware.code-spell-checker",
"tamasfe.even-better-toml"
]
}
}

View file

@ -0,0 +1,124 @@
# VinculaPartes Ajuste de Vínculo de Noivos no SIRC
Aplicação em Python (empacotada em `.exe`) criada para **corrigir e vincular os dados dos noivos** entre as tabelas:
- `V_CASAMENTO`
- `V_PESSOA_VINCULO` (que referencia `V_PESSOA`)
Essa correção é necessária porque, **na tela do SIRC**, os dados são **validados com base nos registros de `V_PESSOA_VINCULO`**.
Sem esse vínculo correto, os dados dos noivos podem não aparecer ou não ser validados adequadamente.
---
## 1. Como a aplicação funciona
1. **Lê os dados dos noivos em `V_CASAMENTO`**
Para cada casamento, a aplicação busca as informações dos noivos já cadastradas nessa view/tabela.
2. **Verifica se a pessoa já existe em `V_PESSOA`**
- Se **já existir** cadastro da pessoa:
- A aplicação **apenas cria o vínculo** correto em `V_PESSOA_VINCULO`.
- Se **não existir**:
- A aplicação **cria um novo registro em `V_PESSOA`** com base nos dados de `V_CASAMENTO`;
- Em seguida, **cria o vínculo** desse registro recém-criado com o casamento em `V_PESSOA_VINCULO`.
3. **Resultado esperado**
- Todos os noivos presentes em `V_CASAMENTO` passam a ter:
- Um registro correspondente em `V_PESSOA` (quando necessário);
- Um vínculo correto em `V_PESSOA_VINCULO`, permitindo a **validação correta no SIRC**.
---
## 2. Dependência do `config.ini`
A aplicação **deve ser executada na mesma pasta em que está o arquivo `config.ini`**.
- O `config.ini` contém os **dados de acesso ao banco de dados**.
- A aplicação **reutiliza essas mesmas credenciais** para conectar no banco (host, porta, usuário, senha, caminho da base, etc.).
- Sem o `config.ini` correto na mesma pasta, a aplicação **não conseguirá conectar** ao banco.
> 🔹 Em resumo:
> **`VinculaPartes.exe` (ou `main.py`) e `config.ini` precisam estar lado a lado na mesma pasta.**
---
## 3. Execução
### 3.1. Pré-requisitos (modo desenvolvimento Python)
- Python instalado
- Biblioteca `fdb` instalada (`pip install fdb`)
- `fbclient.dll` acessível (na pasta do projeto ou em local conhecido)
- Arquivo `config.ini` configurado com os dados do banco
Para executar via Python:
```bash
python main.py
```
Certifique-se de que:
- `main.py`
- `config.ini`
- `fbclient.dll`
estão na mesma pasta (ou com caminhos corretamente configurados).
### 3.2. Execução do executável (`.exe`)
Após gerado o executável:
- Copie para a mesma pasta:
- `VinculaPartes.exe`
- `config.ini`
- `fbclient.dll` (se necessário)
- Dê duplo clique no `VinculaPartes.exe` **ou** execute pelo Prompt de Comando:
```bash
VinculaPartes.exe
```
---
## 4. Build do executável com PyInstaller
Para gerar o executável em modo one-file com ícone e dependências embutidas, utilize o comando:
```bash
pyinstaller --onefile --name "VinculaPartes" --icon="images/icon.ico" --hidden-import=fdb --add-binary "fbclient.dll;." main.py
```
**Descrição dos parâmetros principais:**
- `--onefile`
Gera um único arquivo executável.
- `--name "VinculaPartes"`
Define o nome do executável gerado.
- `--icon="images/icon.ico"`
Define o ícone do executável.
- `--hidden-import=fdb`
Garante que o PyInstaller inclua a biblioteca `fdb` no build.
- `--add-binary "fbclient.dll;."`
Inclui o `fbclient.dll` e o disponibiliza na mesma pasta do executável.
- `main.py`
Arquivo principal da aplicação.
Após a compilação, o executável será gerado na pasta `dist/`:
```text
dist/
└── VinculaPartes.exe
```
---
## 5. Resumo
- A aplicação **corrige o vínculo dos noivos** entre `V_CASAMENTO`, `V_PESSOA` e `V_PESSOA_VINCULO`.
- É **fundamental** para que os dados sejam **validados corretamente no SIRC**.
- **Depende do `config.ini`** para conectar ao banco de dados.
- Pode ser executada via Python (`main.py`) ou via executável (`VinculaPartes.exe`) gerado com PyInstaller.
> Qualquer alteração futura na estrutura das tabelas ou nas regras de negócio
> deve ser refletida na lógica da aplicação antes de nova execução em produção.

View file

@ -0,0 +1,39 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['main.py'],
pathex=[],
binaries=[('fbclient.dll', '.')],
datas=[],
hiddenimports=['fdb'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='VinculaPartes',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=['images\\icon.ico'],
)

View file

@ -0,0 +1,177 @@
from typing import Any, List, Optional, Literal, Union, overload
from types import SimpleNamespace
from datetime import datetime, date
from sqlalchemy import text
from sqlalchemy.engine import CursorResult
from sqlalchemy.exc import SQLAlchemyError
from actions.ui.ui import warn
from database.firebird import Firebird
class BaseRepositoryFirebird:
# ------------------------------------------------------------------
# Sobrecargas
# ------------------------------------------------------------------
@overload
def _execute(
self, sql: str, params: Optional[dict[str, Any]], fetch: Literal["all"]
) -> List[Any]: ...
@overload
def _execute(
self, sql: str, params: Optional[dict[str, Any]], fetch: Literal["one"]
) -> Optional[Any]: ...
@overload
def _execute(
self, sql: str, params: Optional[dict[str, Any]], fetch: Literal["none"]
) -> None: ...
@overload
def _execute(
self, sql: str, params: Optional[dict[str, Any]], fetch: Literal["result"]
) -> CursorResult[Any]: ...
# ------------------------------------------------------------------
# Sanitizador seguro de parâmetros (CORREÇÃO CRÍTICA)
# ------------------------------------------------------------------
def _sanitize_params(self, params: Optional[dict[str, Any]]) -> dict[str, Any]:
"""
Sanitiza parâmetros antes de enviar ao Firebird.
Trava datas inválidas (< 1900) e remove timezone para evitar overflow.
"""
if params is None:
return {}
safe: dict[str, Any] = {}
for key, value in params.items():
# Permite None normalmente
if value is None:
safe[key] = None
continue
# ---------------------------
# Tratamento de datetime
# ---------------------------
if isinstance(value, datetime):
# Firebird explode com datas muito antigas
if value.year < 1900:
warn(
f"⚠️ Data inválida detectada em '{key}': {value} "
f"(ano < 1900). Definido como NULL para evitar overflow."
)
safe[key] = None
continue
# Remove timezone se existir (evita timestamp negativo!)
if value.tzinfo is not None:
safe[key] = value.replace(tzinfo=None)
else:
safe[key] = value
continue
# ---------------------------
# Tratamento de date
# ---------------------------
if isinstance(value, date):
if value.year < 1900:
warn(
f"⚠️ Data de calendário inválida em '{key}': {value}. "
f"Convertido para NULL."
)
safe[key] = None
else:
safe[key] = value
continue
# Outros valores seguem direto
safe[key] = value
return safe
# ------------------------------------------------------------------
# Execução de SQL
# ------------------------------------------------------------------
def _execute(
self,
sql: str,
params: Optional[dict[str, Any]] = None,
fetch: Literal["all", "one", "none", "result"] = "result",
) -> Union[List[Any], Optional[Any], None, CursorResult[Any]]:
engine = Firebird.get_engine()
# 🔥 Sanitiza todos os parâmetros antes de enviar ao driver
safe_params = self._sanitize_params(params)
try:
with engine.begin() as conn:
result = conn.execute(text(sql), safe_params)
# Lê BLOBs com segurança
def _read_blob(value):
if hasattr(value, "read"):
try:
return value.read()
except Exception:
return b""
return value
# all
if fetch == "all":
rows = []
for row in result.mappings().all():
row_dict = {k.lower(): _read_blob(v) for k, v in row.items()}
rows.append(SimpleNamespace(**row_dict))
return rows
# one
elif fetch == "one":
row = result.mappings().first()
if row:
row_dict = {k.lower(): _read_blob(v) for k, v in row.items()}
return SimpleNamespace(**row_dict)
return None
# none
elif fetch == "none":
return None
# result
return result
except SQLAlchemyError as e:
warn("⚠️ [ERRO SQL]: execução falhou")
warn(f"SQL:\n{sql}")
warn(f"Parâmetros SANITIZADOS enviados ao banco:\n{safe_params}")
raise
# ------------------------------------------------------------------
# Métodos utilitários públicos
# ------------------------------------------------------------------
def query(
self, sql: str, params: Optional[dict[str, Any]] = None
) -> CursorResult[Any]:
return self._execute(sql, params, fetch="result")
def fetch_all(self, sql: str, params: Optional[dict[str, Any]] = None) -> List[Any]:
return self._execute(sql, params, fetch="all")
def fetch_one(
self, sql: str, params: Optional[dict[str, Any]] = None
) -> Optional[Any]:
return self._execute(sql, params, fetch="one")
def run(self, sql: str, params: Optional[dict[str, Any]] = None) -> None:
self._execute(sql, params, fetch="none")
def run_and_return(
self, sql: str, params: Optional[dict[str, Any]] = None
) -> Optional[Any]:
return self._execute(sql, params, fetch="one")

View file

@ -0,0 +1,91 @@
# Importa o módulo padrão do Python para ler arquivos INI (.ini)
import configparser
# Importa Path, que facilita o trabalho com caminhos de arquivos (mais seguro que strings)
from pathlib import Path
# Importa tipos para anotações — Dict (dicionário) e Any (qualquer tipo)
from typing import Dict, Any
# Define uma classe chamada ConfigIni
class ConfigIni:
@staticmethod
def read(path: str) -> Dict[str, Any]:
"""
um arquivo INI (ignorando comentários iniciados por ';')
e retorna um dicionário com suas seções e pares chave=valor.
Tenta múltiplas codificações comuns no Windows:
- utf-8
- utf-8-sig
- latin-1
- cp1252
"""
# Converte o caminho recebido (string) em um objeto Path (mais seguro e portável)
config_path = Path(path)
# Verifica se o arquivo realmente existe no caminho informado
if not config_path.exists():
# Caso não exista, lança uma exceção com uma mensagem explicativa
raise FileNotFoundError(f"Arquivo não encontrado: {path}")
# Lista de codificações a tentar, em ordem de preferência
encodings_to_try = ["utf-8", "utf-8-sig", "latin-1", "cp1252"]
last_error: Exception | None = None
config: configparser.ConfigParser | None = None
# Tenta ler o arquivo usando diferentes encodings
for enc in encodings_to_try:
try:
tmp_config = configparser.ConfigParser()
# Garante que as chaves mantenham o mesmo formato de maiúsculas/minúsculas
# Por padrão, o configparser converte tudo para minúsculas.
tmp_config.optionxform = str
# Tenta ler o arquivo com a codificação atual
tmp_config.read(config_path, encoding=enc)
# Se chegou aqui sem UnicodeDecodeError, consideramos que deu certo
config = tmp_config
break
except UnicodeDecodeError as e:
# Guarda o último erro para caso nenhuma codificação funcione
last_error = e
continue
# Se nenhuma codificação funcionou, levanta um erro mais amigável
if config is None:
msg = (
f"Não foi possível decodificar o arquivo INI '{path}' "
f"usando as codificações: {', '.join(encodings_to_try)}"
)
if last_error:
raise UnicodeDecodeError(
last_error.encoding or "utf-8",
last_error.object,
last_error.start,
last_error.end,
msg,
)
raise RuntimeError(msg)
# Cria um dicionário vazio que armazenará os dados lidos do INI
data: Dict[str, Dict[str, Any]] = {}
# Percorre cada seção do arquivo INI (exemplo: [Geral], [Banco], etc.)
for section in config.sections():
# Cria um dicionário interno para armazenar as chaves e valores dessa seção
data[section] = {}
# Percorre todas as chaves e valores da seção atual
for key, value in config.items(section):
# .strip() remove espaços no início/fim
# .strip("'\"") remove aspas simples ou duplas em volta do valor (se existirem)
data[section][key] = value.strip().strip("'\"")
# Retorna o dicionário completo contendo todas as seções e seus pares chave=valor
return data

View file

@ -0,0 +1,19 @@
from types import SimpleNamespace
from collections.abc import Mapping
class DictToObj:
def __new__(cls, x):
# dict (ou Mapping): vira SimpleNamespace com conversão recursiva
if isinstance(x, Mapping):
return SimpleNamespace(**{k: cls(v) for k, v in x.items()})
# listas: converte cada item
if isinstance(x, list):
return [cls(i) for i in x]
# (opcional) outras coleções comuns
if isinstance(x, tuple):
return tuple(cls(i) for i in x)
if isinstance(x, set):
return {cls(i) for i in x}
# primitivos: retornam como estão
return x

View file

@ -0,0 +1,24 @@
from actions.data.json_size import JsonSize
class EnsureBatchLimit:
# ------------------------------------------------------------------
# Função reutilizável: recebe o lote (array) e o item, respeita o limite
# Retorna (to_send, new_batch):
# - se couber: (None, batch_com_item)
# - se não couber: (batch_atual_para_envio, novo_batch_com_item)
# ------------------------------------------------------------------
def execute(self, batch: list, item, max_bytes: int):
batch_size = JsonSize(batch)
item_size = JsonSize(item)
if batch_size + item_size <= max_bytes:
batch.append(item)
return None, batch # nada a enviar agora
else:
# não coube: envia o lote atual e começa um novo com o item
to_send = batch[:] # cópia para envio
new_batch = [item]
return to_send, new_batch

View file

@ -0,0 +1,52 @@
from pydantic import BaseModel
from decimal import Decimal
def generate_insert_sql(table_name: str, data: BaseModel | dict) -> str:
"""
Gera um comando SQL INSERT seguro para Firebird.
- Aceita BaseModel (Pydantic) ou dict.
- Ignora *_ID None.
- Mantém colunas em MAIÚSCULAS e parâmetros em minúsculas (para bind funcionar).
"""
# Converte o model em dict limpo
if isinstance(data, BaseModel):
data_dict = data.model_dump(exclude_unset=True)
elif isinstance(data, dict):
data_dict = data
else:
raise TypeError("O parâmetro 'data' deve ser um BaseModel ou dict.")
columns = []
params = []
returning_fields = []
for key, value in data_dict.items():
column_name = key.upper()
# Converte Decimal → float
if isinstance(value, Decimal):
data_dict[key] = float(value)
# Campos válidos para retorno
if value is not None:
returning_fields.append(column_name)
# Coluna em maiúsculo, parâmetro em minúsculo
columns.append(column_name)
params.append(f":{key}")
columns_str = ", ".join(columns)
params_str = ", ".join(params)
returning_str = ", ".join(returning_fields) if returning_fields else "*"
sql = (
f"INSERT INTO {table_name} (\n"
f" {columns_str}\n"
f") VALUES (\n"
f" {params_str}\n"
f") RETURNING {returning_str};"
)
return sql

View file

@ -0,0 +1,12 @@
class IsBlank:
@staticmethod
def __new__(self, val) -> bool:
if val is None:
return True
if isinstance(val, (str, bytes)):
return len(val.strip()) == 0
return False

View file

@ -0,0 +1,33 @@
import json
from types import SimpleNamespace
class JsonSize:
@staticmethod
def _safe_default(o):
from datetime import datetime, date
from decimal import Decimal
if isinstance(o, (datetime, date)):
return o.isoformat()
if isinstance(o, Decimal):
return float(o)
if isinstance(o, SimpleNamespace):
return vars(o)
if hasattr(o, "__dict__"):
return vars(o)
return str(o)
def __new__(cls, obj):
try:
# 💡 ensure_ascii=False mantém acentos; surrogatepass evita falha com caracteres inválidos
json_str = json.dumps(
obj,
ensure_ascii=False,
default=cls._safe_default,
)
# 💡 errors='ignore' ou 'replace' evita que .encode() quebre
return len(json_str.encode("utf-8", errors="ignore"))
except Exception:
# fallback de segurança (nunca deve quebrar)
return len(str(obj).encode("utf-8", errors="ignore"))

View file

@ -0,0 +1,34 @@
from types import SimpleNamespace
from collections.abc import Mapping
from decimal import Decimal
from datetime import datetime, date
class ObjToDict:
def __new__(cls, x):
# SimpleNamespace -> dict (recursivo)
if isinstance(x, SimpleNamespace):
return {k: cls(v) for k, v in vars(x).items()}
# dict / Mapping -> dict (recursivo)
if isinstance(x, Mapping):
return {k: cls(v) for k, v in x.items()}
# listas/tuplas/conjuntos -> preserva tipo
if isinstance(x, list):
return [cls(i) for i in x]
if isinstance(x, tuple):
return tuple(cls(i) for i in x)
if isinstance(x, set):
return {cls(i) for i in x}
# Decimal -> float
if isinstance(x, Decimal):
return float(x)
# datetime / date -> string ISO (compatível com JSON)
if isinstance(x, (datetime, date)):
return x.isoformat()
# Outros tipos -> mantém
return x

View file

@ -0,0 +1,20 @@
from datetime import datetime
class FileNameGenerator:
"""
Gera nomes de arquivos únicos com base na data e hora atuais.
Exemplo: relatorio_2025-11-11_17-43-22.txt
"""
def __init__(
self,
prefix: str = "arquivo",
):
self.prefix = prefix
def generate(self, prefix="arquivo") -> str:
"""Gera o nome completo do arquivo com base na data e hora."""
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"{prefix}_{timestamp}"
return str(filename)

View file

@ -0,0 +1,88 @@
import platform
from typing import Dict, Tuple
from actions.env.mirror_sync_env import MirrorSyncEnv
from actions.ui.ui import warn, fail
from packages.v1.parametros.repositories.g_config.g_config_show_by_nome_repository import (
GConfigShowByNomeRepository,
)
from packages.v1.parametros.schemas.g_config_schema import GConfigNomeSchema
class InputBaseResolver:
"""
Resolve dinamicamente caminhos base do GED sem precisar passar nada externo.
Apenas:
get_input_base(nome_variavel, sistema_id)
Tudo o resto carregar MirrorSyncEnv, decrypt, buscar LOCAL_IMAGEM
é feito internamente.
"""
def __init__(self):
# Carrega ENV automaticamente
self.env = MirrorSyncEnv().as_object()
# GConfig carregado internamente
self.g_config_repo = GConfigShowByNomeRepository()
# Cache por chave (env_var_name, sistema_id)
self._cache: Dict[Tuple[str, int], str] = {}
# -------------------------------------------------------
def get_input_base(self, env_var_name: str, sistema_id: int) -> str:
"""
Obtém dinamicamente o caminho do GED:
- Se Windows + ged_local=True retorna variável do .env
- Se Windows + ged_local=False busca LOCAL_IMAGEM no GConfig
- Se Linux sempre retorna variável do .env
Parâmetros:
env_var_name: nome da variável de ambiente (ex.: "ged_tabelionato")
sistema_id: ID do sistema (ex.: 2)
"""
cache_key = (env_var_name, sistema_id)
if cache_key in self._cache:
return self._cache[cache_key]
resolved = self._resolve_path(env_var_name, sistema_id)
self._cache[cache_key] = resolved or ""
return resolved
# -------------------------------------------------------
def _resolve_path(self, env_var_name: str, sistema_id: int) -> str:
system = platform.system()
ged_local = str(getattr(self.env, "ged_local", "false")).lower() == "true"
dynamic_path = getattr(self.env, env_var_name, "")
# ---------------- WINDOWS ----------------
if system == "Windows":
if ged_local:
# Usa o caminho vindo do .env (dinâmico)
return dynamic_path
# Busca LOCAL_IMAGEM no GConfig
return self._resolve_from_gconfig(sistema_id)
# ---------------- LINUX ------------------
if system == "Linux":
return dynamic_path
# ---------------- OUTROS -----------------
warn(f"Sistema operacional não suportado: {system}")
return ""
# -------------------------------------------------------
def _resolve_from_gconfig(self, sistema_id: int) -> str:
"""Busca LOCAL_IMAGEM no GConfig automaticamente."""
try:
result = self.g_config_repo.execute(
GConfigNomeSchema(nome="LOCAL_IMAGEM", sistema_id=sistema_id)
)
return result.valor
except Exception as e:
fail(f"Erro no GConfig (sistema_id={sistema_id}): {e}")
return ""

View file

@ -0,0 +1,69 @@
import json
from json import JSONDecodeError
from types import SimpleNamespace
from pathlib import Path
from typing import Any, Optional, Union
class JsonFileLoader:
"""
Carrega arquivos JSON e retorna como objeto (SimpleNamespace) ou dict.
Retorna None quando não houver arquivo ou dados.
"""
def __init__(self, file_path: Union[str, Path], encoding: str = "utf-8"):
self.path = Path(file_path)
self.encoding = encoding
def load(
self, as_namespace: bool = True, empty_as_none: bool = True
) -> Optional[Any]:
"""
o JSON do disco.
- as_namespace=True: retorna SimpleNamespace (com conversão recursiva).
- as_namespace=False: retorna dict/list nativos.
- empty_as_none=True: {}, [] ou None são tratados como "sem dados" e retornam None.
"""
# 1) Arquivo inexistente ou não regular
if not self.path.exists() or not self.path.is_file():
return None
# 2) Arquivo vazio
if self.path.stat().st_size == 0:
return None
# 3) Leitura + limpeza
try:
with self.path.open("r", encoding=self.encoding) as f:
raw = f.read()
except OSError:
return None
raw = (raw or "").strip()
if not raw:
return None
# 4) Parse do JSON
try:
data = json.loads(raw)
except JSONDecodeError:
return None
# 5) Tratar dados vazios
if empty_as_none and (data is None or data == {} or data == []):
return None
# 6) Converter (opcional) para SimpleNamespace
return self._to_namespace(data) if as_namespace else data
# ---------- helpers ----------
@classmethod
def _to_namespace(cls, value: Any) -> Any:
"""Converte recursivamente dict/list em SimpleNamespace/list."""
if isinstance(value, dict):
return SimpleNamespace(
**{k: cls._to_namespace(v) for k, v in value.items()}
)
if isinstance(value, list):
return [cls._to_namespace(v) for v in value]
return value

View file

@ -0,0 +1,180 @@
import json
import traceback
from pathlib import Path
from types import SimpleNamespace
from typing import Any, Optional, Literal, Union
from pydantic import BaseModel
from actions.file.json_file_saver import JsonFileSaver
class JsonFileMerger:
"""
Classe utilitária para unir um novo JSON com um existente em disco,
aplicando merge profundo e salvando o resultado final.
Mantém o mesmo padrão seguro de conversão da classe JsonFileSaver.
"""
def __init__(self, saver: Optional[JsonFileSaver] = None):
self.saver = saver or JsonFileSaver()
# ------------------------------------------------------------
# Carrega JSON existente (se houver)
# ------------------------------------------------------------
def _load_existing(self, file_path: Path) -> Any:
if not file_path.exists():
return {}
try:
with open(file_path, "r", encoding=self.saver.encoding) as f:
return json.load(f)
except Exception:
# Em caso de erro, considera como vazio para não quebrar o fluxo
return {}
# ------------------------------------------------------------
# Merge de listas (não duplica itens)
# ------------------------------------------------------------
def _merge_lists(self, base_list: list, new_list: list) -> list:
combined = base_list[:]
for item in new_list:
if item not in combined:
combined.append(item)
return combined
# ------------------------------------------------------------
# Merge profundo (deep merge) para dicionários
# ------------------------------------------------------------
def _deep_merge(self, base: dict, new: dict) -> dict:
for key, value in new.items():
if key in base:
# dict + dict → merge recursivo
if isinstance(base[key], dict) and isinstance(value, dict):
base[key] = self._deep_merge(base[key], value)
# list + list → merge de listas
elif isinstance(base[key], list) and isinstance(value, list):
base[key] = self._merge_lists(base[key], value)
# tipos diferentes ou simples → substitui
else:
base[key] = value
else:
base[key] = value
return base
# ------------------------------------------------------------
# Estratégias de merge
# ------------------------------------------------------------
def _apply_strategy(self, existing: Any, new: Any, strategy: str) -> Any:
# Se ambos forem listas, usa merge de listas
if isinstance(existing, list) and isinstance(new, list):
return self._merge_lists(existing, new)
# Se tipos forem diferentes, substitui completamente
if type(existing) is not type(new):
return new
# replace → ignora o que existia
if strategy == "replace":
return new
# update → comportamento similar a dict.update (apenas para dicts)
if strategy == "update":
if isinstance(existing, dict) and isinstance(new, dict):
existing.update(new)
return existing
# para outros tipos, apenas substitui
return new
# default / "deep" → deep merge para dicts
if strategy == "deep":
if isinstance(existing, dict) and isinstance(new, dict):
return self._deep_merge(existing, new)
# se não forem dicts, substitui
return new
# fallback: se for estratégia desconhecida, substitui
return new
# ------------------------------------------------------------
# NOVO MÉTODO — merge sem salvar (apenas processa)
# ------------------------------------------------------------
def merge_data(
self,
existing_json: Any,
new_json: Union[dict, BaseModel, SimpleNamespace],
strategy: Literal["replace", "update", "deep"] = "deep",
) -> Any:
new_clean = self.saver._convert(new_json)
return self._apply_strategy(existing_json, new_clean, strategy)
# ------------------------------------------------------------
# MÉTODO PRINCIPAL — AGORA USANDO merge_data()
# ------------------------------------------------------------
def merge_and_save(
self,
new_json: Union[dict, BaseModel, SimpleNamespace],
file_path: str,
strategy: Literal["replace", "update", "deep"] = "deep",
add_timestamp: bool = False,
) -> dict:
"""
Faz merge entre existing.json new_json e salva o resultado final.
Parâmetros:
new_json:
- Dado novo que será mesclado ao JSON existente.
Pode ser dict, BaseModel ou SimpleNamespace.
file_path:
- Caminho do arquivo JSON em disco.
strategy:
- "replace" sobrescreve tudo
- "update" comportamento semelhante a dict.update
- "deep" merge recursivo (deep merge) para dicts
e merge de listas sem duplicação
add_timestamp:
- Se True, adiciona timestamp no nome do arquivo ao salvar.
Retorno:
dict com:
{
"success": bool,
"path": str,
"size": int,
"error": Optional[str],
}
"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
# Carrega JSON existente
existing = self._load_existing(path)
# Novo merge utilizando merge_data()
merged = self.merge_data(existing, new_json, strategy)
# Salva em disco
return self.saver.save(
data=merged,
file_path=str(path),
as_json=True,
add_timestamp=add_timestamp,
mode="overwrite",
)
except Exception as e:
traceback.print_exc()
return {
"success": False,
"path": file_path,
"size": 0,
"error": str(e),
}

View file

@ -0,0 +1,133 @@
import json
import traceback
from datetime import datetime, date
from decimal import Decimal
from types import SimpleNamespace
from pathlib import Path
from typing import Any, Optional, Literal, Union
from pydantic import BaseModel
# ✔️ agora usando sua UI
from actions.ui.ui import ok, fail
class JsonFileSaver:
"""
Classe utilitária para salvar qualquer tipo de dado em disco,
automaticamente convertido para JSON se aplicável.
Suporta: dict, list, str, bytes, BaseModel, SimpleNamespace, Decimal, datetime, etc.
"""
def __init__(
self,
encoding: str = "utf-8",
indent: int = 4,
default_suffix: str = ".json",
):
self.encoding = encoding
self.indent = indent
self.default_suffix = default_suffix
# ------------------------------------------------------------
# Método público principal
# ------------------------------------------------------------
def save(
self,
data: Union[str, bytes, dict, list, Any],
file_path: str,
as_json: Optional[bool] = None,
add_timestamp: bool = False,
mode: Literal["overwrite", "append"] = "overwrite",
) -> dict:
"""
Salva o conteúdo em disco com serialização segura.
"""
try:
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
# Adiciona timestamp no nome, se solicitado
if add_timestamp:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
stem = path.stem
suffix = path.suffix or self.default_suffix
path = path.with_name(f"{stem}_{timestamp}{suffix}")
# Detecta automaticamente se é JSON
if as_json is None:
as_json = isinstance(data, (dict, list, SimpleNamespace, BaseModel))
# Define o modo de escrita
write_mode = (
"wb" if isinstance(data, bytes) else "a" if mode == "append" else "w"
)
# Salva arquivo
if isinstance(data, bytes):
with open(path, write_mode) as f:
f.write(data)
elif as_json:
safe_data = self._convert(data)
with open(path, write_mode, encoding=self.encoding) as f:
json.dump(safe_data, f, ensure_ascii=False, indent=self.indent)
else:
if not isinstance(data, str):
data = str(data)
with open(path, write_mode, encoding=self.encoding) as f:
f.write(data)
file_size = path.stat().st_size if path.exists() else 0
# ✔️ substitui print() por UI
ok(f"Arquivo salvo: {path} ({file_size} bytes)")
return {
"success": True,
"path": str(path.resolve()),
"size": file_size,
"error": None,
}
except Exception as e:
error_message = (
f"Falha ao salvar arquivo '{file_path}': {e}\n"
f"{traceback.format_exc()}"
)
# ✔️ substitui print de erro por UI
fail(error_message)
return {
"success": False,
"path": str(file_path),
"size": 0,
"error": str(e),
}
# ------------------------------------------------------------
# Conversor recursivo interno
# ------------------------------------------------------------
def _convert(self, obj: Any) -> Any:
"""Converte qualquer tipo para formato serializável JSON."""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="ignore")
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, SimpleNamespace):
return {k: self._convert(v) for k, v in vars(obj).items()}
if isinstance(obj, BaseModel):
return obj.model_dump()
if isinstance(obj, list):
return [self._convert(i) for i in obj]
if isinstance(obj, dict):
return {k: self._convert(v) for k, v in obj.items()}
return str(obj)

View file

@ -0,0 +1,67 @@
import json
import threading
from pathlib import Path
from typing import Dict, List, Any
class JsonArrayPersistentStorage:
"""
Classe para gravar e atualizar um JSON de forma persistente,
com MERGE profundo e proteção contra escrita paralela.
"""
def __init__(self, caminho_arquivo: Path):
self.caminho = caminho_arquivo
self.lock = threading.Lock() # 🔒 proteção de escrita
self.caminho.parent.mkdir(parents=True, exist_ok=True)
self._cache = self._load_existing()
# ------------------------------------------------------------------
def _load_existing(self) -> Dict[str, Any]:
if not self.caminho.exists():
return {}
try:
with open(self.caminho, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
# ------------------------------------------------------------------
def _deep_merge(self, original: Any, novo: Any) -> Any:
if isinstance(original, dict) and isinstance(novo, dict):
for k, v in novo.items():
if k not in original:
original[k] = v
else:
original[k] = self._deep_merge(original[k], v)
return original
if isinstance(original, list) and isinstance(novo, list):
for item in novo:
if item not in original:
original.append(item)
return original
return novo
# ------------------------------------------------------------------
def _save(self):
with open(self.caminho, "w", encoding="utf-8") as f:
json.dump(self._cache, f, indent=4, ensure_ascii=False)
# ------------------------------------------------------------------
# MÉTODO QUE RESOLVE O ERRO - AGORA COM LOCK
# ------------------------------------------------------------------
def add(self, novos_dados: Dict[str, List[dict]]):
"""
Adiciona novos dados com merge profundo.
Protegido por lock para evitar race condition.
"""
with self.lock: # 🔒 trava escrita e merge até finalizar
self._cache = self._deep_merge(self._cache, novos_dados)
self._save()
# ------------------------------------------------------------------
def get_all(self) -> Dict[str, Any]:
return self._cache

View file

@ -0,0 +1,300 @@
from pathlib import Path
from typing import Optional, Iterable, Dict, Any, List, TypedDict
from actions.string.file_name_generator import FileNameGenerator
from actions.ui.ui import ok, warn, fail
# -------------------------------------------------------------------
# TRATATIVA DE ERRO SMB — WinError 64 (Rede indisponível)
# -------------------------------------------------------------------
def _exists_safe(path: Path, verbose: bool = True) -> bool:
"""
Verifica se o path existe sem explodir WinError 64 (rede indisponível).
"""
try:
return path.exists()
except OSError as e:
if getattr(e, "winerror", None) == 64:
if verbose:
fail(f"⚠️ Rede indisponível ao acessar: {path}")
return False
raise
# -------------------------------------------------------------------
# VERIFICAÇÃO REAL DE ACESSO AO ARQUIVO
# -------------------------------------------------------------------
def can_open_file(path: Path, verbose: bool = True) -> bool:
"""
Garante que o arquivo pode ser aberto diferentemente de .exists().
Captura erros reais de SMB/Permissão.
"""
try:
with open(path, "rb"):
return True
except PermissionError:
if verbose:
fail(f"❌ Permissão negada ao tentar abrir: {path}")
return False
except FileNotFoundError:
if verbose:
fail(f"❌ Arquivo não encontrado no momento da abertura: {path}")
return False
except OSError as e:
if verbose:
fail(f"⚠️ Erro ao abrir arquivo ({e}) → {path}")
return False
# ---------------------------------------------------------------
# TABELAS DE TIPOS
# ---------------------------------------------------------------
class ArquivoLocalizado(TypedDict):
caminho_arquivo: str
tipo_documento: str
nome_documento: str
TIPOS_PADRAO: Dict[str, Dict[str, str]] = {
"Ato": {"base": "Ato", "prefixo": "A_", "tipo_documento": "ATO_LAVRADO"},
"Cartao": {
"base": "Cartao",
"prefixo": "C_",
"tipo_documento": "DOCUMENTO_PESSOAL",
},
"Biometria": {
"base": "Biometria",
"prefixo": "Biometria_",
"tipo_documento": "DOCUMENTO_PESSOAL",
},
"Procuracao": {
"base": "Procuracao",
"prefixo": "P_",
"tipo_documento": "PROCURACAO",
},
"Documento": {
"base": "Documento",
"prefixo": "D_",
"tipo_documento": "DOCUMENTO_PESSOAL",
},
"Registro": {"base": "Registro", "prefixo": "R_", "tipo_documento": "MATRICULA"},
"Contrato": {"base": "Contrato", "prefixo": "T_", "tipo_documento": "CONTRATO"},
"Dut": {"base": "Dut", "prefixo": "U_", "tipo_documento": "OUTROS"},
"RegistroImoveis": {"base": "", "prefixo": "R_", "tipo_documento": "MATRICULA"},
}
PREFIXOS_RC: Dict[str, List[str]] = {
"nascimento": ["L_A_", "C_A_"],
"casamento": ["L_B_", "C_B_"],
"obito": ["L_C_", "C_C_"],
"estrangeiro": ["L_E_", "C_E_"],
"averbacao": ["R_A_"],
}
EXTS: tuple[str, ...] = (".spd", ".tif", ".tiff")
# ---------------------------------------------------------------
# FUNÇÕES AUXILIARES
# ---------------------------------------------------------------
def _nomes_possiveis(
prefixos: Iterable[str],
id6: str,
sufixos_extras: Optional[Iterable[str]] = None,
) -> List[str]:
simbolos = ("#", "_", "")
sufixos = list(sufixos_extras) if sufixos_extras is not None else [""]
nomes: List[str] = []
for prefixo in prefixos:
for extra in sufixos:
for simbolo in simbolos:
for ext in EXTS:
nomes.append(f"{prefixo}{id6}{extra}{simbolo}{ext}")
return nomes
def _calcular_pasta(arquivo_id: int) -> str:
if arquivo_id < 1000:
return f"{arquivo_id:03d}"
return str(arquivo_id)[:3]
def _gerar_nome_documento(caminho: Path, tipo_documento: str) -> str:
base_sem_ext = caminho.stem
return FileNameGenerator.generate(base_sem_ext)
def _montar_retorno(caminho: Path, tipo_documento: str) -> ArquivoLocalizado:
return ArquivoLocalizado(
caminho_arquivo=str(caminho),
tipo_documento=tipo_documento,
nome_documento=_gerar_nome_documento(caminho, tipo_documento),
)
# ---------------------------------------------------------------
# FUNÇÃO PRINCIPAL — agora com verificação REAL de acesso SMB
# ---------------------------------------------------------------
def localizar_arquivo_por_id(
arquivo_id: Optional[int | float],
pasta_origem: str,
natureza: Optional[str] = None,
tipos_permitidos: Optional[Iterable[str]] = None,
registro: Optional[str] = None,
livro: Optional[str] = None,
registro_anterior: Optional[str] = None,
verbose: bool = True,
) -> Optional[Dict[str, Any]]:
if arquivo_id is None:
if verbose:
warn("ID inválido (None) — Ignorando busca de arquivo.")
return None
try:
arquivo_id_int = int(arquivo_id)
except (TypeError, ValueError):
if verbose:
warn(f"ID inválido ({arquivo_id}) — Ignorando busca de arquivo.")
return None
id6 = f"{arquivo_id_int:06d}"
ddd = _calcular_pasta(arquivo_id_int)
base_path = Path(pasta_origem)
# -----------------------------------------------------------
# REGISTRO CIVIL
# -----------------------------------------------------------
if natureza in PREFIXOS_RC:
prefixos = PREFIXOS_RC[natureza]
pasta_ddd = base_path / ddd
if not _exists_safe(pasta_ddd, verbose):
return None
for nome in _nomes_possiveis(prefixos, id6):
caminho_path = pasta_ddd / nome
if _exists_safe(caminho_path, verbose) and can_open_file(
caminho_path, verbose
):
if verbose:
ok(
f"[RC/{natureza.upper()}] Arquivo encontrado e acessível: {caminho_path}"
)
return _montar_retorno(caminho_path, "CERTIDAO")
if verbose:
fail(
f"[RC/{natureza.upper()}] Nenhum arquivo acessível para ID {arquivo_id_int}"
)
return None
# -----------------------------------------------------------
# PROTESTO
# -----------------------------------------------------------
if natureza == "protesto":
sufixos_protesto = ("", "I")
prefixos = ("PRO_",)
for subpasta in ("", "Retirados"):
pasta = base_path / ddd if not subpasta else base_path / subpasta / ddd
if not _exists_safe(pasta, verbose):
continue
for nome in _nomes_possiveis(prefixos, id6, sufixos_protesto):
caminho_path = pasta / nome
if _exists_safe(caminho_path, verbose) and can_open_file(
caminho_path, verbose
):
if verbose:
ok(f"[PROTESTO] Arquivo encontrado e acessível: {caminho_path}")
return _montar_retorno(caminho_path, "OUTROS")
if verbose:
fail(f"[PROTESTO] Nenhum arquivo acessível para ID {arquivo_id_int}")
return None
# -----------------------------------------------------------
# RCPJ / RTD
# -----------------------------------------------------------
if registro in ("RCPJ", "RTD"):
pasta_ddd = base_path / ddd
if not _exists_safe(pasta_ddd, verbose):
return None
prefixo_base = f"{registro}_{(livro or 'A')}_"
sufixo = (
registro_anterior
if (registro == "RCPJ" and registro_anterior in ("N", "S", "U"))
else ""
)
for nome in _nomes_possiveis((prefixo_base,), id6, (sufixo,)):
caminho_path = pasta_ddd / nome
if _exists_safe(caminho_path, verbose) and can_open_file(
caminho_path, verbose
):
tipo_doc = "ESCRITURA" if registro == "RCPJ" else "OUTROS"
if verbose:
ok(f"[{registro}] Arquivo encontrado e acessível: {caminho_path}")
return _montar_retorno(caminho_path, tipo_doc)
if verbose:
fail(f"[{registro}] Nenhum arquivo acessível para ID {arquivo_id_int}")
return None
# -----------------------------------------------------------
# PADRÕES / REGISTRO IMÓVEIS
# -----------------------------------------------------------
tipos = list(tipos_permitidos) if tipos_permitidos else list(TIPOS_PADRAO.keys())
for tipo in tipos:
info_tipo = TIPOS_PADRAO.get(tipo)
if not info_tipo:
continue
tipo_doc = info_tipo["tipo_documento"]
# REGISTRO DE IMÓVEIS
if tipo == "RegistroImoveis":
for ri_dir in (p for p in base_path.glob("R_*") if p.is_dir()):
for nome in _nomes_possiveis((info_tipo["prefixo"],), id6):
for achado in ri_dir.rglob(nome):
if _exists_safe(achado, verbose) and can_open_file(
achado, verbose
):
if verbose:
ok(f"[RI] Arquivo encontrado e acessível: {achado}")
return _montar_retorno(achado, tipo_doc)
# TIPOS NORMAIS
base_tipo = info_tipo["base"]
pasta_tipo = base_path / base_tipo if base_tipo else base_path
pasta_ddd = pasta_tipo / ddd
if not _exists_safe(pasta_ddd, verbose):
continue
for nome in _nomes_possiveis((info_tipo["prefixo"],), id6):
caminho_path = pasta_ddd / nome
if _exists_safe(caminho_path, verbose) and can_open_file(
caminho_path, verbose
):
if verbose:
ok(f"Arquivo encontrado e acessível ({tipo}): {caminho_path}")
return _montar_retorno(caminho_path, tipo_doc)
if verbose:
fail(f"Nenhum arquivo acessível para ID {arquivo_id_int} em {pasta_origem}")
return None

View file

@ -0,0 +1,101 @@
import json
import traceback
from datetime import datetime, date
from decimal import Decimal
from types import SimpleNamespace
from pathlib import Path
from typing import Any, Optional, Literal, Union
from pydantic import BaseModel
def save_to_disk(
data: Union[str, bytes, dict, list, Any],
file_path: str,
as_json: Optional[bool] = None,
encoding: str = "utf-8",
indent: int = 4,
add_timestamp: bool = False,
mode: Literal["overwrite", "append"] = "overwrite",
) -> dict:
"""
Salva dados em disco local de forma segura, com tratamento automático
para objetos não serializáveis (ex.: SimpleNamespace, Decimal, datetime, BaseModel).
"""
def convert(obj: Any):
"""Converte qualquer objeto para formato serializável em JSON."""
if obj is None:
return None
if isinstance(obj, (str, int, float, bool)):
return obj
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="ignore")
if isinstance(obj, Decimal):
return float(obj)
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, SimpleNamespace):
return {k: convert(v) for k, v in vars(obj).items()}
if isinstance(obj, BaseModel):
return obj.model_dump()
if isinstance(obj, list):
return [convert(i) for i in obj]
if isinstance(obj, dict):
return {k: convert(v) for k, v in obj.items()}
# fallback final — stringifica o objeto
return str(obj)
try:
path = Path(file_path)
# Garante que o diretório existe
path.parent.mkdir(parents=True, exist_ok=True)
# Adiciona timestamp ao nome se necessário
if add_timestamp:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
stem = path.stem
suffix = path.suffix or ".json"
path = path.with_name(f"{stem}_{timestamp}{suffix}")
# Detecta automaticamente se deve salvar como JSON
if as_json is None:
as_json = isinstance(data, (dict, list, SimpleNamespace, BaseModel))
write_mode = (
"wb" if isinstance(data, bytes) else "a" if mode == "append" else "w"
)
if isinstance(data, bytes):
with open(path, write_mode) as f:
f.write(data)
elif as_json:
# Converte antes de salvar
safe_data = convert(data)
with open(path, write_mode, encoding=encoding) as f:
json.dump(safe_data, f, ensure_ascii=False, indent=indent)
else:
if not isinstance(data, str):
data = str(data)
with open(path, write_mode, encoding=encoding) as f:
f.write(data)
file_size = path.stat().st_size if path.exists() else 0
print(f"[OK] Arquivo salvo: {path} ({file_size} bytes)")
return {
"success": True,
"path": str(path.resolve()),
"size": file_size,
"error": None,
}
except Exception as e:
error_message = f"[ERRO] Falha ao salvar arquivo '{file_path}': {e}\n{traceback.format_exc()}"
print(error_message)
return {
"success": False,
"path": str(file_path),
"size": 0,
"error": str(e),
}

View file

@ -0,0 +1,218 @@
from PIL import Image, UnidentifiedImageError
from pathlib import Path
from pypdf import PdfReader, PdfWriter
from reportlab.pdfgen import canvas
from io import BytesIO
from pypdf.constants import UserAccessPermissions
import base64
from typing import Optional
from actions.ui.ui import info, ok, fail, status, progress
# Alias para manter compatibilidade e usar "error" na interface
def error(message: str):
fail(message)
class TifToPdfConverter:
"""
Converte um arquivo TIFF/SPD específico em PDF protegido e confidencial.
Totalmente em memória, com marca d'água e feedback visual via UI.
"""
def __init__(
self, input_file: str, output_folder: Optional[str] = "storage/output"
):
self.input_file = Path(input_file)
if not self.input_file.exists():
raise FileNotFoundError(f"Arquivo não encontrado: {self.input_file}")
# Pasta de saída padrão
self.output_folder = Path(output_folder)
self.output_folder.mkdir(parents=True, exist_ok=True)
self.watermark_text = "DISPONÍVEL APENAS PARA ATIVIDADE CORRECIONAL ONLINE"
self.confidential_metadata = {
"/Title": "Documento Confidencial",
"/Author": "Sistema MirrorSYNC",
"/Subject": "Documento protegido",
"/Keywords": "Confidencial, Corregedoria, MirrorSYNC, Orius",
"/Producer": "Orius Tecnologia",
"/Creator": "MirrorSYNC Automação",
}
# =========================================================
# Helper 1 — Converte imagem em PDF (BytesIO)
# =========================================================
def _convert_image_to_pdf(self) -> Optional[BytesIO]:
try:
with Image.open(self.input_file) as img:
frames = []
# Converte todas as páginas do TIFF para RGB
for frame in range(getattr(img, "n_frames", 1)):
img.seek(frame)
frame_rgb = img.convert("RGB")
frames.append(frame_rgb.copy())
if not frames:
info(f"Nenhuma página encontrada em {self.input_file}")
return None
# Buffer em memória
buffer = BytesIO()
# Salva todas as páginas como um único PDF
frames[0].save(
buffer,
format="PDF",
resolution=150,
save_all=True,
append_images=frames[1:],
quality=80,
optimize=True,
)
buffer.seek(0)
ok(
f"Imagem convertida para PDF em memória "
f"({len(frames)} página(s))."
)
return buffer
except UnidentifiedImageError:
error(f"Arquivo não reconhecido como imagem: {self.input_file}")
except Exception as e:
error(f"Erro ao converter {self.input_file}: {e}")
return None
# =========================================================
# Helper 2 — Gera PDF de marca dágua
# =========================================================
def _create_watermark_pdf(self, page_width: float, page_height: float) -> BytesIO:
buffer = BytesIO()
c = canvas.Canvas(buffer, pagesize=(page_width, page_height))
c.saveState()
# Fonte adaptativa (limites 24..72)
base_font = max(24, min(72, max(page_width, page_height) * 0.04))
c.setFont("Helvetica-Bold", base_font)
# Cor vermelha com transparência (~30%)
try:
c.setFillColorRGB(1, 0, 0) # vermelho
c.setFillAlpha(0.3)
except Exception:
# Fallback sem alpha (ainda vermelho)
c.setFillColorRGB(1, 0, 0)
# Centro e rotação para deixar na diagonal
x_center = page_width / 2.0
y_center = page_height / 2.0
c.translate(x_center, y_center)
c.rotate(45) # ângulo da marca d'água (diagonal)
# Linhas igualmente espaçadas ao longo do eixo Y rotacionado
span = max(page_width, page_height) * 0.4
y_positions = [-span, -span * 0.5, 0, span * 0.5, span]
for y in y_positions:
c.drawCentredString(0, y, self.watermark_text)
c.restoreState()
c.save()
buffer.seek(0)
return buffer
# =========================================================
# Helper 3 — Aplica marca d'água e confidencialidade
# =========================================================
def _apply_watermark_and_security(self, pdf_buffer: BytesIO) -> BytesIO:
reader = PdfReader(pdf_buffer)
writer = PdfWriter()
for page in reader.pages:
width = float(page.mediabox.width)
height = float(page.mediabox.height)
watermark_stream = self._create_watermark_pdf(width, height)
watermark_pdf = PdfReader(watermark_stream)
watermark_page = watermark_pdf.pages[0]
page.merge_page(watermark_page)
writer.add_page(page)
# Aplica metadados confidenciais
writer.add_metadata(self.confidential_metadata)
# Bloqueia ações do usuário
block_permissions = (
UserAccessPermissions.PRINT
| UserAccessPermissions.MODIFY
| UserAccessPermissions.EXTRACT
| UserAccessPermissions.ASSEMBLE_DOC
| UserAccessPermissions.PRINT_TO_REPRESENTATION
)
writer.encrypt(
user_password="",
owner_password="correcional",
permissions_flag=int(block_permissions),
)
output_buffer = BytesIO()
writer.write(output_buffer)
output_buffer.seek(0)
ok("Marca dágua e proteção aplicadas em memória.")
return output_buffer
# =========================================================
# Helper 4 — Converte PDF final para Base64
# =========================================================
def _to_base64(self, pdf_buffer: BytesIO) -> str:
b64 = base64.b64encode(pdf_buffer.getvalue()).decode("ascii")
ok("Arquivo convertido para Base64.")
return b64
# =========================================================
# Método principal — Processamento completo
# =========================================================
def convert(self, return_base64: bool = False) -> Optional[str]:
"""
Executa a conversão e retorna:
- Base64 (string), se return_base64=True
- Caminho do arquivo PDF salvo em disco, se return_base64=False
"""
info(f"📄 Iniciando conversão de: {self.input_file.name}")
# Barra de progresso única para todo o fluxo
with progress(f"Processando {self.input_file.name}...", total=3) as step:
# 1) Converter imagem em PDF
with status("Convertendo imagem para PDF..."):
pdf_buffer = self._convert_image_to_pdf()
step()
if not pdf_buffer:
error("Falha na conversão da imagem. Processo interrompido.")
return None
# 2) Aplicar marca dágua e segurança
with status("Aplicando marca dágua e proteção..."):
protected_buffer = self._apply_watermark_and_security(pdf_buffer)
step()
# 3) Gerar Base64 ou salvar em disco
if return_base64:
with status("Gerando conteúdo Base64..."):
result = self._to_base64(protected_buffer)
ok("Conversão concluída com sucesso (Base64 gerado).")
step()
return result
with status("Salvando arquivo PDF em disco..."):
output_path = self.output_folder / f"{self.input_file.stem}.pdf"
with open(output_path, "wb") as f:
f.write(protected_buffer.getvalue())
ok(f"Arquivo PDF salvo em: {output_path}")
step()
return str(output_path)

View file

@ -0,0 +1,80 @@
import re
from typing import Any, Dict, Union
class ParseFirebirdPath:
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 3050
@classmethod
def execute(cls, value: Union[str, Dict[str, Any]]) -> Dict[str, Any]:
# -------------------------------------------
# Se for dicionário, já padroniza
# -------------------------------------------
if isinstance(value, dict):
return {
"host": value.get("host", cls.DEFAULT_HOST),
"port": int(value.get("port", cls.DEFAULT_PORT)),
"path": value.get("path"),
}
if not isinstance(value, str):
raise TypeError("BaseDados deve ser string ou dict.")
raw = value.strip().strip('"').strip("'")
if not raw:
raise ValueError("BaseDados está vazio.")
# -------------------------------------------
# 1) host/port:path
# -------------------------------------------
m = re.match(r"^(?P<host>[^:/]+)\/(?P<port>\d+)\:(?P<path>.+)$", raw)
if m:
return {
"host": m.group("host"),
"port": int(m.group("port")),
"path": m.group("path").strip(),
}
# -------------------------------------------
# 2) host:path (ex: 127.0.0.1:D:\Banco.fdb)
# -------------------------------------------
m = re.match(r"^(?P<host>[^:\/]+)\:(?P<path>[A-Za-z]:\\.+)$", raw)
if m:
return {
"host": m.group("host"),
"port": cls.DEFAULT_PORT,
"path": m.group("path"),
}
# -------------------------------------------
# 3) Caminho local absoluto (D:\, E:\)
# -------------------------------------------
if re.match(r"^[A-Za-z]:\\", raw):
return {
"host": cls.DEFAULT_HOST,
"port": cls.DEFAULT_PORT,
"path": raw,
}
# -------------------------------------------
# 4) UNC path (\\servidor\pasta\arquivo)
# -------------------------------------------
if raw.startswith("\\\\"):
return {
"host": cls.DEFAULT_HOST,
"port": cls.DEFAULT_PORT,
"path": raw,
}
# -------------------------------------------
# 5) Alias puro — retorna padrão
# -------------------------------------------
return {
"host": cls.DEFAULT_HOST,
"port": cls.DEFAULT_PORT,
"path": raw,
}

View file

@ -0,0 +1,124 @@
class HexCipher:
"""
Um cifrador simples e reversível baseado em:
- XOR com chave
- Inversão de texto
- Representação em hexadecimal
Inspirado no componente original Delphi TEvCriptografa.
"""
# -------------------------------------------------------------------------
# Construtor
# -------------------------------------------------------------------------
def __init__(self, key: str = "123", ignore_fields: list[str] | None = None):
"""Inicializa a classe com a chave fornecida."""
self._key = key
# nomes de campos que NÃO devem ser descriptografados (sempre minúsculos)
self._ignore_fields = (
{f.lower() for f in ignore_fields} if ignore_fields else set()
)
# -------------------------------------------------------------------------
# Métodos públicos principais
# -------------------------------------------------------------------------
def encrypt(self, text: str) -> str:
"""
Recebe um texto puro e retorna sua versão criptografada em hexadecimal.
"""
binary_cipher = self._text_to_binary_cipher(text)
hex_cipher = "".join(self._decimal_to_hex(ord(c)) for c in binary_cipher)
return hex_cipher
def decrypt(self, hex_string: str) -> str:
"""
Recebe um texto criptografado em hexadecimal e retorna o texto puro.
"""
chars = []
for i in range(0, len(hex_string), 2):
hex_pair = hex_string[i : i + 2]
chars.append(chr(self._hex_to_decimal(hex_pair)))
binary_str = "".join(chars)
return self._binary_to_text(binary_str)
def decrypt_safe(self, value: str, field: str | None = None) -> str:
"""
Descriptografa com segurança:
- Se o campo estiver em _ignore_fields, retorna o valor original.
- Se o valor não parecer HEX válido, retorna o valor original.
- Em qualquer erro na descriptografia, retorna o valor original.
"""
# 1) Campo explicitamente ignorado
if field and field.lower() in self._ignore_fields:
return value
# 2) Valor vazio / None
if not value:
return value
# 3) Verifica se é HEX puro (somente 0-9 A-F) e tamanho par
import re
if re.fullmatch(r"[0-9A-Fa-f]+", value) is None:
return value
if len(value) % 2 != 0:
return value
# 4) Tenta descriptografar
try:
return self.decrypt(value)
except Exception:
return value
# -------------------------------------------------------------------------
# Funções internas de criptografia/descriptografia
# -------------------------------------------------------------------------
def _text_to_binary_cipher(self, text: str) -> str:
"""
Criptografa um texto aplicando:
1. Inversão de caracteres.
2. Operação XOR entre cada caractere e um valor derivado da chave.
"""
text = self._reverse(text)
result = []
for position, char in enumerate(text, start=1):
key_char = self._key[position % len(self._key)]
key_value = ord(key_char) + position
result.append(chr(ord(char) ^ key_value))
return "".join(result)
def _binary_to_text(self, cipher_text: str) -> str:
"""
Descriptografa um texto binário cifrado por XOR.
O processo é simétrico: aplica o mesmo XOR e inverte novamente.
"""
result = []
for position, char in enumerate(cipher_text, start=1):
key_char = self._key[position % len(self._key)]
key_value = ord(key_char) + position
result.append(chr(ord(char) ^ key_value))
return self._reverse("".join(result))
# -------------------------------------------------------------------------
# Funções auxiliares
# -------------------------------------------------------------------------
def _reverse(self, text: str) -> str:
"""Inverte a ordem dos caracteres de uma string."""
return text[::-1]
def _decimal_to_hex(self, number: int) -> str:
"""Converte um número decimal (byte) em uma string hexadecimal de 2 dígitos."""
return f"{number:02X}"
def _hex_to_decimal(self, hex_str: str) -> int:
"""Converte uma string hexadecimal de 2 dígitos em seu valor decimal (byte)."""
return int(hex_str, 16)

View file

@ -0,0 +1,32 @@
from datetime import datetime, timedelta
from jose import jwt
from pytz import timezone
from actions.config.config_json import ConfigJson
class CreateToken:
def __init__(self):
# Busca as configurações da aplicação
self.config = ConfigJson.get("app.json")
# Cria o timedelta com base na config
self.access_token_expire = timedelta(
minutes=self.config.jwt.expire.minute,
hours=self.config.jwt.expire.hours,
days=self.config.jwt.expire.days,
)
def execute(self, tipo_token: str, data: str) -> str:
sp = timezone("America/Sao_Paulo")
agora = datetime.now(tz=sp)
expira = agora + self.access_token_expire
# Define os dados do token
payload = {"type": tipo_token, "exp": expira, "iat": agora, "data": str(data)}
# Retorna os dados codificados
return jwt.encode(
payload, self.config.jwt.token, algorithm=self.config.jwt.algorithm
)

View file

@ -0,0 +1,24 @@
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from actions.jwt.verify_token import VerifyToken # A classe que criamos anteriormente
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Apenas requerido pelo FastAPI
def get_current_user(token: str = Depends(oauth2_scheme)):
# Ação que válida o tokne
verify_token = VerifyToken()
# Obtem o resultado da validação
result = verify_token.execute(token)
# Verifica se a resposta é diferente de inválida
if result['status'] != 'valid':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=result.get('message', 'Token inválido ou expirado'),
headers={"WWW-Authenticate": "Bearer"},
)
# Retorna apenas os dados do token
return result['payload']

View file

@ -0,0 +1,55 @@
from jose import jwt, JWTError, ExpiredSignatureError
from actions.config.config_json import ConfigJson
class VerifyToken:
def __init__(self):
# Carrega configurações
self.config = ConfigJson.get('app.json')
def execute(self, token: str, expected_type: str = 'access-token') -> dict:
try:
# Decodifica o token
payload = jwt.decode(
token,
self.config.jwt.token,
algorithms=[self.config.jwt.algorithm]
)
# Valida expiração
exp_timestamp = payload.get("exp")
if exp_timestamp is None:
raise ValueError("O token não possui data de expiração.")
# Verifica o tipo de token
token_type = payload.get("type")
if token_type != expected_type:
raise ValueError("Tipo de token inválido.")
# Verificação opcional: validar campo "data"
if "data" not in payload:
raise ValueError("Token malformado: campo 'data' ausente.")
return {
"status": "valid",
"payload": payload
}
except ExpiredSignatureError:
return {
"status": "expired",
"message": "O token expirou."
}
except JWTError as e:
return {
"status": "invalid",
"message": f"Token inválido: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"message": f"Erro na validação do token: {str(e)}"
}

View file

@ -0,0 +1,32 @@
import json
import os
class Log:
def register(self, data, caminho_arquivo='storage/temp.json'):
try:
# Garante que a pasta existe
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
# Lê dados existentes (ou cria nova lista)
if os.path.exists(caminho_arquivo):
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
try:
dados_existentes = json.load(arquivo)
if not isinstance(dados_existentes, list):
dados_existentes = []
except json.JSONDecodeError:
dados_existentes = []
else:
dados_existentes = []
# Adiciona novo dado
dados_existentes.append(data)
# Salva novamente no arquivo com indentação
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo:
json.dump(dados_existentes, arquivo, indent=4, ensure_ascii=False)
except Exception as e:
print(f"❌ Erro ao salvar o dado: {e}")

View file

@ -0,0 +1,59 @@
import re
import unicodedata
from datetime import datetime
from typing import Optional
class FileNameGenerator:
"""
Gera nomes de arquivos seguros e consistentes a partir de uma string de entrada.
- Remove acentos, espaços e caracteres inválidos
- Pode adicionar timestamp e extensão
- Não grava em disco, apenas retorna a string final
"""
@staticmethod
def _sanitize(text: str) -> str:
"""Remove acentos, espaços e símbolos inválidos do nome do arquivo."""
text = (
unicodedata.normalize("NFKD", text)
.encode("ASCII", "ignore")
.decode("ASCII")
)
text = re.sub(r"[^a-zA-Z0-9_-]+", "_", text)
text = re.sub(r"_+", "_", text).strip("_")
return text.lower()
@classmethod
def generate(
cls,
base_name: str,
extension: str = ".txt",
include_timestamp: bool = False,
counter: Optional[int] = None,
) -> str:
"""
Retorna um nome de arquivo limpo, como string.
:param base_name: Texto base (ex: título, nome, descrição)
:param extension: Extensão desejada (ex: .pdf, .json)
:param include_timestamp: Se True, adiciona timestamp no nome
:param counter: Número opcional para diferenciar nomes repetidos
:return: String do nome final do arquivo
"""
if not base_name:
raise ValueError("O nome base não pode estar vazio.")
safe_name = cls._sanitize(base_name)
if include_timestamp:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_name = f"{safe_name}_{timestamp}"
if counter is not None:
safe_name = f"{safe_name}_{counter}"
if not extension.startswith("."):
extension = f".{extension}"
return f"{safe_name}{extension}"

View file

@ -0,0 +1,141 @@
# Importa a biblioteca nativa 'zlib' usada para descompressão de dados binários.
import base64
import zlib
# Importa a função 'rtf_to_text' da biblioteca 'striprtf',
# responsável por converter documentos RTF em texto plano legível.
from striprtf.striprtf import rtf_to_text
# Define uma classe utilitária chamada 'String', contendo apenas métodos estáticos.
# Essa abordagem permite o uso direto sem necessidade de instanciar a classe.
class String:
@staticmethod
def decompress(vf_string):
"""
Descomprime e decodifica texto de origem WPTools/Firebird.
Finalidade:
Converter o conteúdo de campos BLOB ou strings compactadas (como no Delphi)
em texto legível, detectando automaticamente se o conteúdo está:
- Compactado com zlib
- Codificado em ISO-8859-1 (padrão ANSI)
- Representado como bytes puros
"""
# Verifica se o valor recebido é nulo, vazio ou None.
# Se for, retorna string vazia para evitar erros de processamento.
if not vf_string:
return ""
# Caso seja um objeto tipo stream (ex: campo BLOB do Firebird)
# Campos BLOB geralmente possuem o método `.read()` para leitura de bytes.
if hasattr(vf_string, "read"):
vf_string = vf_string.read() # Lê o conteúdo completo do stream
# Garante que o valor trabalhado é uma sequência de bytes (não string)
# Se o dado já for texto (str), converte para bytes em codificação Latin-1,
# que é compatível com ISO-8859-1 usado por sistemas Delphi/Firebird.
if isinstance(vf_string, str):
vf_bytes = vf_string.encode("latin1", errors="ignore")
else:
vf_bytes = vf_string # Já está em bytes, então apenas reaproveita
# Detecta se o conteúdo foi compactado com zlib.
# A assinatura padrão do formato zlib começa com bytes: 0x78 0x9C ou 0x78 0xDA.
is_zlib = (
len(vf_bytes) > 2 and vf_bytes[0] == 0x78 and vf_bytes[1] in (0x9C, 0xDA)
)
# Se a detecção confirmar que o conteúdo é zlib, tenta descompactar.
if is_zlib:
try:
# Descompacta os bytes e decodifica o texto usando ISO-8859-1 (ANSI),
# que preserva corretamente acentuação e caracteres especiais.
text = zlib.decompress(vf_bytes).decode("iso-8859-1", errors="ignore")
return text
except Exception:
# Caso falhe (por dados corrompidos ou não comprimidos de fato),
# o fluxo continua normalmente sem interromper a execução.
pass
# Se não for zlib, tenta tratar o conteúdo como texto puro (não compactado)
try:
# Decodifica os bytes diretamente de ISO-8859-1 (padrão usado pelo Delphi)
return vf_bytes.decode("iso-8859-1", errors="ignore")
except Exception:
# Como fallback, converte para string bruta para evitar falhas.
return str(vf_string)
# >>> NOVO MÉTODO <<<
@staticmethod
def compress(text, *, encoding: str = "iso-8859-1", as_base64: bool = True):
"""
Comprime texto/dados com zlib.
Parâmetros:
text: str | bytes | stream (com .read())
encoding: encoding usado quando 'text' for str (padrão: ISO-8859-1)
as_base64: se True, retorna string Base64 do conteúdo comprimido;
caso False, retorna bytes comprimidos.
Retorno:
- bytes (zlib) quando as_base64=False
- str (Base64) quando as_base64=True
Observações:
- Use o mesmo 'encoding' ao descomprimir para simetria.
- Ideal para armazenar em BLOB ou trafegar seguro (Base64).
"""
if text is None or text == "":
return "" if as_base64 else b""
# Se for stream (ex.: BLOB do Firebird)
if hasattr(text, "read"):
raw = text.read()
else:
raw = text
# Garante bytes
if isinstance(raw, str):
raw_bytes = raw.encode(encoding, errors="ignore")
else:
raw_bytes = bytes(raw)
# Comprime com zlib
comp = zlib.compress(raw_bytes)
# Opcional: codifica em Base64 para transporte/JSON
if as_base64:
return base64.b64encode(comp).decode("ascii")
return comp
@staticmethod
def to_text(raw_text: str) -> str:
"""
Converte o conteúdo RTF em texto simples e retorna como string.
Finalidade:
- Detectar automaticamente se o conteúdo está em formato RTF.
- Converter para texto plano usando a função 'rtf_to_text' (da striprtf).
- Retornar uma string limpa e pronta para ser usada em APIs, logs, etc.
"""
# Verifica se o texto recebido está vazio ou None — retorna vazio se sim.
if not raw_text:
return ""
# Verifica se o texto começa com o cabeçalho padrão de arquivos RTF.
# Exemplo: "{\\rtf1\\ansi..." indica conteúdo em formato RTF.
if raw_text.strip().startswith("{\\rtf"):
try:
# Converte o RTF em texto simples, preservando acentuação e quebras de linha.
text = rtf_to_text(raw_text)
# Remove espaços em branco extras nas extremidades.
return text.strip()
except Exception:
# Se ocorrer erro na conversão (ex: RTF inválido),
# retorna o conteúdo original sem alterações.
return raw_text
# Caso o texto não seja RTF, apenas remove espaços em branco extras e retorna.
return raw_text.strip()

View file

@ -0,0 +1,4 @@
# exceptions.py
class BusinessRuleException(Exception):
def __init__(self, message: str):
self.message = message

View file

@ -0,0 +1,85 @@
# handlers.py
import traceback
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from actions.system.exceptions import BusinessRuleException
from actions.log.log import Log
def register_exception_handlers(app):
def __init__ (self):
log = Log()
@app.exception_handler(BusinessRuleException)
async def business_rule_exception_handler(request: Request, exc: BusinessRuleException):
response = {
"status": "422",
"error": "Regra de negócio",
"detail": exc.message
}
# Salva o log em disco
Log.register(response, 'storage/temp/business_rule_exception_handler.json')
return JSONResponse(
status_code=422,
content=response
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
response = {
"status": exc.status_code,
"error": "HTTP Error",
"detail": exc.detail
}
# Salva o log em disco
Log.register(response, 'storage/temp/http_exception_handler.json')
return JSONResponse(
status_code=exc.status_code,
content=response
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
response = {
"status": 400,
"error": "Erro de validação",
"detail": exc.errors()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=400,
content=response
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
response = {
"status": 500,
"error": "Erro Interno do Servidor",
"type": type(exc).__name__,
"message": str(exc),
"trace": traceback.format_exc()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=500,
content=response
)

View file

@ -0,0 +1,203 @@
# ui.py
from contextlib import asynccontextmanager
from contextlib import contextmanager
from typing import Iterable, Mapping
from rich.live import Live
from rich.console import Console
from rich.theme import Theme
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from rich.progress import (
Progress,
SpinnerColumn,
BarColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.markdown import Markdown
from rich.traceback import install
from rich.logging import RichHandler
import logging
import json
# ─────────────────────────────────────────────────────────────
# Tema global (cores nomeadas para facilitar padronização)
# ─────────────────────────────────────────────────────────────
theme = Theme(
{
"accent": "bright_magenta",
"info": "cyan",
"success": "green",
"warning": "yellow",
"error": "bold red",
"muted": "grey62",
}
)
# Console único para todo o sistema
console = Console(theme=theme)
install(show_locals=False) # Tracebacks bonitos
# ─────────────────────────
# BÁSICOS
# ─────────────────────────
def banner(titulo: str, subtitulo: str | None = None):
text = Text(titulo, style="bold accent")
if subtitulo:
text.append(f"\n{subtitulo}", style="muted")
console.print(Panel.fit(text, border_style="accent", padding=(1, 2)))
def rule(texto: str = ""):
console.rule(Text(texto, style="accent"))
def info(msg: str):
console.print(f" [info]{msg}[/]")
def ok(msg: str):
console.print(f"✅ [success]{msg}[/]")
def warn(msg: str):
console.print(f"⚠️ [warning]{msg}[/]")
def fail(msg: str):
console.print(f"❌ [error]{msg}[/]")
def md(markdown: str):
console.print(Markdown(markdown))
def json_pretty(data):
try:
console.print_json(data=json.loads(data) if isinstance(data, str) else data)
except Exception:
console.print(data)
# ─────────────────────────
# TABELA RÁPIDA
# ─────────────────────────
def table(rows: Iterable[Mapping], title: str | None = None):
rows = list(rows)
if not rows:
warn("Tabela vazia.")
return
t = Table(title=title, title_style="accent")
for col in rows[0].keys():
t.add_column(str(col), style="muted")
for r in rows:
t.add_row(*[str(r[k]) for k in rows[0].keys()])
console.print(t)
# ─────────────────────────
# STATUS / SPINNER
# ─────────────────────────
@contextmanager
def status(msg: str):
with console.status(f"[info]{msg}[/]"):
yield
# ─────────────────────────
# BARRA DE PROGRESSO
# ─────────────────────────
@contextmanager
def progress(task_desc: str, total: int | None = None):
with Progress(
SpinnerColumn(),
TextColumn("[accent]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
transient=True,
console=console,
) as p:
task_id = p.add_task(task_desc, total=total)
yield lambda advance=1: p.update(task_id, advance=advance)
# ─────────────────────────
# LOGGING RICH
# ─────────────────────────
def setup_logging(level: int = logging.INFO):
logging.basicConfig(
level=level,
format="%(message)s",
handlers=[RichHandler(rich_tracebacks=True, console=console)],
)
# ─────────────────────────
# FIREBIRD / DB HELPERS
# ─────────────────────────
def db_info(msg: str):
console.print(f"🗄️ [info][DB INFO][/info] {msg}")
def db_ok(msg: str):
console.print(f"🗄️ [success][DB OK][/success] {msg}")
def db_fail(msg: str):
console.print(f"🗄️ [error][DB ERROR][/error] {msg}")
def db_warning(msg: str):
console.print(f"🗄️ [warning][DB WARNING][/warning] {msg}")
def db_dsn(dsn: str):
# Mascara senha com segurança
try:
before_at = dsn.split("@")[0]
user = before_at.split("//")[1].split(":")[0]
masked = dsn.replace(user, "******")
except Exception:
masked = dsn # fallback
console.print(
Panel(
Text(f"[accent]DSN de conexão[/accent]\n{masked}", style="muted"),
border_style="accent",
)
)
@asynccontextmanager
async def progress_manager():
# Criar progress com o console compartilhado
progress = Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed}/{task.total}"),
TimeElapsedColumn(),
expand=True,
console=console, # fundamental!
)
# Live controla a tela inteira
live = Live(progress, refresh_per_second=10, console=console, transient=False)
live.start()
try:
yield progress
finally:
live.stop()

View file

@ -0,0 +1,76 @@
from typing import Optional
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import DBAPIError, OperationalError
from firebird.driver.types import DatabaseError as FbDatabaseError
from actions.config.config_ini import ConfigIni
from actions.firebird.parse_firebird_path import ParseFirebirdPath
from actions.hexCipher.hex_chipher import HexCipher
from actions.ui.ui import db_fail
class Firebird:
_engine: Optional[Engine] = None
hex_cipher: Optional[HexCipher] = None
@classmethod
def get_engine(cls) -> Engine:
# Cria a instância de descriptografia apenas uma vez
if cls.hex_cipher is None:
cls.hex_cipher = HexCipher(key="Wallace&Gromitt")
# Lê o arquivo INI com os parâmetros
database = ConfigIni.read("Config.ini")
# ----------------------------------------------------------------------
# Descriptografa o valor bruto do BaseDados ANTES do parser.
# Ex: "127.0.0.1/3050:S:\Bases\SANTARITA.FDB"
# ----------------------------------------------------------------------
base_raw = cls.hex_cipher.decrypt(database["Geral"]["BaseDados"])
# Parser converte "host/port:path" → dict {host, port, path}
base_info = ParseFirebirdPath.execute(base_raw)
# Descriptografa usuário e senha
user = cls.hex_cipher.decrypt(database["Geral"]["Usuario"])
passwd = cls.hex_cipher.decrypt(database["Geral"]["Senha"])
# Charset da conexão
charset = "ISO8859_1"
# Monta o DSN final no padrão aceito pelo firebird-driver
dsn = (
f"firebird+firebird://{user}:{passwd}"
f"@{base_info['host']}:{base_info['port']}/{base_info['path']}"
)
# Cria a engine apenas uma vez (singleton)
if cls._engine is None:
try:
cls._engine = create_engine(
dsn,
connect_args={"charset": charset},
pool_pre_ping=True,
pool_size=5,
max_overflow=10,
)
# Testa a conexão imediatamente para evitar erros silenciosos
with cls._engine.connect() as conn:
conn.execute(text("SELECT 1 FROM RDB$DATABASE"))
except (OperationalError, DBAPIError, FbDatabaseError) as e:
db_fail("Erro ao conectar ao Firebird:")
db_fail(str(e))
raise
return cls._engine
@classmethod
def dispose(cls):
if cls._engine:
cls._engine.dispose()
cls._engine = None

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5 KiB

View file

@ -0,0 +1,156 @@
import asyncio
import os
import platform
import sys
import traceback
from datetime import datetime
from pathlib import Path
from firebird.driver import driver_config
from actions.ui.ui import ok, rule, warn
from packages.v1.manutencao.controller.manutencao_v_casamento_controller import (
ManutencaoVCasamentoController,
)
# ==============================================================
# Corrige BASE_DIR (funciona no .exe, --onefile e ambiente normal)
# ==============================================================
if getattr(sys, "frozen", False):
BASE_DIR = Path(getattr(sys, "_MEIPASS", Path(sys.executable).parent))
else:
BASE_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(BASE_DIR))
# ==============================================================
# Prepara pasta de logs (100% seguro)
# ==============================================================
def get_log_path() -> Path:
"""Sempre retorna o caminho correto para logs + cria pasta storage."""
storage_dir = BASE_DIR / "storage"
storage_dir.mkdir(parents=True, exist_ok=True)
# arquivo de log diário
today = datetime.now().strftime("%Y-%m-%d")
return storage_dir / f"errors_{today}.log"
# ==============================================================
# Logging robusto para qualquer exceção
# ==============================================================
def registrar_erro(e: Exception, contexto: str = ""):
"""Grava traceback completo em storage/errors_YYYY-MM-DD.log."""
log_path = get_log_path()
try:
with open(log_path, "a", encoding="utf-8") as f:
f.write("\n" + "=" * 80 + "\n")
f.write(f"DATA: {datetime.now().isoformat()}\n")
if contexto:
f.write(f"CONTEXTO: {contexto}\n")
f.write("ERRO:\n")
traceback.print_exc(file=f)
f.write("=" * 80 + "\n")
warn(f"⚠️ Erro registrado em {log_path}")
except Exception as log_error:
print("❌ Falha ao escrever no arquivo de log:", log_error)
print("Erro original:", e)
input("\nPressione ENTER para sair...")
sys.exit(1)
# ==============================================================
# Configuração do Firebird
# ==============================================================
def configurar_firebird():
try:
system = platform.system()
if system == "Windows":
fb_client_local = BASE_DIR / "fbclient.dll"
if fb_client_local.exists():
os.add_dll_directory(str(BASE_DIR))
driver_config.fb_client_library.value = str(fb_client_local)
ok(f"🔗 Firebird DLL carregada: {fb_client_local}")
else:
raise FileNotFoundError(
f"fbclient.dll não encontrada em {fb_client_local}"
)
elif system == "Linux":
possible_paths = [
"/usr/lib/x86_64-linux-gnu/firebird/4.0/lib/libfbclient.so.2",
"/usr/lib/x86_64-linux-gnu/libfbclient.so.2",
"/usr/lib/x86_64-linux-gnu/libfbclient.so.4.0.5",
str(BASE_DIR / "libfbclient.so"),
]
for path in possible_paths:
if os.path.exists(path):
driver_config.fb_client_library.value = path
ok(f"🔗 Firebird client carregado: {path}")
break
else:
raise FileNotFoundError(
"❌ libfbclient.so não encontrada. "
"Instale o firebird-client no Linux."
)
else:
warn(f"⚠️ Sistema operacional não suportado: {system}")
except Exception as e:
registrar_erro(e, contexto="CONFIGURAÇÃO FIREBIRD")
raise
# ==============================================================
# Execução principal
# ==============================================================
async def main():
manutencao_v_casamento_controller = ManutencaoVCasamentoController()
response = manutencao_v_casamento_controller.VincularNoivosAtosAntigos()
print(response)
# ==============================================================
# Entry point
# ==============================================================
if __name__ == "__main__":
try:
configurar_firebird()
except Exception:
traceback.print_exc()
input("\nErro ao configurar o Firebird. Pressione ENTER para sair...")
sys.exit(1)
# Corrige event loop no Windows
if platform.system() == "Windows":
try:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except Exception:
pass
try:
asyncio.run(main())
input("\nProcesso concluído. Pressione ENTER para sair...")
except Exception as e:
registrar_erro(e, contexto="EXECUÇÃO PRINCIPAL")
traceback.print_exc()
input("\nOcorreu um erro. Pressione ENTER para sair...")
sys.exit(1)

View file

@ -0,0 +1,198 @@
import json
import re
from types import SimpleNamespace
from actions.file.file_name_generator import FileNameGenerator
from actions.file.json_file_saver import JsonFileSaver
import httpx
from actions.data.dict_to_obj import DictToObj
from actions.ui.ui import fail, ok, rule
class ApiAction:
def __init__(self) -> None:
self.file_name_generator = FileNameGenerator()
# Classe para obter as variaveis de ambiente
mirror_sync_env = SimpleNamespace(
url="https://mattermost.oriustecnologia.com/api/v4/",
timeout=10,
token="uzszwh6c7pbc8xyt1d1skope8y",
)
# Obtem as variaveis de ambiente em formato de objeto
self.mirror_sync_env = mirror_sync_env
pass
def sanitize_unicode(self, data):
"""
Remove caracteres inválidos (surrogates) de strings dentro de estruturas complexas.
"""
if isinstance(data, str):
# Remove caracteres Unicode no intervalo D800DFFF (não permitidos em UTF-8)
return re.sub(r"[\ud800-\udfff]", "", data)
elif isinstance(data, list):
return [self.sanitize_unicode(i) for i in data]
elif isinstance(data, dict):
return {k: self.sanitize_unicode(v) for k, v in data.items()}
return data
# Cria os Cabeçalhos de envio
def _headers(self, data):
headers = {
"Accept": getattr(data, "accept", "application/json"),
"Content-Type": getattr(data, "content_type", "application/json"),
}
token = getattr(data, "token", None)
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
# Trata os dados da respsota
async def _response(self, response: httpx.Response, data):
# Tenta JSON; se falhar, retorna texto bruto
try:
# Gerador de arquivo JSON
json_file_saver = JsonFileSaver()
# Obtem o código de requisição
status = response.status_code
# Converter o Json para objeto
response = DictToObj(response.json())
# Verifica o tipo de resposta
match status:
case 200:
rule("Dados enviados.:" + str(status))
ok("Verifique o log...")
json_file_saver.save(
response,
"storage/2XX/"
+ self.file_name_generator.generate("200")
+ ".json",
)
case 201:
rule("Dados criados.:" + str(status))
ok("Verifique o log...")
json_file_saver.save(
response,
"storage/2XX/"
+ self.file_name_generator.generate("201")
+ ".json",
)
case 400:
rule("Error.:" + str(status))
fail("Verifique o log...")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("400")
+ ".json",
)
case 404:
rule("Error.:" + str(status))
fail("Verifique o log...")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("404")
+ ".json",
)
case 422:
rule("Error.:" + str(status))
fail("Verifique o log....")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("422")
+ ".json",
)
return response
except ValueError:
return DictToObj(
{
"status": status,
"message": json.dumps(response, ensure_ascii=False, default=vars),
"data": None,
}
)
# Contrói a requisição
async def _request(self, client: httpx.AsyncClient, data):
"""
Constrói e envia a requisição com base no método HTTP definido em `data.method`.
Suporta:
- GET: usa params (query string)
- POST/PUT/PATCH: usa json ou form (data)
- DELETE: suporta params opcionais
"""
# Obtem o verbo de requisição
method = data.method.lower().strip()
# Obtem o endpoint
url = data.endpoint.lstrip("/")
# Prepara argumentos de envio
kwargs = {}
# Obtem o corpo da requisição
body = getattr(data, "body", None)
# Corpo JSON ou formulário (para POST/PUT/PATCH)
if body is not None:
# Sanitiza caracteres inválidos antes de enviar
body = self.sanitize_unicode(body)
# Guarda o corpo da requisição
kwargs["json"] = body
# Constrói o request (sem enviar ainda)
request = client.build_request(method.upper(), url, **kwargs)
# Envia a requisição (client.send respeita timeout e intercepta tudo)
response = await client.send(request)
# Se sucesso, trata normalmente
return await self._response(response, data)
# Prepara os dados para envio
async def send(self, data):
async with httpx.AsyncClient(
base_url=self.mirror_sync_env.url,
headers=self._headers(data),
timeout=getattr(data, "timeout", int(self.mirror_sync_env.timeout)),
follow_redirects=True,
) as client:
rule("Salvando Dados de Envio...")
if True:
json_file_saver = JsonFileSaver()
json_file_saver.save(
data,
"storage/data/" + self.file_name_generator.generate() + ".json",
)
response = await self._request(client, data)
# Pydantic v2 valida e ignora campos extras (ex.: userId)
return response

View file

@ -0,0 +1,29 @@
from types import SimpleNamespace
from actions.data.obj_to_dict import ObjToDict
from packages.v1.api.Mattermost.actions.api_action import ApiAction
from actions.ui.ui import status
class ApiSendToChannel:
async def execute(self, data):
# Converte o SimpleNamespace em um dicionário serializável
ato_dict = ObjToDict(data)
# Classe de requisição
api_action = ApiAction()
# Informa que esta enviandoos dados
with status("Mattermost..."):
# Envia os dados para a API
await api_action.send(
SimpleNamespace(
endpoint="posts",
method="post",
body=ato_dict,
timeout=None,
token="uzszwh6c7pbc8xyt1d1skope8y",
)
)

View file

@ -0,0 +1,194 @@
import json
import re
from actions.env.mirror_sync_env import MirrorSyncEnv
from actions.file.file_name_generator import FileNameGenerator
from actions.file.json_file_saver import JsonFileSaver
import httpx
from actions.data.dict_to_obj import DictToObj
from actions.ui.ui import fail, ok, rule
class ApiAction:
def __init__(self) -> None:
self.file_name_generator = FileNameGenerator()
# Classe para obter as variaveis de ambiente
mirror_sync_env = MirrorSyncEnv()
# Obtem as variaveis de ambiente em formato de objeto
self.mirror_sync_env = mirror_sync_env.as_object()
pass
def sanitize_unicode(self, data):
"""
Remove caracteres inválidos (surrogates) de strings dentro de estruturas complexas.
"""
if isinstance(data, str):
# Remove caracteres Unicode no intervalo D800DFFF (não permitidos em UTF-8)
return re.sub(r"[\ud800-\udfff]", "", data)
elif isinstance(data, list):
return [self.sanitize_unicode(i) for i in data]
elif isinstance(data, dict):
return {k: self.sanitize_unicode(v) for k, v in data.items()}
return data
# Cria os Cabeçalhos de envio
def _headers(self, data):
headers = {
"Accept": getattr(data, "accept", "application/json"),
"Content-Type": getattr(data, "content_type", "application/json"),
}
token = getattr(data, "token", None)
if token:
headers["Authorization"] = f"Bearer {token}"
return headers
# Trata os dados da respsota
async def _response(self, response: httpx.Response, data):
# Tenta JSON; se falhar, retorna texto bruto
try:
# Gerador de arquivo JSON
json_file_saver = JsonFileSaver()
# Obtem o código de requisição
status = response.status_code
# Converter o Json para objeto
response = DictToObj(response.json())
# Verifica o tipo de resposta
match status:
case 200:
rule("Dados enviados.:" + str(status))
ok("Verifique o log...")
json_file_saver.save(
response,
"storage/2XX/"
+ self.file_name_generator.generate("200")
+ ".json",
)
case 201:
rule("Dados criados.:" + str(status))
ok("Verifique o log...")
json_file_saver.save(
response,
"storage/2XX/"
+ self.file_name_generator.generate("201")
+ ".json",
)
case 400:
rule("Error.:" + str(status))
fail("Verifique o log...")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("400")
+ ".json",
)
case 404:
rule("Error.:" + str(status))
fail("Verifique o log...")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("404")
+ ".json",
)
case 422:
rule("Error.:" + str(status))
fail("Verifique o log....")
json_file_saver.save(
response,
"storage/4XX/"
+ self.file_name_generator.generate("422")
+ ".json",
)
return response
except ValueError:
return DictToObj(
{
"status": status,
"message": json.dumps(response, ensure_ascii=False, default=vars),
"data": None,
}
)
# Contrói a requisição
async def _request(self, client: httpx.AsyncClient, data):
"""
Constrói e envia a requisição com base no método HTTP definido em `data.method`.
Suporta:
- GET: usa params (query string)
- POST/PUT/PATCH: usa json ou form (data)
- DELETE: suporta params opcionais
"""
# Obtem o verbo de requisição
method = data.method.lower().strip()
# Obtem o endpoint
url = data.endpoint.lstrip("/")
# Prepara argumentos de envio
kwargs = {}
# Obtem o corpo da requisição
body = getattr(data, "body", None)
# Corpo JSON ou formulário (para POST/PUT/PATCH)
if body is not None:
# Sanitiza caracteres inválidos antes de enviar
body = self.sanitize_unicode(body)
# Guarda o corpo da requisição
kwargs["json"] = body
# Constrói o request (sem enviar ainda)
request = client.build_request(method.upper(), url, **kwargs)
# Envia a requisição (client.send respeita timeout e intercepta tudo)
response = await client.send(request)
# Se sucesso, trata normalmente
return await self._response(response, data)
# Prepara os dados para envio
async def send(self, data):
async with httpx.AsyncClient(
base_url=self.mirror_sync_env.api_url,
headers=self._headers(data),
timeout=getattr(data, "timeout", int(self.mirror_sync_env.timeout)),
follow_redirects=True,
) as client:
rule("Salvando Dados de Envio...")
if True:
json_file_saver = JsonFileSaver()
json_file_saver.save(
data,
"storage/data/" + self.file_name_generator.generate() + ".json",
)
response = await self._request(client, data)
# Pydantic v2 valida e ignora campos extras (ex.: userId)
return response

View file

@ -0,0 +1,17 @@
from ast import Dict
from dataclasses import dataclass
from typing import Optional, Dict, Any
@dataclass
class RequestData:
base_url: str
endpoint: str
method: str = "get"
accept: str = "application/json"
content_type: str = "application/json"
token: Optional[str] = None
params: Optional[Dict[str, Any]] = None
json: Optional[Dict[str, Any]] = None
form: Optional[Dict[str, Any]] = None
timeout: float = 10.0

View file

@ -0,0 +1,10 @@
from pydantic import BaseModel
class UsuarioAuthenticateSchema(BaseModel):
username: str
password: str
class Config:
from_attributes = True

View file

@ -0,0 +1,41 @@
from types import SimpleNamespace
from actions.env.mirror_sync_env import MirrorSyncEnv
from packages.v1.api.Mirror.actions.api_action import ApiAction
from actions.ui.ui import ok, rule, status
class ApiAuthenticateService:
def __init__(self):
# Classe de variaveis de ambiente
mirror_sync_env = MirrorSyncEnv()
# Obtem todas as variaveis que inicia com MIRROR_SYNC
self.mirror_sync_env = mirror_sync_env.as_object()
async def execute(self):
rule("Autenticação")
api_action = ApiAction()
# Informa que esta enviandoos dados
with status("Autenticando na plataforma..."):
# Realiza a autenticação
response = await api_action.send(
SimpleNamespace(
endpoint="usuario/authenticate",
method="post",
body={
"username": self.mirror_sync_env.username,
"password": self.mirror_sync_env.password,
},
)
)
ok("Autenticação realizada!")
return response

View file

@ -0,0 +1,29 @@
from types import SimpleNamespace
from actions.data.obj_to_dict import ObjToDict
from packages.v1.api.Mirror.actions.api_action import ApiAction
from actions.ui.ui import status
class ApiSendAtoBatchService:
async def execute(self, data, user_authenticated):
# Converte o SimpleNamespace em um dicionário serializável
ato_dict = ObjToDict(data)
# Classe de requisição
api_action = ApiAction()
# Informa que esta enviandoos dados
with status("Enviando dados..."):
# Envia os dados para a API
await api_action.send(
SimpleNamespace(
endpoint="ato/batch",
method="post",
token=user_authenticated.token,
body=ato_dict,
timeout=None,
)
)

View file

@ -0,0 +1,29 @@
from types import SimpleNamespace
from actions.data.obj_to_dict import ObjToDict
from packages.v1.api.Mirror.actions.api_action import ApiAction
from actions.ui.ui import status
class ApiSendAtoService:
async def execute(self, data, user_authenticated):
# Converte o SimpleNamespace em um dicionário serializável
ato_dict = ObjToDict(data)
# Classe de requisição
api_action = ApiAction()
# Informa que esta enviandoos dados
with status("Enviando dados..."):
# Envia os dados para a API
await api_action.send(
SimpleNamespace(
endpoint="ato/",
method="post",
token=user_authenticated.token,
body=ato_dict,
timeout=None,
)
)

View file

@ -0,0 +1,29 @@
from types import SimpleNamespace
from actions.data.obj_to_dict import ObjToDict
from packages.v1.api.Mirror.actions.api_action import ApiAction
from actions.ui.ui import status
class ApiSendGedBatchService:
async def execute(self, data, user_authenticated):
# Converte o SimpleNamespace em um dicionário serializável
ato_dict = ObjToDict(data)
# Classe de requisição
api_action = ApiAction()
# Informa que esta enviandoos dados
with status("Enviando dados..."):
# Envia os dados para a API
await api_action.send(
SimpleNamespace(
endpoint="ato_documento/batch",
method="post",
token=user_authenticated.token,
body=ato_dict,
timeout=None,
)
)

View file

@ -0,0 +1,12 @@
from packages.v1.manutencao.services.manutencao_vincular_noivos_atos_antigos import (
ManutencaoVincularNoivosAtosAntigos,
)
class ManutencaoVCasamentoController:
def VincularNoivosAtosAntigos(self):
manutencao_vincular_noivos_atos_antigos = ManutencaoVincularNoivosAtosAntigos()
return manutencao_vincular_noivos_atos_antigos.execute()

View file

@ -0,0 +1,172 @@
from types import SimpleNamespace
from actions.ui.ui import info, ok, rule, warn
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from packages.v1.sequencia.services.g_sequencia.generate_service import GenerateService
from packages.v1.serventias.controllers.v_casamento_controller import (
VCasamentoController,
)
from packages.v1.serventias.schemas.v_pessoa_schema import (
VPessoaSaveSchema,
VPessoaSearchSchema,
)
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
VPessoaVinculoSaveSchema,
)
from packages.v1.serventias.services.v_pessoa.go.v_pessoa_index_service import (
VPessoaIndexService,
)
from packages.v1.serventias.services.v_pessoa.go.v_pessoa_save_service import (
VPessoaSaveService,
)
from packages.v1.serventias.services.v_pessoa_vinculo.go.v_pessoa_vinculo_index_service import (
VPessoaVinculoIndexService,
)
from packages.v1.serventias.services.v_pessoa_vinculo.go.v_pessoa_vinculo_save_service import (
VPessoaVInculoSaveService,
)
class ManutencaoVincularNoivosAtosAntigos:
def _gerar_sequencia(self, tabela):
# Cria o schema de sequência
sequencia_schema = GSequenciaSchema()
sequencia_schema.tabela = tabela
# Gera a sequência atualizada
generate = GenerateService()
response = generate.execute(sequencia_schema)
return response.sequencia
def _cadastrar_pessoa(self, pessoa):
info("Cadastro da pessoa.:" + pessoa.nome)
pessoa.pessoa_id = self._gerar_sequencia("V_PESSOA")
v_pessoa = VPessoaSaveService()
info("Criando sequência de pessoa..: " + pessoa.nome)
response = v_pessoa.execute(
VPessoaSaveSchema(
pessoa_id=pessoa.pessoa_id,
nome=pessoa.nome,
cpf_cnpj=pessoa.cpf,
nacionalidade=pessoa.nacionalidade,
naturalidade=pessoa.naturalidade,
data_nascimento=pessoa.data_nascimento,
nome_mae=pessoa.nome_mae,
nome_pai=pessoa.nome_pai,
)
)
return response
def _vincular_pessoa(self, nome, ato_antigo, sexo):
info("Buscando noivo(a)..: " + nome)
v_pessoa_search = VPessoaIndexService()
response_pessoa = v_pessoa_search.execute(VPessoaSearchSchema(nome=nome))
if not response_pessoa:
if sexo == "M":
response_pessoa = self._cadastrar_pessoa(
SimpleNamespace(
nome=nome,
sexo=sexo,
cpf=getattr(ato_antigo, "noivo_cpf", None),
nacionalidade=getattr(ato_antigo, "noivo_nacionalidade", None),
naturalidade=getattr(
ato_antigo, "lv_antigo_noivo_naturalidade", None
),
data_nascimento=getattr(
ato_antigo, "lv_antigo_noivo_nascimento", None
),
nome_mae=getattr(ato_antigo, "lv_antigo_noivo_mae", None),
nome_pai=getattr(ato_antigo, "lv_antigo_noivo_pai", None),
)
)
elif sexo == "F":
response_pessoa = self._cadastrar_pessoa(
SimpleNamespace(
nome=nome,
exo=sexo,
cpf=getattr(ato_antigo, "noiva_cpf", None),
nacionalidade=getattr(ato_antigo, "noiva_nacionalidade", None),
naturalidade=getattr(
ato_antigo, "lv_antigo_noiva_naturalidade", None
),
data_nascimento=getattr(
ato_antigo, "lv_antigo_noiva_nascimento", None
),
nome_mae=getattr(ato_antigo, "lv_antigo_noiva_mae", None),
nome_pai=getattr(ato_antigo, "lv_antigo_noiva_pai", None),
)
)
ok("Noivo(a) localizado..: " + nome)
info("Vinculando ao casamento..: " + nome)
info("Criando sequência de vinculo..: " + nome)
sequencia = self._gerar_sequencia("V_PESSOA_VINCULO")
v_pessoa_vinculo_save_service = VPessoaVInculoSaveService()
response = v_pessoa_vinculo_save_service.execute(
VPessoaVinculoSaveSchema(
pessoa_vinculo_id=sequencia,
registro_id=ato_antigo.casamento_id,
livro_natureza_id=4,
nome=getattr(response_pessoa, "nome", None),
pessoa_id=response_pessoa.pessoa_id,
tb_estadocivil_id=getattr(response_pessoa, "tb_estadocivil_id", None),
tb_profissao_id=getattr(response_pessoa, "tb_profissao_id", None),
tipo_cadastro_geral=7,
)
)
if response:
ok("Noivo(a) vinculado..: " + ato_antigo.noivo_nome_atual)
def execute(self):
v_casamento_controller = VCasamentoController()
response_atos_antigos = v_casamento_controller.AntigosIndex()
for ato_antigo in response_atos_antigos:
rule("Casamento.:" + str(ato_antigo.casamento_id))
v_pessoa_vinculo_service = VPessoaVinculoIndexService()
response_pessoas = v_pessoa_vinculo_service.execute(
VPessoaVinculoIndexSchema(
registro_id=ato_antigo.casamento_id, livro_natureza_id=4
)
)
if len(response_pessoas) == 0:
info("Casamento antigo sem partes vinculadas")
if ato_antigo.noivo_nome_atual:
self._vincular_pessoa(ato_antigo.noivo_nome_atual, ato_antigo, "M")
if ato_antigo.noivo_nome_atual:
self._vincular_pessoa(ato_antigo.noiva_nome_atual, ato_antigo, "F")

View file

@ -0,0 +1,18 @@
from packages.v1.parametros.repositories.g_config.g_config_show_by_nome_repository import (
GConfigShowByNomeRepository,
)
from packages.v1.parametros.schemas.g_config_schema import (
GConfigNomeSchema,
GConfigResponseSchema,
)
class GConfigShowByNomeAction:
def execute(self, g_config_nome_schema: GConfigNomeSchema) -> GConfigResponseSchema:
# Instânciamento de repositório
g_config_show_by_nome_repository = GConfigShowByNomeRepository()
# Execução do repositório
return g_config_show_by_nome_repository.execute(g_config_nome_schema)

View file

@ -0,0 +1,18 @@
from packages.v1.parametros.repositories.g_config.g_config_show_by_nome_repository import (
GConfigShowByNomeRepository,
)
from packages.v1.parametros.schemas.g_config_schema import (
GConfigNomeSchema,
GConfigResponseSchema,
)
class GConfigShowByNomeAction:
def execute(self, g_config_nome_schema: GConfigNomeSchema) -> GConfigResponseSchema:
# Instânciamento de repositório
g_config_show_by_nome_repository = GConfigShowByNomeRepository()
# Execução do repositório
return g_config_show_by_nome_repository.execute(g_config_nome_schema)

View file

@ -0,0 +1,31 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from packages.v1.parametros.schemas.g_config_schema import (
GConfigNomeSchema,
GConfigResponseSchema,
)
class GConfigShowByNomeRepository(BaseRepositoryFirebird):
def execute(self, g_config_nome_schema: GConfigNomeSchema) -> GConfigResponseSchema:
# Montagem da consulta sql
sql = """ SELECT
FIRST 1 GC.*
FROM G_CONFIG GC
JOIN G_CONFIG_GRUPO GCG ON GC.CONFIG_GRUPO_ID = GCG.CONFIG_GRUPO_ID
WHERE GC.NOME LIKE :nome
AND GCG.SISTEMA_ID = :sistema_id
"""
# Preenchimento dos parâmetros
params = {
"nome": g_config_nome_schema.nome,
"sistema_id": g_config_nome_schema.sistema_id,
}
# Execução do sql
response = self.fetch_one(sql, params)
# Transforma em dict associativo
return response

View file

@ -0,0 +1,29 @@
from typing import Optional
from pydantic import BaseModel, ConfigDict
class GConfigSchema(BaseModel):
config_id: Optional[float] = None
config_grupo_id: Optional[float] = None
config_padrao_id: Optional[float] = None
secao: Optional[str] = None
nome: Optional[str] = None
valor: Optional[str] = None
descricao: Optional[str] = None
texto: Optional[str] = None
terminal: Optional[str] = None
tipo_valor: Optional[str] = None
atualizado: Optional[str] = None
# substitui a antiga inner class Config
model_config = ConfigDict(from_attributes=True)
class GConfigResponseSchema(GConfigSchema):
model_config = ConfigDict(from_attributes=True)
class GConfigNomeSchema(BaseModel):
nome: str = None
sistema_id: float = None
model_config = ConfigDict(from_attributes=True)

View file

@ -0,0 +1,18 @@
from packages.v1.parametros.actions.g_config.g_config_show_by_nome_action import (
GConfigShowByNomeAction,
)
from packages.v1.parametros.schemas.g_config_schema import (
GConfigNomeSchema,
GConfigResponseSchema,
)
class GConfigShowByNomeService:
def execute(self, g_config_nome_schema: GConfigNomeSchema) -> GConfigResponseSchema:
# Instânciamento de Action
g_config_show_by_nome_action = GConfigShowByNomeAction()
# Execução da Ação
return g_config_show_by_nome_action.execute(g_config_nome_schema)

View file

@ -0,0 +1,9 @@
from types import SimpleNamespace
class GSeloLivroCreateObjectAction:
@staticmethod
def execute(data):
"""Cria um objeto SimpleNamespace para atos vinculados."""
return SimpleNamespace(**data)

View file

@ -0,0 +1,17 @@
from packages.v1.selos.repositories.g_selo_livro.g_selo_livro_index_repository import (
GSeloLivroIndexRepository,
)
from packages.v1.selos.schemas.g_selo_livro_schema import (
GSeloLivroIndexSchema,
)
class GSeloLivroIndexAction:
def execute(self, g_selo_livro_index_schema: GSeloLivroIndexSchema):
# Instânciamento de repositório
g_selo_livro_index_repository = GSeloLivroIndexRepository()
# Retorna todos produtos
return g_selo_livro_index_repository.execute(g_selo_livro_index_schema)

View file

@ -0,0 +1,22 @@
from packages.v1.serventias.repositories.t_ato_vinculoparte.t_ato_vinculoparte_index_repository import (
TAatoVinculoParteIndexRepository,
)
from packages.v1.serventias.schemas.t_ato_vinculoparte_schema import (
ResponseTAtoVinculoParteSchema,
TAtoVinculoParteIndexSchema,
)
class TAtoVinculoParteIndexAction(=):
def execute(
self, ato_vinculoparte_index_schema: TAtoVinculoParteIndexSchema
) -> ResponseTAtoVinculoParteSchema:
# Instânciamento de repositório
t_ato_vinculoparte_index_repository = TAatoVinculoParteIndexRepository()
# Retorna todos produtos
return t_ato_vinculoparte_index_repository.execute(
ato_vinculoparte_index_schema
)

View file

@ -0,0 +1,18 @@
# Importação de bibliotecas
from packages.v1.selos.services.g_selo_livro.go.g_selo_livro_index_service import (
GSeloLivroIndexService,
)
from packages.v1.selos.schemas.g_selo_livro_schema import (
GSeloLivroIndexSchema,
)
class GSeloLivroController:
def index(self, g_selo_livro_index_schema: GSeloLivroIndexSchema):
# Importação da classe desejad
g_selo_livro_index_service = GSeloLivroIndexService()
# Lista todos os produtos
return g_selo_livro_index_service.execute(g_selo_livro_index_schema)

View file

@ -0,0 +1,24 @@
from packages.v1.serventias.schemas.t_ato_vinculoparte_schema import (
ResponseTAtoVinculoParteSchema,
TAtoVinculoParteIndexSchema,
)
from packages.v1.serventias.services.t_ato_vinculoparte.go.t_ato_vinculoparte_index_service import (
TAtoVinculoParteIndexService,
)
class TAtoVinculoParteController:
def index(
self, ato_vinculoparte_index_schema: TAtoVinculoParteIndexSchema
) -> ResponseTAtoVinculoParteSchema:
# Importação da classe desejada
t_ato_vinculoparte_index_service = TAtoVinculoParteIndexService()
# Executa a classe importada
response = t_ato_vinculoparte_index_service.execute(
ato_vinculoparte_index_schema
)
return response

View file

@ -0,0 +1,52 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from packages.v1.selos.schemas.g_selo_livro_schema import (
GSeloLivroIndexSchema,
)
class GSeloLivroIndexRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self, g_selo_livro_index_schema: GSeloLivroIndexSchema):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT
COALESCE(NULLIF(TRIM(GU.NOME_COMPLETO), ''), 'Não informado') AS nome_serventuario_praticou_ato,
COALESCE(GSG.NUMERO, 0) AS tipo_ato,
COALESCE(NULLIF(TRIM(GST.NOTA_FISCAL), ''), 'Não informado') AS identificacao_pedido_cgj,
COALESCE(NULLIF(TRIM(GSL.NUMERO_SELO), ''), 'Não informado') AS codigo_selo,
COALESCE(NULLIF(TRIM(GSL.NUMERO_SELO), ''), 'Não informado') AS codigo_ato,
COALESCE(NULLIF(TRIM(GSL.APRESENTANTE), ''), 'Não informado') AS nome_civil_ato,
COALESCE(GSL.USUARIO_ID, 0) AS USUARIO_ID,
GSL.DATA_INFORMACAO as data_solicitacao,
COALESCE(NULLIF(TRIM(GSL.IP_MAQUINA), ''), 'Não informado') AS ip_maquina,
COALESCE(GSL.VALOR_EMOLUMENTO, 0) AS emolumento,
COALESCE(GSL.VALOR_TAXA_JUDICIARIA, 0) AS taxa_judiciaria,
COALESCE(GSL.VALOR_FUNDESP, 0) AS fundos_estaduais
FROM G_SELO_LIVRO GSL
JOIN G_SELO_LOTE GST ON GSL.SELO_LOTE_ID = GST.SELO_LOTE_ID
JOIN G_SELO_GRUPO GSG ON GST.SELO_GRUPO_ID = GSG.SELO_GRUPO_ID
JOIN G_USUARIO GU ON GSL.USUARIO_ID = GU.USUARIO_ID
WHERE GSL.CAMPO_ID = :campo_id
AND GSL.TABELA LIKE :tabela
ORDER BY GSG.NUMERO ASC; """
# Preenchimento dos parâmetros
params = {
"campo_id": g_selo_livro_index_schema.campo_id,
"tabela": g_selo_livro_index_schema.tabela,
}
# Execução do sql
response = self.fetch_all(sql, params)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,37 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from packages.v1.serventias.schemas.t_ato_vinculoparte_schema import (
ResponseTAtoVinculoParteSchema,
TAtoVinculoParteIndexSchema,
)
class TAatoVinculoParteIndexRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(
self, ato_vinculoparte_index_schema: TAtoVinculoParteIndexSchema
) -> ResponseTAtoVinculoParteSchema:
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT
TAV.PESSOA_NOME,
TAV.PESSOA_CPF
FROM T_ATO_VINCULOPARTE TAV
WHERE TAV.ATO_ID = :ato_id """
# Preenchimento dos parâmetros
params = {"ato_id": ato_vinculoparte_index_schema.ato_id}
# Execução do sql
response = self.fetch_all(sql, params)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,31 @@
from pydantic import BaseModel
class GSeloLivroIndexSchema(BaseModel):
campo_id: int
tabela: str
class Config:
from_attributes = True
class GSeloLivro(BaseModel):
selo_livro_id: int
numero_selo: str
apresentante: str
usuario_id: str
data_informacao: str
ip_maquina: str
valor_emolumento: str
valor_taxa_judiciaria: str
valor_fundesp: str
class Config:
from_attributes = True
class ResponseGSeloLivro(BaseModel):
data: list[GSeloLivro]
class Config:
from_attributes = True

View file

@ -0,0 +1,20 @@
from pydantic import BaseModel
class TAtoVinculoParteIndexSchema(BaseModel):
ato_id: int
class Config:
from_attributes = True
class TAtoVinculoParteSchema(BaseModel):
pessoa_nome: str
pessoa_cpf: str
class Config:
from_attributes = True
class ResponseTAtoVinculoParteSchema(BaseModel):
data: list[TAtoVinculoParteSchema]
class Config:
from_attributes = True

View file

@ -0,0 +1,39 @@
from packages.v1.selos.actions.g_selo_livro.g_selo_livro_index_action import (
GSeloLivroIndexAction,
)
from packages.v1.selos.schemas.g_selo_livro_schema import (
GSeloLivroIndexSchema,
)
class GSeloLivroIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self, g_selo_livro_index_schema: GSeloLivroIndexSchema):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
g_selo_livro_index_action = GSeloLivroIndexAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = g_selo_livro_index_action.execute(g_selo_livro_index_schema)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,42 @@
from packages.v1.serventias.actions.t_ato_vinculoparte.t_ato_vinculoparte_index_action import (
TAtoVinculoParteIndexAction,
)
from packages.v1.serventias.schemas.t_ato_vinculoparte_schema import (
ResponseTAtoVinculoParteSchema,
TAtoVinculoParteIndexSchema,
)
class TAtoVinculoParteIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(
self, ato_vinculoparte_index_schema: TAtoVinculoParteIndexSchema
) -> ResponseTAtoVinculoParteSchema:
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
t_ato_vinculoparte_action = TAtoVinculoParteIndexAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = t_ato_vinculoparte_action.execute(ato_vinculoparte_index_schema)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,13 @@
from packages.v1.sequencia.repositories.g_sequencia.checkout import Checkout
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
class CheckoutAction:
def execute(self, sequencia_schema: GSequenciaSchema):
# Instânciamento de repositório
checkout = Checkout()
# Execução do repositório
return checkout.execute(sequencia_schema)

View file

@ -0,0 +1,13 @@
from packages.v1.sequencia.repositories.g_sequencia.get import Get
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
class GetAction:
def execute(self, sequencia_schema: GSequenciaSchema):
# Instânciamento de repositório
get = Get()
# Execução do repositório
return get.execute(sequencia_schema)

View file

@ -0,0 +1,13 @@
from packages.v1.sequencia.repositories.g_sequencia.save import Save
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
class SaveAction:
def execute(self, sequencia_schema: GSequenciaSchema):
# Instânciamento de repositório
save = Save()
# Execução do repositório
return save.execute(sequencia_schema)

View file

@ -0,0 +1,71 @@
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from abstracts.repository_firebird import BaseRepositoryFirebird
from fastapi import HTTPException, status
class Checkout(BaseRepositoryFirebird):
def execute(self, sequencia_schema: GSequenciaSchema):
# 1) Descobre o nome da PK a partir dos metadados
sql = """
SELECT
sg.RDB$FIELD_NAME AS primary_key
FROM RDB$RELATION_CONSTRAINTS rc
JOIN RDB$INDEX_SEGMENTS sg
ON rc.RDB$INDEX_NAME = sg.RDB$INDEX_NAME
WHERE rc.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY'
AND rc.RDB$RELATION_NAME = UPPER(:tabela)
ORDER BY sg.RDB$FIELD_POSITION
"""
params = {"tabela": sequencia_schema.tabela}
pk_result = self.fetch_one(sql, params)
if not pk_result:
raise Exception(
f"Tabela {sequencia_schema.tabela} não possui chave primária"
)
# CORREÇÃO AQUI
pk_field = pk_result.primary_key.strip()
# 2) Monta dinamicamente a query para buscar o último ID
sql = f"SELECT MAX({pk_field}) AS last_id FROM {sequencia_schema.tabela}"
last_id_row = self.fetch_one(sql)
# CORREÇÃO: SimpleNamespace → acessar como atributo
last_id = last_id_row.last_id if last_id_row else None
if last_id is None:
last_id = 0 # tabela vazia
# 3) Verifica se a tabela foi cadastrada no G_SEQUENCIA
sql = """
SELECT *
FROM G_SEQUENCIA
WHERE TABELA = :tabela
"""
params = {"tabela": sequencia_schema.tabela}
g_seq = self.fetch_one(sql, params)
# 4) Se não houver registro no g_sequencia, cadastra um novo
if not g_seq:
sql = """
INSERT INTO G_SEQUENCIA (TABELA, SEQUENCIA)
VALUES (:tabela, :sequencia)
"""
params = {"tabela": sequencia_schema.tabela, "sequencia": last_id}
self.run(sql, params)
return last_id
# 5) Se a sequência estiver divergente
if g_seq.sequencia != last_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=(
f"A sequência atual ({g_seq.sequencia}) está divergente "
f"do último ID da tabela ({last_id})"
),
)
# Tudo ok → retorna o last_id
return last_id

View file

@ -0,0 +1,19 @@
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from abstracts.repository_firebird import BaseRepositoryFirebird
class Get(BaseRepositoryFirebird):
def execute(self, sequencia_schema: GSequenciaSchema):
# Montagem da consulta sql
sql = """ SELECT FIRST 1 * FROM G_SEQUENCIA gs WHERE gs.TABELA LIKE :tabela """
# Preenchimento dos parâmetros
params = {"tabela": sequencia_schema.tabela}
# Execução do sql
response = self.fetch_one(sql, params)
# Transforma em dict associativo
return response

View file

@ -0,0 +1,25 @@
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from abstracts.repository_firebird import BaseRepositoryFirebird
class Save(BaseRepositoryFirebird):
def execute(self, sequencia_schema: GSequenciaSchema):
# Construção do sql
sql = """ UPDATE G_SEQUENCIA
SET SEQUENCIA = :sequencia
WHERE TABELA LIKE :tabela
RETURNING TABELA, SEQUENCIA """
# Preenchimento de parâmetros
params = {
"sequencia": sequencia_schema.sequencia,
"tabela": sequencia_schema.tabela,
}
# Execução do sql
response = self.run_and_return(sql, params)
# Retorna como verdadeiro se for salvo com sucesso
return response

View file

@ -0,0 +1,11 @@
from typing import Optional
from pydantic import BaseModel
class GSequenciaSchema(BaseModel):
tabela: Optional[str] = None
sequencia: Optional[str] = None
class Config:
from_attributes = True

View file

@ -0,0 +1,25 @@
from packages.v1.sequencia.actions.g_sequencia.checkout_action import CheckoutAction
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from fastapi import HTTPException, status
class GenerateService:
def execute(self, sequencia_schema: GSequenciaSchema):
# Instânciamento de Action
checkoutAction = CheckoutAction()
# Atualiza a sequência atual
data = checkoutAction.execute(sequencia_schema)
# Verifica se foi localizado o registro
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Não foi possível localizar a tabela para verificação de sequência",
)
# Retorna a informação localizada
return data

View file

@ -0,0 +1,26 @@
from packages.v1.sequencia.actions.g_sequencia.get_action import GetAction
from packages.v1.sequencia.actions.g_sequencia.save_action import SaveAction
from packages.v1.sequencia.actions.g_sequencia.checkout_action import CheckoutAction
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
class GenerateService:
def execute(self, sequencia_schema: GSequenciaSchema):
# Instânciamento de Action
getAction = GetAction()
saveAction = SaveAction()
checkoutAction = CheckoutAction()
# # Verifico se a tabela existe no G_SEQUENCIA e se a sequência está correta
# checkoutAction.execute(sequencia_schema)
# Busco a sequência atual
sequencia_result = getAction.execute(sequencia_schema)
# Incrementa a sequência atual
sequencia_schema.sequencia = sequencia_result.sequencia + 1
# Atualiza a sequência atual
return saveAction.execute(sequencia_schema)

View file

@ -0,0 +1,14 @@
from packages.v1.sequencia.actions.g_sequencia.save_action import \
SaveAction
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
class SaveService:
def execute(self, sequencia_schema : GSequenciaSchema):
# Instânciamento de Action
saveAction = SaveAction()
# Execução da Ação
return saveAction.execute(sequencia_schema)

View file

@ -0,0 +1,14 @@
from packages.v1.serventias.repositories.v_casamento.v_casamento_antigo_index_repository import (
VCasamentoAntigoIndexRepository,
)
class VCasamentoAntigosIndexAction:
def execute(self):
# Instânciamento de repositório
v_casamento_index_repository = VCasamentoAntigoIndexRepository()
# Retorna todos produtos
return v_casamento_index_repository.execute()

View file

@ -0,0 +1,17 @@
from packages.v1.serventias.repositories.v_pessoa.v_pessoa_index_repository import (
VPessoaIndexRepository,
)
from packages.v1.serventias.schemas.v_pessoa_schema import (
VPessoaSaveSchema,
)
class VPessoaIndexAction:
def execute(self, v_pessoa_search: VPessoaSaveSchema):
# Instânciamento de repositório
v_pessoa_save = VPessoaIndexRepository()
# Retorna todos produtos
return v_pessoa_save.execute(v_pessoa_search)

View file

@ -0,0 +1,17 @@
from packages.v1.serventias.repositories.v_pessoa.v_pessoa_save_repository import (
VPessoaSaveRepository,
)
from packages.v1.serventias.schemas.v_pessoa_schema import (
VPessoaSaveSchema,
)
class VPessoaSaveAction:
def execute(self, v_pessoa_save_schema: VPessoaSaveSchema):
# Instânciamento de repositório
v_pessoa_save = VPessoaSaveRepository()
# Retorna todos produtos
return v_pessoa_save.execute(v_pessoa_save_schema)

View file

@ -0,0 +1,17 @@
from packages.v1.serventias.repositories.v_pessoa_vinculo.v_pessoa_vinculo_index_repository import (
VPessoaVinculoIndexRepository,
)
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
)
class VPessoaVinculoIndexAction:
def execute(self, v_pessoa_vinculo_index_schema: VPessoaVinculoIndexSchema):
# Instânciamento de repositório
v_pessoa_vinculo_index_repository = VPessoaVinculoIndexRepository()
# Retorna todos produtos
return v_pessoa_vinculo_index_repository.execute(v_pessoa_vinculo_index_schema)

View file

@ -0,0 +1,17 @@
from packages.v1.serventias.repositories.v_pessoa_vinculo.v_pessoa_vinculo_save_repository import (
VPessoaVinculoSaveRepository,
)
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoSaveSchema,
)
class VPessoaVinculoSaveAction:
def execute(self, v_pessoa_vinculo_save_schema: VPessoaVinculoSaveSchema):
# Instânciamento de repositório
v_pessoa_vinculo_save_repository = VPessoaVinculoSaveRepository()
# Retorna todos produtos
return v_pessoa_vinculo_save_repository.execute(v_pessoa_vinculo_save_schema)

View file

@ -0,0 +1,14 @@
from packages.v1.serventias.services.v_casamento.go.v_casamento_antigo_index_service import (
VCasamentosAntigosIndexService,
)
class VCasamentoController:
def AntigosIndex(self):
# Importação da classe desejad
antigos_index = VCasamentosAntigosIndexService()
# Lista todos os produtos
return antigos_index.execute()

View file

@ -0,0 +1,23 @@
# Importação de bibliotecas
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
)
from packages.v1.serventias.services.v_pessoa_vinculo.go.v_pessoa_vinculo_index_service import (
VPessoaVinculoIndexService,
)
class VPessoaVinculoController:
def index(self, v_pessoa_vinculo_index_schema: VPessoaVinculoIndexSchema):
# Importação da classe desejad
v_pessoa_vinculo_index_service = VPessoaVinculoIndexService()
# Intânciamento da classe service
self.v_pessoa_vinculo_index_service = v_pessoa_vinculo_index_service
# Lista todos os produtos
return self.v_pessoa_vinculo_index_service.execute(
v_pessoa_vinculo_index_schema
)

View file

@ -0,0 +1,52 @@
import sys
from abstracts.repository_firebird import BaseRepositoryFirebird
class VCasamentoAntigoIndexRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT
VCA.CASAMENTO_ID,
VCA.NOIVO_CPF,
VCA.NOIVO_NOME_ATUAL,
VCA.NOIVO_NACIONALIDADE,
VCA.NOIVA_CPF,
VCA.NOIVA_NOME_ATUAL,
VCA.NOIVA_NACIONALIDADE,
VCA.LV_ANTIGO_NOIVO_NASCIMENTO,
VCA.LV_ANTIGO_NOIVO_MAE,
VCA.LV_ANTIGO_NOIVO_PAI,
VCA.LV_ANTIGO_NOIVA_NASCIMENTO,
VCA.LV_ANTIGO_NOIVA_PAI,
VCA.LV_ANTIGO_NOIVA_MAE,
VCA.LV_ANTIGO_NOIVO_NATURALIDADE,
VCA.LV_ANTIGO_NOIVA_NATURALIDADE
FROM V_CASAMENTO VCA
WHERE VCA.LV_ANTIGO_TEXTO_NOIVO IS NOT NULL
OR VCA.LV_ANTIGO_TEXTO_NOIVA IS NOT NULL
OR VCA.LV_ANTIGO_NOIVO_NASCIMENTO IS NOT NULL
OR VCA.LV_ANTIGO_NOIVO_MAE IS NOT NULL
OR VCA.LV_ANTIGO_NOIVO_PAI IS NOT NULL
OR VCA.LV_ANTIGO_NOIVA_NASCIMENTO IS NOT NULL
OR VCA.LV_ANTIGO_NOIVA_PAI IS NOT NULL
OR VCA.LV_ANTIGO_NOIVA_MAE IS NOT NULL
OR VCA.LV_ANTIGO_NOIVO_NATURALIDADE IS NOT NULL
OR VCA.LV_ANTIGO_NOIVA_NATURALIDADE IS NOT NULL
ORDER BY CASAMENTO_ID ASC"""
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,29 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from packages.v1.serventias.schemas.v_pessoa_schema import VPessoaSearchSchema
class VPessoaIndexRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self, v_pessoa_search: VPessoaSearchSchema):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT FIRST 1 * FROM V_PESSOA WHERE NOME LIKE :nome"""
params = {
"nome": v_pessoa_search.nome,
}
# Execução do sql
response = self.fetch_one(sql, params)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,27 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from actions.data.generate_insert_sql import generate_insert_sql
from packages.v1.serventias.schemas.v_pessoa_schema import VPessoaSaveSchema
class VPessoaSaveRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self, v_pessoa_save_schema: VPessoaSaveSchema):
# ----------------------------------------------------
# Preenchimento dos parâmetros
# ----------------------------------------------------
params = v_pessoa_save_schema.model_dump(exclude_unset=True)
# ----------------------------------------------------
# Montagem do SQL dinâmico
# ----------------------------------------------------
sql = generate_insert_sql("V_PESSOA", params)
# ----------------------------------------------------
# Execução do SQL e retorno do registro
# ----------------------------------------------------
return self.run_and_return(sql, params)

View file

@ -0,0 +1,38 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
)
class VPessoaVinculoIndexRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self, v_pessoa_vinculo_index_schema: VPessoaVinculoIndexSchema):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """
SELECT
VPV.PESSOA_VINCULO_ID
FROM V_PESSOA_VINCULO VPV
WHERE VPV.REGISTRO_ID = :registro_id
AND VPV.LIVRO_NATUREZA_ID = :livro_natureza_id
"""
params = {
"registro_id": v_pessoa_vinculo_index_schema.registro_id,
"livro_natureza_id": v_pessoa_vinculo_index_schema.livro_natureza_id,
}
# Execução do sql
response = self.fetch_all(sql, params)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,30 @@
from abstracts.repository_firebird import BaseRepositoryFirebird
from actions.data.generate_insert_sql import generate_insert_sql
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
VPessoaVinculoSaveSchema,
)
class VPessoaVinculoSaveRepository(BaseRepositoryFirebird):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_censec_qualidade.
"""
def execute(self, v_pessoa_vinculo_index_schema: VPessoaVinculoSaveSchema):
# ----------------------------------------------------
# Preenchimento dos parâmetros
# ----------------------------------------------------
params = v_pessoa_vinculo_index_schema.model_dump(exclude_unset=True)
# ----------------------------------------------------
# Montagem do SQL dinâmico
# ----------------------------------------------------
sql = generate_insert_sql("V_PESSOA_VINCULO", params)
# ----------------------------------------------------
# Execução do SQL e retorno do registro
# ----------------------------------------------------
return self.run_and_return(sql, params)

View file

@ -0,0 +1,22 @@
from pydantic import BaseModel
class VCasamentoIndex(BaseModel):
casamento_id: int
class Config:
from_attributes = True
class VCasamento(BaseModel):
casamento_id: int
class Config:
from_attributes = True
class ResponseVCasamento(BaseModel):
data: list[VCasamento]
class Config:
from_attributes = True

View file

@ -0,0 +1,89 @@
from datetime import date
from pydantic import BaseModel
class VPessoaSearchSchema(BaseModel):
nome: str
class Config:
from_attributes = True
class VPessoaSaveSchema(BaseModel):
pessoa_id: int | None = None # PK opcional (create/update)
nome: str
sexo: str | None = None
data_nascimento: date | None = None
nacionalidade: str | None = None
naturalidade: str | None = None
cpf_cnpj: str | None = None
tb_profissao_id: int | None = None
tb_estadocivil_id: int | None = None
tb_documentotipo_id: int | None = None
documento: str | None = None
endereco: str | None = None
numero_endereco: str | None = None
bairro: str | None = None
cidade: str | None = None
cep: str | None = None
uf: str | None = None
telefone: str | None = None
ddd: str | None = None
email: str | None = None
pessoa_tipo: int | None = None
cpf_terceiro: str | None = None
nome_pai: str | None = None
nome_mae: str | None = None
nome_especial_pai: str | None = None
nome_especial_mae: str | None = None
tempo_residencia: int | None = None
naturalidade_cidade_id: int | None = None
uf_naturalidade: str | None = None
pais_nascimento: str | None = None
cidade_nat_id: int | None = None
cidade_res_id: int | None = None
data_falecimento: date | None = None
pai_falecido: bool | None = None
mae_falecida: bool | None = None
data_emissao_doc: date | None = None
uf_emissor_doc: str | None = None
serie_doc: str | None = None
orgao_emissor_id: int | None = None
texto_irmaos_gemeos: str | None = None
grupo_sanguineo: str | None = None
data_validade: date | None = None
zona_secao: str | None = None
cidade_emissao_id: int | None = None
cert_cartorio_id: int | None = None
cert_cidade_id: int | None = None
cert_folha: str | None = None
cert_livro: str | None = None
cert_termo: str | None = None
cert_uf: str | None = None
cert_data: date | None = None
pessoa_conjuge_cpf: str | None = None
pessoa_conjuge_id: int | None = None
pessoa_conjuge_nome: str | None = None
tabela_origem: str | None = None
chave_pessoa_imp: str | None = None
ignorar_documentos: bool | None = None
class Config:
from_attributes = True

View file

@ -0,0 +1,67 @@
from datetime import date
from pydantic import BaseModel
class VPessoaVinculoIndexSchema(BaseModel):
registro_id: int
livro_natureza_id: int
class Config:
from_attributes = True
class VPessoaVinculo(BaseModel):
pessoa_vinculo_id: int
livro_natureza_id: int
registro_id: str
nome: str
cpf_cnpj: str
telefone: str
class Config:
from_attributes = True
class ResponseVPessoaVinculo(BaseModel):
data: list[VPessoaVinculo]
class Config:
from_attributes = True
class VPessoaVinculoSaveSchema(BaseModel):
pessoa_vinculo_id: int | None = None # PK opcional (create/update)
nome: str
tb_estadocivil_id: int | None = None
tb_profissao_id: int | None = None
pessoa_id: int | None = None
falecido: bool | None = None
tipo_cadastro_familia: int | None = None
livro_natureza_id: int
registro_id: int # ajuste para str se o seu modelo/trilha usar string
tipo_cadastro_geral: int | None = None
cidade_residencia: str | None = None
idade: int | None = None
auxiliar: bool | None = None
ordem: int | None = None
tb_partelivroe_id: int | None = None
declarante: bool | None = None
nome_especial: str | None = None
cert_data: date | None = None
cert_folha: str | None = None
cert_livro: str | None = None
cert_termo: str | None = None
cert_uf: str | None = None
chave_importacao: str | None = None
tabela_vinculo: str | None = None
abandonador_pessoa_id: int | None = None
assinatura: bool | None = None
class Config:
from_attributes = True

View file

@ -0,0 +1,31 @@
from packages.v1.serventias.actions.v_casamento.v_casamento_antigo_index_action import (
VCasamentoAntigosIndexAction,
)
class VCasamentosAntigosIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
antigos_index = VCasamentoAntigosIndexAction()
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return antigos_index.execute()

View file

@ -0,0 +1,43 @@
from packages.v1.serventias.actions.v_pessoa.v_pessoa_index_action import (
VPessoaIndexAction,
)
from packages.v1.serventias.actions.v_pessoa_vinculo.v_pessoa_vinculo_index_action import (
VPessoaVinculoIndexAction,
)
from packages.v1.serventias.schemas.v_pessoa_schema import VPessoaSearchSchema
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
)
class VPessoaIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self, v_pessoa_search: VPessoaSearchSchema):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
v_pessoa_index_action = VPessoaIndexAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = v_pessoa_index_action.execute(v_pessoa_search)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,37 @@
from packages.v1.serventias.actions.v_pessoa.v_pessoa_save_action import (
VPessoaSaveAction,
)
from packages.v1.serventias.schemas.v_pessoa_schema import VPessoaSaveSchema
class VPessoaSaveService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self, v_pessoa_save_schema: VPessoaSaveSchema):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
v_pessoa_save = VPessoaSaveAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = v_pessoa_save.execute(v_pessoa_save_schema)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,39 @@
from packages.v1.serventias.actions.v_pessoa_vinculo.v_pessoa_vinculo_index_action import (
VPessoaVinculoIndexAction,
)
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoIndexSchema,
)
class VPessoaVinculoIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self, v_pessoa_vinculo_index_schema: VPessoaVinculoIndexSchema):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
v_pessoa_vinculo_index_action = VPessoaVinculoIndexAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = v_pessoa_vinculo_index_action.execute(v_pessoa_vinculo_index_schema)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,39 @@
from packages.v1.serventias.actions.v_pessoa_vinculo.v_pessoa_vinculo_save_action import (
VPessoaVinculoSaveAction,
)
from packages.v1.serventias.schemas.v_pessoa_vinculo_schema import (
VPessoaVinculoSaveSchema,
)
class VPessoaVInculoSaveService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self, v_pessoa_vinculo_save_schema: VPessoaVinculoSaveSchema):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
v_pessoa_vinculo_index_action = VPessoaVinculoSaveAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = v_pessoa_vinculo_index_action.execute(v_pessoa_vinculo_save_schema)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

View file

@ -0,0 +1,90 @@
@echo off
setlocal EnableExtensions EnableDelayedExpansion
:: ===== Configuração/ajuda =====
if /I "%~1"=="-h" goto :help
if /I "%~1"=="/h" goto :help
if /I "%~1"=="--help" goto :help
:: Pasta raiz = 1º argumento ou pasta atual
set "ROOT=%~1"
if not defined ROOT set "ROOT=%cd%"
:: Checa flag /dry-run em qualquer argumento
set "DRYRUN="
for %%A in (%*) do (
if /I "%%~A"=="/dry-run" set "DRYRUN=1"
)
:: Normaliza ROOT removendo aspas extras
for %%# in ("%ROOT%") do set "ROOT=%%~f#"
if not exist "%ROOT%" (
echo [ERRO] Pasta nao encontrada: "%ROOT%"
exit /b 1
)
:: ===== Timestamp e log =====
set "TS=%date%_%time%"
set "TS=%TS:/=%"
set "TS=%TS::=%"
set "TS=%TS:.=%"
set "TS=%TS:,=%"
set "TS=%TS: =0%"
set "LOG=%ROOT%\cleanup_pycache_%TS%.log"
echo ================================================== > "%LOG%"
echo Limpeza de __pycache__ >> "%LOG%"
echo Pasta raiz: "%ROOT%" >> "%LOG%"
if defined DRYRUN (echo Modo: DRY-RUN (apenas listar) >> "%LOG%") else (echo Modo: EXECUTANDO REMOCOES >> "%LOG%")
echo Iniciado: %date% %time% >> "%LOG%"
echo ================================================== >> "%LOG%"
set /a FOUND=0, OK=0, ERR=0
:: ===== Procura e (opcionalmente) remove =====
for /d /r "%ROOT%" %%D in (__pycache__) do (
set /a FOUND+=1
if defined DRYRUN (
echo [LISTAR] "%%~fD"
>>"%LOG%" echo [LISTAR] "%%~fD"
) else (
echo [APAGAR] "%%~fD"
rd /s /q "%%~fD" 1>nul 2>nul
if exist "%%~fD" (
set /a ERR+=1
>>"%LOG%" echo [FALHA] "%%~fD"
) else (
set /a OK+=1
>>"%LOG%" echo [OK] "%%~fD"
)
)
)
echo.>>"%LOG%"
echo Pastas encontradas: %FOUND% >> "%LOG%"
echo Removidas com sucesso: %OK% >> "%LOG%"
echo Falhas: %ERR% >> "%LOG%"
echo Finalizado: %date% %time% >> "%LOG%"
:: ===== Resumo no console =====
echo.
echo ===== RESUMO =====
echo Pasta raiz: "%ROOT%"
if defined DRYRUN (echo Modo: DRY-RUN ^(nao removeu nada^))
echo Pastas encontradas: %FOUND%
if not defined DRYRUN (
echo Removidas com sucesso: %OK%
echo Falhas: %ERR%
)
echo Log salvo em: "%LOG%"
exit /b 0
:help
echo Uso:
echo cleanup_pycache.bat [PASTA_RAIZ] [/dry-run]
echo.
echo Ex.: cleanup_pycache.bat "D:\Projetos\MeuApp"
echo Ex.: cleanup_pycache.bat /dry-run
echo Ex.: cleanup_pycache.bat "D:\Repos" /dry-run
exit /b 0

Some files were not shown because too many files have changed in this diff Show more