commit 28c7462d2fcfdf8aea2acc91ccf6d407b1ade06d Author: Kenio Date: Sat Jun 28 20:09:55 2025 -0300 Primeiro commit diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..0e0f35c --- /dev/null +++ b/.gitattributes @@ -0,0 +1,9 @@ +# Normaliza finais de linha +* text=auto + +# Força Python e arquivos de configuração a usarem LF +*.py text eol=lf +*.sh text eol=lf +*.yml text eol=lf +*.yaml text eol=lf +*.env text eol=lf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..80ee3d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Ambiente virtual +venv/ +.env +.env.* + +# Bytecode compilado +__pycache__/ +*.py[cod] +*$py.class + +# Arquivos temporários do sistema +.DS_Store +Thumbs.db + +# Logs e databases locais +*.log +*.sqlite3 + +# VSCode +.vscode/ + +# PyCharm +.idea/ + +# Arquivos de testes ou builds +*.coverage +htmlcov/ +coverage.xml +dist/ +build/ +.eggs/ +*.egg-info/ + +# Cache do pip +pip-wheel-metadata/ +*.egg +.cache/ +.tox/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..8f41d41 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,26 @@ +# Usa a imagem oficial do Python +FROM python:3.12-slim + +# Define diretório de trabalho no container +WORKDIR /app + +# Copia o arquivo de dependências +COPY requirements.txt . + +# Instala dependências no sistema e no Python +RUN apt-get update && apt-get install -y \ + gcc libffi-dev libssl-dev python3-dev firebird-dev \ + && pip install --upgrade pip \ + && pip install --no-cache-dir -r requirements.txt \ + && apt-get remove -y gcc \ + && apt-get autoremove -y \ + && rm -rf /var/lib/apt/lists/* + +# Copia o restante do projeto para o container +COPY . . + +# Expõe a porta padrão do Uvicorn/FastAPI +EXPOSE 8000 + +# Comando para iniciar o servidor +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..9a58ae7 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# apiorius diff --git a/api/v1/api.py b/api/v1/api.py new file mode 100644 index 0000000..a46ac70 --- /dev/null +++ b/api/v1/api.py @@ -0,0 +1,18 @@ +from fastapi import APIRouter # Importa o gerenciador de rotas do FastAPI + +# Importa os módulos de rotas específicos +from api.v1.endpoints import g_usuario_endpoint +from api.v1.endpoints import c_caixa_item_endpoint + +# Cria uma instância do APIRouter que vai agregar todas as rotas da API +api_router = APIRouter() + +# Inclui as rotas de "g_usuario" no roteador principal, com prefixo /usuarios e tag 'Usuarios' +api_router.include_router( + g_usuario_endpoint.router, prefix='/usuarios', tags=['Usuários'] +) + +# Inclui as rotas de "c_caixa_item no roteador principal, com prefixo /c_caixa_items e tag 'Caixa Itens' +api_router.include_router( + c_caixa_item_endpoint.router, prefix='/caixa_itens', tags=['Caixa Itens'] +) \ No newline at end of file diff --git a/api/v1/controllers/caixa/atos_praticados.py b/api/v1/controllers/caixa/atos_praticados.py new file mode 100644 index 0000000..e69de29 diff --git a/api/v1/controllers/caixa/c_caixa_item_controller.py b/api/v1/controllers/caixa/c_caixa_item_controller.py new file mode 100644 index 0000000..9ae8e5a --- /dev/null +++ b/api/v1/controllers/caixa/c_caixa_item_controller.py @@ -0,0 +1,69 @@ +from typing import Optional, List +from fastapi import HTTPException, status # Importe HTTPException e status + +# Schemas usados para entrada e saída de dados dos items +from api.v1.schemas.caixa.c_caixa_item_schema import ( + CCaixaItemSchemaBase, + CCaixaItemSchemaList, + CCaixaItemPaginationSchema +) + +# Model responsável pelo acesso ao banco de dados Firebird +from api.v1.models.caixa.c_caixa_item_model import CCaixaItemModel + +# Funções para sanitização de entradas (evitar XSS, SQLi etc.) +from core.validation import InputSanitizer + +# Retorna a lista de todos os itens cadastrados +def get_all_caixa_itens(skip: int = 0, limit: int = 10) -> CCaixaItemPaginationSchema: + try: + itens = CCaixaItemModel.get_all_caixa_itens(skip=skip, limit=limit) + total = CCaixaItemModel.count_items() + + return { + "total": total, + "skip": skip, + "limit": limit, + "data": [CCaixaItemSchemaList(**u) for u in itens] + } + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao listar os itens: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao listar os itens: {e}" + ) + +# Retorna a quantidade de registros no banco de dados +def count_items() -> int: + try: + return CCaixaItemModel.count_items() + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro ao contar items: {e}" + ) + +# Retorna um item específico pelo ID +def get_item_by_id(caixa_item_id: int) -> CCaixaItemSchemaBase: # Retorno alterado para UserSchemaBase + try: + item = CCaixaItemModel.get_by_id(caixa_item_id) + if not item: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Item com ID {caixa_item_id} não encontrado." + ) + return CCaixaItemSchemaBase(**item) + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao buscar item por ID: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao buscar item por ID: {e}" + ) \ No newline at end of file diff --git a/api/v1/controllers/g_usuario_controller.py b/api/v1/controllers/g_usuario_controller.py new file mode 100644 index 0000000..0651f4c --- /dev/null +++ b/api/v1/controllers/g_usuario_controller.py @@ -0,0 +1,239 @@ +# controllers/user_controller.py + +from typing import Optional, List +from fastapi import HTTPException, status # Importe HTTPException e status + +# Schemas usados para entrada e saída de dados dos usuários +from api.v1.schemas.g_usuario_schema import ( + UserSchemaBase, + UserSchemaCreate, + UserSchemaUpdate, + UserSchemaList, + UserPaginationSchema +) + +# Model responsável pelo acesso ao banco de dados Firebird +from api.v1.models.g_usuario_model import UserModel + +# Funções utilitárias para segurança (hash e verificação de senha) +from core.security import verify_senha_api, hash_senha_api + +# Funções para sanitização de entradas (evitar XSS, SQLi etc.) +from core.validation import InputSanitizer + + +# Autentica um usuário com base no e-mail e senha fornecidos +def authenticate_user(email: str, senha_api: str) -> Optional[dict]: + # Nenhuma mudança significativa aqui, pois o retorno já é None ou dict + email = InputSanitizer.clean_text(email) + try: + user = UserModel.get_by_email(email) + if not user: + return None # Não lança exceção para 'usuário não encontrado' em autenticação + + if not verify_senha_api(senha_api, user["senha_api"]): + return None + + return user + except RuntimeError as e: + # Erros no banco de dados durante a busca, que podem ser tratados como 500 + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao autenticar: {e}" + ) + + +# Cria um novo usuário após validação e sanitização dos campos +def create_user(user_data: UserSchemaCreate) -> UserSchemaBase: # Retorno alterado para UserSchemaBase, não Optional + try: + # Sanitiza os campos recebidos + nome_completo = InputSanitizer.clean_text(user_data.nome_completo) + email = InputSanitizer.clean_text(user_data.email) + senha_api = InputSanitizer.clean_text(user_data.senha_api) + + # Validações iniciais (antes de ir para o banco) + if not InputSanitizer.is_valid_email(email): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Formato de e-mail inválido." + ) + + if not InputSanitizer.is_safe(nome_completo + email + senha_api): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Conteúdo malicioso detectado nos dados do usuário." + ) + + hashed_senha_api = hash_senha_api(senha_api) + + # Chama o método do modelo, que agora pode levantar exceções + result = UserModel.create(nome_completo, email, hashed_senha_api) + + # Se o modelo retornar um dicionário, converte para UserSchemaBase + if result: + return UserSchemaBase(**result) + # Se o modelo retornasse None (o que não deve mais acontecer com as exceções), + # poderia ser um erro interno ou algo que não previu. + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Não foi possível criar o usuário por uma razão desconhecida." + ) + + except ValueError as e: + # Captura erros de validação de dados do modelo (ex: e-mail duplicado, ou ID problemático) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, # 409 Conflict para dados duplicados + detail=str(e) + ) + except RuntimeError as e: + # Captura erros gerais de banco de dados ou problemas inesperados do modelo + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao criar usuário: {e}" + ) + except Exception as e: + # Captura qualquer outra exceção não tratada especificamente + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado: {e}" + ) + + +# Retorna a lista de todos os usuários cadastrados +def get_all(skip: int = 0, limit: int = 10) -> UserPaginationSchema: + try: + users = UserModel.get_all(skip=skip, limit=limit) + total = UserModel.count_users() + + return { + "total": total, + "skip": skip, + "limit": limit, + "data": [UserSchemaList(**u) for u in users] + } + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao listar usuários: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao listar usuários: {e}" + ) + +# Retorna a quantidade de registros no banco de dados +def count_users() -> int: + try: + return UserModel.count_users() + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro ao contar usuários: {e}" + ) + +# Retorna um usuário específico pelo ID +def get_user_by_id(user_id: int) -> UserSchemaBase: # Retorno alterado para UserSchemaBase + try: + user = UserModel.get_by_id(user_id) + if not user: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Usuário com ID {user_id} não encontrado." + ) + return UserSchemaBase(**user) + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao buscar usuário por ID: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao buscar usuário por ID: {e}" + ) + + +# Atualiza os dados de um usuário existente +def update_user(user_id: int, user_data: UserSchemaUpdate) -> UserSchemaBase: # Retorno alterado + try: + nome_completo = InputSanitizer.clean_text(user_data.nome_completo) if user_data.nome_completo else None + email = InputSanitizer.clean_text(user_data.email) if user_data.email else None + senha_api = InputSanitizer.clean_text(user_data.senha_api) if user_data.senha_api else None + telefone = InputSanitizer.clean_text(user_data.telefone) if user_data.telefone else None + + if email and not InputSanitizer.is_valid_email(email): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Formato de e-mail inválido para atualização." + ) + + if any([nome_completo, email, senha_api]) and not InputSanitizer.is_safe( + (nome_completo or '') + (email or '') + (senha_api or '') + ): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Conteúdo malicioso detectado nos dados de atualização." + ) + + hashed_senha_api = hash_senha_api(senha_api) if senha_api else None + + success = UserModel.update(user_id, nome_completo, email, hashed_senha_api, telefone) + if not success: + # UserModel.update agora lança KeyError/ValueError, então este 'if not success' + # só seria acionado se houvesse um caso em que nada foi atualizado e não houve erro. + # No entanto, com a lógica atual de raise, ele não deve ser alcançado. + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, # Ou 422 Unprocessable Entity + detail="Nenhum dado válido fornecido para atualização." + ) + + return get_user_by_id(user_id) # Se atualizou com sucesso, retorna o usuário atualizado + + except KeyError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e) + ) + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, # Conflito de dados (ex: email duplicado) + detail=str(e) + ) + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao atualizar usuário: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao atualizar usuário: {e}" + ) + + +# Deleta um usuário do banco de dados pelo ID +def delete_user(user_id: int) -> bool: + try: + success = UserModel.delete(user_id) + if not success: # Embora com KeyError do modelo, esta linha talvez não seja mais alcançada + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Usuário com ID {user_id} não encontrado para exclusão." + ) + return success + except KeyError as e: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(e) + ) + except RuntimeError as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Erro interno do servidor ao excluir usuário: {e}" + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Ocorreu um erro inesperado ao excluir usuário: {e}" + ) \ No newline at end of file diff --git a/api/v1/endpoints/c_caixa_item_endpoint.py b/api/v1/endpoints/c_caixa_item_endpoint.py new file mode 100644 index 0000000..e6b8e4b --- /dev/null +++ b/api/v1/endpoints/c_caixa_item_endpoint.py @@ -0,0 +1,55 @@ +# endpoints/c_caixa_item_endpoint.py + +from typing import List +from fastapi import APIRouter, status, Depends, HTTPException, Response, Query +from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import JSONResponse + +# Schemas para entrada e saída de dados (nomes padronizados em inglês) +from api.v1.schemas.caixa.c_caixa_item_schema import ( + CCaixaItemSchemaBase, + CCaixaItemSchemaList, + CCaixaItemPaginationSchema +) + +# Controller responsável pelas regras de negócio e sanitização +from api.v1.controllers.caixa.c_caixa_item_controller import ( + get_all_caixa_itens, + get_item_by_id, + count_items +) + +# Dependência para obter o usuário autenticado a partir do token JWT +from core.deps import get_current_user + +# Inicializa o roteador responsável pelas rotas de usuários +router = APIRouter() + + + +# ---------------------- ROTAS DINÂMICAS ---------------------- + +@router.get('/', response_model=CCaixaItemPaginationSchema) +def get_items(skip: int = Query(0, ge=0), limit: int = Query(10, ge=1), current_user: dict = Depends(get_current_user)): + """ + Retorna todos os usuários cadastrados no sistema. + """ + items = get_all_caixa_itens(skip=skip, limit=limit) + total = count_items() + + return get_all_caixa_itens(skip=skip, limit=limit) + + +@router.get('/{caixa_item_id}', response_model=CCaixaItemSchemaBase, status_code=status.HTTP_200_OK) +def get_user(caixa_item_id: int, current_user: dict = Depends(get_current_user)): + """ + Retorna os dados de um caixa item específico pelo ID. + """ + item = get_item_by_id(caixa_item_id) + if item: + return item + + raise HTTPException( + detail='User not found.', + status_code=status.HTTP_404_NOT_FOUND + ) diff --git a/api/v1/endpoints/g_usuario_endpoint.py b/api/v1/endpoints/g_usuario_endpoint.py new file mode 100644 index 0000000..ae53054 --- /dev/null +++ b/api/v1/endpoints/g_usuario_endpoint.py @@ -0,0 +1,138 @@ +# endpoints/g_usuario_endpoint.py + +from typing import List +from fastapi import APIRouter, status, Depends, HTTPException, Response, Query +from fastapi.security import OAuth2PasswordRequestForm +from fastapi.responses import JSONResponse + +# Schemas para entrada e saída de dados (nomes padronizados em inglês) +from api.v1.schemas.g_usuario_schema import ( + UserSchemaBase, + UserSchemaCreate, + UserSchemaUpdate, + UserPaginationSchema +) + +# Controller responsável pelas regras de negócio e sanitização +from api.v1.controllers.g_usuario_controller import ( + authenticate_user, + create_user, + get_all, + get_user_by_id, + update_user, + delete_user, + count_users +) + +# Dependência para obter o usuário autenticado a partir do token JWT +from core.deps import get_current_user + +# Função para gerar JWT +from core.auth import create_access_token + +# Inicializa o roteador responsável pelas rotas de usuários +router = APIRouter() + + +# ---------------------- ROTAS FIXAS ---------------------- + +@router.get('/logado', response_model=UserSchemaBase) +def get_logged_user(current_user: dict = Depends(get_current_user)): + """ + Retorna os dados do usuário autenticado com o token atual. + """ + return current_user + + +@router.post('/signup', status_code=status.HTTP_201_CREATED, response_model=UserSchemaBase) +def post_user(user: UserSchemaCreate): + """ + Cria um novo usuário após validações e sanitizações. + """ + new_user = create_user(user) + if not new_user: + raise HTTPException( + status_code=status.HTTP_406_NOT_ACCEPTABLE, + detail='E-mail is already registered.' + ) + return new_user + + +@router.post('/login') +def login(form_data: OAuth2PasswordRequestForm = Depends()): + """ + Realiza login com e-mail e senha, retornando um token JWT válido. + """ + user = authenticate_user( + email=form_data.username, + senha_api=form_data.password + ) + + if not user: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='Invalid login credentials.' + ) + + return JSONResponse(content={ + "access_token": create_access_token(sub=user["user_id"]), + "token_type": "bearer", + }) + + +# ---------------------- ROTAS DINÂMICAS ---------------------- + +@router.get('/', response_model=UserPaginationSchema) +def get_users(skip: int = Query(0, ge=0), limit: int = Query(10, ge=1), current_user: dict = Depends(get_current_user)): + """ + Retorna todos os usuários cadastrados no sistema. + """ + usuarios = get_all(skip=skip, limit=limit) + total = count_users() + + return get_all(skip=skip, limit=limit) + + +@router.get('/{user_id}', response_model=UserSchemaBase, status_code=status.HTTP_200_OK) +def get_user(user_id: int, current_user: dict = Depends(get_current_user)): + """ + Retorna os dados de um usuário específico pelo ID. + """ + user = get_user_by_id(user_id) + if user: + return user + + raise HTTPException( + detail='User not found.', + status_code=status.HTTP_404_NOT_FOUND + ) + + +@router.put('/{user_id}', response_model=UserSchemaBase, status_code=status.HTTP_202_ACCEPTED) +def put_user(user_id: int, user: UserSchemaUpdate, current_user: dict = Depends(get_current_user)): + """ + Atualiza os dados de um usuário específico com os campos fornecidos. + """ + updated_user = update_user(user_id, user) + if updated_user: + return updated_user + + raise HTTPException( + detail='User not found.', + status_code=status.HTTP_404_NOT_FOUND + ) + + +@router.delete('/{user_id}', status_code=status.HTTP_204_NO_CONTENT) +def delete_user_by_id(user_id: int): + """ + Exclui um usuário com base no ID fornecido. + """ + success = delete_user(user_id) + if success: + return Response(status_code=status.HTTP_204_NO_CONTENT) + + raise HTTPException( + detail='User not found.', + status_code=status.HTTP_404_NOT_FOUND + ) diff --git a/api/v1/models/caixa/c_caixa_item_model.py b/api/v1/models/caixa/c_caixa_item_model.py new file mode 100644 index 0000000..90870df --- /dev/null +++ b/api/v1/models/caixa/c_caixa_item_model.py @@ -0,0 +1,128 @@ +# models/c_caixa_item_model.py + +from firebird.driver.types import DatabaseError # Importe esta exceção específica +from core.database import get_connection +# Se você tiver core.configs, pode ser útil para logs ou configurações +# from core.configs import settings + +class CCaixaItemModel: + """ + Classe responsável por interagir diretamente com o banco de dados Firebird. + Nenhuma validação ou sanitização deve ser feita aqui. + """ + + @staticmethod + def get_by_id(caixa_item_id: int) -> dict | None: + """ + Retorna um usuário com base no ID, ou None se não encontrado. + Lança exceções em caso de falha no banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + cur.execute(""" + SELECT CAIXA_ITEM_ID, + DESCRICAO, + DATA_PAGAMENTO, + VALOR_SERVICO, + VALOR_PAGO, + APRESENTANTE + FROM C_CAIXA_ITEM + WHERE CAIXA_ITEM_ID = ? + """, (caixa_item_id,)) + row = cur.fetchone() + + if row: + return { + "caixa_item_id": row[0], + "descricao": row[1], + "data_pagamento": row[2], + "valor_servico": row[3], + "valor_pago": row[4], + "apresentante": row[5], + } + return None + except DatabaseError as e: + print(f"Database error in get_by_id: {e}") + raise RuntimeError(f"Erro ao buscar item por ID no banco de dados: {e}") + except Exception as e: + print(f"Unexpected error in get_by_id: {e}") + raise RuntimeError(f"Erro inesperado ao buscar item por ID: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def count_items() -> int: + """ + Retorna a quantidade de usuários. + """ + try: + conn = get_connection() + cur = conn.cursor() + cur.execute("SELECT COUNT(*) FROM C_CAIXA_ITEM") + total = cur.fetchone()[0] + return total + except Exception as e: + raise RuntimeError(f"Erro ao contar itens: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + + @staticmethod + def get_all_caixa_itens(skip: int = 0, limit: int = 10) -> list[dict]: + """ + Retorna todos os itens cadastrados no banco de dados. + Lança exceções em caso de falha no banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + query = f""" + SELECT FIRST {limit} SKIP {skip} + CAIXA_ITEM_ID, + DESCRICAO, + DATA_PAGAMENTO, + VALOR_SERVICO, + VALOR_PAGO, + APRESENTANTE + FROM C_CAIXA_ITEM + ORDER BY CAIXA_ITEM_ID + """ + + cur.execute(query) + rows = cur.fetchall() + + return [ + { + "caixa_item_id": r[0], + "descricao": r[1], + "data_pagamento": r[2], + "valor_servico": r[3], + "valor_pago": r[4], + "apresentante": r[5], + } + for r in rows + ] + except DatabaseError as e: + print(f"Database error in get_all: {e}") + raise RuntimeError(f"Erro ao buscar todos os itens no banco de dados: {e}") + except Exception as e: + print(f"Unexpected error in get_all: {e}") + raise RuntimeError(f"Erro inesperado ao buscar todos os itens: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() \ No newline at end of file diff --git a/api/v1/models/g_usuario_model.py b/api/v1/models/g_usuario_model.py new file mode 100644 index 0000000..59a8985 --- /dev/null +++ b/api/v1/models/g_usuario_model.py @@ -0,0 +1,394 @@ +# models/g_usuario_model.py + +from firebird.driver.types import DatabaseError # Importe esta exceção específica +from core.database import get_connection +# Se você tiver core.configs, pode ser útil para logs ou configurações +# from core.configs import settings + +class UserModel: + """ + Classe responsável por interagir diretamente com o banco de dados Firebird. + Nenhuma validação ou sanitização deve ser feita aqui. + """ + + @staticmethod + def get_by_email(email: str) -> dict | None: + """ + Retorna um usuário com base no e-mail, ou None se não encontrado. + Lança exceções em caso de falha no banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + cur.execute(""" + SELECT USUARIO_ID, EMAIL, SENHA_API, NOME_COMPLETO + FROM G_USUARIO + WHERE EMAIL = ? + """, (email,)) + row = cur.fetchone() + + if row: + return { + "user_id": row[0], + "email": row[1], + "senha_api": row[2], + "nome_completo": row[3], + } + return None + except DatabaseError as e: + # Erros específicos do Firebird (ex: problema na conexão, query inválida) + print(f"Database error in get_by_email: {e}") + raise RuntimeError(f"Erro ao buscar usuário por e-mail no banco de dados: {e}") + except Exception as e: + # Qualquer outro erro inesperado + print(f"Unexpected error in get_by_email: {e}") + raise RuntimeError(f"Erro inesperado ao buscar usuário por e-mail: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def get_by_id(user_id: int) -> dict | None: + """ + Retorna um usuário com base no ID, ou None se não encontrado. + Lança exceções em caso de falha no banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + cur.execute(""" + SELECT USUARIO_ID, + NOME_COMPLETO, + EMAIL, + TELEFONE + FROM G_USUARIO + WHERE USUARIO_ID = ? + """, (user_id,)) + row = cur.fetchone() + + if row: + return { + "user_id": row[0], + "nome_completo": row[1], + "email": row[2], + "telefone": row[3], + } + return None + except DatabaseError as e: + print(f"Database error in get_by_id: {e}") + raise RuntimeError(f"Erro ao buscar usuário por ID no banco de dados: {e}") + except Exception as e: + print(f"Unexpected error in get_by_id: {e}") + raise RuntimeError(f"Erro inesperado ao buscar usuário por ID: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def count_users() -> int: + """ + Retorna a quantidade de usuários. + """ + try: + conn = get_connection() + cur = conn.cursor() + cur.execute("SELECT COUNT(*) FROM G_USUARIO") + total = cur.fetchone()[0] + return total + except Exception as e: + raise RuntimeError(f"Erro ao contar usuários: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + + @staticmethod + def get_all(skip: int = 0, limit: int = 10) -> list[dict]: + """ + Retorna todos os usuários cadastrados no banco de dados. + Lança exceções em caso de falha no banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + query = f""" + SELECT FIRST {limit} SKIP {skip} + USUARIO_ID, + TROCARSENHA, + LOGIN, + SITUACAO, + NOME_COMPLETO, + FUNCAO, + ASSINA, + SIGLA, + USUARIO_TAB, + ULTIMO_LOGIN, + ULTIMO_LOGIN_REGS, + DATA_EXPIRACAO, + ANDAMENTO_PADRAO, + LEMBRETE_PERGUNTA, + LEMBRETE_RESPOSTA, + ANDAMENTO_PADRAO2, + RECEBER_MENSAGEM_ARROLAMENTO, + EMAIL, + ASSINA_CERTIDAO, + RECEBER_EMAIL_PENHORA, + FOTO, + NAO_RECEBER_CHAT_TODOS, + PODE_ALTERAR_CAIXA, + RECEBER_CHAT_CERTIDAO_ONLINE, + RECEBER_CHAT_CANCELAMENTO, + CPF, + SOMENTE_LEITURA, + RECEBER_CHAT_ENVIO_ONR, + TIPO_USUARIO, + DATA_CADASTRO, + TELEFONE + FROM G_USUARIO + ORDER BY USUARIO_ID + """ + + cur.execute(query) + rows = cur.fetchall() + + return [ + { + "usuario_id": r[0], + "trocarsenha": r[1], + "login": r[2], + "situacao": r[3], + "nome_completo": r[4], + "funcao": r[5], + "assina": r[6], + "sigla": r[7], + "usuario_tab": r[8], + "ultimo_login": r[9], + "ultimo_login_regs": r[10], + "data_expiracao": r[11], + "andamento_padrao": r[12], + "lembrete_pergunta": r[13], + "lembrete_resposta": r[14], + "andamento_padrao2": r[15], + "receber_mensagem_arrolamento": r[16], + "email": r[17], + "assina_certidao": r[18], + "receber_email_penhora": r[19], + "foto": r[20], + "nao_receber_chat_todos": r[21], + "pode_alterar_caixa": r[22], + "receber_chat_certidao_online": r[23], + "receber_chat_cancelamento": r[24], + "cpf": r[25], + "somente_leitura": r[26], + "receber_chat_envio_onr": r[27], + "tipo_usuario": r[28], + "data_cadastro": r[29], + "telefone": r[30], + } + for r in rows + ] + except DatabaseError as e: + print(f"Database error in get_all: {e}") + raise RuntimeError(f"Erro ao buscar todos os usuários no banco de dados: {e}") + except Exception as e: + print(f"Unexpected error in get_all: {e}") + raise RuntimeError(f"Erro inesperado ao buscar todos os usuários: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def create(nome_completo: str, email: str, senha_api: str) -> dict | None: + """ + Cria um novo usuário no banco. Lança exceções para e-mail duplicado + ou outros erros de banco de dados. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + # A validação de e-mail duplicado feita aqui no modelo é um pouco redundante + # se você já tiver uma UNIQUE constraint no EMAIL. + # O tratamento de exceção abaixo (DatabaseError) já capturaria a violação da UNIQUE constraint. + # No entanto, se você não tiver uma constraint UNIQUE, esta verificação é válida. + # Se tiver uma constraint, pode remover este SELECT e deixar o DB lançar o erro. + cur.execute("SELECT 1 FROM G_USUARIO WHERE EMAIL = ?", (email,)) + if cur.fetchone(): + # Captura de um e-mail já existente antes de tentar a inserção + # Isso evita um DatabaseError mais tarde e permite uma mensagem mais específica. + raise ValueError("E-mail já cadastrado. Por favor, use outro e-mail.") + + cur.execute(""" + INSERT INTO G_USUARIO (NOME_COMPLETO, EMAIL, SENHA_API) + VALUES (?, ?, ?) + RETURNING USUARIO_ID + """, (nome_completo, email, senha_api)) + + user_id = cur.fetchone()[0] + conn.commit() + + return { + "user_id": user_id, + "nome_completo": nome_completo, + "email": email, + } + except DatabaseError as e: + if conn: + conn.rollback() # Desfaz a transação em caso de erro no DB + error_message = str(e).lower() + # Tratamento específico para violação de UNIQUE KEY ou PRIMARY KEY + if "violation of primary or unique key constraint" in error_message or "duplicate value" in error_message: + # Se você tem certeza de que USUARIO_ID e EMAIL são as únicas chaves únicas/primárias + # e o EMAIL já foi verificado acima, então este erro aponta para USUARIO_ID. + # No entanto, é mais robusto verificar o nome da constraint se possível, ou ser mais genérico. + if "g_usuario_pk" in error_message: # Supondo que G_USUARIO_PK é para USUARIO_ID + raise ValueError(f"O ID gerado para o usuário já existe. Tente novamente ou verifique os dados: {error_message}") + # Se você tiver uma constraint UNIQUE no EMAIL, isso também seria capturado aqui + elif "seu_email_unique_constraint_name" in error_message: # Substitua pelo nome real da sua constraint de email + raise ValueError(f"Já existe um usuário com este e-mail: {email}. Erro: {error_message}") + else: # Outras violações de chaves únicas + raise ValueError(f"Violação de restrição de dados. Verifique os campos fornecidos. Detalhe: {error_message}") + else: + # Outros erros de DatabaseError + raise RuntimeError(f"Erro no banco de dados ao criar usuário: {error_message}") + except ValueError as e: + # Re-lança o ValueError de e-mail duplicado, se a verificação acima estiver ativa + raise e + except Exception as e: + if conn: + conn.rollback() + print(f"Unexpected error in create: {e}") + raise RuntimeError(f"Erro inesperado ao criar usuário: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def update(user_id: int, nome_completo: str | None, email: str | None, senha_api: str | None, telefone: str | None) -> bool: + """ + Atualiza os dados de um usuário existente. + Lança exceções se o usuário não for encontrado ou em caso de erro no DB. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + # Verifica se o usuário existe antes de tentar atualizar + cur.execute("SELECT 1 FROM G_USUARIO WHERE USUARIO_ID = ?", (user_id,)) + if not cur.fetchone(): + raise KeyError(f"Usuário com ID {user_id} não encontrado para atualização.") + + updates = [] + params = [] + + if nome_completo is not None: + updates.append("NOME_COMPLETO = ?") + params.append(nome_completo) + if email is not None: + # Verifique se o novo e-mail já existe para outro usuário (se email for uma chave única) + cur.execute("SELECT USUARIO_ID FROM G_USUARIO WHERE EMAIL = ? AND USUARIO_ID <> ?", (email, user_id)) + if cur.fetchone(): + raise ValueError(f"O e-mail '{email}' já está sendo usado por outro usuário.") + updates.append("EMAIL = ?") + params.append(email) + if senha_api is not None: + updates.append("SENHA_API = ?") + params.append(senha_api) + if telefone is not None: + updates.append("TELEFONE = ?") + params.append(telefone) + + if not updates: # Se nenhum campo foi fornecido para atualização + return False + + query = f"UPDATE G_USUARIO SET {', '.join(updates)} WHERE USUARIO_ID = ?" + params.append(user_id) + + cur.execute(query, tuple(params)) + conn.commit() + + return True + except DatabaseError as e: + if conn: + conn.rollback() + error_message = str(e).lower() + if "violation of unique constraint" in error_message or "duplicate value" in error_message: + # Captura violação de unique constraint no email, se houver + raise ValueError(f"Erro de dados duplicados ao atualizar: {error_message}") + else: + raise RuntimeError(f"Erro no banco de dados ao atualizar usuário: {e}") + except (KeyError, ValueError) as e: + # Re-lança as exceções de não encontrado ou e-mail duplicado + raise e + except Exception as e: + if conn: + conn.rollback() + print(f"Unexpected error in update: {e}") + raise RuntimeError(f"Erro inesperado ao atualizar usuário: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() + + @staticmethod + def delete(user_id: int) -> bool: + """ + Exclui um usuário com base no ID. Lança exceção se o usuário não for encontrado. + """ + conn = None + cur = None + try: + conn = get_connection() + cur = conn.cursor() + + # Verifica se o usuário existe antes de tentar deletar + cur.execute("SELECT 1 FROM G_USUARIO WHERE USUARIO_ID = ?", (user_id,)) + if not cur.fetchone(): + raise KeyError(f"Usuário com ID {user_id} não encontrado para exclusão.") + + cur.execute("DELETE FROM G_USUARIO WHERE USUARIO_ID = ?", (user_id,)) + conn.commit() + + return True + except DatabaseError as e: + if conn: + conn.rollback() + print(f"Database error in delete: {e}") + raise RuntimeError(f"Erro no banco de dados ao excluir usuário: {e}") + except KeyError as e: + # Re-lança a exceção de não encontrado + raise e + except Exception as e: + if conn: + conn.rollback() + print(f"Unexpected error in delete: {e}") + raise RuntimeError(f"Erro inesperado ao excluir usuário: {e}") + finally: + if cur: + cur.close() + if conn: + conn.close() \ No newline at end of file diff --git a/api/v1/schemas/caixa/c_caixa_item_schema.py b/api/v1/schemas/caixa/c_caixa_item_schema.py new file mode 100644 index 0000000..0a976e4 --- /dev/null +++ b/api/v1/schemas/caixa/c_caixa_item_schema.py @@ -0,0 +1,34 @@ +# schemas/c_caixa_item_schema.py + +from typing import Optional, List +from pydantic import BaseModel, EmailStr +from datetime import datetime +from decimal import Decimal + +# Schema base usado para representar um caixa_item retornado pela API +class CCaixaItemSchemaBase(BaseModel): + caixa_item_id: Optional[int] = None + descricao: Optional[str] = None + data_pagamento: Optional[datetime] = None + valor_servico: Optional[Decimal] = None + valor_pago: Optional[Decimal] = None + apresentante: Optional[str] = None + + class Config: + from_attributes = True # Permite construir a partir de dicts ou ORMs (mesmo sem ORM aqui) + +# Schema para listagem de registros com paginação +class CCaixaItemSchemaList(BaseModel): + caixa_item_id: Optional[int] + descricao: Optional[str] + data_pagamento: Optional[datetime] + valor_servico: Optional[Decimal] + valor_pago: Optional[Decimal] + apresentante: Optional[str] + + +class CCaixaItemPaginationSchema(BaseModel): + total: int + skip: int + limit: int + data: List[CCaixaItemSchemaList] \ No newline at end of file diff --git a/api/v1/schemas/caixa/t_ato_schema.py b/api/v1/schemas/caixa/t_ato_schema.py new file mode 100644 index 0000000..094ac69 --- /dev/null +++ b/api/v1/schemas/caixa/t_ato_schema.py @@ -0,0 +1,73 @@ +# schemas/t_ato_schema.py + +from typing import Optional, List +from pydantic import BaseModel, EmailStr +from datetime import datetime + +# Schema base usado para representar um usuário retornado pela API +class TAtoSchemaBase(BaseModel): + user_id: Optional[int] = None # ID do usuário (opcional) + nome_completo: Optional[str] = None # Nome completo do usuário + email: Optional[EmailStr] = None # E-mail validado do usuário + + class Config: + from_attributes = True # Permite construir a partir de dicts ou ORMs (mesmo sem ORM aqui) + + +# Schema usado para criação de um novo usuário +class TAtoSchemaCreate(BaseModel): + nome_completo: str # Nome completo obrigatório + email: EmailStr # E-mail obrigatório e validado + senha_api: str # Senha enviada pelo cliente (será criptografada no backend) + + +# Schema usado para atualização de dados do usuário +# Todos os campos são opcionais para permitir atualizações parciais +class TAtoSchemaUpdate(BaseModel): + nome_completo: Optional[str] = None # Atualização do nome, se fornecido + email: Optional[EmailStr] = None # Atualização do e-mail, se fornecido + senha_api: Optional[str] = None # Atualização da senha, se fornecida + + class Config: + from_attributes = True + + +# Schema para listagem de registros com paginação +class TAtoSchemaList(BaseModel): + usuario_id: float # USUARIO_ID NUMERIC(10,2) + trocarsenha: Optional[str] + login: Optional[str] + situacao: Optional[str] + nome_completo: Optional[str] + funcao: Optional[str] + assina: Optional[str] + sigla: Optional[str] + usuario_tab: Optional[float] # NUMERIC(10,2) + ultimo_login: Optional[datetime] + ultimo_login_regs: Optional[datetime] + data_expiracao: Optional[datetime] + andamento_padrao: Optional[float] # NUMERIC(10,2) + lembrete_pergunta: Optional[str] + lembrete_resposta: Optional[str] + andamento_padrao2: Optional[float] # NUMERIC(10,2) + receber_mensagem_arrolamento: Optional[str] + email: Optional[EmailStr] # ou Optional[str] se preferir mais flexível + assina_certidao: Optional[str] + receber_email_penhora: Optional[str] + foto: Optional[bytes] # BLOB SUB_TYPE BINARY + nao_receber_chat_todos: Optional[str] + pode_alterar_caixa: Optional[str] + receber_chat_certidao_online: Optional[str] + receber_chat_cancelamento: Optional[str] + cpf: Optional[str] + somente_leitura: Optional[str] + receber_chat_envio_onr: Optional[str] + tipo_usuario: Optional[str] + data_cadastro: Optional[datetime] + + +class TAtoPaginationSchema(BaseModel): + total: int + skip: int + limit: int + data: List[TAtoSchemaList] diff --git a/api/v1/schemas/g_usuario_schema.py b/api/v1/schemas/g_usuario_schema.py new file mode 100644 index 0000000..8695dbc --- /dev/null +++ b/api/v1/schemas/g_usuario_schema.py @@ -0,0 +1,76 @@ +# schemas/g_usuario_schema.py + +from typing import Optional, List +from pydantic import BaseModel, EmailStr +from datetime import datetime + +# Schema base usado para representar um usuário retornado pela API +class UserSchemaBase(BaseModel): + user_id: Optional[int] = None # ID do usuário (opcional) + nome_completo: Optional[str] = None # Nome completo do usuário + email: Optional[EmailStr] = None # E-mail validado do usuário + telefone: Optional[str] = None # Telefone validado do usuário + + class Config: + from_attributes = True # Permite construir a partir de dicts ou ORMs (mesmo sem ORM aqui) + + +# Schema usado para criação de um novo usuário +class UserSchemaCreate(BaseModel): + nome_completo: str # Nome completo obrigatório + email: EmailStr # E-mail obrigatório e validado + senha_api: str # Senha enviada pelo cliente (será criptografada no backend) + + +# Schema usado para atualização de dados do usuário +# Todos os campos são opcionais para permitir atualizações parciais +class UserSchemaUpdate(BaseModel): + nome_completo: Optional[str] = None # Atualização do nome, se fornecido + email: Optional[EmailStr] = None # Atualização do e-mail, se fornecido + senha_api: Optional[str] = None # Atualização da senha, se fornecida + telefone: Optional[str] = None # Atualização do telefone, se fornecida + + class Config: + from_attributes = True + + +# Schema para listagem de registros com paginação +class UserSchemaList(BaseModel): + usuario_id: float # USUARIO_ID NUMERIC(10,2) + trocarsenha: Optional[str] + login: Optional[str] + situacao: Optional[str] + nome_completo: Optional[str] + funcao: Optional[str] + assina: Optional[str] + sigla: Optional[str] + usuario_tab: Optional[float] # NUMERIC(10,2) + ultimo_login: Optional[datetime] + ultimo_login_regs: Optional[datetime] + data_expiracao: Optional[datetime] + andamento_padrao: Optional[float] # NUMERIC(10,2) + lembrete_pergunta: Optional[str] + lembrete_resposta: Optional[str] + andamento_padrao2: Optional[float] # NUMERIC(10,2) + receber_mensagem_arrolamento: Optional[str] + email: Optional[EmailStr] # ou Optional[str] se preferir mais flexível + assina_certidao: Optional[str] + receber_email_penhora: Optional[str] + foto: Optional[bytes] # BLOB SUB_TYPE BINARY + nao_receber_chat_todos: Optional[str] + pode_alterar_caixa: Optional[str] + receber_chat_certidao_online: Optional[str] + receber_chat_cancelamento: Optional[str] + cpf: Optional[str] + somente_leitura: Optional[str] + receber_chat_envio_onr: Optional[str] + tipo_usuario: Optional[str] + data_cadastro: Optional[datetime] + telefone: Optional[str] + + +class UserPaginationSchema(BaseModel): + total: int + skip: int + limit: int + data: List[UserSchemaList] diff --git a/core/auth.py b/core/auth.py new file mode 100644 index 0000000..2153740 --- /dev/null +++ b/core/auth.py @@ -0,0 +1,35 @@ +from pytz import timezone +from datetime import datetime, timedelta + +from fastapi.security import OAuth2PasswordBearer +from jose import jwt + +from core.configs import settings +from api.v1.controllers.g_usuario_controller import authenticate_user + +# Define o esquema OAuth2 para login +oauth2_schema = OAuth2PasswordBearer( + tokenUrl=f"{settings.API_V1_STR}/usuarios/login" +) + +# Função para criar um token JWT +def create_token(tipo_token: str, tempo_vida: timedelta, sub: str) -> str: + payload = {} + sp = timezone('America/Sao_Paulo') + expira = datetime.now(tz=sp) + tempo_vida + + payload["type"] = tipo_token + payload["exp"] = expira + payload["iat"] = datetime.now(tz=sp) + payload["sub"] = str(sub) + + return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.ALGORITHM) + + +# Criação do token de acesso (access_token) +def create_access_token(sub: str) -> str: + return create_token( + tipo_token='access_token', + tempo_vida=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), + sub=sub + ) diff --git a/core/configs.py b/core/configs.py new file mode 100644 index 0000000..5b27f7c --- /dev/null +++ b/core/configs.py @@ -0,0 +1,34 @@ +from pydantic_settings import BaseSettings + +# Classe de configurações gerais da aplicação +class Settings(BaseSettings): + # Prefixo padrão para as rotas da API + API_V1_STR: str = '/api/v1' + + # URL de conexão com o banco de dados Firebird 4 (driver oficial) + # Obs: encode a senha corretamente se houver caracteres especiais + # DB_URL: str = "firebird://SYSDBA:Sun147oi.@185.139.1.35:3050/CARTORIO" + DB_URL: str = "firebird://SYSDBA:Sun147oi.@185.139.1.35:3050/CARTORIO" + + + # Chave secreta usada para geração de tokens JWT + JWT_SECRET: str = 'WYe1zwtlDkh39_X3X3qTSICFDxts4VQrMyGLxnEpGUg' + + """ + Para gerar uma nova chave JWT segura, use: + import secrets + secrets.token_urlsafe(32) + """ + + # Algoritmo usado para assinar os tokens JWT + ALGORITHM: str = 'HS256' + + # Tempo de expiração do token JWT (em minutos): 1 semana + ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 7 + + # Configuração do Pydantic + class Config: + case_sensitive = True # Variáveis de ambiente sensíveis a maiúsculas/minúsculas + +# Instância global das configurações +settings: Settings = Settings() \ No newline at end of file diff --git a/core/database.py b/core/database.py new file mode 100644 index 0000000..c9861a6 --- /dev/null +++ b/core/database.py @@ -0,0 +1,28 @@ +import os +from urllib.parse import urlparse, unquote +from firebird.driver import connect +from core.configs import settings + +def get_connection(): + parsed = urlparse(settings.DB_URL) + + user = parsed.username + password = unquote(parsed.password or '') + host = parsed.hostname + port = parsed.port or 3050 + database = parsed.path.lstrip('/') + + # Ajusta o caminho no Windows: transforma '/' em '\' + if os.name == 'nt': # 'nt' = Windows + database = database.replace('/', '\\') + + # Constrói o DSN no formato 'hostname/port:database_path' + # E essa string é passada como o PRIMEIRO ARGUMENTO POSICIONAL + connection_dsn = f"{host}/{port}:{database}" + + return connect( + connection_dsn, # Este é o DSN completo que o driver espera + user=user, + password=password, + charset="UTF8" + ) \ No newline at end of file diff --git a/core/deps.py b/core/deps.py new file mode 100644 index 0000000..54a8454 --- /dev/null +++ b/core/deps.py @@ -0,0 +1,64 @@ +# core/deps.py + +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from jose import jwt, JWTError +from core.configs import settings +from api.v1.models.g_usuario_model import UserModel # <--- Importe o UserModel + +# Define o esquema de segurança OAuth2 (token tipo Bearer) +oauth2_schema = OAuth2PasswordBearer( + tokenUrl=f"{settings.API_V1_STR}/usuarios/login" +) + +# Função que retorna o usuário autenticado com base no token JWT +def get_current_user(token: str = Depends(oauth2_schema)) -> dict: + credential_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail='Could not validate credentials', + headers={"WWW-Authenticate": "Bearer"}, + ) + + try: + payload = jwt.decode( + token, + settings.JWT_SECRET, + algorithms=[settings.ALGORITHM], + options={"verify_aud": False} + ) + + user_id: str = payload.get("sub") + + if user_id is None: + raise credential_exception + + except JWTError: + raise credential_exception + + # --- NOVO: Buscar os dados completos do usuário do banco de dados --- + # Convert user_id para int, se ele for um string no JWT e int no banco + try: + user_id_int = int(user_id) + except ValueError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Invalid user ID format in token." + ) + + # Use o UserModel para buscar os dados completos + # Adicione um try-except para a chamada ao modelo para capturar erros de DB + try: + user = UserModel.get_by_id(user_id_int) + except Exception as e: # Captura qualquer erro ao buscar no DB + print(f"Error fetching user in get_current_user: {e}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to retrieve user data. {str(e)}" + ) + + if not user: + # Se o usuário não for encontrado no DB (mas o token era válido para um ID), + # pode indicar um usuário deletado ou um ID inválido no token. + raise credential_exception # Ou HTTPException(404, "User associated with token not found") + + return user # Retorna o dicionário completo do usuário \ No newline at end of file diff --git a/core/security.py b/core/security.py new file mode 100644 index 0000000..936691f --- /dev/null +++ b/core/security.py @@ -0,0 +1,32 @@ +# core/security.py + +# Importa CryptContext da biblioteca passlib para operações de hash de senha +from passlib.context import CryptContext + +# Cria uma instância do contexto de criptografia +# O esquema usado é 'bcrypt', que é seguro e amplamente aceito +# O parâmetro 'deprecated="auto"' marca versões antigas como inseguras, se aplicável +CRYPTO = CryptContext(schemes=['bcrypt'], deprecated='auto') + + +# Verifica se uma senha fornecida corresponde ao hash armazenado +def verify_senha_api(plain_senha_api: str, hashed_senha_api: str) -> bool: + """ + Compara a senha fornecida em texto puro com o hash armazenado. + + :param plain_senha_api: Senha digitada pelo usuário + :param hashed_senha_api: Hash da senha armazenado no banco de dados + :return: True se corresponder, False se não + """ + return CRYPTO.verify(plain_senha_api, hashed_senha_api) + + +# Gera o hash de uma senha fornecida +def hash_senha_api(plain_senha_api: str) -> str: + """ + Gera e retorna o hash da senha fornecida. + + :param plain_senha_api: Senha em texto puro fornecida pelo usuário + :return: Hash da senha + """ + return CRYPTO.hash(plain_senha_api) diff --git a/core/validation.py b/core/validation.py new file mode 100644 index 0000000..3e34a15 --- /dev/null +++ b/core/validation.py @@ -0,0 +1,32 @@ +# utils/validation.py + +import re +import html + + +class InputSanitizer: + + @staticmethod + def clean_text(text: str) -> str: + """Trim spaces, escape HTML entities, collapse multiple spaces.""" + text = text.strip() + text = html.escape(text) + text = re.sub(r"\s+", " ", text) + return text + + @staticmethod + def is_valid_email(email: str) -> bool: + """Check if email has a valid structure""" + return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email)) + + @staticmethod + def has_script(text: str) -> bool: + """Detect basic XSS attempts""" + return " bool: + """Detect common XSS/SQL injection characters or patterns""" + blacklist = ["