feat(): Criação do serviço que retorna a estrutura do banco de dados firebird atual

This commit is contained in:
Kenio 2025-11-14 07:57:40 -03:00
parent 4673d8c271
commit 0382e7c383
6 changed files with 230 additions and 0 deletions

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.repositories.firebird.firebird_schema_repository import FirebirdSchemaExtractor
class FirebirdSchemaAction:
"""
Action: instancia o extractor e chama o método principal.
"""
@staticmethod
def execute():
extractor = FirebirdSchemaExtractor()
return extractor.extract_all()

View file

@ -0,0 +1,10 @@
from packages.v1.administrativo.services.firebird.firebird_schema_service import FirebirdSchemaService
class FirebirdSchemaController:
"""
Controller: recebe chamadas do endpoint e envia para o service.
"""
def extract_schema(self):
# Apenas delega para o service
return FirebirdSchemaService.execute()

View file

@ -0,0 +1,11 @@
from fastapi import APIRouter
from packages.v1.administrativo.controllers.firebird_schema_controller import FirebirdSchemaController
router = APIRouter(prefix="/firebird-schema", tags=["Firebird Schema"])
controller = FirebirdSchemaController()
# Apenas um endpoint essencial
@router.get("/", summary="Retorna estrutura completa do banco Firebird 4")
def extract_schema():
return controller.extract_schema()

View file

@ -0,0 +1,185 @@
import json
import fdb
from fastapi import HTTPException
from pathlib import Path
from typing import List, Dict, Any
class FirebirdSchemaExtractor:
"""
Classe responsável por extrair toda a estrutura de um banco Firebird 4.
"""
def __init__(self, config_path: str = "config/database/firebird.conf"):
"""
o arquivo de configuração e abre conexão com o banco.
"""
try:
# Lê o JSON de configuração
config_file = Path(config_path)
if not config_file.exists():
raise HTTPException(status_code=500, detail="Arquivo firebird.conf não encontrado")
with open(config_file, "r", encoding="utf-8") as f:
config = json.load(f)
# Abre conexão com Firebird
self.conn = fdb.connect(
host=config["host"],
database=config["name"],
user=config["user"],
password=config["password"],
port=config.get("port", 3050),
charset=config.get("charset", "UTF8")
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao conectar no Firebird: {e}")
def query(self, sql: str) -> List[Dict[str, Any]]:
try:
cur = self.conn.cursor()
cur.execute(sql)
columns = [col[0].strip() for col in cur.description]
return [dict(zip(columns, row)) for row in cur.fetchall()]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Erro ao executar SQL: {e}")
# -------------------- 1) TABELAS E CAMPOS --------------------
def get_tables(self):
sql = """
SELECT
TRIM(r.RDB$RELATION_NAME) AS TABLE_NAME,
TRIM(f.RDB$FIELD_NAME) AS FIELD_NAME,
CASE f.RDB$NULL_FLAG WHEN 1 THEN 'NOT NULL' ELSE 'NULL' END AS NULLABLE,
CASE t.RDB$FIELD_TYPE
WHEN 7 THEN 'SMALLINT'
WHEN 8 THEN 'INTEGER'
WHEN 9 THEN 'QUAD'
WHEN 10 THEN 'FLOAT'
WHEN 12 THEN 'DATE'
WHEN 13 THEN 'TIME'
WHEN 14 THEN 'CHAR(' || t.RDB$FIELD_LENGTH || ')'
WHEN 16 THEN
CASE t.RDB$FIELD_SUB_TYPE
WHEN 0 THEN 'BIGINT'
WHEN 1 THEN 'NUMERIC(' || t.RDB$FIELD_PRECISION || ',' || ABS(t.RDB$FIELD_SCALE) || ')'
WHEN 2 THEN 'DECIMAL(' || t.RDB$FIELD_PRECISION || ',' || ABS(t.RDB$FIELD_SCALE) || ')'
ELSE 'BIGINT'
END
WHEN 27 THEN 'DOUBLE PRECISION'
WHEN 35 THEN 'TIMESTAMP'
WHEN 37 THEN 'VARCHAR(' || t.RDB$FIELD_LENGTH || ')'
WHEN 261 THEN 'BLOB SUB_TYPE ' || COALESCE(t.RDB$FIELD_SUB_TYPE, 0)
ELSE 'UNKNOWN (' || t.RDB$FIELD_TYPE || ')'
END AS FIELD_TYPE,
COALESCE(t.RDB$DEFAULT_SOURCE, f.RDB$DEFAULT_SOURCE) AS DEFAULT_VALUE,
f.RDB$DESCRIPTION AS DESCRIPTION
FROM RDB$RELATION_FIELDS f
JOIN RDB$RELATIONS r ON f.RDB$RELATION_NAME = r.RDB$RELATION_NAME
JOIN RDB$FIELDS t ON f.RDB$FIELD_SOURCE = t.RDB$FIELD_NAME
WHERE r.RDB$SYSTEM_FLAG = 0
ORDER BY TABLE_NAME, f.RDB$FIELD_POSITION
"""
return self.query(sql)
# -------------------- 2) PRIMARY KEYS --------------------
def get_primary_keys(self):
sql = """
SELECT
TRIM(rc.RDB$RELATION_NAME) AS TABLE_NAME,
TRIM(isg.RDB$FIELD_NAME) AS FIELD_NAME,
TRIM(rc.RDB$CONSTRAINT_NAME) AS CONSTRAINT_NAME
FROM RDB$RELATION_CONSTRAINTS rc
JOIN RDB$INDEX_SEGMENTS isg ON rc.RDB$INDEX_NAME = isg.RDB$INDEX_NAME
WHERE rc.RDB$CONSTRAINT_TYPE = 'PRIMARY KEY'
ORDER BY TABLE_NAME, FIELD_NAME
"""
return self.query(sql)
# -------------------- 3) FOREIGN KEYS --------------------
def get_foreign_keys(self):
sql = """
SELECT
TRIM(rc.RDB$RELATION_NAME) AS TABLE_NAME,
TRIM(rc.RDB$CONSTRAINT_NAME) AS CONSTRAINT_NAME,
TRIM(isg.RDB$FIELD_NAME) AS FIELD_NAME,
TRIM(refc.RDB$CONST_NAME_UQ) AS REF_CONSTRAINT_NAME,
TRIM(id2.RDB$RELATION_NAME) AS REFERENCED_TABLE
FROM RDB$RELATION_CONSTRAINTS rc
JOIN RDB$REF_CONSTRAINTS refc ON rc.RDB$CONSTRAINT_NAME = refc.RDB$CONSTRAINT_NAME
JOIN RDB$INDEX_SEGMENTS isg ON rc.RDB$INDEX_NAME = isg.RDB$INDEX_NAME
JOIN RDB$INDICES id2 ON refc.RDB$CONST_NAME_UQ = id2.RDB$INDEX_NAME
WHERE rc.RDB$CONSTRAINT_TYPE = 'FOREIGN KEY'
ORDER BY TABLE_NAME, CONSTRAINT_NAME
"""
return self.query(sql)
# -------------------- 4) ÍNDICES --------------------
def get_indexes(self):
sql = """
SELECT
TRIM(i.RDB$RELATION_NAME) AS TABLE_NAME,
TRIM(i.RDB$INDEX_NAME) AS INDEX_NAME,
TRIM(isg.RDB$FIELD_NAME) AS FIELD_NAME,
CASE WHEN i.RDB$UNIQUE_FLAG = 1 THEN 'UNIQUE' ELSE '' END AS UNIQUENESS,
CASE WHEN i.RDB$INDEX_TYPE = 1 THEN 'DESC' ELSE 'ASC' END AS SORT_ORDER,
i.RDB$DESCRIPTION AS DESCRIPTION
FROM RDB$INDICES i
JOIN RDB$INDEX_SEGMENTS isg ON i.RDB$INDEX_NAME = isg.RDB$INDEX_NAME
WHERE i.RDB$SYSTEM_FLAG = 0
ORDER BY TABLE_NAME, INDEX_NAME
"""
return self.query(sql)
# -------------------- 5) VIEWS --------------------
def get_views(self):
sql = """
SELECT
TRIM(r.RDB$RELATION_NAME) AS VIEW_NAME,
r.RDB$VIEW_SOURCE AS VIEW_DEFINITION
FROM RDB$RELATIONS r
WHERE r.RDB$VIEW_BLR IS NOT NULL
ORDER BY VIEW_NAME
"""
return self.query(sql)
# -------------------- 6) PROCEDURES --------------------
def get_procedures(self):
sql = """
SELECT
TRIM(p.RDB$PROCEDURE_NAME) AS PROCEDURE_NAME,
p.RDB$PROCEDURE_SOURCE AS SOURCE_CODE
FROM RDB$PROCEDURES p
ORDER BY PROCEDURE_NAME
"""
return self.query(sql)
# -------------------- 7) TRIGGERS --------------------
def get_triggers(self):
sql = """
SELECT
TRIM(t.RDB$TRIGGER_NAME) AS TRIGGER_NAME,
TRIM(t.RDB$RELATION_NAME) AS TABLE_NAME,
t.RDB$TRIGGER_SEQUENCE AS SEQUENCE,
t.RDB$TRIGGER_TYPE AS TYPE,
t.RDB$TRIGGER_SOURCE AS SOURCE_CODE,
CASE t.RDB$TRIGGER_INACTIVE WHEN 1 THEN 'INACTIVE' ELSE 'ACTIVE' END AS STATUS
FROM RDB$TRIGGERS t
WHERE t.RDB$SYSTEM_FLAG = 0
ORDER BY TABLE_NAME, SEQUENCE
"""
return self.query(sql)
# -------------------- ESTRUTURA COMPLETA --------------------
def extract_all(self):
return {
"tables": self.get_tables(),
"primary_keys": self.get_primary_keys(),
"foreign_keys": self.get_foreign_keys(),
"indexes": self.get_indexes(),
"views": self.get_views(),
"procedures": self.get_procedures(),
"triggers": self.get_triggers(),
}

View file

@ -0,0 +1,12 @@
from packages.v1.administrativo.actions.firebird.firebird_schema_action import FirebirdSchemaAction
class FirebirdSchemaService:
"""
Service: responsável por validações simplificadas
e por chamar a Action.
"""
@staticmethod
def execute():
# Sem validações complexas — essencial
return FirebirdSchemaAction.execute()

View file

@ -9,6 +9,7 @@ dnspython==2.8.0
ecdsa==0.19.1 ecdsa==0.19.1
email-validator==2.3.0 email-validator==2.3.0
fastapi==0.118.0 fastapi==0.118.0
fdb==2.0.4
greenlet==3.2.4 greenlet==3.2.4
h11==0.16.0 h11==0.16.0
idna==3.10 idna==3.10