[MIR-4] feat(Initial): Cria o projeto inicial da aplicação

This commit is contained in:
Keven Willian Pereira de Souza 2025-10-21 18:14:51 -03:00
commit a41e383dad
42 changed files with 1378 additions and 0 deletions

9
.gitattributes vendored Normal file
View file

@ -0,0 +1,9 @@
# Normaliza finais de linha
* text=auto
# Força Python e arquivos de configuração a usarem LF
*.py text eol=lf
*.sh text eol=lf
*.yml text eol=lf
*.yaml text eol=lf
*.env text eol=lf

46
.gitignore vendored Normal file
View file

@ -0,0 +1,46 @@
# Ambiente virtual
venv/
.env
.env.*
# Bytecode compilado
__pycache__/
*.py[cod]
*$py.class
# Arquivos temporários do sistema
.DS_Store
Thumbs.db
# Logs e databases locais
*.log
*.sqlite3
# VSCode
.vscode/
# PyCharm
.idea/
# Arquivos de testes ou builds
*.coverage
htmlcov/
coverage.xml
dist/
build/
.eggs/
*.egg-info/
# Cache do pip
pip-wheel-metadata/
*.egg
.cache/
.tox/
# Arquivo s de conexão
config/database/firebird.json
storage/temp
storage/temp.json
# Ignorar arquivos storage
storage/

26
Dockerfile Normal file
View file

@ -0,0 +1,26 @@
# Usa a imagem oficial do Python
FROM python:3.12-slim
# Define diretório de trabalho no container
WORKDIR /app
# Copia o arquivo de dependências
COPY requirements.txt .
# Instala dependências no sistema e no Python
RUN apt-get update && apt-get install -y \
gcc libffi-dev libssl-dev python3-dev firebird-dev \
&& pip install --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt \
&& apt-get remove -y gcc \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# Copia o restante do projeto para o container
COPY . .
# Expõe a porta padrão do Uvicorn/FastAPI
EXPOSE 8000
# Comando para iniciar o servidor
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8000"]

142
README.md Normal file
View file

@ -0,0 +1,142 @@
# Configuração do Projeto Python
Este guia descreve o passo a passo para configurar o ambiente de desenvolvimento de um projeto Python, incluindo a preparação do ambiente virtual, instalação de dependências e configuração do banco de dados.
---
## 1. Clonar o Projeto
Primeiro, clone o repositório do projeto a partir do Git:
```bash
git clone https://git.oriustecnologia.com/OriusTecnologia/Mirror.git
```
---
## 2. Criar o Ambiente Virtual
O uso de um **ambiente virtual** garante que as bibliotecas instaladas para este projeto não afetem o Python global da sua máquina.
```bash
python -m venv venv
```
---
## 3. Ativar o Ambiente Virtual
Ative o ambiente virtual antes de instalar as dependências ou executar a aplicação.
```bash
venv\Scripts\activate
```
> **Observação:**
> Em sistemas Unix (Linux/Mac), o comando pode ser:
>
> ```bash
> source venv/bin/activate
> ```
---
## 4. Instalar Dependências do Sistema
A biblioteca de criptografia utilizada no projeto requer uma extensão da Microsoft para ser instalada.
Baixe e instale o **Microsoft C++ Build Tools** através do link abaixo:
[https://visualstudio.microsoft.com/pt-br/visual-cpp-build-tools/](https://visualstudio.microsoft.com/pt-br/visual-cpp-build-tools/)
Durante a instalação, selecione o pacote:
```
Desktop Development With C++
```
---
## 5. Instalar as Bibliotecas do Projeto
Com o ambiente virtual **ativado**, instale as dependências listadas no arquivo `requirements.txt`:
```bash
pip install -r requirements.txt
```
---
## 6. Configurar o Banco de Dados
O projeto utiliza um banco **Firebird**.
Edite o arquivo de configuração localizado em:
```
api/config/database/firebird.json
```
Exemplo do conteúdo padrão:
```json
{
"host": "localhost",
"name": "D:/Orius/Base/CAIAPONIA.FDB",
"port": 3050,
"user": "SYSDBA",
"password": "",
"charset": "UTF8",
"pool": {
"pre_ping": true,
"size": 5,
"max_overflow": 10
}
}
```
### Ajustes Necessários
* **host**: Endereço do servidor do banco de dados.
* **name**: Caminho completo do arquivo `.FDB`.
* **port**: Porta do Firebird (padrão: `3050`).
* **user**: Usuário do banco de dados.
* **password**: Senha do usuário configurado.
---
## 7. Iniciar a Aplicação
Com o ambiente virtual **ativado**, execute o comando abaixo para iniciar a aplicação:
```bash
uvicorn main:app --reload
```
> **Dica:**
> O parâmetro `--reload` reinicia automaticamente a aplicação sempre que houver alterações no código.
---
## 8. Testando a Aplicação
Após iniciar a aplicação, abra o navegador e acesse o seguinte endereço:
```http
http://localhost:8000/docs
```
Você deverá visualizar a interface do **Swagger**, onde estarão listados todos os endpoints disponíveis da API.
> **Observação:**
> O Swagger permite testar os endpoints diretamente pelo navegador, sem necessidade de ferramentas externas como Postman ou Insomnia.
---
## Resumo dos Comandos
| Etapa | Comando |
| ----------------------- | ------------------------------------------------------------------------------- |
| Clonar o projeto | `git clone https://git.oriustecnologia.com/OriusTecnologia/saas_api.git` |
| Criar ambiente virtual | `python -m venv venv` |
| Ativar ambiente virtual | `venv\Scripts\activate` *(Windows)*<br>`source venv/bin/activate` *(Linux/Mac)* |
| Instalar dependências | `pip install -r requirements.txt` |
| Iniciar a aplicação | `uvicorn main:app --reload` |

17
abstracts/action.py Normal file
View file

@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class BaseAction:
"""
Classe abstrata base para todos as actions do sistema.
Obriga implementação de um método execute().
"""
@abstractmethod
def execute(self, *args, **kwargs):
"""
Método abstrato obrigatório a ser implementado pelas subclasses.
Deve conter a lógica principal do repositório.
"""
pass

41
abstracts/repository.py Normal file
View file

@ -0,0 +1,41 @@
# abstracts/repository.py
from sqlalchemy.orm import Session
from database.postgres import SessionLocal
class BaseRepository:
"""Classe base para repositórios ORM."""
def __init__(self):
self.session: Session = SessionLocal()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
self.session.rollback()
else:
self.session.commit()
self.session.close()
# 🔹 Métodos utilitários ORM
def add(self, instance):
"""Adiciona um registro."""
self.session.add(instance)
self.session.commit()
self.session.refresh(instance)
return instance
def delete(self, instance):
"""Remove um registro."""
self.session.delete(instance)
self.session.commit()
def all(self, model, limit: int = 100):
"""Retorna todos os registros de um modelo."""
return self.session.query(model).limit(limit).all()
def get(self, model, pk: int):
"""Busca um registro por ID."""
return self.session.get(model, pk)

20
actions/config/config.py Normal file
View file

@ -0,0 +1,20 @@
import json
from pathlib import Path
from types import SimpleNamespace
class Config:
@staticmethod
def get(name: str):
# Caminho absoluto do arquivo atual
base_dir = Path(__file__).resolve().parent
# Caminho absoluto para o config.json (subindo dois níveis e entrando em config/)
config_path = base_dir.parent.parent / 'config' / name
# Carrega o JSON como objeto acessível por ponto
with open(config_path, 'r') as f:
config = json.load(f, object_hook=lambda d: SimpleNamespace(**d))
return config

View file

@ -0,0 +1,32 @@
import importlib
from actions.config.config import Config
from typing import Optional, Any, Type
class DynamicImport:
def __init__(self) -> None:
self.config: dict[str, Any] = Config.get("app.json")
self.base: str = "packages.v1"
self.package: Optional[str] = None
self.table: Optional[str] = None
def set_package(self, name: str) -> None:
self.package = name
def set_table(self, table: str):
self.table = table
def service(self, name: str, class_name : str) -> Type[Any]:
try:
# Define o nome do Módulo
module_file = f"{name}"
# Define o caminho do arquivo
path = f"{self.base}.{self.package}.services.{self.table}.{self.config.state}.{module_file}"
# Realiza a importação do arquivo
module = importlib.import_module(path)
clazz = getattr(module, class_name)
return clazz
except (ImportError, AttributeError) as e:
raise ImportError(f"Erro ao importar '{class_name}' de '{path}': {e}")

32
actions/file/file.py Normal file
View file

@ -0,0 +1,32 @@
import json
import os
class File:
def create(self, data, caminho_arquivo='storage/temp.json'):
try:
# Garante que a pasta existe
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
# Lê dados existentes (ou cria nova lista)
if os.path.exists(caminho_arquivo):
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
try:
dados_existentes = json.load(arquivo)
if not isinstance(dados_existentes, list):
dados_existentes = []
except json.JSONDecodeError:
dados_existentes = []
else:
dados_existentes = []
# Adiciona novo dado
dados_existentes.append(data)
# Salva novamente no arquivo com indentação
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo:
json.dump(dados_existentes, arquivo, indent=4, ensure_ascii=False)
except Exception as e:
print(f"❌ Erro ao salvar o dado: {e}")

View file

@ -0,0 +1,36 @@
from datetime import datetime, timedelta
from jose import jwt
from pytz import timezone
from abstracts.action import BaseAction
from actions.config.config import Config
class CreateToken(BaseAction):
def __init__(self):
# Busca as configurações da aplicação
self.config = Config.get('app.json')
# Cria o timedelta com base na config
self.access_token_expire = timedelta(
minutes=self.config.jwt.expire.minute,
hours=self.config.jwt.expire.hours,
days=self.config.jwt.expire.days
)
def execute(self, tipo_token: str, data : str) -> str:
sp = timezone('America/Sao_Paulo')
agora = datetime.now(tz=sp)
expira = agora + self.access_token_expire
# Define os dados do token
payload = {
'type' : tipo_token,
'exp' : expira,
'iat' : agora,
'data' : str(data)
}
# Retorna os dados codificados
return jwt.encode(payload, self.config.jwt.token, algorithm=self.config.jwt.algorithm)

View file

@ -0,0 +1,24 @@
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from actions.jwt.verify_token import VerifyToken # A classe que criamos anteriormente
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Apenas requerido pelo FastAPI
def get_current_user(token: str = Depends(oauth2_scheme)):
# Ação que válida o tokne
verify_token = VerifyToken()
# Obtem o resultado da validação
result = verify_token.execute(token)
# Verifica se a resposta é diferente de inválida
if result['status'] != 'valid':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=result.get('message', 'Token inválido ou expirado'),
headers={"WWW-Authenticate": "Bearer"},
)
# Retorna apenas os dados do token
return result['payload']

View file

@ -0,0 +1,57 @@
from datetime import datetime
from jose import jwt, JWTError, ExpiredSignatureError
from pytz import timezone
from actions.config.config import Config
class VerifyToken:
def __init__(self):
# Carrega configurações
self.config = Config.get('app.json')
def execute(self, token: str, expected_type: str = 'access-token') -> dict:
try:
# Decodifica o token
payload = jwt.decode(
token,
self.config.jwt.token,
algorithms=[self.config.jwt.algorithm]
)
# Valida expiração
exp_timestamp = payload.get("exp")
if exp_timestamp is None:
raise ValueError("O token não possui data de expiração.")
# Verifica o tipo de token
token_type = payload.get("type")
if token_type != expected_type:
raise ValueError("Tipo de token inválido.")
# Verificação opcional: validar campo "data"
if "data" not in payload:
raise ValueError("Token malformado: campo 'data' ausente.")
return {
"status": "valid",
"payload": payload
}
except ExpiredSignatureError:
return {
"status": "expired",
"message": "O token expirou."
}
except JWTError as e:
return {
"status": "invalid",
"message": f"Token inválido: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"message": f"Erro na validação do token: {str(e)}"
}

32
actions/log/log.py Normal file
View file

@ -0,0 +1,32 @@
import json
import os
class Log:
def register(self, data, caminho_arquivo='storage/temp.json'):
try:
# Garante que a pasta existe
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
# Lê dados existentes (ou cria nova lista)
if os.path.exists(caminho_arquivo):
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
try:
dados_existentes = json.load(arquivo)
if not isinstance(dados_existentes, list):
dados_existentes = []
except json.JSONDecodeError:
dados_existentes = []
else:
dados_existentes = []
# Adiciona novo dado
dados_existentes.append(data)
# Salva novamente no arquivo com indentação
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo:
json.dump(dados_existentes, arquivo, indent=4, ensure_ascii=False)
except Exception as e:
print(f"❌ Erro ao salvar o dado: {e}")

View file

@ -0,0 +1,43 @@
# core/security.py
# Importa CryptContext da biblioteca passlib para operações de hash de senha
from passlib.context import CryptContext
# Cria uma instância do contexto de criptografia
# O esquema usado é 'bcrypt', que é seguro e amplamente aceito
# O parâmetro 'deprecated="auto"' marca versões antigas como inseguras, se aplicável
CRYPTO = CryptContext(schemes=['bcrypt'], deprecated='auto')
class Security:
# Verifica se a senha tem um hash válido
@staticmethod
def is_hash(senha: str) -> bool:
"""
Verifica se a string fornecida é um hash reconhecido pelo CryptContext.
"""
return CRYPTO.identify(senha)
# Verifica se uma senha fornecida corresponde ao hash armazenado
def verify_senha_api(plain_senha_api: str, hashed_senha_api: str) -> bool:
"""
Compara a senha fornecida em texto puro com o hash armazenado.
:param plain_senha_api: Senha digitada pelo usuário
:param hashed_senha_api: Hash da senha armazenado no banco de dados
:return: True se corresponder, False se não
"""
return CRYPTO.verify(plain_senha_api, hashed_senha_api)
# Gera o hash de uma senha fornecida
def hash_senha_api(plain_senha_api: str) -> str:
"""
Gera e retorna o hash da senha fornecida.
:param plain_senha_api: Senha em texto puro fornecida pelo usuário
:return: Hash da senha
"""
return CRYPTO.hash(plain_senha_api)

View file

@ -0,0 +1,4 @@
# exceptions.py
class BusinessRuleException(Exception):
def __init__(self, message: str):
self.message = message

View file

@ -0,0 +1,86 @@
# handlers.py
import json
import traceback
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from actions.system.exceptions import BusinessRuleException
from actions.log.log import Log
def register_exception_handlers(app):
def __init__ (self):
log = Log()
@app.exception_handler(BusinessRuleException)
async def business_rule_exception_handler(request: Request, exc: BusinessRuleException):
response = {
"status": "422",
"error": "Regra de negócio",
"detail": exc.message
}
# Salva o log em disco
Log.register(response, 'storage/temp/business_rule_exception_handler.json')
return JSONResponse(
status_code=422,
content=response
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
response = {
"status": exc.status_code,
"error": "HTTP Error",
"detail": exc.detail
}
# Salva o log em disco
Log.register(response, 'storage/temp/http_exception_handler.json')
return JSONResponse(
status_code=exc.status_code,
content=response
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
response = {
"status": 400,
"error": "Erro de validação",
"detail": exc.errors()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=400,
content=response
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
response = {
"status": 500,
"error": "Erro Interno do Servidor",
"type": type(exc).__name__,
"message": str(exc),
"trace": traceback.format_exc()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=500,
content=response
)

View file

@ -0,0 +1,8 @@
class CEP:
@staticmethod
def validate(data: str) -> bool:
# Valida e retorna a informação
return len(data) == 8

View file

@ -0,0 +1,35 @@
import re
class CNPJ:
@staticmethod
def validate(data: str) -> bool:
# Remove caracteres não numéricos
data = re.sub(r'\D', '', data)
# Verifica se tem 14 dígitos
if len(data) != 14:
return False
# CNPJs com todos os dígitos iguais são inválidos
if data == data[0] * 14:
return False
# Calcula os dois dígitos verificadores
def calcular_digito(data, peso):
soma = sum(int(a) * b for a, b in zip(data, peso))
resto = soma % 11
return '0' if resto < 2 else str(11 - resto)
# Primeiro dígito verificador
peso1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
digito1 = calcular_digito(data[:12], peso1)
# Segundo dígito verificador
peso2 = [6] + peso1
digito2 = calcular_digito(data[:12] + digito1, peso2)
# Verifica se os dígitos batem
return data[-2:] == digito1 + digito2

View file

@ -0,0 +1,34 @@
import re
class CPF:
@staticmethod
def is_valid_cpf(data: str) -> bool:
# Remove caracteres não numéricos
data = re.sub(r'\D', '', data)
# Verifica se tem 11 dígitos
if len(data) != 11:
return False
# CPFs com todos os dígitos iguais são inválidos
if data == data[0] * 11:
return False
# Calcula o primeiro e segundo dígitos verificadores
def calcular_digito(digitos, peso):
soma = sum(int(a) * b for a, b in zip(digitos, peso))
resto = soma % 11
return '0' if resto < 2 else str(11 - resto)
# Primeiro dígito verificador
peso1 = range(10, 1, -1)
digito1 = calcular_digito(data[:9], peso1)
# Segundo dígito verificador
peso2 = range(11, 1, -1)
digito2 = calcular_digito(data[:10], peso2)
# Verifica se os dígitos batem
return data[-2:] == digito1 + digito2

View file

@ -0,0 +1,9 @@
import re
class Email:
@staticmethod
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))

View file

@ -0,0 +1,12 @@
class Phone:
@staticmethod
def validate_cellphone(data: str) -> bool:
# Verifica e retorna se o numero de celular é igual a 11
return len(data) == 11
@staticmethod
def validate_telephone(data: str) -> bool:
# Verifica e retorna se o numero de telefone é igual a 11
return len(data) == 10

View file

@ -0,0 +1,63 @@
import html
import re
class Text:
# Remove as mascaras de números
@staticmethod
def just_numbers(data: str) -> str:
""" Mantêm apenas os numeros """
data = re.sub(r"[^\d]", "", data)
return data
# Verifica se um e-mail é válido
@staticmethod
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))
"""
Sanitiza entradas de texto contra XSS e SQL Injection básicos.
- Remove espaços extras
- Escapa entidades HTML
- Remove padrões suspeitos de XSS e SQL Injection
- Normaliza múltiplos espaços em um
"""
@staticmethod
def sanitize_input(data: str) -> str:
if not data:
return data
# 1) Remove espaços no início e no fim
data = data.strip()
# 2) Escapa entidades HTML (< > & ")
data = html.escape(data)
# 3) Remove múltiplos espaços seguidos
data = re.sub(r"\s+", " ", data)
# 4) Remove tags <script> (com atributos)
data = re.sub(r'<\s*script[^>]*>', '', data, flags=re.IGNORECASE)
data = re.sub(r'<\s*/\s*script\s*>', '', data, flags=re.IGNORECASE)
# 5) Remove javascript: de links
data = re.sub(r'javascript\s*:', '', data, flags=re.IGNORECASE)
# 6) Remove palavras-chave SQL Injection comuns
blacklist = [
"--", ";", "/*", "*/", "@@",
"char(", "nchar(", "varchar(",
"alter", "drop", "exec", "insert",
"delete", "update", "union", "select",
"from", "where"
]
for word in blacklist:
# Verificar se 'word' é uma string não vazia e válida para a regex
if word:
data = re.sub(re.escape(word), "", data, flags=re.IGNORECASE)
return data

24
config/app.json Normal file
View file

@ -0,0 +1,24 @@
{
"state" : "go",
"url": "/api/v1",
"log": {
"request": {
"name": "request.json",
"path": "storage/temp"
}
},
"StartupCheck": {
"database": true,
"disk": true
},
"jwt" : {
"token" : "WYe1zwtlDkh39_X3X3qTSICFDxts4VQrMyGLxnEpGUg",
"algorithm" : "HS256",
"type" : "",
"expire" : {
"minute" : 60,
"hours" : 24,
"days" : 7
}
}
}

View file

@ -0,0 +1,13 @@
{
"host": "localhost",
"name": "D:/Orius/Base/CAIAPONIA.FDB",
"port": 3050,
"user": "SYSDBA",
"password": "master!orius",
"charset": "UTF8",
"pool" : {
"pre_ping" : true,
"size" : 5,
"max_overflow" :10
}
}

View file

@ -0,0 +1,14 @@
{
"host": "localhost",
"port": 5432,
"name": "mirror",
"user": "postgres",
"password": "root",
"charset": "UTF8",
"pool": {
"pre_ping": true,
"size": 5,
"max_overflow": 10
},
"debug": false
}

43
database/firebird.py Normal file
View file

@ -0,0 +1,43 @@
from typing import Optional
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from actions.config.config import Config
class Firebird:
_engine: Optional[Engine] = None
@classmethod
def get_engine(cls) -> Engine:
# Obtem as configurações do arquivo JSON
database = Config.get('database/firebird.json')
# Cria a engine apenas uma vez
if cls._engine is None:
# DSN para o SQLAlchemy usando firebird-driver
dsn = (
f"firebird+firebird://{database.user}:"
f"{database.password}@"
f"{database.host}:"
f"{database.port}/"
f"{database.name}"
)
# Criação da engine SQLAlchemy
cls._engine = create_engine(
dsn,
connect_args={"charset": database.charset},
pool_pre_ping=bool(database.pool.pre_ping),
pool_size=database.pool.size,
max_overflow=database.pool.max_overflow,
)
return cls._engine
@classmethod
def dispose(cls):
if cls._engine:
cls._engine.dispose()
cls._engine = None

40
database/postgres.py Normal file
View file

@ -0,0 +1,40 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from actions.config.config import Config
# Base para os modelos ORM
Base = declarative_base()
def get_database_settings():
"""Lê as configurações do arquivo database/postgres.json"""
return Config.get("database/postgres.json")
def get_postgres_engine():
"""Cria e retorna a engine PostgreSQL."""
db = get_database_settings()
dsn = (
f"postgresql+psycopg2://{db.user}:{db.password}@"
f"{db.host}:{db.port}/{db.name}"
)
engine = create_engine(
dsn,
echo=bool(getattr(db, "debug", False)),
pool_pre_ping=bool(db.pool.pre_ping),
pool_size=int(db.pool.size),
max_overflow=int(db.pool.max_overflow),
connect_args={"connect_timeout": 10},
)
return engine
# Criação da engine global
engine = get_postgres_engine()
# Criação da sessão ORM (SessionLocal)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

89
main.py Normal file
View file

@ -0,0 +1,89 @@
# Ajuste para garantir que o diretório base do projeto seja incluído no PYTHONPATH
import os
import sys
# Adiciona o diretório atual (onde está o main.py) ao sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Importa a classe principal do FastAPI
from fastapi import FastAPI, Request
from pathlib import Path
# Importa o middleware de CORS
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from starlette.middleware.base import BaseHTTPMiddleware
# Importa o roteador principal da API versão 1
from packages.v1.api import api_router
from packages.v1.system.service.startup_check_service import \
StartupCheckService
# Importa as configurações globais da aplicação
from actions.log.log import Log
from actions.config.config import Config
from actions.system.handlers import register_exception_handlers
config = Config.get('app.json')
# Instancia o app FastAPI com um título personalizado
app = FastAPI(title='Mirror | Orius')
# Controle de erros personalizados
register_exception_handlers(app)
# Adiciona o middleware de CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Domínio do frontend
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def on_startup():
# Realiza as verificações do servidor
startupCheckService = StartupCheckService()
# Exibe o amarzenamento do servidor
print(startupCheckService.execute())
@app.middleware("http")
async def log_tempo_requisicao(request: Request, call_next):
# Ação responsavel por registrar o log de requisição
log = Log()
config = Config.get('app.json')
# Obtem os dados da requisição
log_data = {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers)
}
# Gera o nome do arquivo
file = Path(config.log.request.path) / config.log.request.name
# Registra as requisições
log.register(log_data, file)
# Passa adiante
response = await call_next(request)
return response
# Inclui as rotas da versão 1 da API com prefixo definido em settings (ex: /api/v1)
app.include_router(api_router, prefix=config.url)
# Executa o servidor com Uvicorn se este arquivo for executado diretamente
if __name__ == '__main__':
import uvicorn
uvicorn.run(
"main:app", # Caminho do app para execução
host="0.0.0.0", # Disponibiliza a aplicação externamente
port=8000, # Porta padrão
log_level='info', # Define o nível de log para desenvolvimento
reload=True # Ativa auto-reload durante desenvolvimento
)

0
packages/__init__.py Normal file
View file

View file

@ -0,0 +1,13 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.ato_principal.ato_principal_index_repository import AtoPrincipalIndexRepository
class AtoPrincipalIndexAction(BaseAction):
def execute(self):
# Instânciamento de repositório
ato_principal_index_repository = AtoPrincipalIndexRepository()
# Retorna todos produtos
return ato_principal_index_repository.execute()

View file

@ -0,0 +1,20 @@
# Importação de bibliotecas
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.services.ato_principal.go.ato_principal_index_service import AtoPrincipalIndexService
class AtoPrincipalController:
def index(self):
# Importação da classe desejad
ato_principal_index_service = AtoPrincipalIndexService()
# Intânciamento da classe service
self.index_service = ato_principal_index_service
# Lista todos os produtos
return {
'message' : 'Registros localizados com sucesso',
'data': self.index_service.execute()
}

View file

@ -0,0 +1,22 @@
# Importação de bibliotecas
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.ato_principal_controller import AtoPrincipalController
# Inicializar o roteaodr para as rotas de produtos
router = APIRouter()
# Instãnciamento do controller desejado
ato_principal_controller = AtoPrincipalController()
@router.get("/",
status_code=status.HTTP_200_OK,
summary="Busca itens com filtros opcionais",
response_description="Lista de itens encontrados com base nos critérios de busca.")
async def index():
# Busca todos os produtos cadastrados
response = ato_principal_controller.index()
# Retornar os dados localizados
return response

View file

@ -0,0 +1,65 @@
# packages/v1/administrativo/models/ato_principal_model.py
from sqlalchemy import (
Column,
BigInteger,
Integer,
String,
DateTime,
Numeric,
Text,
ForeignKey,
CheckConstraint,
)
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.sql import func
from database.postgres import Base
class AtoPrincipal(Base):
"""
Representa o modelo da tabela 'ato_principal' no banco de dados PostgreSQL.
"""
__tablename__ = "ato_principal"
ato_principal_id = Column(BigInteger, primary_key=True, autoincrement=True, index=True)
origem_ato_principal_id = Column(
BigInteger,
ForeignKey("ato_principal.ato_principal_id", ondelete="SET NULL", onupdate="CASCADE"),
nullable=True,
)
identificacao_pedido_cgj = Column(BigInteger, nullable=False)
tipo_ato = Column(Integer, nullable=False)
codigo_selo = Column(String(50), nullable=False, unique=True)
codigo_ato = Column(String(50), nullable=False, unique=True)
nome_civil_ato = Column(String(255), nullable=False)
nome_serventuario_praticou_ato = Column(String(255), nullable=False)
data_solicitacao = Column(DateTime(timezone=True), nullable=False)
ip_maquina = Column(INET, nullable=True)
inteiro_teor = Column(Text, nullable=False)
valor_entrada = Column(Numeric(12, 2), nullable=True, default=0)
emolumento = Column(Numeric(12, 2), nullable=False)
taxa_judiciaria = Column(Numeric(12, 2), nullable=False)
fundos_estaduais = Column(Numeric(12, 2), nullable=False)
protocolo_protesto = Column(String(50), nullable=True)
protocolo_imovel = Column(String(50), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False)
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False)
# Constraints e índices adicionais
__table_args__ = (
CheckConstraint(
"(COALESCE(valor_entrada, 0) >= 0) AND (emolumento >= 0) AND (taxa_judiciaria >= 0) AND (fundos_estaduais >= 0)",
name="chk_valores_positivos",
),
)
def __repr__(self):
return (
f"<AtoPrincipal(id={self.ato_principal_id}, "
f"codigo_ato='{self.codigo_ato}', selo='{self.codigo_selo}', tipo={self.tipo_ato})>"
)

View file

@ -0,0 +1,21 @@
from database.postgres import SessionLocal
from packages.v1.administrativo.models.ato_principal_model import AtoPrincipal
from packages.v1.administrativo.schemas.ato_principal_schema import AtoPrincipalResponseSchema
class AtoPrincipalIndexRepository:
def execute(self):
# Cria a sessão dentro do repositório
db = SessionLocal()
try:
# Executa a query
result = db.query(AtoPrincipal).all()
# Converte os models SQLAlchemy em schemas Pydantic
data = [AtoPrincipalResponseSchema.model_validate(obj) for obj in result]
return data
finally:
# Fecha a sessão após o uso (evita vazamento de conexão)
db.close()

View file

@ -0,0 +1,56 @@
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
from ipaddress import IPv4Address, IPv6Address
from decimal import Decimal
# ----------------------------------------------------
# Schema base - campos principais e comuns
# ----------------------------------------------------
class AtoPrincipalBaseSchema(BaseModel):
origem_ato_principal_id: Optional[int] = None
identificacao_pedido_cgj: int
tipo_ato: int
codigo_selo: str
codigo_ato: str
nome_civil_ato: str
nome_serventuario_praticou_ato: str
data_solicitacao: datetime
ip_maquina: Optional[IPv4Address | IPv6Address] = None
inteiro_teor: str
valor_entrada: Optional[Decimal] = None
emolumento: Decimal
taxa_judiciaria: Decimal
fundos_estaduais: Decimal
protocolo_protesto: Optional[str] = None
protocolo_imovel: Optional[str] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema de resposta (GET) — inclui metadata
# ----------------------------------------------------
class AtoPrincipalResponseSchema(AtoPrincipalBaseSchema):
ato_principal_id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema de criação (POST)
# ----------------------------------------------------
class AtoPrincipalCreateSchema(AtoPrincipalBaseSchema):
pass
# ----------------------------------------------------
# Schema de atualização (PUT)
# ----------------------------------------------------
class AtoPrincipalUpdateSchema(AtoPrincipalBaseSchema):
ato_principal_id: int

View file

@ -0,0 +1,45 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.actions.ato_principal.ato_principal_index_action import AtoPrincipalIndexAction
class AtoPrincipalIndexService:
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de registros na tabela G_GRAMATICA.
"""
def execute(self):
"""
Executa a operação de busca de todos os registros no banco de dados.
Args:
g_cartorio_index_schema (GCartorioIndexSchema):
Esquema que pode conter filtros ou parâmetros de busca.
Returns:
A lista de registros encontrados.
"""
# ----------------------------------------------------
# Instanciamento da ação
# ----------------------------------------------------
ato_principal_index_action = AtoPrincipalIndexAction()
# ----------------------------------------------------
# Execução da ação
# ----------------------------------------------------
data = ato_principal_index_action.execute()
# ----------------------------------------------------
# Verificação de retorno
# ----------------------------------------------------
if not data:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Não foi possível localizar registros de G_GRAMATICA."
)
# ----------------------------------------------------
# Retorno da informação
# ----------------------------------------------------
return data

13
packages/v1/api.py Normal file
View file

@ -0,0 +1,13 @@
# Importa o gerenciador de rotas do FastAPI
from fastapi import APIRouter
# Importa os módulos de rotas específicos
from packages.v1.administrativo.endpoints import ato_principal_endpoint
# Cria uma instância do APIRouter que vai agregar todas as rotas da API
api_router = APIRouter()
# Inclui as rotas de g_cartorio
api_router.include_router(
ato_principal_endpoint.router, prefix="/ato", tags=["Dados do Ato"]
)

View file

@ -0,0 +1,22 @@
from abstracts.repository import BaseRepository
class FirebirdCheckAction(BaseRepository):
def execute(self):
# Montagem do SQL
sql = """ SELECT 1 FROM RDB$DATABASE """
# Execução do sql
response = self.fetch_one(sql)
if response:
# Dados
response = {
"status" : "Banco de dados acessível"
}
# Retorna os dados localizados
return response

View file

@ -0,0 +1,24 @@
import shutil
from abstracts.action import BaseAction
class GetSizeAction:
def execute(self):
# Verificar espaço em disco
total, used, free = shutil.disk_usage("/")
# Converter de bytes para gigabytes
total_gb = total / (1024 ** 3)
used_gb = used / (1024 ** 3)
free_gb = free / (1024 ** 3)
return {
"total" : round(total_gb, 2),
"used" : round(used_gb, 2),
"free" : round(free_gb, 2)
}

View file

@ -0,0 +1,12 @@
from packages.v1.system.actions.disk.get_size_action import GetSizeAction
class StartupCheckService:
def execute(self):
get_size_action = GetSizeAction()
get_size_action_result = get_size_action.execute()
return get_size_action_result

34
requirements.txt Normal file
View file

@ -0,0 +1,34 @@
annotated-types==0.7.0
anyio==4.10.0
bcrypt==3.1.7
cffi==1.17.1
click==8.2.1
colorama==0.4.6
dnspython==2.7.0
ecdsa==0.19.1
email_validator==2.2.0
fastapi==0.116.1
firebird-base==2.0.2
firebird-driver==2.0.2
greenlet==3.2.4
h11==0.16.0
idna==3.10
packaging==25.0
passlib==1.7.4
protobuf==5.29.5
pyasn1==0.6.1
pycparser==2.22
pydantic==2.11.7
pydantic_core==2.33.2
python-dateutil==2.9.0.post0
python-jose==3.5.0
pytz==2025.2
rsa==4.9.1
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.42
sqlalchemy-firebird==2.1
starlette==0.47.2
typing-inspection==0.4.1
typing_extensions==4.14.1
uvicorn==0.35.0