first commit

This commit is contained in:
Kenio 2025-10-06 09:30:41 -03:00
parent ab8b2cb7ce
commit 9c4d32a65c
82 changed files with 3217 additions and 0 deletions

9
.gitattributes vendored Normal file
View file

@ -0,0 +1,9 @@
# Normaliza finais de linha
* text=auto
# Força Python e arquivos de configuração a usarem LF
*.py text eol=lf
*.sh text eol=lf
*.yml text eol=lf
*.yaml text eol=lf
*.env text eol=lf

46
.gitignore vendored Normal file
View file

@ -0,0 +1,46 @@
# Ambiente virtual
venv/
.env
.env.*
# Bytecode compilado
__pycache__/
*.py[cod]
*$py.class
# Arquivos temporários do sistema
.DS_Store
Thumbs.db
# Logs e databases locais
*.log
*.sqlite3
# VSCode
.vscode/
# PyCharm
.idea/
# Arquivos de testes ou builds
*.coverage
htmlcov/
coverage.xml
dist/
build/
.eggs/
*.egg-info/
# Cache do pip
pip-wheel-metadata/
*.egg
.cache/
.tox/
# Arquivo s de conexão
config/database/firebird.json
storage/temp
storage/temp.json
# Ignorar arquivos storage
storage/

26
Dockerfile Normal file
View file

@ -0,0 +1,26 @@
# Usa a imagem oficial do Python
FROM python:3.12-slim
# Define diretório de trabalho no container
WORKDIR /app
# Copia o arquivo de dependências
COPY requirements.txt .
# Instala dependências no sistema e no Python
RUN apt-get update && apt-get install -y \
gcc libffi-dev libssl-dev python3-dev firebird-dev \
&& pip install --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt \
&& apt-get remove -y gcc \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# Copia o restante do projeto para o container
COPY . .
# Expõe a porta padrão do Uvicorn/FastAPI
EXPOSE 8000
# Comando para iniciar o servidor
CMD ["sh", "-c", "uvicorn main:app --host 0.0.0.0 --port 8000"]

17
abstracts/action.py Normal file
View file

@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class BaseAction:
"""
Classe abstrata base para todos as actions do sistema.
Obriga implementação de um método execute().
"""
@abstractmethod
def execute(self, *args, **kwargs):
"""
Método abstrato obrigatório a ser implementado pelas subclasses.
Deve conter a lógica principal do repositório.
"""
pass

100
abstracts/repository.py Normal file
View file

@ -0,0 +1,100 @@
from typing import Optional, Literal
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from database.mysql import MySQL # Importa a classe MySQL que gerencia a engine
class BaseRepository:
"""
Classe base para todos os repositórios.
Contém métodos genéricos para executar SQL no MySQL usando SQLAlchemy.
"""
def query(self, sql: str, params: Optional[dict] = None):
"""
Executa uma query e retorna o ResultProxy bruto.
Útil para operações customizadas que precisam do objeto SQLAlchemy diretamente.
"""
return self._execute(sql, params, fetch="result")
def fetch_all(self, sql: str, params: Optional[dict] = None):
"""
Executa uma query SQL e retorna todos os registros como lista de dicionários.
Retorna lista vazia se não encontrar nada.
"""
return self._execute(sql, params, fetch="all")
def fetch_one(self, sql: str, params: Optional[dict] = None):
"""
Executa uma query SQL e retorna o primeiro registro como dicionário.
Retorna None se não encontrar nenhum registro.
"""
return self._execute(sql, params, fetch="one")
def run(self, sql: str, params: Optional[dict] = None):
"""
Executa um SQL sem retorno (ex: INSERT, UPDATE, DELETE).
Não retorna nenhum dado.
"""
return self._execute(sql, params, fetch="none")
def run_and_return(self, sql: str, params: Optional[dict] = None):
"""
Executa um INSERT e retorna o último ID gerado no MySQL.
Se for um SELECT, retorna o primeiro registro normalmente.
"""
engine = MySQL.get_engine() # Obtém a engine do MySQL
try:
with engine.begin() as conn: # Inicia uma transação automática
result = conn.execute(text(sql), params or {}) # Executa o SQL
# Se for INSERT, retorna o último ID inserido
if sql.strip().upper().startswith("INSERT"):
last_id = conn.execute(text("SELECT LAST_INSERT_ID() AS id")).mappings().first()
return last_id
# Se não for INSERT, retorna o primeiro registro
return result.mappings().first()
except SQLAlchemyError as e:
print(f"[ERRO SQL]: {e}") # Imprime o erro para debug
raise
def _execute(
self,
sql: str,
params: Optional[dict] = None,
fetch: Literal["all", "one", "result", "none"] = "result",
):
"""
Método interno que executa o SQL no MySQL.
Suporta diferentes tipos de retorno:
- all -> todos os registros como lista de dicionários
- one -> primeiro registro como dicionário
- result -> ResultProxy bruto
- none -> não retorna nada
"""
engine = MySQL.get_engine() # Pega a engine do MySQL
try:
with engine.connect() as conn: # Abre a conexão com o banco
result = conn.execute(text(sql), params or {}) # Executa o SQL com parâmetros
# Commit explícito para operações DML
# Se não for um SELECT, faça o commit para persistir a alteração.
sql_upper = sql.strip().upper()
if sql_upper.startswith(("INSERT", "UPDATE", "DELETE")):
conn.commit()
# Retorno baseado no tipo solicitado
if fetch == "all":
return result.mappings().all() # Todos os registros
elif fetch == "one":
return result.mappings().first() # Apenas o primeiro registro
elif fetch == "result":
return result # Retorno bruto do SQLAlchemy
elif fetch == "none":
return result.rowcount # Retorna o número de linhas afetadas
except SQLAlchemyError as e:
print(f"[ERRO SQL]: {e}") # Log de erro para debug
raise

20
actions/config/config.py Normal file
View file

@ -0,0 +1,20 @@
import json
from pathlib import Path
from types import SimpleNamespace
class Config:
@staticmethod
def get(name: str):
# Caminho absoluto do arquivo atual
base_dir = Path(__file__).resolve().parent
# Caminho absoluto para o config.json (subindo dois níveis e entrando em config/)
config_path = base_dir.parent.parent / 'config' / name
# Carrega o JSON como objeto acessível por ponto
with open(config_path, 'r') as f:
config = json.load(f, object_hook=lambda d: SimpleNamespace(**d))
return config

View file

@ -0,0 +1,27 @@
import importlib
from actions.config.config import Config
class DynamicImport:
def __init__(self):
self.base = 'packages.v1'
def set_package(self, name):
self.package = name
def set_table(self, table):
self.table = table
def service(self, name: str, class_name : str):
try:
# Define o nome do Módulo
module_file = f"{name}"
# Define o caminho do arquivo
path = f"{self.base}.{self.package}.services.{self.table}.{module_file}"
# Realiza a importação do arquivo
module = importlib.import_module(path)
clazz = getattr(module, class_name)
return clazz
except (ImportError, AttributeError) as e:
raise ImportError(f"Erro ao importar '{class_name}' de '{path}': {e}")

32
actions/file/file.py Normal file
View file

@ -0,0 +1,32 @@
import json
import os
class File:
def create(self, data, caminho_arquivo='storage/temp.json'):
try:
# Garante que a pasta existe
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
# Lê dados existentes (ou cria nova lista)
if os.path.exists(caminho_arquivo):
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
try:
dados_existentes = json.load(arquivo)
if not isinstance(dados_existentes, list):
dados_existentes = []
except json.JSONDecodeError:
dados_existentes = []
else:
dados_existentes = []
# Adiciona novo dado
dados_existentes.append(data)
# Salva novamente no arquivo com indentação
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo:
json.dump(dados_existentes, arquivo, indent=4, ensure_ascii=False)
except Exception as e:
print(f"❌ Erro ao salvar o dado: {e}")

View file

@ -0,0 +1,36 @@
from datetime import datetime, timedelta
from jose import jwt
from pytz import timezone
from abstracts.action import BaseAction
from actions.config.config import Config
class CreateToken(BaseAction):
def __init__(self):
# Busca as configurações da aplicação
self.config = Config.get('app.json')
# Cria o timedelta com base na config
self.access_token_expire = timedelta(
minutes=self.config.jwt.expire.minute,
hours=self.config.jwt.expire.hours,
days=self.config.jwt.expire.days
)
def execute(self, tipo_token: str, data : str) -> str:
sp = timezone('America/Sao_Paulo')
agora = datetime.now(tz=sp)
expira = agora + self.access_token_expire
# Define os dados do token
payload = {
'type' : tipo_token,
'exp' : expira,
'iat' : agora,
'data' : str(data)
}
# Retorna os dados codificados
return jwt.encode(payload, self.config.jwt.token, algorithm=self.config.jwt.algorithm)

View file

@ -0,0 +1,24 @@
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer
from actions.jwt.verify_token import VerifyToken # A classe que criamos anteriormente
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # Apenas requerido pelo FastAPI
def get_current_user(token: str = Depends(oauth2_scheme)):
# Ação que válida o tokne
verify_token = VerifyToken()
# Obtem o resultado da validação
result = verify_token.execute(token)
# Verifica se a resposta é diferente de inválida
if result['status'] != 'valid':
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=result.get('message', 'Token inválido ou expirado'),
headers={"WWW-Authenticate": "Bearer"},
)
# Retorna apenas os dados do token
return result['payload']

View file

@ -0,0 +1,57 @@
from datetime import datetime
from jose import jwt, JWTError, ExpiredSignatureError
from pytz import timezone
from actions.config.config import Config
class VerifyToken:
def __init__(self):
# Carrega configurações
self.config = Config.get('app.json')
def execute(self, token: str, expected_type: str = 'access-token') -> dict:
try:
# Decodifica o token
payload = jwt.decode(
token,
self.config.jwt.token,
algorithms=[self.config.jwt.algorithm]
)
# Valida expiração
exp_timestamp = payload.get("exp")
if exp_timestamp is None:
raise ValueError("O token não possui data de expiração.")
# Verifica o tipo de token
token_type = payload.get("type")
if token_type != expected_type:
raise ValueError("Tipo de token inválido.")
# Verificação opcional: validar campo "data"
if "data" not in payload:
raise ValueError("Token malformado: campo 'data' ausente.")
return {
"status": "valid",
"payload": payload
}
except ExpiredSignatureError:
return {
"status": "expired",
"message": "O token expirou."
}
except JWTError as e:
return {
"status": "invalid",
"message": f"Token inválido: {str(e)}"
}
except Exception as e:
return {
"status": "error",
"message": f"Erro na validação do token: {str(e)}"
}

32
actions/log/log.py Normal file
View file

@ -0,0 +1,32 @@
import json
import os
class Log:
def register(self, data, caminho_arquivo='storage/temp.json'):
try:
# Garante que a pasta existe
os.makedirs(os.path.dirname(caminho_arquivo), exist_ok=True)
# Lê dados existentes (ou cria nova lista)
if os.path.exists(caminho_arquivo):
with open(caminho_arquivo, 'r', encoding='utf-8') as arquivo:
try:
dados_existentes = json.load(arquivo)
if not isinstance(dados_existentes, list):
dados_existentes = []
except json.JSONDecodeError:
dados_existentes = []
else:
dados_existentes = []
# Adiciona novo dado
dados_existentes.append(data)
# Salva novamente no arquivo com indentação
with open(caminho_arquivo, 'w', encoding='utf-8') as arquivo:
json.dump(dados_existentes, arquivo, indent=4, ensure_ascii=False)
except Exception as e:
print(f"❌ Erro ao salvar o dado: {e}")

View file

@ -0,0 +1,43 @@
# core/security.py
# Importa CryptContext da biblioteca passlib para operações de hash de senha
from passlib.context import CryptContext
# Cria uma instância do contexto de criptografia
# O esquema usado é 'bcrypt', que é seguro e amplamente aceito
# O parâmetro 'deprecated="auto"' marca versões antigas como inseguras, se aplicável
CRYPTO = CryptContext(schemes=['bcrypt'], deprecated='auto')
class Security:
# Verifica se a senha tem um hash válido
@staticmethod
def is_hash(senha: str) -> bool:
"""
Verifica se a string fornecida é um hash reconhecido pelo CryptContext.
"""
return CRYPTO.identify(senha)
# Verifica se uma senha fornecida corresponde ao hash armazenado
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Compara a senha fornecida em texto puro com o hash armazenado.
:param plain_password: Senha digitada pelo usuário
:param hashed_password: Hash da senha armazenado no banco de dados
:return: True se corresponder, False se não
"""
return CRYPTO.verify(plain_password, hashed_password)
# Gera o hash de uma senha fornecida
def hash_password(plain_password: str) -> str:
"""
Gera e retorna o hash da senha fornecida.
:param plain_password: Senha em texto puro fornecida pelo usuário
:return: Hash da senha
"""
return CRYPTO.hash(plain_password)

View file

@ -0,0 +1,4 @@
# exceptions.py
class BusinessRuleException(Exception):
def __init__(self, message: str):
self.message = message

View file

@ -0,0 +1,86 @@
# handlers.py
import json
import traceback
from fastapi import Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from starlette.exceptions import HTTPException as StarletteHTTPException
from actions.system.exceptions import BusinessRuleException
from actions.log.log import Log
def register_exception_handlers(app):
def __init__ (self):
log = Log()
@app.exception_handler(BusinessRuleException)
async def business_rule_exception_handler(request: Request, exc: BusinessRuleException):
response = {
"status": "422",
"error": "Regra de negócio",
"detail": exc.message
}
# Salva o log em disco
Log.register(response, 'storage/temp/business_rule_exception_handler.json')
return JSONResponse(
status_code=422,
content=response
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
response = {
"status": exc.status_code,
"error": "HTTP Error",
"detail": exc.detail
}
# Salva o log em disco
Log.register(response, 'storage/temp/http_exception_handler.json')
return JSONResponse(
status_code=exc.status_code,
content=response
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
response = {
"status": 400,
"error": "Erro de validação",
"detail": exc.errors()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=400,
content=response
)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
response = {
"status": 500,
"error": "Erro Interno do Servidor",
"type": type(exc).__name__,
"message": str(exc),
"trace": traceback.format_exc()
}
# Salva o log em disco
Log.register(response, 'storage/temp/validation_exception_handler.json')
return JSONResponse(
status_code=500,
content=response
)

View file

@ -0,0 +1,8 @@
class CEP:
@staticmethod
def validate(data: str) -> bool:
# Valida e retorna a informação
return len(data) == 8

View file

@ -0,0 +1,35 @@
import re
class CNPJ:
@staticmethod
def validate(data: str) -> bool:
# Remove caracteres não numéricos
data = re.sub(r'\D', '', data)
# Verifica se tem 14 dígitos
if len(data) != 14:
return False
# CNPJs com todos os dígitos iguais são inválidos
if data == data[0] * 14:
return False
# Calcula os dois dígitos verificadores
def calcular_digito(data, peso):
soma = sum(int(a) * b for a, b in zip(data, peso))
resto = soma % 11
return '0' if resto < 2 else str(11 - resto)
# Primeiro dígito verificador
peso1 = [5, 4, 3, 2, 9, 8, 7, 6, 5, 4, 3, 2]
digito1 = calcular_digito(data[:12], peso1)
# Segundo dígito verificador
peso2 = [6] + peso1
digito2 = calcular_digito(data[:12] + digito1, peso2)
# Verifica se os dígitos batem
return data[-2:] == digito1 + digito2

View file

@ -0,0 +1,34 @@
import re
class CPF:
@staticmethod
def is_valid_cpf(data: str) -> bool:
# Remove caracteres não numéricos
data = re.sub(r'\D', '', data)
# Verifica se tem 11 dígitos
if len(data) != 11:
return False
# CPFs com todos os dígitos iguais são inválidos
if data == data[0] * 11:
return False
# Calcula o primeiro e segundo dígitos verificadores
def calcular_digito(digitos, peso):
soma = sum(int(a) * b for a, b in zip(digitos, peso))
resto = soma % 11
return '0' if resto < 2 else str(11 - resto)
# Primeiro dígito verificador
peso1 = range(10, 1, -1)
digito1 = calcular_digito(data[:9], peso1)
# Segundo dígito verificador
peso2 = range(11, 1, -1)
digito2 = calcular_digito(data[:10], peso2)
# Verifica se os dígitos batem
return data[-2:] == digito1 + digito2

View file

@ -0,0 +1,9 @@
import re
class Email:
@staticmethod
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))

View file

@ -0,0 +1,12 @@
class Phone:
@staticmethod
def validate_cellphone(data: str) -> bool:
# Verifica e retorna se o numero de celular é igual a 11
return len(data) == 11
@staticmethod
def validate_telephone(data: str) -> bool:
# Verifica e retorna se o numero de telefone é igual a 11
return len(data) == 10

View file

@ -0,0 +1,63 @@
import html
import re
class Text:
# Remove as mascaras de números
@staticmethod
def just_numbers(data: str) -> str:
""" Mantêm apenas os numeros """
data = re.sub(r"[^\d]", "", data)
return data
# Verifica se um e-mail é válido
@staticmethod
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))
"""
Sanitiza entradas de texto contra XSS e SQL Injection básicos.
- Remove espaços extras
- Escapa entidades HTML
- Remove padrões suspeitos de XSS e SQL Injection
- Normaliza múltiplos espaços em um
"""
@staticmethod
def sanitize_input(data: str) -> str:
if not data:
return data
# 1) Remove espaços no início e no fim
data = data.strip()
# 2) Escapa entidades HTML (< > & ")
data = html.escape(data)
# 3) Remove múltiplos espaços seguidos
data = re.sub(r"\s+", " ", data)
# 4) Remove tags <script> (com atributos)
data = re.sub(r'<\s*script[^>]*>', '', data, flags=re.IGNORECASE)
data = re.sub(r'<\s*/\s*script\s*>', '', data, flags=re.IGNORECASE)
# 5) Remove javascript: de links
data = re.sub(r'javascript\s*:', '', data, flags=re.IGNORECASE)
# 6) Remove palavras-chave SQL Injection comuns
blacklist = [
"--", ";", "/*", "*/", "@@",
"char(", "nchar(", "varchar(",
"alter", "drop", "exec", "insert",
"delete", "update", "union", "select",
"from", "where"
]
for word in blacklist:
# Verificar se 'word' é uma string não vazia e válida para a regex
if word:
data = re.sub(re.escape(word), "", data, flags=re.IGNORECASE)
return data

28
config/app.json Normal file
View file

@ -0,0 +1,28 @@
{
"state" : "go",
"url": "/api/v1",
"log": {
"request": {
"name": "request.json",
"path": "storage/temp"
}
},
"StartupCheck": {
"database": true,
"disk": true
},
"jwt" : {
"token" : "WYe1zwtlDkh39_X3X3qTSICFDxts4VQrMyGLxnEpGUg",
"algorithm" : "HS256",
"type" : "",
"expire" : {
"minute" : 60,
"hours" : 24,
"days" : 7
}
},
"pagination": {
"first" : 20,
"skip" : 0
}
}

View file

@ -0,0 +1,13 @@
{
"host": "localhost",
"name": "D:/Orius/Base/CAIAPONIA.FDB",
"port": 3050,
"user": "SYSDBA",
"password": "master!orius",
"charset": "UTF8",
"pool" : {
"pre_ping" : true,
"size" : 5,
"max_overflow" :10
}
}

View file

@ -0,0 +1,13 @@
{
"user": "root",
"password": "root",
"host": "localhost",
"port": 3306,
"name": "monitoring",
"charset": "utf8mb4",
"pool": {
"pre_ping": true,
"size": 5,
"max_overflow": 10
}
}

73
database/mysql.py Normal file
View file

@ -0,0 +1,73 @@
from typing import Optional
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import SQLAlchemyError
from actions.config.config import Config
class MySQL:
_engine: Optional[Engine] = None # Armazena a engine do SQLAlchemy para reutilização
@classmethod
def get_engine(cls) -> Engine:
"""
Retorna a engine do SQLAlchemy.
Cria a engine se ainda não existir e testa a conexão.
"""
# Carrega configurações do JSON
database = Config.get('database/mysql.json')
if cls._engine is None:
# Monta DSN para o SQLAlchemy usando PyMySQL
dsn = (
f"mysql+pymysql://{database.user}:"
f"{database.password}@"
f"{database.host}:"
f"{database.port}/"
f"{database.name}"
)
# Cria a engine SQLAlchemy
cls._engine = create_engine(
dsn,
connect_args={"charset": database.charset},
pool_pre_ping=bool(database.pool.pre_ping), # Testa conexão antes de usar
pool_size=database.pool.size,
max_overflow=database.pool.max_overflow,
)
# Testa conexão imediatamente após criar a engine
try:
with cls._engine.connect() as conn:
conn.execute(text("SELECT 1")) # Consulta simples para verificar conexão
print("Conexão MySQL OK")
except SQLAlchemyError as e:
cls._engine = None # Se falhar, descarta a engine
print("Falha na conexão MySQL:", e)
raise # Repassa o erro para tratamento externo
return cls._engine
@classmethod
def dispose(cls):
"""
Fecha a engine e limpa o objeto para liberar recursos.
"""
if cls._engine:
cls._engine.dispose()
cls._engine = None
@classmethod
def test_connection(cls) -> bool:
"""
Testa a conexão com o banco de dados sem criar a engine permanente.
Retorna True se a conexão for bem-sucedida, False caso contrário.
"""
try:
engine = cls.get_engine()
with engine.connect() as conn:
conn.execute(text("SELECT 1")) # Consulta simples para testar conexão
return True
except SQLAlchemyError as e:
print("Erro ao testar conexão MySQL:", e)
return False

89
main.py Normal file
View file

@ -0,0 +1,89 @@
# Ajuste para garantir que o diretório base do projeto seja incluído no PYTHONPATH
import os
import sys
# Adiciona o diretório atual (onde está o main.py) ao sys.path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Importa a classe principal do FastAPI
from fastapi import FastAPI, Request
from pathlib import Path
# Importa o middleware de CORS
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response
from starlette.middleware.base import BaseHTTPMiddleware
# Importa o roteador principal da API versão 1
from packages.v1.api import api_router
from packages.v1.system.service.startup_check_service import \
StartupCheckService
# Importa as configurações globais da aplicação
from actions.log.log import Log
from actions.config.config import Config
from actions.system.handlers import register_exception_handlers
config = Config.get('app.json')
# Instancia o app FastAPI com um título personalizado
app = FastAPI(title='Monitoramento Orius')
# Controle de erros personalizados
register_exception_handlers(app)
# Adiciona o middleware de CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # Domínio do frontend
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.on_event("startup")
async def on_startup():
# Realiza as verificações do servidor
startupCheckService = StartupCheckService()
# Exibe o amarzenamento do servidor
print(startupCheckService.execute())
@app.middleware("http")
async def log_tempo_requisicao(request: Request, call_next):
# Ação responsavel por registrar o log de requisição
log = Log()
config = Config.get('app.json')
# Obtem os dados da requisição
log_data = {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers)
}
# Gera o nome do arquivo
file = Path(config.log.request.path) / config.log.request.name
# Registra as requisições
log.register(log_data, file)
# Passa adiante
response = await call_next(request)
return response
# Inclui as rotas da versão 1 da API com prefixo definido em settings (ex: /api/v1)
app.include_router(api_router, prefix=config.url)
# Executa o servidor com Uvicorn se este arquivo for executado diretamente
if __name__ == '__main__':
import uvicorn
uvicorn.run(
"main:app", # Caminho do app para execução
host="0.0.0.0", # Disponibiliza a aplicação externamente
port=8000, # Porta padrão
log_level='info', # Define o nível de log para desenvolvimento
reload=True # Ativa auto-reload durante desenvolvimento
)

0
packages/__init__.py Normal file
View file

View file

@ -0,0 +1,27 @@
from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.repositories.log.log_delete_repository import LogDeleteRepository
class LogDeleteAction:
"""
Action para a exclusão de um registro na tabela 'log'.
Utiliza o schema com o ID do log e delega a operação ao repositório.
"""
def execute(self, log_schema: LogIdSchema):
"""
Executa a lógica de exclusão do log.
Note que, embora a tabela 'log' possua vários campos
(log_id, client_id, date_post, file), a exclusão normalmente
requer apenas a chave primária ('log_id'), que deve ser encapsulada
no schema 'LogIdSchema'.
:param log_schema: Schema contendo o ID do log a ser excluído.
:return: Resultado da operação de exclusão do repositório.
"""
# Instancia o repositório específico para a exclusão de logs
delete_repository = LogDeleteRepository()
# Chama o método execute do repositório, passando o schema do log
return delete_repository.execute(log_schema)

View file

@ -0,0 +1,26 @@
from packages.v1.administrativo.schemas.log_schema import LogSchema
from packages.v1.administrativo.repositories.log.log_get_by_log_id_repository import GetByLogIdRepository
class GetByLogIdAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogSchema):
"""
Executa a lógica de busca do log pelo seu ID.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogSchema' é usado aqui para encapsular o 'log_id' que será
utilizado na busca pelo repositório.
:param log_schema: Schema contendo o 'log_id' para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
get_by_log_id_repository = GetByLogIdRepository()
# Execução do repositório
return get_by_log_id_repository.execute(log_schema)

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.log.log_index_repository import LogIndexRepository
from typing import Tuple, List, Dict, Any
class LogIndexAction(BaseAction):
"""
Action responsável por orquestrar a listagem (indexação) de todos
os registros da tabela 'log' com suporte a paginação.
"""
# O método execute agora recebe 'first' e 'skip'
def execute(self, first: int, skip: int) -> Tuple[List[Dict[str, Any]], int]:
"""
Executa a lógica de listagem com paginação.
:param first: Número máximo de registros a retornar (LIMIT).
:param skip: Número de registros a pular (OFFSET).
:return: Tupla com a lista de logs e o total de registros.
"""
# Instânciamento do repositório de indexação (listagem)
log_index_repository = LogIndexRepository()
# Execução do repositório para buscar os logs com paginação
response, total_records = log_index_repository.execute(first, skip)
# Retorno da informação
return response, total_records

View file

@ -0,0 +1,26 @@
from packages.v1.administrativo.schemas.log_schema import LogSaveSchema
from packages.v1.administrativo.repositories.log.log_save_repository import LogSaveRepository
class LogSaveAction:
"""
Action responsável por orquestrar a operação de salvar (inserir ou atualizar)
um registro na tabela 'log'.
"""
def execute(self, log_schema: LogSaveSchema):
"""
Executa a lógica de salvamento do log.
O schema 'LogSaveSchema' deve conter todos os campos necessários
para a operação de persistência, baseados na DDL:
client_id, date_post (opcional na entrada, pois tem DEFAULT), e file.
:param log_schema: Schema contendo os dados do log a serem salvos.
:return: Resultado da operação de salvamento do repositório.
"""
# Instancia o repositório específico para a operação de salvar logs
save_repository = LogSaveRepository()
# Chama o método execute do repositório, passando o objeto schema
return save_repository.execute(log_schema)

View file

@ -0,0 +1,30 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.log_schema import LogSchema
from packages.v1.administrativo.repositories.log.log_show_repository import LogShowRepository
class LogShowAction(BaseAction):
"""
Action responsável por orquestrar a visualização (show) de um registro
único na tabela 'log', geralmente utilizando o 'log_id'.
"""
def execute(self, log_schema: LogSchema):
"""
Executa a lógica de busca e exibição do log.
O schema 'LogSchema' é usado para transportar o 'log_id', que
será o critério principal para buscar os dados completos
(client_id, date_post, file) do log.
:param log_schema: Schema contendo o ID do log a ser exibido.
:return: O registro de log encontrado ou None/erro.
"""
# Instânciamento do repositório de visualização (show)
show_repository = LogShowRepository()
# Execução do repositório
response = show_repository.execute(log_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,27 @@
from packages.v1.administrativo.schemas.log_schema import LogUpdateSchema
from packages.v1.administrativo.repositories.log.log_update_repository import LogUpdateRepository
class LogUpdateAction:
"""
Action responsável por orquestrar a operação de atualização (UPDATE)
de um registro na tabela 'log', identificado pelo seu ID.
"""
def execute(self, log_id: int, log_schema: LogUpdateSchema):
"""
Executa a lógica de atualização do log.
O 'log_id' identifica qual registro será modificado, e o
'log_schema' contém os novos valores para os campos
(client_id, date_post, file).
:param log_id: ID do log a ser atualizado.
:param log_schema: Schema contendo os novos dados do log.
:return: Resultado da operação de atualização do repositório.
"""
# Instancia o repositório específico para a operação de atualização de logs
update_repository = LogUpdateRepository()
# Chama o método execute do repositório, passando o ID e o objeto schema
return update_repository.execute(log_id, log_schema)

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.user_schema import UserIdSchema
from packages.v1.administrativo.repositories.user.user_delete_repository import DeleteRepository
class DeleteAction:
def execute(self, usuario_schema : UserIdSchema):
delete_repository = DeleteRepository()
return delete_repository.execute(usuario_schema)

View file

@ -0,0 +1,14 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.user_schema import UserAuthenticateSchema
from packages.v1.administrativo.repositories.user.user_get_by_authenticate_repository import GetByAuthenticateRepository
class GetByAuthenticateAction(BaseAction):
def execute(self, user_authenticate_schema : UserAuthenticateSchema):
# Instânciamento do repositório de busca pelo authenticate
get_by_authenticate_repository = GetByAuthenticateRepository()
# Execução do repositório
return get_by_authenticate_repository.execute(user_authenticate_schema)

View file

@ -0,0 +1,13 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.user_schema import UserEmailSchema
from packages.v1.administrativo.repositories.user.user_get_by_email_repository import GetByUsuarioEmailRepository
class GetByUsuarioEmailAction(BaseAction):
def execute(self, user_schema = UserEmailSchema):
# Importação do repositório
get_by_email_repository = GetByUsuarioEmailRepository()
# Execução do repositório
return get_by_email_repository.execute(user_schema)

View file

@ -0,0 +1,12 @@
from packages.v1.administrativo.schemas.user_schema import UserSchema
from packages.v1.administrativo.repositories.user.user_get_by_user_id_repository import GetByuserIdRepository
class GetByuserIdAction:
def execute(self, user_schema = UserSchema):
# Importação do repositório
get_by_user_id_repository = GetByuserIdRepository()
# Execução do repositório
return get_by_user_id_repository.execute(user_schema)

View file

@ -0,0 +1,15 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.user.user_index_repository import IndexRepository
class IndexAction(BaseAction):
def execute(self):
# Instânciamento do repositório sql
index_repository = IndexRepository()
# Execução do sql
response = index_repository.execute()
# Retorno da informação
return response

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.user_schema import UserSaveSchema
from packages.v1.administrativo.repositories.user.user_save_repository import SaveRepository
class SaveAction:
def execute(self, usuario_schema : UserSaveSchema):
save_repository = SaveRepository()
return save_repository.execute(usuario_schema)

View file

@ -0,0 +1,16 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.user_schema import UserSchema
from packages.v1.administrativo.repositories.user.user_show_repository import ShowRepository
class ShowAction(BaseAction):
def execute(self, usuario_schema : UserSchema):
# Instânciamento do repositório sql
show_repository = ShowRepository()
# Execução do sql
response = show_repository.execute(usuario_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.user_schema import UserUpdateSchema
from packages.v1.administrativo.repositories.user.user_update_repository import UpdateRepository
class UpdateAction:
def execute(self, user_id: int, usuario_schema : UserUpdateSchema):
save_repository = UpdateRepository()
return save_repository.execute(user_id, usuario_schema)

View file

@ -0,0 +1,177 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.log_schema import (
LogSchema,
LogAuthenticateSchema,
LogSaveSchema,
LogUpdateSchema,
LogIdSchema,
LogFileSchema
)
import json # Necessário para carregar o arquivo app.json
import math
# Carrega as configurações de paginação do app.json
with open('config/app.json', 'r') as f:
app_config = json.load(f)
PAGINATION_FIRST = app_config.get('pagination', {}).get('first', 20)
PAGINATION_SKIP = app_config.get('pagination', {}).get('skip', 0)
class LogController:
"""
Controller responsável por orquestrar as operações (CRUD e outras buscas)
para a tabela 'log'.
"""
def __init__(self):
# Action responsável por carregar as services de acordo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("log")
pass
# Efetua a busca por algum critério de autenticação/filtro
def authenticate(self, log_authenticate_schema: LogAuthenticateSchema):
# Importação de service de Authenticate
authenticate_service = self.dynamic_import.service("log_authenticate_service", "AuthenticateService")
# Instânciamento da service
self.authenticate_service = authenticate_service()
# Retorna o log ou logs localizados
return {
'message': 'Log(s) localizado(s) com sucesso',
'data': self.authenticate_service.execute(log_authenticate_schema)
}
# Carrega os dados de um log (adaptado de 'me' para logs, se aplicável)
def me(self, current_log):
# Esta função é incomum para um log, mas adaptada do original 'me' (usuário logado)
# Importação de service de me
me_service = self.dynamic_import.service("log_me_service", "MeService")
# Instânciamento da service
self.me_service = me_service()
# Retorna o log completo
return {
'message': 'Log localizado com sucesso',
'data': self.me_service.execute(current_log)
}
# Lista todos os logs com paginação
def index(self, first: int = PAGINATION_FIRST, skip: int = PAGINATION_SKIP):
# Importação da classe desejada
indexService = self.dynamic_import.service("log_index_service", "IndexService")
# Instânciamento da classe service
self.indexService = indexService()
# Lista todos os logs, recebendo a lista de dados e o total de registros
data, total_records = self.indexService.execute(first, skip)
# Cálculo dos metadados de paginação
total_pages = math.ceil(total_records / first)
current_page = (skip // first) + 1
next_page = None
# Verifica se existe uma próxima página
if current_page < total_pages:
next_page = current_page + 1
# Retorna a lista de logs e os metadados de paginação
return {
'message': 'Logs localizados com sucesso',
'data': data,
'pagination': {
'total_records': total_records,
'total_pages': total_pages,
'current_page': current_page,
'next_page': next_page,
'first': first, # Total de registros por página
'skip': skip # Registros pulados
}
}
# Busca um log específico pelo ID (log_id)
def show(self, log_schema: LogSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('log_show_service', 'ShowService')
# Instânciamento da classe desejada
self.show_service = show_service()
# Busca e retorna o log desejado
return {
'message': 'Log localizado com sucesso',
'data': self.show_service.execute(log_schema)
}
# Busca um log específico pelo campo 'file' (adaptado de getEmail)
def getFile(self, log_schema: LogFileSchema):
# Importação da classe desejada
get_file_service = self.dynamic_import.service('log_get_file_service', 'GetFileService')
# Instânciamento da classe desejada
self.get_file_service = get_file_service()
# Busca e retorna o log desejado
return {
'message': 'Arquivo localizado com sucesso no log',
'data': self.get_file_service.execute(log_schema, True)
}
# Cadastra um novo log
def save(self, log_schema: LogSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('log_save_service', 'LogSaveService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o log desejado
return {
'message': 'Log salvo com sucesso',
'data': self.save_service.execute(log_schema)
}
# Atualiza os dados de um log
def update(self, log_id: int, log_schema: LogUpdateSchema):
#Importação da classe desejada
update_service = self.dynamic_import.service('log_update_service', 'LogUpdateService')
# Instânciamento da classe desejada
self.update_service = update_service()
# Busca e retorna o log desejado
return {
'message': 'Log atualizado com sucesso',
'data': self.update_service.execute(log_id, log_schema)
}
# Exclui um log
def delete(self, log_schema: LogIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('log_delete_service', 'DeleteService')
# Instânciamento da classe desejada
self.delete_service = delete_service()
# Busca e retorna o log desejado
return {
'message': 'Log removido com sucesso',
'data': self.delete_service.execute(log_schema)
}

View file

@ -0,0 +1,145 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.user_schema import (
UserSchema,
UserAuthenticateSchema,
UserSaveSchema,
UserUpdateSchema,
UserEmailSchema,
UserIdSchema
)
class UserController:
def __init__(self):
# Action responsável por carregar as services de acodo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("user")
pass
# Efetua o acesso junto ao sistema por um determinado usuário
def authenticate(self, user_authenticate_schema : UserAuthenticateSchema):
# Importação de service de Authenticate
authenticate_service = self.dynamic_import.service("user_authenticate_service", "AuthenticateService")
# Instânciamento da service
self.authenticate_service = authenticate_service()
# Retorna o usuário logado
return {
'message' : 'Usuário localizado com sucesso',
'data' : {
'token' : self.authenticate_service.execute(user_authenticate_schema)
}
}
# Carrega os dados do usuário logado
def me(self, current_user):
# Importação de service de authenticate
me_service = self.dynamic_import.service("user_me_service", "MeService")
# Instânciamento da service
self.me_service = me_service()
# Retorna o usuário logado
return {
'message' : 'Usuário localizado com sucesso',
'data' : self.me_service.execute(current_user)
}
# Lista todos os usuários
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("user_index_service", "IndexService")
# Instânciamento da classe service
self.indexService = indexService()
# Lista todos os usuários
return {
'message': 'Usuários localizados com sucesso',
'data': self.indexService.execute()
}
# Busca um usuário especifico pelo ID
def show(self, usuario_schema : UserSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('user_show_service', 'ShowService')
# Instânciamento da classe desejada
self.show_service = show_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Usuário localizado com sucesso',
'data': self.show_service.execute(usuario_schema)
}
# Busca um usuário especifico pelo e-mail
def getEmail(self, usuario_schema : UserEmailSchema):
#Importação da classe desejada
get_email_service = self.dynamic_import.service('user_get_email_service', 'GetEmailService')
# Instânciamento da classe desejada
self.get_email_service = get_email_service()
# Busca e retorna o usuário desejado
return {
'message' : 'E-mail localizado com sucesso',
'data': self.get_email_service.execute(usuario_schema, True) # True para retornar a mensagem de erro caso não localize o usuario
}
# Cadastra um novo usuário
def save(self, usuario_schema : UserSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('user_save_service', 'UserSaveService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Usuário salvo com sucesso',
'data': self.save_service.execute(usuario_schema)
}
# Atualiza os dados de um usuário
def update(self, user_id: int, usuario_schema : UserUpdateSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('user_update_service', 'UserUpdateService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Usuário atualizado com sucesso',
'data': self.save_service.execute(user_id, usuario_schema)
}
# Exclui um usuário
def delete(self, usuario_schema : UserIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('user_delete_service', 'DeleteService')
# Instânciamento da classe desejada
self.delete_service = delete_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Usuário removido com sucesso',
'data': self.delete_service.execute(usuario_schema)
}

View file

@ -0,0 +1,111 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status, Query
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.log_controller import LogController
from packages.v1.administrativo.schemas.log_schema import (
LogSchema,
LogAuthenticateSchema,
LogSaveSchema,
LogUpdateSchema,
LogIdSchema,
LogFileSchema
)
# Inicializa o roteador para as rotas de log
router = APIRouter()
# Instânciamento do controller desejado
log_controller = LogController()
# Lista todos os logs com paginação
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os logs cadastrados com paginação',
response_description='Lista todos os logs cadastrados')
async def index(
first: int = Query(20, description="Total de registros por página (LIMIT)"),
skip: int = Query(0, description="Total de registros a pular (OFFSET)"),
current_user: dict = Depends(get_current_user)
):
# Busca todos os logs cadastrados, passando os parâmetros de paginação
response = log_controller.index(first=first, skip=skip)
# Retorna os dados localizados com os metadados de paginação
return response
# Localiza um log pelo campo 'file' (adaptado de /email)
@router.get('/file',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pelo nome do arquivo (file)',
response_description='Busca um registro em específico')
async def getFile(file: str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogFileSchema(file=file)
# Busca um log específico pelo arquivo
response = log_controller.getFile(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo ID
@router.get('/{log_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pelo ID do log',
response_description='Busca um registro em específico')
async def show(log_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogIdSchema(log_id=log_id)
# Busca um log específico pelo ID
response = log_controller.show(log_schema)
# Retorna os dados localizados
return response
# Cadastro de logs
@router.post('/',
status_code=status.HTTP_201_CREATED, # Alterado para 201 Created, mais apropriado para POST
summary='Cadastra um novo log',
response_description='Cadastra um log')
async def save(log_schema: LogSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro do log junto ao banco de dados
response = log_controller.save(log_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de log
@router.put('/{log_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um log',
response_description='Atualiza um log')
async def update(log_id: int, log_schema: LogUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados do log
response = log_controller.update(log_id, log_schema)
# Retorna os dados localizados
return response
# Exclui um determinado log
@router.delete('/{log_id}',
status_code=status.HTTP_200_OK,
summary='Remove um log',
response_description='Remove um log')
async def delete(log_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogIdSchema(log_id=log_id)
# Efetua a exclusão de um determinado log
response = log_controller.delete(log_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,133 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.user_controller import UserController
from packages.v1.administrativo.schemas.user_schema import (
UserSchema,
UserAuthenticateSchema,
UserSaveSchema,
UserUpdateSchema,
UserEmailSchema,
UserIdSchema
)
# Inicializa o roteador para as rotas de usuário
router = APIRouter()
# Instânciamento do controller desejado
user_controller = UserController()
# Autenticação de usuário
@router.post('/authenticate',
status_code=status.HTTP_200_OK,
summary='Cria o token de acesso do usuário',
response_description='Retorna o token de acesso do usuário')
async def index(user_authenticate_schema : UserAuthenticateSchema):
# Efetua a autenticação de um usuário junto ao sistema
response = user_controller.authenticate(user_authenticate_schema)
# Retorna os dados localizados
return response
# Dados do usuário logado
@router.get('/me',
status_code=status.HTTP_200_OK,
summary='Retorna os dados do usuário que efetuou o login',
response_description='Dados do usuário que efetuou o login' )
async def me(current_user: dict = Depends(get_current_user)):
# Busca os dados do usuário logado
response = user_controller.me(current_user)
# Retorna os dados localizados
return response
# Lista todos os usuários
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os usuário cadastrados',
response_description='Lista todos os usuário cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = user_controller.index()
# Retorna os dados localizados
return response
# Localiza um usuário pelo email
@router.get('/email',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico por e-mail informado',
response_description='Busca um registro em especifico')
async def getEmail(email : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = UserEmailSchema(email=email)
# Busca um usuário especifico pelo e-mail
response = user_controller.getEmail(usuario_schema)
# Retorna os dados localizados
return response
# Localiza um usuário pelo ID
@router.get('/{user_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pelo ID do usuário',
response_description='Busca um registro em especifico')
async def show(user_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = UserIdSchema(user_id=user_id)
# Busca um usuário especifico pelo ID
response = user_controller.show(usuario_schema)
# Retorna os dados localizados
return response
# Cadastro de usuários
@router.post('/',
status_code=status.HTTP_200_OK,
summary='Cadastra um usuário',
response_description='Cadastra um usuário')
async def save(usuario_schema : UserSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro do usuário junto ao banco de dados
response = user_controller.save(usuario_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de usuário
@router.put('/{user_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um usuário',
response_description='Atualiza um usuário')
async def update(user_id : int, usuario_schema : UserUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados de usuário
response = user_controller.update(user_id, usuario_schema)
# Retorna os dados localizados
return response
# Exclui um determinado usuário
@router.delete('/{user_id}',
status_code=status.HTTP_200_OK,
summary='Remove um usuário',
response_description='Remove um usuário')
async def delete(user_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = UserIdSchema(user_id=user_id)
# Efetua a exclusão de um determinado usuário
response = user_controller.delete(usuario_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,38 @@
from packages.v1.administrativo.schemas.log_schema import \
LogIdSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class LogDeleteRepository(BaseRepository):
"""
Repositório responsável pela operação de exclusão (DELETE) de um
registro na tabela 'log', usando o log_id como critério.
"""
def execute(self, log_schema: LogIdSchema):
try:
# Montagem do sql para exclusão
sql = """ DELETE FROM log l WHERE l.log_id = :logId """
# Preenchimento de parâmetros
params = {
"logId" : log_schema.log_id
}
# Execução do sql
response = self.run(sql, params)
# Retorna o resultado (número de linhas afetadas)
return response
except Exception as e:
# Informa que houve uma falha na exclusão do log
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir log: {e}"
)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogSchema
class LogGetByLogIdRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.log_id = :logId """
# Preenchimento dos parâmetros SQL
params = {
'logId': log_schema.log_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,46 @@
from abstracts.repository import BaseRepository
from typing import Tuple, List, Dict, Any
class LogIndexRepository(BaseRepository):
"""
Repositório responsável por buscar e retornar todos os registros
da tabela 'log' (indexação), com suporte a paginação.
"""
# O método execute agora recebe 'first' (limite) e 'skip' (offset)
def execute(self, first: int, skip: int) -> Tuple[List[Dict[str, Any]], int]:
"""
Executa a busca de logs com paginação e retorna o total de registros.
O SELECT * retorna todos os campos da DDL:
log_id, client_id, date_post e file.
:param first: Número máximo de registros a retornar (LIMIT).
:param skip: Número de registros a pular (OFFSET).
:return: Uma tupla contendo a lista de logs e o total de registros.
"""
# 1. SQL para contar o total de registros (ignorando LIMIT/OFFSET)
sql_count = """ SELECT COUNT(*)
FROM log l """
total_records = self.fetch_one(sql_count)['COUNT(*)']
# 2. SQL para listar os logs com LIMIT e OFFSET (Paginação)
sql = f""" select l.log_id,
l.client_id,
l.date_post,
c.client_id,
c.cns,
c.name
from log l
left join client c on l.client_id = c.client_id
LIMIT {first} OFFSET {skip} """
# Execução do sql para buscar múltiplos registros
response = self.fetch_all(sql)
# Retorna os dados localizados e o total de registros
return response, total_records

View file

@ -0,0 +1,64 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogSaveSchema # Importação do schema LogSaveSchema
class LogSaveRepository(BaseRepository):
"""
Repositório responsável pela operação de salvar/atualizar (Upsert)
um registro na tabela 'log', utilizando a lógica INSERT...ON DUPLICATE KEY UPDATE.
"""
def execute(self, log_schema: LogSaveSchema):
try:
# SQL adaptado para MySQL: INSERT ... ON DUPLICATE KEY UPDATE para a tabela 'log'.
# Colunas para o INSERT:
insert_cols = """
log_id, client_id, date_post, file
"""
# Valores para o INSERT (usando os placeholders):
insert_vals = """
:log_id, :client_id, :date_post, :file
"""
# Ações no ON DUPLICATE KEY UPDATE (atualiza os campos e o date_post como data de atualização)
update_actions = """
client_id = VALUES(client_id),
date_post = NOW(), # Atualiza o timestamp para o momento da modificação (comportamento de update)
file = VALUES(file)
"""
sql = f"""
INSERT INTO log ({insert_cols})
VALUES ({insert_vals})
ON DUPLICATE KEY UPDATE
{update_actions};
"""
# Preenchimento de parâmetros. log_id será None (para INSERT) ou o ID (para UPDATE).
params = {
'log_id': log_schema.log_id, # Deve ser None/0 para INSERT
'client_id': log_schema.client_id,
'date_post': log_schema.date_post,
'file': log_schema.file,
}
# Execução do SQL.
result = self.run_and_return(sql, params)
# Se for um INSERT e a execução retornar um ID, retorna o novo ID do log.
if not log_schema.log_id and result:
return result
# Se for um UPDATE ou um resultado de sucesso da operação.
return result
except Exception as e:
# Informa que houve uma falha ao salvar/atualizar o log
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar log: {e}"
)

View file

@ -0,0 +1,29 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogSchema
class LogShowRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Montagem do sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.log_id = :logId """
# Preenchimento de parâmetros
params = {
'logId' : log_schema.log_id
}
# Execução do sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,63 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogUpdateSchema
from fastapi import HTTPException, status
# A importação de 'datetime' foi removida, pois a tabela 'log' não possui o campo 'date_update',
# e o campo 'date_post' é o timestamp de criação e não deve ser modificado.
class LogUpdateRepository(BaseRepository):
"""
Repositório responsável pela atualização (UPDATE) dinâmica de um
registro na tabela 'log', identificado pelo 'log_id'.
"""
def execute(self, log_id: int, log_schema: LogUpdateSchema):
try:
updates = []
params = {}
# --- Mapeamento e inclusão dos campos da DDL para atualização dinâmica ---
# client_id
if log_schema.client_id is not None:
updates.append("client_id = :client_id")
params["client_id"] = log_schema.client_id
# file (tipo JSON)
if log_schema.file is not None:
updates.append("file = :file")
params["file"] = log_schema.file
# Os campos 'log_id' (chave) e 'date_post' (criação) não são atualizados dinamicamente.
if not updates:
# Se não houver campos para atualizar, retorna False
return False
params["log_id"] = log_id
# SQL para UPDATE
sql = f"UPDATE log SET {', '.join(updates)} WHERE log_id = :log_id;"
# Executa a query
result = self.run(sql, params)
if result is None or result == 0:
# Se 0 linhas afetadas, pode ser que o log não exista ou não houve alteração.
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum log localizado para esta solicitação ou nenhuma alteração realizada.'
)
# Retorna True, indicando o sucesso da operação
return True
except Exception as e:
# Informa que houve uma falha na atualização do log
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar log: {e}"
)

View file

@ -0,0 +1,34 @@
from packages.v1.administrativo.schemas.user_schema import \
UserIdSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
def execute(self, usuario_schema : UserIdSchema):
try:
# Montagem do sql
sql = """ DELETE FROM user u WHERE u.user_id = :userId """
# Preenchimento de parâmetros
params = {
"userId" : usuario_schema.user_id
}
#Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir usuário: {e}"
)

View file

@ -0,0 +1,23 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserAuthenticateSchema
from fastapi import HTTPException, status
class GetByAuthenticateRepository(BaseRepository):
def execute(self, user_authenticate_schema: UserAuthenticateSchema):
"""
Busca um usuário pelo email e retorna seus dados.
Retorna HTTP 404 se não encontrado.
"""
# Montagem do SQL
sql = """ SELECT * FROM `user` u WHERE u.EMAIL = :email LIMIT 1 """
# Preenchimento dos parâmetros
params = {"email": user_authenticate_schema.email}
# Executa a consulta usando fetch_one
response = self.fetch_one(sql, params)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,18 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserEmailSchema, UserSchema
class GetByUsuarioEmailRepository(BaseRepository):
def execute(self, user_schema = UserEmailSchema)-> UserSchema:
# Define a consulta sql
sql = """ SELECT * FROM user u WHERE u.email = :email """
# Preenchimento dos parâmetros SQL
params = {
'email': user_schema.email
}
# Execução da instrução sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,18 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserSchema
class GetByuserIdRepository(BaseRepository):
def execute(self, user_schema = UserSchema):
# Define a consulta sql
sql = """ SELECT * FROM user u WHERE u.user_id = :userId """
# Preenchimento dos parâmetros SQL
params = {
'userId': user_schema.user_id
}
# Execução da instrução sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,14 @@
from abstracts.repository import BaseRepository
class IndexRepository(BaseRepository):
def execute(self):
# Montagem do sql
sql = """ SELECT * FROM user """
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,82 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserSaveSchema # Importação do schema UserSaveSchema
class SaveRepository(BaseRepository):
def execute(self, user_schema: UserSaveSchema):
try:
# SQL adaptado para MySQL: INSERT ... ON DUPLICATE KEY UPDATE.
# Este comando tenta inserir. Se o 'user_id' já existir (PRIMARY KEY), ele atualiza.
# Note que 'user_id' é passado como NULL no INSERT se for uma criação.
# Colunas para o INSERT:
insert_cols = """
user_id, name, email, password, password_temp, password_temp_confirm,
position, team, status, user_id_create
"""
# Valores para o INSERT (usando os mesmos placeholders):
insert_vals = """
:user_id, :name, :email, :password, :password_temp, :password_temp_confirm,
:position, :team, :status, :user_id_create
"""
# Ações no ON DUPLICATE KEY UPDATE (atualiza todos os campos mutáveis e o date_update)
update_actions = """
name = VALUES(name),
email = VALUES(email),
password = VALUES(password),
password_temp = VALUES(password_temp),
password_temp_confirm = VALUES(password_temp_confirm),
position = VALUES(position),
team = VALUES(team),
status = VALUES(status),
date_update = NOW()
"""
sql = f"""
INSERT INTO user ({insert_cols})
VALUES ({insert_vals})
ON DUPLICATE KEY UPDATE
{update_actions};
"""
# Preenchimento de parâmetros. user_id será None (para INSERT) ou o ID (para UPDATE).
# O MySQL gerencia o AUTO_INCREMENT para user_id se for None.
params = {
'user_id': user_schema.user_id,
'name': user_schema.name,
'email': user_schema.email,
'password': user_schema.password, # A senha deve vir hasheada
'password_temp': None,
'password_temp_confirm': 'N',
'position': user_schema.position,
'team': user_schema.team,
'status': user_schema.status if user_schema.status else 'A',
'user_id_create': user_schema.user_id_create
}
# Execução do SQL. No MySQL, o `run_and_return` normalmente retornaria
# o ID da nova linha (para INSERT) ou o número de linhas afetadas (para UPDATE).
result = self.run_and_return(sql, params)
# Assumindo que 'run_and_return' é ajustado para retornar o objeto salvo/atualizado
# ou um indicador de sucesso que permita buscar o registro posteriormente.
# Se for um INSERT, o resultado pode ser o lastrowid (o novo user_id).
if not user_schema.user_id and result:
# Se for uma criação, retorna o ID do novo usuário (ou o registro completo se o ORM/Driver buscar).
return result
# Se for um UPDATE (user_id != None), retornamos o resultado da operação.
return result
except Exception as e:
# Informa que houve uma falha ao salvar/atualizar o usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar usuário: {e}"
)

View file

@ -0,0 +1,17 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserSchema
class ShowRepository(BaseRepository):
def execute(self, usuario_schema : UserSchema):
# Montagem do sql
sql = """ SELECT * FROM user u WHERE u.user_id = :userId """
# Preenchimento de parâmetros
params = {
'userId' : usuario_schema.user_id
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,105 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.user_schema import UserUpdateSchema
from fastapi import HTTPException, status
from datetime import datetime
class UpdateRepository(BaseRepository):
def execute(self, user_id: int, user_schema: UserUpdateSchema):
print(user_id)
try:
updates = []
params = {}
# --- Mapeamento e inclusão dos campos da DDL para atualização dinâmica ---
if user_schema.name is not None:
updates.append("name = :name")
params["name"] = user_schema.name
if user_schema.email is not None:
updates.append("email = :email")
params["email"] = user_schema.email
# Senha (se fornecida, já vem hasheada do Schema)
if user_schema.password is not None:
updates.append("password = :password")
params["password"] = user_schema.password
if user_schema.password_temp is not None:
updates.append("password_temp = :password_temp")
params["password_temp"] = user_schema.password_temp
if user_schema.password_temp_confirm is not None:
updates.append("password_temp_confirm = :password_temp_confirm")
params["password_temp_confirm"] = user_schema.password_temp_confirm
if user_schema.position is not None:
updates.append("position = :position")
params["position"] = user_schema.position
if user_schema.team is not None:
updates.append("team = :team")
params["team"] = user_schema.team
if user_schema.status is not None:
updates.append("status = :status")
params["status"] = user_schema.status
# Campo de auditoria: user_id_update
if user_schema.user_id_update is not None:
updates.append("user_id_update = :user_id_update")
params["user_id_update"] = user_schema.user_id_update
# Campo de auditoria: date_update (deve ser atualizado em qualquer modificação)
# Para MySQL, usamos NOW() ou enviamos o timestamp. Usaremos o parâmetro Python.
updates.append("date_update = :date_update")
params["date_update"] = datetime.now() # O datetime.now() será formatado pelo driver do banco
if not updates:
# Se não houver campos para atualizar, retorna False
return False
params["user_id"] = user_id
# SQL para MySQL: UPDATE sem RETURNING
sql = f"UPDATE user SET {', '.join(updates)} WHERE user_id = :user_id;"
# Executa a query. O run_and_return é substituído por um run simples para UPDATE,
# e a lógica de retorno deve ser adaptada (por exemplo, retornar o número de linhas afetadas)
# ou realizar um SELECT posterior.
# Mantendo a chamada original para compatibilidade com o framework:
result = self.run(sql, params)
# Para MySQL, 'run_and_return' pode retornar o número de linhas afetadas ou False/None.
# Aqui, adaptamos a lógica para refletir o sucesso da operação, pois a função original
# esperava um retorno de objeto.
# Se a query foi executada com sucesso (result não é False/None e é > 0 linhas afetadas),
# o objeto UserSchema atualizado deve ser retornado (pode ser necessário um SELECT extra).
# Para manter o padrão de retorno do código original (retornar o objeto/registro),
# é necessário um SELECT subsequente ou ajuste no método run_and_return.
# Assumindo que o método 'run_and_return' agora retorna o número de linhas afetadas ou um valor de sucesso/falha:
if result is None or result == 0:
# Se 0 linhas afetadas, pode ser que o usuário não exista.
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum usuário localizado para esta solicitação ou nenhuma alteração realizada.'
)
# O repositório deve retornar o objeto atualizado (pode exigir um SELECT extra aqui).
# No entanto, mantendo o padrão do código original, retornamos o resultado da execução
# do UPDATE, que é o que o framework espera.
return True # Retorna True ou o número de linhas afetadas, indicando sucesso.
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar usuário: {e}"
)

View file

@ -0,0 +1,148 @@
from pydantic import BaseModel, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional, Any, List
from datetime import datetime
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
# Mantido por consistência com o arquivo original
from actions.validations.text import Text
# ----------------------------------------------------
# Schema base - Inclui todos os campos da DDL
# ----------------------------------------------------
class LogSchema(BaseModel):
log_id: Optional[int] = None
client_id: Optional[int] = None
date_post: Optional[datetime] = None
file: Optional[Any] = None # Mapeamento do campo JSON para Any ou dict
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um log especifico pelo ID (GET/DELETE)
# ----------------------------------------------------
class LogIdSchema(BaseModel):
log_id: int
# ----------------------------------------------------
# Schema para localizar um log por critérios de filtro
# (Adaptado de UserAuthenticateSchema, usando client_id e data)
# ----------------------------------------------------
class LogAuthenticateSchema(BaseModel):
# Campos utilizados: client_id e date_post (para filtro de range, por exemplo)
client_id: Optional[int] = None
start_date_post: Optional[datetime] = None
end_date_post: Optional[datetime] = None
# Adicionando um campo genérico para buscar algo dentro do JSON 'file'
file_search_term: Optional[str] = None
@field_validator('file_search_term')
def sanitize_search_term(cls, v):
if v:
return Text.sanitize_input(v)
return v
@model_validator(mode='after')
def check_required_filter(self):
# Garante que pelo menos um critério de busca seja fornecido
if not any([self.client_id, self.start_date_post, self.end_date_post, self.file_search_term]):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Pelo menos um critério de busca (client_id, data inicial, data final ou termo de busca no arquivo) é obrigatório.'
)
return self
# ----------------------------------------------------
# Schema para localizar log pelo campo 'file' (Adaptado de UserEmailSchema)
# ----------------------------------------------------
class LogFileSchema(BaseModel):
# Usado para buscar logs que contenham esta string no campo JSON 'file'
file_content_search: Optional[str] = None
client_id: Optional[int] = None
# Sanitiza o input
@field_validator('file_content_search')
def sanitize_search(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_search_fields(self):
if not self.file_content_search and not self.client_id:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um termo de busca no arquivo ou o ID do cliente.'
)
return self
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para criação de novo log (POST)
# ----------------------------------------------------
class LogSaveSchema(BaseModel):
log_id: Optional[int] = None # Opcional para operações de upsert
client_id: int # NOT NULL
date_post: Optional[datetime] = None # Possui DEFAULT CURRENT_TIMESTAMP
file: Any # NOT NULL (Mapeamento do campo JSON)
# Sanitiza o input (apenas para campos string, mas mantido por consistência)
@field_validator('client_id', mode='before')
def validate_client_id(cls, v):
if v is not None:
# Converte para string para sanitizar e depois para int (se necessário)
return int(Text.sanitize_input(str(v)))
return v
# O campo 'file' sendo Any/dict não tem sanitização Text.sanitize_input direta.
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_required_fields(self):
errors = []
# Validação do client_id
if not self.client_id or self.client_id <= 0:
errors.append({'input': 'client_id', 'message': 'O ID do cliente é obrigatório.'})
# Validação do file (JSON)
if self.file is None:
errors.append({'input': 'file', 'message': 'O campo de arquivo (JSON) é obrigatório.'})
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar log (PUT)
# ----------------------------------------------------
class LogUpdateSchema(BaseModel):
client_id: Optional[int] = None
file: Optional[Any] = None # Mapeamento do campo JSON
# Sanitiza o input (apenas para campos string, mas mantido por consistência)
@field_validator('client_id', mode='before')
def validate_client_id(cls, v):
if v is not None:
return int(Text.sanitize_input(str(v)))
return v

View file

@ -0,0 +1,201 @@
from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
from datetime import datetime
# Funções utilitárias para segurança (hash e verificação de senha)
# Supondo que você ajustará as importações conforme seu projeto
from actions.security.security import Security
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# Funções para validar E-mail
from actions.validations.email import Email
# Funções para validar cpf (Não está na DDL, mas mantido como opcional no Save e Update)
# from actions.validations.cpf import CPF # Não utilizado neste novo escopo
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class UserSchema(BaseModel):
user_id: Optional[int] = None
name: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None # Corresponde ao 'password' na DDL
password_temp: Optional[str] = None
password_temp_confirm: Optional[str] = None
position: Optional[str] = None # Corresponde ao 'position' na DDL
team: Optional[str] = None
status: Optional[str] = None # Corresponde ao 'status' na DDL
date_register: Optional[datetime] = None
date_update: Optional[datetime] = None
user_id_create: Optional[int] = None
user_id_update: Optional[int] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para acesso ao sistema (Autenticação)
# ----------------------------------------------------
class UserAuthenticateSchema(BaseModel):
# Campos utilizados
email: EmailStr # Usando email para login, é mais comum em DDLs simples
password: str # Corresponde ao 'password' na DDL
# Validação e sanitização do email
@field_validator('email')
def validar_e_sanitizar_email(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe o e-mail'
)
# Sanitiza e-mail
return Text.sanitize_input(v)
# Validação e sanitização da senha
@field_validator('password')
def validar_e_sanitizar_senha(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe a senha'
)
# Sanitiza a senha
return Text.sanitize_input(v)
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um usuário especifico pelo ID (GET)
# ----------------------------------------------------
class UserIdSchema(BaseModel):
user_id: int
# ----------------------------------------------------
# Schema para criação de novo usuário (POST)
# ----------------------------------------------------
class UserSaveSchema(BaseModel):
user_id: Optional[int] = None
name: constr(min_length=1)
email: EmailStr
password: constr(min_length=1)
position: constr(min_length=1)
team: Optional[str] = None
status: Optional[str] = 'A'
user_id_create: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator('name', 'email', 'position', 'team', 'status')
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados e hash da senha
@model_validator(mode='after')
def validate_all_fields_and_hash_password(self):
errors = []
# Validação do nome
if not self.name or len(self.name.strip()) == 0:
errors.append({'input': 'name', 'message': 'O nome é obrigatório.'})
# Validação do email (EmailStr do Pydantic já faz a validação de formato)
if not self.email or len(self.email.strip()) == 0:
errors.append({'input': 'email', 'message': 'O e-mail é obrigatório.'})
# Validação da senha
if not self.password or len(self.password.strip()) == 0:
errors.append({'input': 'password', 'message': 'A senha é obrigatória.'})
# Criptografa a senha
if self.password and not errors: # Hash somente se a senha estiver presente e sem erros anteriores
self.password = Security.hash_password(self.password)
# Se houver errors, lança uma única exceção
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar usuário (PUT)
# ----------------------------------------------------
class UserUpdateSchema(BaseModel):
name: Optional[str] = None
email: Optional[EmailStr] = None
password: Optional[str] = None
password_temp: Optional[str] = None
password_temp_confirm: Optional[str] = None
position: Optional[str] = None
team: Optional[str] = None
status: Optional[str] = None
user_id_update: Optional[int] = None
# Sanitiza os inputs enviados
@field_validator('name', 'email', 'password_temp', 'password_temp_confirm', 'position', 'team', 'status')
def validate_and_sanitize_fields(cls, v):
if v is not None:
return Text.sanitize_input(v)
return v
# Hash da senha se ela for fornecida
@model_validator(mode='after')
def hash_password_if_provided(self):
# Hash da nova senha, se fornecida.
if self.password:
self.password = Security.hash_password(self.password)
# Não estamos forçando a obrigatoriedade de todos os campos aqui (PUT é parcial),
# mas garantimos a sanitização e o hash da senha se ela existir.
return self
# ----------------------------------------------------
# Schema para localizar usuário pelo e-mail
# ----------------------------------------------------
class UserEmailSchema(BaseModel):
email: Optional[EmailStr] = None
# Sanitiza o input
@field_validator('email')
def sanitize_email(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_email(self):
if not self.email or len(self.email.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um e-mail'
)
return self
class Config:
from_attributes = True

View file

@ -0,0 +1,19 @@
from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.actions.log.log_delete_action import LogDeleteAction
class LogDeleteService:
"""
Service responsável por orquestrar a ação de exclusão de um registro
na tabela 'log', utilizando o log_id.
"""
def execute(self, log_schema: LogIdSchema):
# Instânciamento de ação
delete_action = LogDeleteAction()
# Executa a ação em questão, passando o schema com o ID do log a ser removido
data = delete_action.execute(log_schema)
# Retorno da informação (geralmente o status de sucesso/falha da exclusão)
return data

View file

@ -0,0 +1,25 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.actions.log.log_index_action import LogIndexAction
from typing import Tuple, List, Dict, Any
class IndexService:
# O método execute agora recebe 'first' e 'skip'
def execute(self, first: int, skip: int) -> Tuple[List[Dict[str, Any]], int]:
# Instanciamento de ações com prefixo 'log'
log_index_action = LogIndexAction()
# Executa a busca de todos os logs com paginação
data, total_records = log_index_action.execute(first, skip)
# Verifica se foram localizados registros
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar os logs'
)
# Retorna as informações localizadas e o total de registros
return data, total_records

View file

@ -0,0 +1,44 @@
from actions.dynamic_import.dynamic_import import DynamicImport
# NOTE: O novo esquema deve ser criado para o Log.
# Assumimos que a UserSaveSchema foi substituída por LogSaveSchema,
# que conteria os campos client_id e file.
# from packages.v1.administrativo.schemas.log_schema import LogSaveSchema
from packages.v1.administrativo.actions.log.log_save_action import SaveAction
from fastapi import status, HTTPException
class SaveService:
# Mantendo o padrão de nome de classe, mas renomeando a classe principal
# para refletir a ação de 'Salvar' (Save) em vez de 'Usuário Salvar' (UserSave).
# O nome do arquivo já indica 'log_save_service'.
def __init__(self):
# Action responsável por carregar as services de acordo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("log")
pass
# O esquema de entrada deve refletir os campos da tabela 'log'
# (client_id e file, já que date_post é gerado no banco)
# def execute(self, log_schema: LogSaveSchema):
# Usando 'schema' para manter a abstração, já que o schema exato não está definido
def execute(self, schema):
# NOTE: A lógica de validação de email foi REMOVIDA
# porque não é aplicável à tabela 'log' (não há campo 'email').
# A validação de 'client_id' ou 'file' deve ocorrer no schema (Pydantic).
# Instanciamento de ações com prefixo 'log' e nome mantido
save_action = SaveAction()
# Retorna o log salvo
return save_action.execute(schema)
# NOTE: Foi mantido o padrão 'SaveService' para a classe principal.
# Se o nome original 'UserSaveService' for um padrão rigoroso,
# o nome da classe pode ser ajustado para 'LogSaveService'.
# No entanto, o padrão de design geralmente usa o nome da ação (Save) + Service.

View file

@ -0,0 +1,29 @@
from fastapi import HTTPException, status
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
# from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.actions.log.log_show_action import ShowAction
class ShowService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
# def execute(self, log_id_schema: LogIdSchema):
def execute(self, schema):
# Instanciamento de ação com prefixo 'log'
log_show_action = ShowAction()
# Executa a ação em questão (buscando pelo log_id)
data = log_show_action.execute(schema)
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o registro de log'
)
# Retorno da informação (log_id, client_id, date_post, file)
return data

View file

@ -0,0 +1,16 @@
from packages.v1.administrativo.schemas.log_schema import LogUpdateSchema
from packages.v1.administrativo.actions.log.log_update_action import UpdateAction
class UpdateService:
# Mantendo o padrão de nome de classe, refletindo apenas a ação 'Update' (Atualizar).
# O nome do arquivo já indica 'log_update_service'.
# O método deve receber o ID do log a ser atualizado (log_id)
# e o schema com os dados de atualização (LogUpdateSchema)
def execute(self, log_id: int, log_schema: LogUpdateSchema):
# Instanciamento de ações com prefixo 'log' e nome mantido
update_action = UpdateAction()
# Executa a ação de atualização, passando o ID e o schema
return update_action.execute(log_id, log_schema)

View file

@ -0,0 +1,52 @@
from fastapi import HTTPException, status
from actions.jwt.create_token import CreateToken
from packages.v1.administrativo.schemas.user_schema import UserAuthenticateSchema
from packages.v1.administrativo.actions.user.user_get_by_authenticate_action import GetByAuthenticateAction
import json
from actions.security.security import Security
class AuthenticateService:
def execute(self, user_authenticate_schema: UserAuthenticateSchema):
# Instância da action de autenticação
get_by_authenticate_action = GetByAuthenticateAction()
get_by_authenticate_result = get_by_authenticate_action.execute(user_authenticate_schema)
# Verifica se o usuário existe
if not get_by_authenticate_result:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Usuário não encontrado ou credenciais inválidas"
)
# Verifica se a senha armazenada é um hash válido
if not Security.is_hash(get_by_authenticate_result.password):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="A senha armazenada é inválida"
)
# Verifica se a senha informada confere
if not Security.verify_password(user_authenticate_schema.password, get_by_authenticate_result.password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Senha incorreta"
)
# Verifica se o usuário está ativo
if get_by_authenticate_result.status != "A":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="O usuário encontra-se desativado"
)
# Gera o token de acesso
create_token = CreateToken()
jwtUser = {
"user_id": int(get_by_authenticate_result.user_id),
"nome": str(get_by_authenticate_result.name),
"email": str(get_by_authenticate_result.email),
}
return create_token.execute("access-token", json.dumps(jwtUser))

View file

@ -0,0 +1,15 @@
from packages.v1.administrativo.schemas.user_schema import UserIdSchema
from packages.v1.administrativo.actions.user.user_delete_action import DeleteAction
class DeleteService:
def execute(self, usuario_schema: UserIdSchema):
# Instânciamento de ação
delete_action = DeleteAction()
# Executa a ação em questão
data = delete_action.execute(usuario_schema)
# Retorno da informação
return data

View file

@ -0,0 +1,26 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.user_schema import UserCpfSchema
from packages.v1.administrativo.actions.user.user_get_by_cpf_action import GetByUsuarioCpfAction
class GetCpfService:
def execute(self, usuario_schema: UserCpfSchema, messageValidate: bool):
# Instânciamento de ação
cpf_action = GetByUsuarioCpfAction()
# Executa a ação em questão
data = cpf_action.execute(usuario_schema)
if messageValidate:
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o CPF do usuário'
)
# Retorno da informação
return data

View file

@ -0,0 +1,26 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.user_schema import UserEmailSchema
from packages.v1.administrativo.actions.user.user_get_by_email_action import GetByUsuarioEmailAction
class GetEmailService:
def execute(self, usuario_schema: UserEmailSchema, messageValidate: bool):
# Instânciamento de ação
email_action = GetByUsuarioEmailAction()
# Executa a ação em questão
data = email_action.execute(usuario_schema)
if messageValidate:
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o e-mail do usuário'
)
# Retorno da informação
return data

View file

@ -0,0 +1,26 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.user_schema import UserLoginSchema
from packages.v1.administrativo.actions.user.user_get_by_login_action import GetByUsuarioLoginAction
class GetLoginService:
def execute(self, usuario_schema: UserLoginSchema, messageValidate: bool):
# Instânciamento de ação
login_action = GetByUsuarioLoginAction()
# Executa a ação em questão
data = login_action.execute(usuario_schema)
if messageValidate:
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o login do usuário'
)
# Retorno da informação
return data

View file

@ -0,0 +1,24 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.user_schema import UserSchema
from packages.v1.administrativo.actions.user.user_index_action import IndexAction
class IndexService:
def execute(self):
# Instânciamento de acções
index_action = IndexAction()
# Executa a busca de todas as ações
data = index_action.execute()
# Verifica se foi loalizado registros
if not data:
# Retorna uma exeção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar os usuários'
)
# Retorna as informações localizadas
return data

View file

@ -0,0 +1,18 @@
import ast
from packages.v1.administrativo.schemas.user_schema import UserIdSchema
from packages.v1.administrativo.actions.user.user_get_by_user_id_action import GetByuserIdAction
class MeService:
def execute(self, current_user):
get_by_user_id_action = GetByuserIdAction()
# Converte a string para dict de forma segura
usuario_data = ast.literal_eval(current_user["data"])
# Define os dados do schema
user_schema = UserIdSchema(user_id=int(usuario_data["user_id"]))
# Executa a ação em questão
return get_by_user_id_action.execute(user_schema)

View file

@ -0,0 +1,49 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.user_schema import UserSaveSchema, UserEmailSchema
from packages.v1.administrativo.actions.user.user_save_action import SaveAction
from fastapi import status, HTTPException
class UserSaveService:
def __init__(self):
# Action responsável por carregar as services de acodo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("user")
pass
def execute(self, usuario_schema: UserSaveSchema):
# Armazena possíveis erros
errors = []
# Verifica se o e-mail já esta sendo utilizado
# Importação de service de email
email_service = self.dynamic_import.service("user_get_email_service", "GetEmailService")
# Instânciamento da service
self.email_service = email_service()
# Verifica se o email já esta sendo utilizado
self.response = self.email_service.execute(UserEmailSchema(email=usuario_schema.email), False)
# Se houver retorno significa que o e-mail já esta sendo utiizado
if self.response:
errors.append({'input': 'email', 'message': 'O e-mail informado já esta sendo utilizado.'})
# Se houver erros, informo
if errors:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=errors
)
# Instânciamento de ações
saveAction = SaveAction()
# Retorna todos produtos desejados
return saveAction.execute(usuario_schema)

View file

@ -0,0 +1,24 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.user_schema import UserSchema
from packages.v1.administrativo.actions.user.user_show_action import ShowAction
class ShowService:
def execute(self, usuario_schema: UserSchema):
# Instânciamento de ação
show_action = ShowAction()
# Executa a ação em questão
data = show_action.execute(usuario_schema)
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o registro'
)
# Retorno da informação
return data

View file

@ -0,0 +1,12 @@
from packages.v1.administrativo.schemas.user_schema import UserUpdateSchema
from packages.v1.administrativo.actions.user.user_update_action import UpdateAction
class UserUpdateService:
def execute(self, user_id: int, usuario_schema: UserUpdateSchema):
# Instânciamento de ações
updateAction = UpdateAction()
# Retorna todos produtos desejados
return updateAction.execute(user_id, usuario_schema)

20
packages/v1/api.py Normal file
View file

@ -0,0 +1,20 @@
# Importa o gerenciador de rotas do FastAPI
from fastapi import APIRouter
# Importa os módulos de rotas específicos
from packages.v1.administrativo.endpoints import user_endpoint
from packages.v1.administrativo.endpoints import log_endpoint
# Cria uma instância do APIRouter que vai agregar todas as rotas da API
api_router = APIRouter()
# Inclui as rotas de user
api_router.include_router(
user_endpoint.router, prefix="/administrativo/user", tags=["Gerenciamento de usuários"]
)
# Inclui as rotas de log
api_router.include_router(
log_endpoint.router, prefix="/administrativo/log", tags=["Gerenciamento de log's"]
)

View file

@ -0,0 +1,22 @@
from abstracts.repository import BaseRepository
class FirebirdCheckAction(BaseRepository):
def execute(self):
# Montagem do SQL
sql = """ SELECT 1 FROM RDB$DATABASE """
# Execução do sql
response = self.fetch_one(sql)
if response:
# Dados
response = {
"status" : "Banco de dados acessível"
}
# Retorna os dados localizados
return response

View file

@ -0,0 +1,24 @@
import shutil
from abstracts.action import BaseAction
class GetSizeAction:
def execute(self):
# Verificar espaço em disco
total, used, free = shutil.disk_usage("/")
# Converter de bytes para gigabytes
total_gb = total / (1024 ** 3)
used_gb = used / (1024 ** 3)
free_gb = free / (1024 ** 3)
return {
"total" : round(total_gb, 2),
"used" : round(used_gb, 2),
"free" : round(free_gb, 2)
}

View file

@ -0,0 +1,12 @@
from packages.v1.system.actions.disk.get_size_action import GetSizeAction
class StartupCheckService:
def execute(self):
get_size_action = GetSizeAction()
get_size_action_result = get_size_action.execute()
return get_size_action_result

29
requirements.txt Normal file
View file

@ -0,0 +1,29 @@
annotated-types==0.7.0
anyio==4.11.0
bcrypt==3.1.7
cffi==2.0.0
click==8.3.0
colorama==0.4.6
dnspython==2.8.0
ecdsa==0.19.1
email-validator==2.3.0
fastapi==0.118.0
greenlet==3.2.4
h11==0.16.0
idna==3.10
passlib==1.7.4
pyasn1==0.6.1
pycparser==2.23
pydantic==2.11.9
pydantic_core==2.33.2
PyMySQL==1.1.2
python-jose==3.5.0
pytz==2025.2
rsa==4.9.1
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.43
starlette==0.48.0
typing-inspection==0.4.2
typing_extensions==4.15.0
uvicorn==0.37.0

6
server.bat Normal file
View file

@ -0,0 +1,6 @@
@echo off
cd \
cd C:\IIS\Orius
call .venv\Scripts\activate.bat
uvicorn main:app --reload
pause