[BE-01] feat: Implementado firebird + sqlAlchemy. Criado o pacote de sequências

This commit is contained in:
Keven Willian 2025-07-02 13:34:22 -03:00
parent f353cf630f
commit a3492cd245
21 changed files with 413 additions and 55 deletions

View file

@ -1,12 +1,13 @@
from core.base.base_action import BaseAction
from api.v1.packages.administrative.repositories.c_caixa_item.index import Index
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSearchSchema
class IndexAction(BaseAction):
def execute(self):
def execute(self, search : CaixaItemSearchSchema):
# Instânciamento de repositório
index = Index()
# Retorna todos produtos
return index.execute()
return index.execute(search)

View file

@ -1,5 +1,5 @@
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
from api.v1.packages.administrative.repositories.c_caixa_item.create import Create
from api.v1.packages.administrative.repositories.c_caixa_item.save import Save
from core.base.base_action import BaseAction
class SaveAction(BaseAction):
@ -7,7 +7,7 @@ class SaveAction(BaseAction):
def execute(self, caixa_item_schema : CaixaItemSchema):
# Instância o repositório desejado
create = Create()
save = Save()
# Executa o respositório desejado
return create.execute(caixa_item_schema)
return save.execute(caixa_item_schema)

View file

@ -1,10 +1,11 @@
# Importação de bibliotecas
from core.utils.dynamic_import import DynamicImport
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSearchSchema
class CCaixaItemController:
def index(self):
def index(self, search : CaixaItemSearchSchema):
# Importação da classe desejad
indexService = DynamicImport.service("administrative", "c_caixa_item", "index_service", "IndexService")
@ -13,7 +14,7 @@ class CCaixaItemController:
self.indexService = indexService()
# Lista todos os produtos
return self.indexService.execute()
return self.indexService.execute(search)
def create(self, caixa_item_schema: CaixaItemSchema):

View file

@ -1,7 +1,9 @@
# Importação de bibliotecas
from fastapi import APIRouter, status, Depends
from fastapi import APIRouter, status, Depends, Body
from typing import Optional
from api.v1.packages.administrative.controllers.c_caixa_item_controller import CCaixaItemController
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSearchSchema
# Inicializar o roteaodr para as rotas de produtos
router = APIRouter()
@ -9,17 +11,23 @@ router = APIRouter()
# Instãnciamento do controller desejado
cCaixaItemController = CCaixaItemController()
@router.get("/", status_code=status.HTTP_200_OK)
async def index():
@router.get("/",
status_code=status.HTTP_200_OK,
summary="Busca itens com filtros opcionais",
response_description="Lista de itens encontrados com base nos critérios de busca.")
async def index(search : Optional[CaixaItemSchema] = Body(default=None)):
# Busca todos os produtos cadastrados
response = cCaixaItemController.index()
response = cCaixaItemController.index(search)
# Retornar os dados localizados
return {
"data": response
}
@router.post('/', status_code=status.HTTP_201_CREATED)
@router.post('/',
status_code=status.HTTP_201_CREATED,
summary="Cadastrar uma nova receita ou despesa no sistema",
response_description="Confirmação do cadastro da receita ou despesa, incluindo detalhes do item criado.")
async def save(caixa_item_schema: CaixaItemSchema):
# Salva o produto desejado

View file

@ -1,29 +0,0 @@
# Importação de bibliotecas
from core.base.base_repository import BaseRepository
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
class Create(BaseRepository):
def execute(self, caixa_item : CaixaItemSchema):
# Realiza a inserção do registro
self.cursor.execute(
"""
INSERT INTO C_CAIXA_ITEM (
CAIXA_ITEM_ID,
CAIXA_SERVICO_ID
) VALUES (
?,
?
);
""",
(
caixa_item.caixa_item_id, caixa_item.caixa_servico_id,
),
)
# Comita a transação
self.commit()
# Retorna como verdadeiro se for salvo com sucesso
return True

View file

@ -1,14 +1,57 @@
# Importação de bibliotecas
from core.base.base_repository import BaseRepository
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSearchSchema
from pydantic import BaseModel
from typing import Optional
class Index(BaseRepository):
def execute(self):
def execute(self, search: Optional[CaixaItemSearchSchema]):
print("### Iniciando método execute ###")
if search is None:
print("⚠️ search é None — retornando lista vazia")
return []
where_clauses = []
params = []
for campo, intervalo in search.__dict__.items():
print(f"🔍 Campo: {campo}, Valor: {intervalo}, Tipo: {type(intervalo)}")
if isinstance(intervalo, BaseModel):
date_start = getattr(intervalo, "date_start", None)
date_end = getattr(intervalo, "date_end", None)
if date_start and date_end:
where_clauses.append(f"cci.{campo} BETWEEN ? AND ?")
params.extend([date_start, date_end])
elif date_start:
where_clauses.append(f"cci.{campo} >= ?")
params.append(date_start)
elif date_end:
where_clauses.append(f"cci.{campo} <= ?")
params.append(date_end)
# Montagem da SQL
sql = """
SELECT FIRST 100 *
FROM c_caixa_item cci \
"""
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
sql += " ORDER BY caixa_item_id DESC"
print("✅ SQL Final:", sql)
print("📦 Params:", params)
# Executa a query
self.cursor.execute(sql, params)
columns = [col[0] for col in self.cursor.description]
results = [
{columns[i]: row[i] for i in range(len(columns))}
for row in self.cursor.fetchall()
]
self.cursor.execute("""SELECT * FROM c_caixa_item cci""")
columns = [col[0] for col in self.cursor.description] # lista dos nomes das colunas
results = []
for row in self.cursor.fetchall():
results.append({columns[i]: row[i] for i in range(len(columns))})
self.commit()
return results

View file

@ -0,0 +1,79 @@
# Importação de bibliotecas
from sqlalchemy import create_engine, text
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
from core.connections.firebird_4 import Firebird4
class Save():
def __init__(self):
firebird = Firebird4()
self.engine = firebird.connect()
def execute(self, caixa_item : CaixaItemSchema):
# SQL para update ou insert
sql = """ UPDATE OR INSERT INTO C_CAIXA_ITEM (
ESPECIE_PAGAMENTO,
CAIXA_ITEM_ID,
CAIXA_SERVICO_ID,
USUARIO_SERVICO_ID,
USUARIO_CAIXA_ID,
DESCRICAO,
DATA_PAGAMENTO,
SITUACAO,
TIPO_DOCUMENTO,
TIPO_TRANSACAO,
VALOR_SERVICO,
VALOR_PAGO,
OBSERVACAO,
HORA_PAGAMENTO,
TIPO_SERVICO,
REGISTRADO
) VALUES (
:especie_pagamento,
:caixa_item_id,
:caixa_servico_id,
:usuario_servico_id,
:usuario_caixa_id,
:descricao,
:data_pagamento,
:situacao,
:tipo_documento,
:tipo_transacao,
:valor_servico,
:valor_pago,
:observacao,
:hora_pagamento,
:tipo_servico,
:registrado
)
MATCHING (CAIXA_ITEM_ID); """
# Preenchimento de Parâmetros
params = {
"especie_pagamento" : caixa_item.especie_pagamento,
"caixa_item_id" : caixa_item.caixa_item_id,
"caixa_servico_id" : caixa_item.caixa_servico_id,
"usuario_servico_id" : caixa_item.usuario_servico_id,
"usuario_caixa_id" : caixa_item.usuario_caixa_id,
"descricao" : caixa_item.descricao,
"data_pagamento" : caixa_item.data_pagamento,
"situacao" : caixa_item.situacao,
"tipo_documento" : caixa_item.tipo_documento,
"tipo_transacao" : caixa_item.tipo_transacao,
"valor_servico" : caixa_item.valor_servico,
"valor_pago" : caixa_item.valor_pago,
"observacao" : caixa_item.observacao,
"hora_pagamento" : caixa_item.hora_pagamento,
"tipo_servico" : caixa_item.tipo_servico,
"registrado" : caixa_item.registrado
}
# Conexão e execução
with self.engine.begin() as conn: # garante commit automático
conn.execute(text(sql), params)
# Retorna os dados registrados
return caixa_item

View file

@ -8,7 +8,7 @@ class CaixaItemSchema(BaseModel):
caixa_servico_id: Optional[int] = None
usuario_servico_id: Optional[int] = None
usuario_caixa_id: Optional[int] = None
chave_servico: Optional[str] = None
chave_servico: Optional[int] = None
descricao: Optional[str] = None
data_pagamento: Optional[date] = None
situacao: Optional[str] = None
@ -18,7 +18,7 @@ class CaixaItemSchema(BaseModel):
valor_pago: Optional[float] = None
observacao: Optional[str] = None
caixa_cheque_id: Optional[int] = None
hora_pagamento: Optional[datetime] = None
hora_pagamento: Optional[str] = None
caixa_id: Optional[int] = None
recibo_id: Optional[int] = None
tipo_servico: Optional[str] = None
@ -26,7 +26,7 @@ class CaixaItemSchema(BaseModel):
apresentante: Optional[str] = None
mensalista_id: Optional[int] = None
quitado_caixa_id: Optional[int] = None
registrado: Optional[bool] = None
registrado: Optional[int] = None
emolumento: Optional[float] = None
taxa_judiciaria: Optional[float] = None
fundesp: Optional[float] = None
@ -50,3 +50,10 @@ class CaixaItemSchema(BaseModel):
class Config:
from_attributes = True
class IntervaloDatas(BaseModel):
date_start: Optional[str] = None
date_end: Optional[str] = None
class CaixaItemSearchSchema(BaseModel):
data_pagamento: Optional[IntervaloDatas] = None

View file

@ -1,11 +1,12 @@
from api.v1.packages.administrative.actions.c_caixa_item.index_action import IndexAction
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSearchSchema
class IndexService:
def execute(self):
def execute(self, search : CaixaItemSearchSchema):
# Instânciamento de ações
indexAction = IndexAction()
# Retorna todos produtos desejados
return indexAction.execute()
return indexAction.execute(search)

View file

@ -1,11 +1,28 @@
from api.v1.packages.administrative.actions.c_caixa_item.save_action import SaveAction
from api.v1.packages.administrative.schemas.c_caixa_item_schema import CaixaItemSchema
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
from api.v1.packages.sequencia.services.g_sequencia.generate_service import GenerateService
class SaveService:
def execute(self, caixa_item_schema: CaixaItemSchema):
# Crio um objeto de sequencia
sequencia_schema = GSequenciaSchema()
# Define os dados para atualizar a sequencia
sequencia_schema.tabela = 'C_CAIXA_ITEM'
# Busco a sequência atualizada
generate = GenerateService()
# Busco a sequência atualizada
sequencia = generate.execute(sequencia_schema)
# Atualiza os dados da chave primária
caixa_item_schema.caixa_item_id = sequencia.sequencia
# Instânciamento de ações
saveAction = SaveAction()

View file

@ -0,0 +1,14 @@
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
from api.v1.packages.sequencia.repositories.g_sequencia.get import Get
from core.base.base_action import BaseAction
class GetAction(BaseAction):
def execute(self, sequencia_schema : GSequenciaSchema):
# Instânciamento de repositório
get = Get()
# Execução do repositório
return get.execute(sequencia_schema)

View file

@ -0,0 +1,14 @@
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
from api.v1.packages.sequencia.repositories.g_sequencia.save import Save
from core.base.base_action import BaseAction
class SaveAction(BaseAction):
def execute(self, sequencia_schema : GSequenciaSchema):
# Instânciamento de repositório
save = Save()
# Execução do repositório
return save.execute(sequencia_schema)

View file

@ -0,0 +1,19 @@
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
from core.base.base_repository import BaseRepository
class Get(BaseRepository):
def execute(self, sequencia_schema : GSequenciaSchema):
self.cursor.execute(""" SELECT * FROM G_SEQUENCIA gs WHERE gs.TABELA LIKE ? """,
(sequencia_schema.tabela,))
row = self.cursor.fetchone()
if not row:
return None
# Transforma em dict associativo
columns = [desc[0].lower() for desc in self.cursor.description]
return dict(zip(columns, row))

View file

@ -0,0 +1,16 @@
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
from core.base.base_repository import BaseRepository
class Save(BaseRepository):
def execute(self, sequencia_schema : GSequenciaSchema):
self.cursor.execute(""" UPDATE G_SEQUENCIA SET SEQUENCIA = ? WHERE TABELA LIKE ? """,
(sequencia_schema.sequencia, sequencia_schema.tabela))
# Comita a transação
self.commit()
# Retorna como verdadeiro se for salvo com sucesso
return sequencia_schema

View file

@ -0,0 +1,9 @@
from typing import Optional
from pydantic import BaseModel
class GSequenciaSchema(BaseModel):
tabela: Optional[str] = None
sequencia: Optional[str] = None
class Config:
from_attributes = True

View file

@ -0,0 +1,21 @@
from api.v1.packages.sequencia.actions.g_sequencia.get_action import GetAction
from api.v1.packages.sequencia.actions.g_sequencia.save_action import SaveAction
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
class GenerateService:
def execute(self, sequencia_schema : GSequenciaSchema):
# Instânciamento de Action
getAction = GetAction()
saveAction = SaveAction()
# Busco a sequência atual
sequencia_result = getAction.execute(sequencia_schema)
# Incrementa a sequência atual
sequencia_schema.sequencia = sequencia_result['sequencia'] + 1
# Atualiza a sequência atual
return saveAction.execute(sequencia_schema)

View file

@ -0,0 +1,13 @@
from api.v1.packages.sequencia.actions.g_sequencia.save_action import SaveAction
from api.v1.packages.sequencia.schemas.g_sequencia import GSequenciaSchema
class SaveService:
def execute(self, sequencia_schema : GSequenciaSchema):
# Instânciamento de Action
saveAction = SaveAction()
# Execução da Ação
return saveAction.execute(sequencia_schema)

View file

@ -1,10 +1,10 @@
{
"firebird": {
"host": "localhost",
"name": "C:\\Users\\keven\\OneDrive\\Desktop\\Orius\\CALCILANDIA.FDB",
"name": "C:/Users/keven/OneDrive/Desktop/Orius/CAIAPONIA.FDB",
"port": 3050,
"user": "SYSDBA",
"password": "masterkey",
"password": "302b3c",
"charset": "UTF8"
}
}

View file

@ -0,0 +1,74 @@
from abc import ABC, abstractmethod
from core.connections.firebird_4 import Firebird4
import traceback
class BaseRepositoryAlchemy(ABC):
"""
Classe base para repositórios Firebird com SQL puro.
Suporte a bind params nomeados (ex: :id), simula uso estilo PDO.
"""
def __init__(self):
self.conn = Firebird4().connect()
self.cursor = self.conn.cursor()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type:
print("⚠️ Erro durante execução do repositório:")
print(f"Tipo: {exc_type.__name__}")
print(f"Erro: {exc_value}")
print("Traceback:")
traceback.print_tb(tb)
self.rollback()
else:
self.commit()
self.close()
@abstractmethod
def execute(self, *args, **kwargs):
pass
def commit(self):
if self.conn:
try:
self.conn.commit()
except Exception as e:
print(f"❌ Erro ao fazer commit: {e}")
self.rollback()
def rollback(self):
if self.conn:
try:
self.conn.rollback()
except Exception as e:
print(f"❌ Erro ao fazer rollback: {e}")
def close(self):
try:
if self.cursor and not self.cursor.closed:
self.cursor.close()
except Exception as e:
print(f"❌ Erro ao fechar cursor: {e}")
try:
if self.conn and not self.conn.closed:
self.conn.close()
except Exception as e:
print(f"❌ Erro ao fechar conexão: {e}")
def fetch_one(self, query: str, params: dict = {}):
self.cursor.execute(query, params)
row = self.cursor.fetchone()
return self._row_to_dict(row) if row else None
def fetch_all(self, query: str, params: dict = {}):
self.cursor.execute(query, params)
return [self._row_to_dict(row) for row in self.cursor.fetchall()]
def execute_query(self, query: str, params: dict = {}):
self.cursor.execute(query, params)
def _row_to_dict(self, row):
return {desc[0]: value for desc, value in zip(self.cursor.description, row)}

View file

@ -0,0 +1,22 @@
import fdb
from core.utils.config import Config
def get_connection():
"""
Constrói e retorna uma conexão com o banco MySQL
utilizando os dados da URL definida nas configurações.
"""
# Obtem as configurações de banco de dados
database = Config.get()
# Constrói o DSN no formato 'hostname/port:database_path'
# E essa string é passada como o PRIMEIRO ARGUMENTO POSICIONAL
connection_dsn = f"{database.firebird.host}/{database.firebird.port}:{database.firebird.name}"
return fdb.connect(
connection_dsn, # Este é o DSN completo que o driver espera
user=database.firebird.user,
password=database.firebird.password,
charset=database.firebird.charset
)

View file

@ -0,0 +1,28 @@
from sqlalchemy import create_engine, text
from core.utils.config import Config
class Firebird4:
def connect(self):
"""
Constrói e retorna uma conexão com o banco Firebird
utilizando os dados da URL definida nas configurações.
"""
# Obtem as configurações de banco de dados
database = Config.get()
# Caminho da conexão com o banco de dados
dsn = (
f"firebird://{database.firebird.user}:"
f"{database.firebird.password}@"
f"{database.firebird.host}:"
f"{database.firebird.port}/"
f"{database.firebird.name}"
)
# Retorna a conexão com o banco
return create_engine(
dsn,
echo=False, # Não exibe as query nos logs
)