Merge branch 'release(MVP/Sprint2)' into homologacao

This commit is contained in:
Kenio 2025-09-12 11:14:23 -03:00
commit 0d3b2ef080
131 changed files with 12519 additions and 28227 deletions

3
.gitignore vendored
View file

@ -3,6 +3,9 @@ venv/
.env
.env.*
# Ignora configuração de acesso ao banco de dados Firebird
config/database/firebird.json
# Bytecode compilado
__pycache__/
*.py[cod]

View file

@ -0,0 +1,43 @@
# core/security.py
# Importa CryptContext da biblioteca passlib para operações de hash de senha
from passlib.context import CryptContext
# Cria uma instância do contexto de criptografia
# O esquema usado é 'bcrypt', que é seguro e amplamente aceito
# O parâmetro 'deprecated="auto"' marca versões antigas como inseguras, se aplicável
CRYPTO = CryptContext(schemes=['bcrypt'], deprecated='auto')
class Security:
# Verifica se a senha tem um hash válido
@staticmethod
def is_hash(senha: str) -> bool:
"""
Verifica se a string fornecida é um hash reconhecido pelo CryptContext.
"""
return CRYPTO.identify(senha)
# Verifica se uma senha fornecida corresponde ao hash armazenado
def verify_senha_api(plain_senha_api: str, hashed_senha_api: str) -> bool:
"""
Compara a senha fornecida em texto puro com o hash armazenado.
:param plain_senha_api: Senha digitada pelo usuário
:param hashed_senha_api: Hash da senha armazenado no banco de dados
:return: True se corresponder, False se não
"""
return CRYPTO.verify(plain_senha_api, hashed_senha_api)
# Gera o hash de uma senha fornecida
def hash_senha_api(plain_senha_api: str) -> str:
"""
Gera e retorna o hash da senha fornecida.
:param plain_senha_api: Senha em texto puro fornecida pelo usuário
:return: Hash da senha
"""
return CRYPTO.hash(plain_senha_api)

View file

@ -0,0 +1,34 @@
import re
class CPF:
@staticmethod
def is_valid_cpf(data: str) -> bool:
# Remove caracteres não numéricos
data = re.sub(r'\D', '', data)
# Verifica se tem 11 dígitos
if len(data) != 11:
return False
# CPFs com todos os dígitos iguais são inválidos
if data == data[0] * 11:
return False
# Calcula o primeiro e segundo dígitos verificadores
def calcular_digito(digitos, peso):
soma = sum(int(a) * b for a, b in zip(digitos, peso))
resto = soma % 11
return '0' if resto < 2 else str(11 - resto)
# Primeiro dígito verificador
peso1 = range(10, 1, -1)
digito1 = calcular_digito(data[:9], peso1)
# Segundo dígito verificador
peso2 = range(11, 1, -1)
digito2 = calcular_digito(data[:10], peso2)
# Verifica se os dígitos batem
return data[-2:] == digito1 + digito2

View file

@ -4,8 +4,6 @@ import re
class Email:
@staticmethod
def validate(data: str) -> str:
# Validação de email
default = r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$'
if not re.match(default, data):
raise f"Email inválido: {data}"
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))

View file

@ -4,16 +4,60 @@ import re
class Text:
@staticmethod
def sanitize(data: str) -> str:
"""Trim spaces, escape HTML entities, collapse multiple spaces."""
data = data.strip()
data = html.escape(data)
data = re.sub(r"\s+", " ", data)
return data
# Remove as mascaras de números
@staticmethod
def just_numbers(data: str) -> str:
""" Mantêm apenas os numeros """
data = re.sub(r"[^\d]", "", data)
return data
# Verifica se um e-mail é válido
@staticmethod
def is_valid_email(email: str) -> bool:
"""Check if email has a valid structure"""
return bool(re.match(r"^[\w\.-]+@[\w\.-]+\.\w+$", email))
"""
Sanitiza entradas de texto contra XSS e SQL Injection básicos.
- Remove espaços extras
- Escapa entidades HTML
- Remove padrões suspeitos de XSS e SQL Injection
- Normaliza múltiplos espaços em um
"""
@staticmethod
def sanitize_input(data: str) -> str:
if not data:
return data
# 1) Remove espaços no início e no fim
data = data.strip()
# 2) Escapa entidades HTML (< > & ")
data = html.escape(data)
# 3) Remove múltiplos espaços seguidos
data = re.sub(r"\s+", " ", data)
# 4) Remove tags <script> (com atributos)
data = re.sub(r'<\s*script[^>]*>', '', data, flags=re.IGNORECASE)
data = re.sub(r'<\s*/\s*script\s*>', '', data, flags=re.IGNORECASE)
# 5) Remove javascript: de links
data = re.sub(r'javascript\s*:', '', data, flags=re.IGNORECASE)
# 6) Remove palavras-chave SQL Injection comuns
blacklist = [
"--", ";", "/*", "*/", "@@",
"char(", "nchar(", "varchar(",
"alter", "drop", "exec", "insert",
"delete", "update", "union", "select",
"from", "where"
]
for word in blacklist:
# Verificar se 'word' é uma string não vazia e válida para a regex
if word:
data = re.sub(re.escape(word), "", data, flags=re.IGNORECASE)
return data

View file

@ -1,9 +1,9 @@
{
"host": "localhost",
"name": "D:/Orius/Base/CAIAPONIA.FDB",
"name": "CARTORIO",
"port": 3050,
"user": "SYSDBA",
"password": "master!orius",
"password": "masterkey",
"charset": "UTF8",
"pool" : {
"pre_ping" : true,

View file

@ -0,0 +1,13 @@
{
"host": "localhost",
"name": "D:/Orius/Base/CAIAPONIA.FDB",
"port": 3050,
"user": "SYSDBA",
"password": "master!orius",
"charset": "UTF8",
"pool" : {
"pre_ping" : true,
"size" : 5,
"max_overflow" :10
}
}

View file

@ -1,7 +1,5 @@
from packages.v1.administrativo.repositories.c_caixa_item.delete import \
Delete
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.repositories.c_caixa_item.c_caixa_item_delete import Delete
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.action import BaseAction

View file

@ -1,4 +1,4 @@
from packages.v1.administrativo.repositories.c_caixa_item.index import \
from packages.v1.administrativo.repositories.c_caixa_item.c_caixa_item_index import \
Index
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSearchSchema

View file

@ -1,4 +1,4 @@
from packages.v1.administrativo.repositories.c_caixa_item.save import Save
from packages.v1.administrativo.repositories.c_caixa_item.c_caixa_item_save import Save
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from abstracts.action import BaseAction

View file

@ -1,6 +1,5 @@
from packages.v1.administrativo.repositories.c_caixa_item.show import Show
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.repositories.c_caixa_item.c_caixa_item_show import Show
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.action import BaseAction

View file

@ -0,0 +1,14 @@
from packages.v1.administrativo.repositories.c_caixa_item.c_caixa_item_update_repository import UpdateRepository
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.action import BaseAction
class UpdateAction(BaseAction):
def execute(self, caixa_item_id : int, caixa_item_schema : CaixaItemSchema):
# Instância o repositório desejado
update = UpdateRepository()
# Executa o respositório desejado
return update.execute(caixa_item_id, caixa_item_schema)

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoIdSchema
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_delete_repository import DeleteRepository
class DeleteAction:
def execute(self, usuario_schema : CCaixaServicoIdSchema):
delete_repository = DeleteRepository()
return delete_repository.execute(usuario_schema)

View file

@ -0,0 +1,16 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoDescricaoSchema
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_get_by_descricao_repository import ShowRepository
class ShowAction(BaseAction):
def execute(self, caixa_servico_schema : CCaixaServicoDescricaoSchema):
# Instânciamento do repositório sql
show_repository = ShowRepository()
# Execução do sql
response = show_repository.execute(caixa_servico_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,15 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_index_repository import IndexRepository
class IndexAction(BaseAction):
def execute(self):
# Instânciamento do repositório sql
index_repository = IndexRepository()
# Execução do sql
response = index_repository.execute()
# Retorno da informação
return response

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoSaveSchema
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_save_repository import SaveRepository
class SaveAction:
def execute(self, usuario_schema : CCaixaServicoSaveSchema):
save_repository = SaveRepository()
return save_repository.execute(usuario_schema)

View file

@ -0,0 +1,16 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoSchema
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_show_repository import ShowRepository
class ShowAction(BaseAction):
def execute(self, usuario_schema : CCaixaServicoSchema):
# Instânciamento do repositório sql
show_repository = ShowRepository()
# Execução do sql
response = show_repository.execute(usuario_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoUpdateSchema
from packages.v1.administrativo.repositories.c_caixa_servico.c_caixa_servico_update_repository import UpdateRepository
class UpdateAction:
def execute(self, caixa_servico_id : int, c_caixa_servico_schema : CCaixaServicoUpdateSchema):
save_repository = UpdateRepository()
return save_repository.execute(caixa_servico_id, c_caixa_servico_schema)

View file

@ -1,10 +1,10 @@
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSchema
from packages.v1.administrativo.repositories.g_usuario.delete_repository import DeleteRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioIdSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_delete_repository import DeleteRepository
class DeleteAction:
def execute(self, usuario_schema : GUsuarioSchema):
def execute(self, usuario_schema : GUsuarioIdSchema):
delete_repository = DeleteRepository()

View file

@ -0,0 +1,14 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioAuthenticateSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_get_by_authenticate_repository import GetByAuthenticateRepository
class GetByAuthenticateAction(BaseAction):
def execute(self, g_usuario_authenticate_schema : GUsuarioAuthenticateSchema):
# Instânciamento do repositório de busca pelo authenticate
get_by_authenticate_repository = GetByAuthenticateRepository()
# Execução do repositório
return get_by_authenticate_repository.execute(g_usuario_authenticate_schema)

View file

@ -0,0 +1,13 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioCpfSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_get_by_cpf_repository import GetByUsuarioCpfRepository
class GetByUsuarioCpfAction(BaseAction):
def execute(self, g_usuario_schema = GUsuarioCpfSchema):
# Importação do repositório
get_by_cpf_repository = GetByUsuarioCpfRepository()
# Execução do repositório
return get_by_cpf_repository.execute(g_usuario_schema)

View file

@ -0,0 +1,13 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioEmailSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_get_by_email_repository import GetByUsuarioEmailRepository
class GetByUsuarioEmailAction(BaseAction):
def execute(self, g_usuario_schema = GUsuarioEmailSchema):
# Importação do repositório
get_by_email_repository = GetByUsuarioEmailRepository()
# Execução do repositório
return get_by_email_repository.execute(g_usuario_schema)

View file

@ -0,0 +1,13 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioLoginSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_get_by_login_repository import GetByUsuarioLoginRepository
class GetByUsuarioLoginAction(BaseAction):
def execute(self, g_usuario_schema = GUsuarioLoginSchema):
# Importação do repositório
get_by_login_repository = GetByUsuarioLoginRepository()
# Execução do repositório
return get_by_login_repository.execute(g_usuario_schema)

View file

@ -1,5 +1,5 @@
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSchema
from packages.v1.administrativo.repositories.g_usuario.get_by_usuario_id_repository import GetByUsuarioIdRepository
from packages.v1.administrativo.repositories.g_usuario.g_usuario_get_by_usuario_id_repository import GetByUsuarioIdRepository
class GetByUsuarioIdAction:

View file

@ -1,5 +1,5 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.g_usuario.index_repository import IndexRepository
from packages.v1.administrativo.repositories.g_usuario.g_usuario_index_repository import IndexRepository
class IndexAction(BaseAction):

View file

@ -1,10 +1,10 @@
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSchema
from packages.v1.administrativo.repositories.g_usuario.save_repository import SaveRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSaveSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_save_repository import SaveRepository
class SaveAction:
def execute(self, usuario_schema : GUsuarioSchema):
def execute(self, usuario_schema : GUsuarioSaveSchema):
save_repository = SaveRepository()

View file

@ -1,6 +1,6 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSchema
from packages.v1.administrativo.repositories.g_usuario.show_repository import ShowRepository
from packages.v1.administrativo.repositories.g_usuario.g_usuario_show_repository import ShowRepository
class ShowAction(BaseAction):

View file

@ -0,0 +1,11 @@
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioUpdateSchema
from packages.v1.administrativo.repositories.g_usuario.g_usuario_update_repository import UpdateRepository
class UpdateAction:
def execute(self, usuario_id: int, usuario_schema : GUsuarioUpdateSchema):
save_repository = UpdateRepository()
return save_repository.execute(usuario_id, usuario_schema)

View file

@ -1,14 +0,0 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioLoginSchema
from packages.v1.administrativo.repositories.g_usuario.get_by_login_repository import GetByLoginRepository
class GetByLoginAction(BaseAction):
def execute(self, g_usuario_login_schema : GUsuarioLoginSchema):
# Instânciamento do repositório de busca pelo login
get_by_login_repository = GetByLoginRepository()
# Execução do repositório
return get_by_login_repository.execute(g_usuario_login_schema)

View file

@ -0,0 +1,26 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoIdSchema
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_delete_repository import DeleteRepository
class DeleteAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de exclusão de um registro na tabela t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoIdSchema):
"""
Executa a operação de exclusão no banco de dados.
Args:
andamentoservico_schema (TTbAndamentoservicoIdSchema): O esquema com o ID a ser excluído.
Returns:
O resultado da operação de exclusão.
"""
# Instanciamento do repositório
delete_repository = DeleteRepository()
# Execução do repositório
return delete_repository.execute(andamentoservico_schema)

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoDescricaoSchema
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_get_by_descricao_repository import GetByDescricaoRepository
class GetByDescricaoAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de busca de um registro na tabela t_tb_andamentoservico por descrição.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoDescricaoSchema):
"""
Executa a operação de busca no banco de dados.
Args:
andamentoservico_schema (TTbAndamentoservicoDescricaoSchema): O esquema com a descrição a ser buscada.
Returns:
O registro encontrado ou None.
"""
# Instanciamento do repositório
show_repository = GetByDescricaoRepository()
# Execução do repositório
response = show_repository.execute(andamentoservico_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,24 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_index_repository import IndexRepository
class IndexAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de todos os registros na tabela t_tb_andamentoservico.
"""
def execute(self):
"""
Executa a operação de listagem no banco de dados.
Returns:
A lista de todos os registros.
"""
# Instanciamento do repositório
index_repository = IndexRepository()
# Execução do repositório
response = index_repository.execute()
# Retorno da informação
return response

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoSaveSchema
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_save_repository import SaveRepository
class SaveAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de salvar um novo registro na tabela t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoSaveSchema):
"""
Executa a operação de salvamento.
Args:
andamentoservico_schema (TTbAndamentoservicoSaveSchema): O esquema com os dados a serem salvos.
Returns:
O resultado da operação de salvamento.
"""
# Instânciamento do repositório
save_repository = SaveRepository()
# Execução do repositório
response = save_repository.execute(andamentoservico_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoIdSchema
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_show_repository import ShowRepository
class ShowAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a exibição
de um registro na tabela t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoIdSchema):
"""
Executa a operação de exibição.
Args:
andamentoservico_schema (TTbAndamentoservicoIdSchema): O esquema com o ID do registro a ser exibido.
Returns:
O resultado da operação de exibição.
"""
# Instânciamento do repositório
show_repository = ShowRepository()
# Execução do repositório
response = show_repository.execute(andamentoservico_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,25 @@
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoUpdateSchema
from packages.v1.administrativo.repositories.t_tb_andamentoservico.t_tb_andamentoservico_update_repository import UpdateRepository
class UpdateAction:
"""
Service responsável por encapsular a lógica de negócio para a atualização
de um registro na tabela t_tb_andamentoservico.
"""
def execute(self, tb_andamentoservico_id : int, andamentoservico_schema: TTbAndamentoservicoUpdateSchema):
"""
Executa a operação de atualização.
Args:
andamentoservico_schema (TTbAndamentoservicoUpdateSchema): O esquema com os dados a serem atualizados.
Returns:
O resultado da operação de atualização.
"""
# Instância o repositório de atualização
update_repository = UpdateRepository()
# Chama o método de execução do repositório para realizar a atualização
return update_repository.execute(tb_andamentoservico_id, andamentoservico_schema)

View file

@ -0,0 +1,26 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoIdSchema
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_delete_repository import DeleteRepository
class DeleteAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de exclusão de um registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoIdSchema):
"""
Executa a operação de exclusão no banco de dados.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoIdSchema): O esquema com o ID a ser excluído.
Returns:
O resultado da operação de exclusão.
"""
# Instanciamento do repositório
delete_repository = DeleteRepository()
# Execução do repositório
return delete_repository.execute(reconhecimentotipo_schema)

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoDescricaoSchema
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_get_by_descricao_repository import GetByDescricaoRepository
class GetByDescricaoAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de busca de um registro na tabela t_tb_reconhecimentotipo por descrição.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoDescricaoSchema):
"""
Executa a operação de busca no banco de dados.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoDescricaoSchema): O esquema com a descrição a ser buscada.
Returns:
O registro encontrado ou None.
"""
# Instanciamento do repositório
show_repository = GetByDescricaoRepository()
# Execução do repositório
response = show_repository.execute(reconhecimentotipo_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,24 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_index_repository import IndexRepository
class IndexAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de todos os registros na tabela t_tb_reconhecimentotipo.
"""
def execute(self):
"""
Executa a operação de listagem no banco de dados.
Returns:
A lista de todos os registros.
"""
# Instanciamento do repositório
index_repository = IndexRepository()
# Execução do repositório
response = index_repository.execute()
# Retorno da informação
return response

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoSaveSchema
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_save_repository import SaveRepository
class SaveAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de salvar um novo registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoSaveSchema):
"""
Executa a operação de salvamento.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoSaveSchema): O esquema com os dados a serem salvos.
Returns:
O resultado da operação de salvamento.
"""
# Instânciamento do repositório
save_repository = SaveRepository()
# Execução do repositório
response = save_repository.execute(reconhecimentotipo_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,28 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoIdSchema
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_show_repository import ShowRepository
class ShowAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a exibição
de um registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoIdSchema):
"""
Executa a operação de exibição.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoIdSchema): O esquema com o ID do registro a ser exibido.
Returns:
O resultado da operação de exibição.
"""
# Instânciamento do repositório
show_repository = ShowRepository()
# Execução do repositório
response = show_repository.execute(reconhecimentotipo_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,25 @@
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoUpdateSchema
from packages.v1.administrativo.repositories.t_tb_reconhecimentotipo.t_tb_reconhecimentotipo_update_repository import UpdateRepository
class UpdateAction:
"""
Service responsável por encapsular a lógica de negócio para a atualização
de um registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, tb_reconhecimentotipo_id : int, reconhecimentotipo_schema: TTbReconhecimentotipoUpdateSchema):
"""
Executa a operação de atualização.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoUpdateSchema): O esquema com os dados a serem atualizados.
Returns:
O resultado da operação de atualização.
"""
# Instância o repositório de atualização
update_repository = UpdateRepository()
# Chama o método de execução do repositório para realizar a atualização
return update_repository.execute(tb_reconhecimentotipo_id, reconhecimentotipo_schema)

View file

@ -1,6 +1,5 @@
# Importação de bibliotecas
from packages.v1.administrativo.schemas.c_caixa_item_schema import (
CaixaItemSchema, CaixaItemSearchSchema)
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from actions.dynamic_import.dynamic_import import DynamicImport
@ -17,7 +16,7 @@ class CCaixaItemController:
def index(self):
# Importação da classe desejad
indexService = self.dynamic_import.service("index_service", "IndexService")
indexService = self.dynamic_import.service("c_caixa_item_index_service", "IndexService")
# Intânciamento da classe service
self.indexService = indexService()
@ -31,7 +30,7 @@ class CCaixaItemController:
def create(self, caixa_item_schema: CaixaItemSchema):
# Importação da classe desejada
createService = self.dynamic_import.service("save_service", "SaveService")
createService = self.dynamic_import.service("c_caixa_item_save_service", "SaveService")
# Intânciamento da classe service
self.createService = createService()
@ -41,10 +40,26 @@ class CCaixaItemController:
'message' : 'Registros cadastrado com sucesso',
'data': self.createService.execute(caixa_item_schema)
}
def update(self, caixa_item_id : int, caixa_item_schema: CaixaItemSchema):
# Importação da classe desejada
updateService = self.dynamic_import.service("c_caixa_item_update_service", "UpdateService")
# Intânciamento da classe service
self.updateService = updateService()
# Lista todos os produtos
return {
'message' : 'Registros cadastrado com sucesso',
'data': self.updateService.execute(caixa_item_id, caixa_item_schema)
}
def show(self, caixa_item_schema: CaixaItemSchema):
# Importação da classe desejad
showService = self.dynamic_import.service("show_service", "ShowService")
showService = self.dynamic_import.service("c_caixa_item_show_service", "ShowService")
# Intânciamento da classe service
self.showService = showService()
@ -57,7 +72,7 @@ class CCaixaItemController:
def delete(self, caixa_item_schema: CaixaItemSchema):
# Importação da classe desejad
deleteService = self.dynamic_import.service("delete_service", "DeleteService")
deleteService = self.dynamic_import.service("c_caixa_item_delete_service", "DeleteService")
# Intânciamento da classe service
self.deleteService = deleteService()

View file

@ -0,0 +1,113 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.c_caixa_servico_schema import (
CCaixaServicoSchema,
CCaixaServicoSaveSchema,
CCaixaServicoUpdateSchema,
CCaixaServicoIdSchema,
CCaixaServicoDescricaoSchema
)
class CCaixaServicoController:
def __init__(self):
# Action responsável por carregar as services de acodo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("c_caixa_servico")
pass
# Lista todos os caixa serviço
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("c_caixa_servico_index_service", "IndexService")
# Instânciamento da classe service
self.indexService = indexService()
# Lista todos os caixa serviço
return {
'message': 'caixa serviço localizados com sucesso',
'data': self.indexService.execute()
}
# Busca um usuário especifico pelo ID
def show(self, caixa_servico_schema : CCaixaServicoIdSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('c_caixa_servico_show_service', 'ShowService')
# Instânciamento da classe desejada
self.show_service = show_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Caixa Serviço localizado com sucesso',
'data': self.show_service.execute(caixa_servico_schema)
}
# Busca um caixa serviço pela descrição
def getDescricao(self, caixa_servico_schema : CCaixaServicoDescricaoSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('c_caixa_servico_get_descricao_service', 'GetDescricaoService')
# Instânciamento da classe desejada
self.show_service = show_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Caixa Serviço localizado com sucesso',
'data': self.show_service.execute(caixa_servico_schema, True)# True para retornar a mensagem de erro caso não localize o serviço
}
# Cadastra um novo usuário
def save(self, caixa_servico_schema : CCaixaServicoSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('c_caixa_servico_save_service', 'CCaixaServicoSaveService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Caixa Serviço salvo com sucesso',
'data': self.save_service.execute(caixa_servico_schema)
}
# Atualiza os dados de um usuário
def update(self, caixa_servico_id : int, caixa_servico_schema : CCaixaServicoUpdateSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('c_caixa_servico_update_service', 'CCaixaServicoUpdateService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Caixa Serviço atualizado com sucesso',
'data': self.save_service.execute(caixa_servico_id, caixa_servico_schema)
}
# Exclui um usuário
def delete(self, caixa_servico_schema : CCaixaServicoIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('c_caixa_servico_delete_service', 'DeleteService')
# Instânciamento da classe desejada
self.delete_service = delete_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Caixa Serviço removido com sucesso',
'data': self.delete_service.execute(caixa_servico_schema)
}

View file

@ -1,7 +1,13 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.g_usuario_schema import (
GUsuarioSchema,
GUsuarioLoginSchema
GUsuarioAuthenticateSchema,
GUsuarioSaveSchema,
GUsuarioUpdateSchema,
GUsuarioEmailSchema,
GUsuarioCpfSchema,
GUsuarioLoginSchema,
GUsuarioIdSchema
)
class GUsuarioController:
@ -9,32 +15,36 @@ class GUsuarioController:
def __init__(self):
# Action responsável por carregar as services de acodo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("g_usuario")
pass
def login(self, g_usuario_login_schema : GUsuarioLoginSchema):
# Efetua o acesso junto ao sistema por um determinado usuário
def authenticate(self, g_usuario_authenticate_schema : GUsuarioAuthenticateSchema):
# Importação de service de login
login_service = self.dynamic_import.service("login_service", "LoginService")
# Importação de service de Authenticate
authenticate_service = self.dynamic_import.service("g_usuario_authenticate_service", "AuthenticateService")
# Instânciamento da service
self.login_service = login_service()
self.authenticate_service = authenticate_service()
# Retorna o usuário logado
return {
'message' : 'Usuário localizado com sucesso',
'data' : {
'token' : self.login_service.execute(g_usuario_login_schema)
'token' : self.authenticate_service.execute(g_usuario_authenticate_schema)
}
}
# Carrega os dados do usuário logado
def me(self, current_user):
# Importação de service de login
me_service = self.dynamic_import.service("me_service", "MeService")
# Importação de service de authenticate
me_service = self.dynamic_import.service("g_usuario_me_service", "MeService")
# Instânciamento da service
self.me_service = me_service()
@ -45,10 +55,11 @@ class GUsuarioController:
'data' : self.me_service.execute(current_user)
}
# Lista todos os usuários
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("index_service", "IndexService")
indexService = self.dynamic_import.service("g_usuario_index_service", "IndexService")
# Instânciamento da classe service
self.indexService = indexService()
@ -59,10 +70,11 @@ class GUsuarioController:
'data': self.indexService.execute()
}
# Busca um usuário especifico pelo ID
def show(self, usuario_schema : GUsuarioSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('show_service', 'ShowService')
show_service = self.dynamic_import.service('g_usuario_show_service', 'ShowService')
# Instânciamento da classe desejada
self.show_service = show_service()
@ -73,10 +85,59 @@ class GUsuarioController:
'data': self.show_service.execute(usuario_schema)
}
def save(self, usuario_schema : GUsuarioSchema):
# Busca um usuário especifico pelo e-mail
def getEmail(self, usuario_schema : GUsuarioEmailSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('save_service', 'SaveService')
get_email_service = self.dynamic_import.service('g_usuario_get_email_service', 'GetEmailService')
# Instânciamento da classe desejada
self.get_email_service = get_email_service()
# Busca e retorna o usuário desejado
return {
'message' : 'E-mail localizado com sucesso',
'data': self.get_email_service.execute(usuario_schema, True) # True para retornar a mensagem de erro caso não localize o usuario
}
# Busca um usuário especifico pelo e-mail
def getLogin(self, usuario_schema : GUsuarioLoginSchema):
#Importação da classe desejada
get_login_service = self.dynamic_import.service('g_usuario_get_login_service', 'GetLoginService')
# Instânciamento da classe desejada
self.get_login_service = get_login_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Login localizado com sucesso',
'data': self.get_login_service.execute(usuario_schema, True) # True para retornar a mensagem de erro caso não localize o usuario
}
# Busca um usuário especifico pelo CPF
def getCpf(self, usuario_schema : GUsuarioCpfSchema):
#Importação da classe desejada
get_cpf_service = self.dynamic_import.service('g_usuario_get_cpf_service', 'GetCpfService')
# Instânciamento da classe desejada
self.get_cpf_service = get_cpf_service()
# Busca e retorna o usuário desejado
return {
'message' : 'CPF localizado com sucesso',
'data': self.get_cpf_service.execute(usuario_schema, True) # True para retornar a mensagem de erro caso não localize o usuario
}
# Cadastra um novo usuário
def save(self, usuario_schema : GUsuarioSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('g_usuario_save_service', 'GUsuarioSaveService')
# Instânciamento da classe desejada
self.save_service = save_service()
@ -87,24 +148,26 @@ class GUsuarioController:
'data': self.save_service.execute(usuario_schema)
}
def update(self, usuario_schema : GUsuarioSchema):
# Atualiza os dados de um usuário
def update(self, usuario_id: int, usuario_schema : GUsuarioUpdateSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('save_service', 'SaveService')
save_service = self.dynamic_import.service('g_usuario_update_service', 'GUsuarioUpdateService')
# Instânciamento da classe desejada
self.save_service = save_service()
# Busca e retorna o usuário desejado
return {
'message' : 'Usuário salvo com sucesso',
'data': self.save_service.execute(usuario_schema)
'message' : 'Usuário atualizado com sucesso',
'data': self.save_service.execute(usuario_id, usuario_schema)
}
def delete(self, usuario_schema : GUsuarioSchema):
# Exclui um usuário
def delete(self, usuario_schema : GUsuarioIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('delete_service', 'DeleteService')
delete_service = self.dynamic_import.service('g_usuario_delete_service', 'DeleteService')
# Instânciamento da classe desejada
self.delete_service = delete_service()

View file

@ -0,0 +1,113 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import (
TTbAndamentoservicoSchema,
TTbAndamentoservicoSaveSchema,
TTbAndamentoservicoUpdateSchema,
TTbAndamentoservicoIdSchema,
TTbAndamentoservicoDescricaoSchema
)
class TTbAndamentoservicoController:
def __init__(self):
# Action responsável por carregar as services de acordo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("t_tb_andamentoservico")
pass
# Lista todos os andamentos de serviço
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("t_tb_andamentoservico_index_service", "IndexService")
# Instância da classe service
self.indexService = indexService()
# Lista todos os andamentos de serviço
return {
'message': 'Andamentos de serviço localizados com sucesso',
'data': self.indexService.execute()
}
# Busca um andamento de serviço específico pelo ID
def show(self, andamentoservico_schema : TTbAndamentoservicoIdSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('t_tb_andamentoservico_show_service', 'ShowService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o andamento de serviço desejado
return {
'message' : 'Andamento de serviço localizado com sucesso',
'data': self.show_service.execute(andamentoservico_schema)
}
# Busca um andamento de serviço pela descrição
def get_by_descricao(self, andamentoservico_schema : TTbAndamentoservicoDescricaoSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('t_tb_andamentoservico_get_descricao_service', 'GetByDescricaoService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o andamento de serviço desejado
return {
'message' : 'Andamento de serviço localizado com sucesso',
'data': self.show_service.execute(andamentoservico_schema, True) #True para retornar a mensagem de erro caso não localize o serviço
}
# Cadastra um novo andamento de serviço
def save(self, andamentoservico_schema : TTbAndamentoservicoSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('t_tb_andamentoservico_save_service', 'TTbAndamentoservicoSaveService')
# Instância da classe desejada
self.save_service = save_service()
# Busca e retorna o andamento de serviço desejado
return {
'message' : 'Andamento de serviço salvo com sucesso',
'data': self.save_service.execute(andamentoservico_schema)
}
# Atualiza os dados de um andamento de serviço
def update(self, tb_andamentoservico_id : int, andamentoservico_schema : TTbAndamentoservicoUpdateSchema):
#Importação da classe desejada
update_service = self.dynamic_import.service('t_tb_andamentoservico_update_service', 'TTbAndamentoservicoUpdateService')
# Instância da classe desejada
self.update_service = update_service()
# Busca e retorna o andamento de serviço desejado
return {
'message' : 'Andamento de serviço atualizado com sucesso',
'data': self.update_service.execute(tb_andamentoservico_id, andamentoservico_schema)
}
# Exclui um andamento de serviço
def delete(self, andamentoservico_schema : TTbAndamentoservicoIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('t_tb_andamentoservico_delete_service', 'DeleteService')
# Instância da classe desejada
self.delete_service = delete_service()
# Busca e retorna o andamento de serviço desejado
return {
'message' : 'Andamento de serviço removido com sucesso',
'data': self.delete_service.execute(andamentoservico_schema)
}

View file

@ -0,0 +1,113 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import (
TTbReconhecimentotipoSchema,
TTbReconhecimentotipoSaveSchema,
TTbReconhecimentotipoUpdateSchema,
TTbReconhecimentotipoIdSchema,
TTbReconhecimentotipoDescricaoSchema
)
class TTbReconhecimentotipoController:
def __init__(self):
# Action responsável por carregar as services de acordo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("t_tb_reconhecimentotipo")
pass
# Lista todos os tipos de reconhecimento
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("t_tb_reconhecimentotipo_index_service", "IndexService")
# Instância da classe service
self.indexService = indexService()
# Lista todos os tipos de reconhecimento
return {
'message': 'Tipos de reconhecimento localizados com sucesso',
'data': self.indexService.execute()
}
# Busca um tipo de reconhecimento específico pelo ID
def show(self, reconhecimentotipo_schema : TTbReconhecimentotipoIdSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('t_tb_reconhecimentotipo_show_service', 'ShowService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o tipo de reconhecimento desejado
return {
'message' : 'Tipo de reconhecimento localizado com sucesso',
'data': self.show_service.execute(reconhecimentotipo_schema)
}
# Busca um tipo de reconhecimento pela descrição
def get_by_descricao(self, reconhecimentotipo_schema : TTbReconhecimentotipoDescricaoSchema):
#Importação da classe desejada
show_service = self.dynamic_import.service('t_tb_reconhecimentotipo_get_descricao_service', 'GetByDescricaoService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o tipo de reconhecimento desejado
return {
'message' : 'Tipo de reconhecimento localizado com sucesso',
'data': self.show_service.execute(reconhecimentotipo_schema, True) #True para retornar a mensagem de erro caso não localize o serviço
}
# Cadastra um novo tipo de reconhecimento
def save(self, reconhecimentotipo_schema : TTbReconhecimentotipoSaveSchema):
#Importação da classe desejada
save_service = self.dynamic_import.service('t_tb_reconhecimentotipo_save_service', 'TTbReconhecimentotipoSaveService')
# Instância da classe desejada
self.save_service = save_service()
# Busca e retorna o tipo de reconhecimento desejado
return {
'message' : 'Tipo de reconhecimento salvo com sucesso',
'data': self.save_service.execute(reconhecimentotipo_schema)
}
# Atualiza os dados de um tipo de reconhecimento
def update(self, tb_reconhecimentotipo_id : int, reconhecimentotipo_schema : TTbReconhecimentotipoUpdateSchema):
#Importação da classe desejada
update_service = self.dynamic_import.service('t_tb_reconhecimentotipo_update_service', 'TTbReconhecimentotipoUpdateService')
# Instância da classe desejada
self.update_service = update_service()
# Busca e retorna o tipo de reconhecimento desejado
return {
'message' : 'Tipo de reconhecimento atualizado com sucesso',
'data': self.update_service.execute(tb_reconhecimentotipo_id, reconhecimentotipo_schema)
}
# Exclui um tipo de reconhecimento
def delete(self, reconhecimentotipo_schema : TTbReconhecimentotipoIdSchema):
#Importação da classe desejada
delete_service = self.dynamic_import.service('t_tb_reconhecimentotipo_delete_service', 'DeleteService')
# Instância da classe desejada
self.delete_service = delete_service()
# Busca e retorna o tipo de reconhecimento desejado
return {
'message' : 'Tipo de reconhecimento removido com sucesso',
'data': self.delete_service.execute(reconhecimentotipo_schema)
}

View file

@ -1,14 +1,9 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.c_caixa_item_controller import \
CCaixaItemController
from packages.v1.administrativo.schemas.c_caixa_item_schema import (
CaixaItemSchema, CaixaItemSearchSchema)
from packages.v1.administrativo.controllers.c_caixa_item_controller import CCaixaItemController
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema, CaixaItemSearchSchema
# Inicializar o roteaodr para as rotas de produtos
router = APIRouter()
@ -39,6 +34,20 @@ async def save(caixa_item_schema: CaixaItemSchema, current_user : dict = Depends
# Retorna a informação desejada
return response
@router.put('/{caixa_item_id}',
status_code=status.HTTP_201_CREATED,
summary="Atualiza uma nova receita ou despesa no sistema",
response_description="Confirmação de atualização da receita ou despesa, incluindo detalhes do item criado.")
async def save(caixa_item_id : int, caixa_item_schema: CaixaItemSchema, current_user : dict = Depends(get_current_user)):
# Salva o produto desejado
response = cCaixaItemController.update(caixa_item_id, caixa_item_schema)
# Retorna a informação desejada
return response
@router.get('/{caixa_item_id}',
status_code=status.HTTP_200_OK,
summary="Busca um registro em específico",

View file

@ -0,0 +1,108 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.c_caixa_servico_controller import CCaixaServicoController
from packages.v1.administrativo.schemas.c_caixa_servico_schema import (
CCaixaServicoSchema,
CCaixaServicoSaveSchema,
CCaixaServicoUpdateSchema,
CCaixaServicoDescricaoSchema
)
# Inicializa o roteador para as rotas de usuário
router = APIRouter()
# Instânciamento do controller desejado
c_caixa_servico_controller = CCaixaServicoController()
# Lista todos os serviços
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os serviços cadastrados',
response_description='Lista todos os serviços cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = c_caixa_servico_controller.index()
# Retorna os dados localizados
return response
# Localiza um caixa serviço pela descrição
@router.get('/descricao',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pela descrição',
response_description='Busca um registro em especifico')
async def getDescricao(descricao : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
caixa_servico_schema = CCaixaServicoDescricaoSchema(descricao=descricao)
# Busca um usuário especifico pelo e-mail
response = c_caixa_servico_controller.getDescricao(caixa_servico_schema)
# Retorna os dados localizados
return response
# Localiza um serviço pelo ID
@router.get('/{caixa_servico_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pelo ID do serviço',
response_description='Busca um registro em especifico')
async def show(caixa_servico_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
caixa_servico_schema = CCaixaServicoSchema(caixa_servico_id=caixa_servico_id)
# Busca um usuário especifico pelo ID
response = c_caixa_servico_controller.show(caixa_servico_schema)
# Retorna os dados localizados
return response
# Cadastro de caixa servico
@router.post('/',
status_code=status.HTTP_200_OK,
summary='Cadastra um caixa serviço',
response_description='Cadastra um serviço')
async def save(caixa_servico_schema : CCaixaServicoSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro do usuário junto ao banco de dados
response = c_caixa_servico_controller.save(caixa_servico_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de caixa serviço
@router.put('/{caixa_servico_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um caixa serviço',
response_description='Atualiza um serviço')
async def update(caixa_servico_id : int, caixa_servico_schema : CCaixaServicoUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados de usuário
response = c_caixa_servico_controller.update(caixa_servico_id, caixa_servico_schema)
# Retorna os dados localizados
return response
# Exclui um determinado usuário
@router.delete('/{caixa_servico_id}',
status_code=status.HTTP_200_OK,
summary='Remove um caixa serviço',
response_description='Remove um serviço')
async def delete(caixa_servico_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
caixa_servico_schema = CCaixaServicoSchema(caixa_servico_id=caixa_servico_id)
# Efetua a exclusão de um determinado usuário
response = c_caixa_servico_controller.delete(caixa_servico_schema)
# Retorna os dados localizados
return response

View file

@ -1,106 +0,0 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.g_usuario_controller import GUsuarioController
from packages.v1.administrativo.schemas.g_usuario_schema import (
GUsuarioSchema,
GUsuarioLoginSchema
)
# Inicializa o roteador para as rotas de usuário
router = APIRouter()
# Instãnciamento do controller desejado
g_usuario_controller = GUsuarioController()
@router.post('/login',
status_code=status.HTTP_200_OK,
summary='Cria o token de acesso do usuário',
response_description='Retorna o token de acesso do usuário')
async def index(g_usuario_login_schema : GUsuarioLoginSchema):
# Busca todos os usuários cadastrados
response = g_usuario_controller.login(g_usuario_login_schema)
# Retorna os dados localizados
return response
@router.get('/me')
async def me(current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = g_usuario_controller.me(current_user)
# Retorna os dados localizados
return response
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os usuário cadastrados',
response_description='Lista todos os usuário cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = g_usuario_controller.index()
# Retorna os dados localizados
return response
@router.get('/{usuario_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico',
response_description='Busca um registro em especifico')
async def show(usuario_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioSchema(usuario_id=usuario_id)
# Busca todos os usuários cadastrados
response = g_usuario_controller.show(usuario_schema)
# Retorna os dados localizados
return response
@router.post('/',
status_code=status.HTTP_200_OK,
summary='Cadastra um usuário',
response_description='Cadastra um usuário')
async def save(usuario_schema : GUsuarioSchema, current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = g_usuario_controller.save(usuario_schema)
# Retorna os dados localizados
return response
@router.put('/',
status_code=status.HTTP_200_OK,
summary='Atualiza um usuário',
response_description='Atualiza um usuário')
async def update(usuario_schema : GUsuarioSchema, current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = g_usuario_controller.update(usuario_schema)
# Retorna os dados localizados
return response
@router.delete('/{usuario_id}',
status_code=status.HTTP_200_OK,
summary='Remove um usuário',
response_description='Remove um usuário')
async def delete(usuario_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioSchema(usuario_id=usuario_id)
# Busca todos os usuários cadastrados
response = g_usuario_controller.delete(usuario_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,168 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.g_usuario_controller import GUsuarioController
from packages.v1.administrativo.schemas.g_usuario_schema import (
GUsuarioSchema,
GUsuarioAuthenticateSchema,
GUsuarioSaveSchema,
GUsuarioUpdateSchema,
GUsuarioEmailSchema,
GUsuarioCpfSchema,
GUsuarioLoginSchema,
GUsuarioIdSchema
)
# Inicializa o roteador para as rotas de usuário
router = APIRouter()
# Instânciamento do controller desejado
g_usuario_controller = GUsuarioController()
# Autenticação de usuário
@router.post('/authenticate',
status_code=status.HTTP_200_OK,
summary='Cria o token de acesso do usuário',
response_description='Retorna o token de acesso do usuário')
async def index(g_usuario_authenticate_schema : GUsuarioAuthenticateSchema):
# Efetua a autenticação de um usuário junto ao sistema
response = g_usuario_controller.authenticate(g_usuario_authenticate_schema)
# Retorna os dados localizados
return response
# Dados do usuário logado
@router.get('/me',
status_code=status.HTTP_200_OK,
summary='Retorna os dados do usuário que efetuou o login',
response_description='Dados do usuário que efetuou o login' )
async def me(current_user: dict = Depends(get_current_user)):
# Busca os dados do usuário logado
response = g_usuario_controller.me(current_user)
# Retorna os dados localizados
return response
# Lista todos os usuários
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os usuário cadastrados',
response_description='Lista todos os usuário cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os usuários cadastrados
response = g_usuario_controller.index()
# Retorna os dados localizados
return response
# Localiza um usuário pelo email
@router.get('/email',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico por e-mail informado',
response_description='Busca um registro em especifico')
async def getEmail(email : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioEmailSchema(email=email)
# Busca um usuário especifico pelo e-mail
response = g_usuario_controller.getEmail(usuario_schema)
# Retorna os dados localizados
return response
# Localiza um usuário pelo login
@router.get('/login',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico por login informado',
response_description='Busca um registro em especifico')
async def getLogin(login : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioLoginSchema(login=login)
# Busca um usuário especifico pelo e-mail
response = g_usuario_controller.getLogin(usuario_schema)
# Retorna os dados localizados
return response
# Localiza um usuário pelo cpf
@router.get('/cpf',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico por número de CPF',
response_description='Busca um registro em especifico')
async def getCpf(cpf : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioCpfSchema(cpf=cpf)
# Busca um usuário especifico pelo e-mail
response = g_usuario_controller.getCpf(usuario_schema)
# Retorna os dados localizados
return response
# Localiza um usuário pelo ID
@router.get('/{usuario_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pelo ID do usuário',
response_description='Busca um registro em especifico')
async def show(usuario_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioIdSchema(usuario_id=usuario_id)
# Busca um usuário especifico pelo ID
response = g_usuario_controller.show(usuario_schema)
# Retorna os dados localizados
return response
# Cadastro de usuários
@router.post('/',
status_code=status.HTTP_200_OK,
summary='Cadastra um usuário',
response_description='Cadastra um usuário')
async def save(usuario_schema : GUsuarioSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro do usuário junto ao banco de dados
response = g_usuario_controller.save(usuario_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de usuário
@router.put('/{usuario_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um usuário',
response_description='Atualiza um usuário')
async def update(usuario_id : int, usuario_schema : GUsuarioUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados de usuário
response = g_usuario_controller.update(usuario_id, usuario_schema)
# Retorna os dados localizados
return response
# Exclui um determinado usuário
@router.delete('/{usuario_id}',
status_code=status.HTTP_200_OK,
summary='Remove um usuário',
response_description='Remove um usuário')
async def delete(usuario_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
usuario_schema = GUsuarioIdSchema(usuario_id=usuario_id)
# Efetua a exclusão de um determinado usuário
response = g_usuario_controller.delete(usuario_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,108 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.t_tb_andamentoservico_controller import TTbAndamentoservicoController
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import (
TTbAndamentoservicoSchema,
TTbAndamentoservicoSaveSchema,
TTbAndamentoservicoUpdateSchema,
TTbAndamentoservicoIdSchema
)
# Inicializa o roteador para as rotas de andamento de serviço
router = APIRouter()
# Instânciamento do controller desejado
t_tb_andamentoservico_controller = TTbAndamentoservicoController()
# Lista todos os andamentos de serviço
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os andamentos de serviço cadastrados',
response_description='Lista todos os andamentos de serviço cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os andamentos de serviço cadastrados
response = t_tb_andamentoservico_controller.index()
# Retorna os dados localizados
return response
# Localiza um andamento de serviço pela descrição
@router.get('/descricao',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pela descrição',
response_description='Busca um registro em específico')
async def get_by_descricao(descricao : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
andamentoservico_schema = TTbAndamentoservicoSchema(descricao=descricao)
# Busca um andamento de serviço específico pela descrição
response = t_tb_andamentoservico_controller.get_by_descricao(andamentoservico_schema)
# Retorna os dados localizados
return response
# Localiza um andamento de serviço pelo ID
@router.get('/{tb_andamentoservico_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pelo ID do andamento de serviço',
response_description='Busca um registro em específico')
async def show(tb_andamentoservico_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
andamentoservico_schema = TTbAndamentoservicoIdSchema(tb_andamentoservico_id=tb_andamentoservico_id)
# Busca um andamento de serviço específico pelo ID
response = t_tb_andamentoservico_controller.show(andamentoservico_schema)
# Retorna os dados localizados
return response
# Cadastro de andamento de serviço
@router.post('/',
status_code=status.HTTP_201_CREATED,
summary='Cadastra um andamento de serviço',
response_description='Cadastra um andamento de serviço')
async def save(andamentoservico_schema : TTbAndamentoservicoSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro no banco de dados
response = t_tb_andamentoservico_controller.save(andamentoservico_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de um andamento de serviço
@router.put('/{tb_andamentoservico_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um andamento de serviço',
response_description='Atualiza um andamento de serviço')
async def update(tb_andamentoservico_id : int, andamentoservico_schema : TTbAndamentoservicoUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados
response = t_tb_andamentoservico_controller.update(tb_andamentoservico_id, andamentoservico_schema)
# Retorna os dados localizados
return response
# Exclui um determinado andamento de serviço
@router.delete('/{tb_andamentoservico_id}',
status_code=status.HTTP_200_OK,
summary='Remove um andamento de serviço',
response_description='Remove um andamento de serviço')
async def delete(tb_andamentoservico_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
andamentoservico_schema = TTbAndamentoservicoIdSchema(tb_andamentoservico_id=tb_andamentoservico_id)
# Efetua a exclusão do andamento de serviço
response = t_tb_andamentoservico_controller.delete(andamentoservico_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,108 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.t_tb_reconhecimentotipo_controller import TTbReconhecimentotipoController
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import (
TTbReconhecimentotipoSchema,
TTbReconhecimentotipoSaveSchema,
TTbReconhecimentotipoUpdateSchema,
TTbReconhecimentotipoIdSchema
)
# Inicializa o roteador para as rotas do tipo de reconhecimento
router = APIRouter()
# Instânciamento do controller desejado
t_tb_reconhecimentotipo_controller = TTbReconhecimentotipoController()
# Lista todos os tipos de reconhecimento
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os tipos de reconhecimento cadastrados',
response_description='Lista todos os tipos de reconhecimento cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os tipos de reconhecimento cadastrados
response = t_tb_reconhecimentotipo_controller.index()
# Retorna os dados localizados
return response
# Localiza um tipo de reconhecimento pela descrição
@router.get('/descricao',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pela descrição',
response_description='Busca um registro em específico')
async def get_by_descricao(descricao : str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
reconhecimentotipo_schema = TTbReconhecimentotipoSchema(descricao=descricao)
# Busca um tipo de reconhecimento específico pela descrição
response = t_tb_reconhecimentotipo_controller.get_by_descricao(reconhecimentotipo_schema)
# Retorna os dados localizados
return response
# Localiza um tipo de reconhecimento pelo ID
@router.get('/{tb_reconhecimentotipo_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em específico pelo ID do tipo de reconhecimento',
response_description='Busca um registro em específico')
async def show(tb_reconhecimentotipo_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
reconhecimentotipo_schema = TTbReconhecimentotipoIdSchema(tb_reconhecimentotipo_id=tb_reconhecimentotipo_id)
# Busca um tipo de reconhecimento específico pelo ID
response = t_tb_reconhecimentotipo_controller.show(reconhecimentotipo_schema)
# Retorna os dados localizados
return response
# Cadastro de tipo de reconhecimento
@router.post('/',
status_code=status.HTTP_201_CREATED,
summary='Cadastra um tipo de reconhecimento',
response_description='Cadastra um tipo de reconhecimento')
async def save(reconhecimentotipo_schema : TTbReconhecimentotipoSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro no banco de dados
response = t_tb_reconhecimentotipo_controller.save(reconhecimentotipo_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de um tipo de reconhecimento
@router.put('/{tb_reconhecimentotipo_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um tipo de reconhecimento',
response_description='Atualiza um tipo de reconhecimento')
async def update(tb_reconhecimentotipo_id : int, reconhecimentotipo_schema : TTbReconhecimentotipoUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados
response = t_tb_reconhecimentotipo_controller.update(tb_reconhecimentotipo_id, reconhecimentotipo_schema)
# Retorna os dados localizados
return response
# Exclui um determinado tipo de reconhecimento
@router.delete('/{tb_reconhecimentotipo_id}',
status_code=status.HTTP_200_OK,
summary='Remove um tipo de reconhecimento',
response_description='Remove um tipo de reconhecimento')
async def delete(tb_reconhecimentotipo_id : int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
reconhecimentotipo_schema = TTbReconhecimentotipoIdSchema(tb_reconhecimentotipo_id=tb_reconhecimentotipo_id)
# Efetua a exclusão do tipo de reconhecimento
response = t_tb_reconhecimentotipo_controller.delete(reconhecimentotipo_schema)
# Retorna os dados localizados
return response

View file

@ -1,5 +1,4 @@
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.repository import BaseRepository

View file

@ -1,5 +1,4 @@
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSearchSchema
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSearchSchema
from abstracts.repository import BaseRepository

View file

@ -1,6 +1,5 @@
# Importação de bibliotecas
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.repository import BaseRepository

View file

@ -1,5 +1,4 @@
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.repository import BaseRepository

View file

@ -0,0 +1,109 @@
# Importação de bibliotecas
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
def execute(self, caixa_item_id : int, caixa_item_schema : CaixaItemSchema):
try:
updates = []
params = {}
if caixa_item_schema.especie_pagamento is not None:
updates.append("ESPECIE_PAGAMENTO = :especie_pagamento")
params["especie_pagamento"] = caixa_item_schema.especie_pagamento
if caixa_item_schema.caixa_item_id is not None:
updates.append("CAIXA_ITEM_ID = :caixa_item_id")
params["caixa_item_id"] = caixa_item_schema.caixa_item_id
if caixa_item_schema.caixa_servico_id is not None:
updates.append("CAIXA_SERVICO_ID = :caixa_servico_id")
params["caixa_servico_id"] = caixa_item_schema.caixa_servico_id
if caixa_item_schema.usuario_servico_id is not None:
updates.append("USUARIO_SERVICO_ID = :usuario_servico_id")
params["usuario_servico_id"] = caixa_item_schema.usuario_servico_id
if caixa_item_schema.usuario_caixa_id is not None:
updates.append("USUARIO_CAIXA_ID = :usuario_caixa_id")
params["usuario_caixa_id"] = caixa_item_schema.usuario_caixa_id
if caixa_item_schema.descricao is not None:
updates.append("DESCRICAO = :descricao")
params["descricao"] = caixa_item_schema.descricao
if caixa_item_schema.data_pagamento is not None:
updates.append("DATA_PAGAMENTO = :data_pagamento")
params["data_pagamento"] = caixa_item_schema.data_pagamento
if caixa_item_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = caixa_item_schema.situacao
if caixa_item_schema.tipo_documento is not None:
updates.append("TIPO_DOCUMENTO = :tipo_documento")
params["tipo_documento"] = caixa_item_schema.tipo_documento
if caixa_item_schema.tipo_transacao is not None:
updates.append("TIPO_TRANSACAO = :tipo_transacao")
params["tipo_transacao"] = caixa_item_schema.tipo_transacao
if caixa_item_schema.valor_servico is not None:
updates.append("VALOR_SERVICO = :valor_servico")
params["valor_servico"] = caixa_item_schema.valor_servico
if caixa_item_schema.valor_pago is not None:
updates.append("VALOR_PAGO = :valor_pago")
params["valor_pago"] = caixa_item_schema.valor_pago
if caixa_item_schema.observacao is not None:
updates.append("OBSERVACAO = :observacao")
params["observacao"] = caixa_item_schema.observacao
if caixa_item_schema.hora_pagamento is not None:
updates.append("HORA_PAGAMENTO = :hora_pagamento")
params["hora_pagamento"] = caixa_item_schema.hora_pagamento
if caixa_item_schema.tipo_servico is not None:
updates.append("TIPO_SERVICO = :tipo_servico")
params["tipo_servico"] = caixa_item_schema.tipo_servico
if caixa_item_schema.registrado is not None:
updates.append("REGISTRADO = :registrado")
params["registrado"] = caixa_item_schema.registrado
if not updates:
return False
params["caixa_item_id"] = caixa_item_id
sql = f"UPDATE C_CAIXA_ITEM SET {', '.join(updates)} WHERE caixa_item_id = :caixa_item_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result.caixa_item_id:
# Informa que não existe usuário a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum caixa serviço localizado para esta solicitação'
)
# Verifica o resultado da execução
if result:
# Se houver um resultado, a atualização foi bem-sucedida
return result
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar caixa serviço: {e}"
)

View file

@ -0,0 +1,33 @@
from packages.v1.administrativo.schemas.c_caixa_servico_schema import \
CCaixaServicoIdSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
def execute(self, caixa_servico_schema : CCaixaServicoIdSchema):
try:
# Montagem do sql
sql = """ DELETE FROM c_caixa_servico ccs WHERE ccs.caixa_servico_id = :caixa_servico_id """
# Preenchimento de parâmetros
params = {
"caixa_servico_id" : caixa_servico_schema.caixa_servico_id
}
#Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir caixa serviço: {e}"
)

View file

@ -0,0 +1,17 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoDescricaoSchema
class ShowRepository(BaseRepository):
def execute(self, caixa_servico_schema : CCaixaServicoDescricaoSchema):
# Montagem do sql
sql = """ SELECT * FROM C_CAIXA_SERVICO ccs WHERE ccs.descricao = :descricao """
# Preenchimento de parâmetros
params = {
'descricao' : caixa_servico_schema.descricao
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,14 @@
from abstracts.repository import BaseRepository
class IndexRepository(BaseRepository):
def execute(self):
# Montagem do sql
sql = """ SELECT * FROM c_caixa_servico """
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,60 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoSaveSchema
class SaveRepository(BaseRepository):
def execute(self, caixa_servico_schema : CCaixaServicoSaveSchema):
try:
# Montagem do SQL
sql = """ INSERT INTO C_CAIXA_SERVICO(
CAIXA_SERVICO_ID,
TIPO_TRANSACAO,
SISTEMA_ID,
SITUACAO,
INTERNO_SISTEMA,
DESCRICAO,
EMITIR_RELATORIO,
TIPO_CONTA_CARNELEAO,
CENTRO_DE_CUSTA_ID,
REPETIR_DESCRICAO
) VALUES (
:caixa_servico_id,
:tipo_transacao,
:sistema_id,
:situacao,
:interno_sistema,
:descricao,
:emitir_relatorio,
:tipo_conta_carneleao,
:centro_de_custa_id,
:repetir_descricao
) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'caixa_servico_id': caixa_servico_schema.caixa_servico_id,
'tipo_transacao': caixa_servico_schema.tipo_transacao,
'sistema_id': caixa_servico_schema.sistema_id,
'situacao': caixa_servico_schema.situacao,
'interno_sistema': caixa_servico_schema.interno_sistema,
'descricao': caixa_servico_schema.descricao,
'emitir_relatorio': caixa_servico_schema.emitir_relatorio,
'tipo_conta_carneleao': caixa_servico_schema.tipo_conta_carneleao,
'centro_de_custa_id': caixa_servico_schema.centro_de_custa_id,
'repetir_descricao': caixa_servico_schema.repetir_descricao
}
# Excução do sql
return self.run_and_return(sql, params)
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar caixa serviço: {e}"
)

View file

@ -0,0 +1,17 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoIdSchema
class ShowRepository(BaseRepository):
def execute(self, caixa_servico_schema : CCaixaServicoIdSchema):
# Montagem do sql
sql = """ SELECT * FROM C_CAIXA_SERVICO ccs WHERE ccs.caixa_servico_id = :caixa_servico_id """
# Preenchimento de parâmetros
params = {
'caixa_servico_id' : caixa_servico_schema.caixa_servico_id
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,79 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoUpdateSchema
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
def execute(self, caixa_servico_id : int, c_caixa_servico_schema: CCaixaServicoUpdateSchema):
try:
updates = []
params = {}
if c_caixa_servico_schema.tipo_transacao is not None:
updates.append("TIPO_TRANSACAO = :tipo_transacao")
params["tipo_transacao"] = c_caixa_servico_schema.tipo_transacao
if c_caixa_servico_schema.sistema_id is not None:
updates.append("SISTEMA_ID = :sistema_id")
params["sistema_id"] = c_caixa_servico_schema.sistema_id
if c_caixa_servico_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = c_caixa_servico_schema.situacao
if c_caixa_servico_schema.interno_sistema is not None:
updates.append("INTERNO_SISTEMA = :interno_sistema")
params["interno_sistema"] = c_caixa_servico_schema.interno_sistema
if c_caixa_servico_schema.descricao is not None:
updates.append("DESCRICAO = :descricao")
params["descricao"] = c_caixa_servico_schema.descricao
if c_caixa_servico_schema.emitir_relatorio is not None:
updates.append("EMITIR_RELATORIO = :emitir_relatorio")
params["emitir_relatorio"] = c_caixa_servico_schema.emitir_relatorio
if c_caixa_servico_schema.tipo_conta_carneleao is not None:
updates.append("TIPO_CONTA_CARNELEAO = :tipo_conta_carneleao")
params["tipo_conta_carneleao"] = c_caixa_servico_schema.tipo_conta_carneleao
if c_caixa_servico_schema.centro_de_custa_id is not None:
updates.append("CENTRO_DE_CUSTA_ID = :centro_de_custa_id")
params["centro_de_custa_id"] = c_caixa_servico_schema.centro_de_custa_id
if c_caixa_servico_schema.repetir_descricao is not None:
updates.append("REPETIR_DESCRICAO = :repetir_descricao")
params["repetir_descricao"] = c_caixa_servico_schema.repetir_descricao
if not updates:
return False
params["caixa_servico_id"] = caixa_servico_id
sql = f"UPDATE C_CAIXA_SERVICO SET {', '.join(updates)} WHERE caixa_servico_id = :caixa_servico_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result.caixa_servico_id:
# Informa que não existe usuário a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum caixa serviço localizado para esta solicitação'
)
# Verifica o resultado da execução
if result:
# Se houver um resultado, a atualização foi bem-sucedida
return result
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar caixa serviço: {e}"
)

View file

@ -1,22 +0,0 @@
from packages.v1.administrativo.schemas.g_usuario_schema import \
GUsuarioSchema
from abstracts.repository import BaseRepository
class DeleteRepository(BaseRepository):
def execute(self, usuario_schema : GUsuarioSchema):
# Montagem do sql
sql = """ DELETE FROM g_usuario gu WHERE gu.usuario_id = :usuarioId """
# Preenchimento de parâmetros
params = {
"usuarioId" : usuario_schema.usuario_id
}
#Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response

View file

@ -0,0 +1,34 @@
from packages.v1.administrativo.schemas.g_usuario_schema import \
GUsuarioIdSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
def execute(self, usuario_schema : GUsuarioIdSchema):
try:
# Montagem do sql
sql = """ DELETE FROM g_usuario gu WHERE gu.usuario_id = :usuarioId """
# Preenchimento de parâmetros
params = {
"usuarioId" : usuario_schema.usuario_id
}
#Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir usuário: {e}"
)

View file

@ -1,20 +1,20 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioLoginSchema
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioAuthenticateSchema
class GetByLoginRepository(BaseRepository):
class GetByAuthenticateRepository(BaseRepository):
def execute(self, g_usuario_login_schema : GUsuarioLoginSchema):
def execute(self, g_usuario_authenticate_schema : GUsuarioAuthenticateSchema):
# Montagem do sql
sql = """ SELECT FIRST 1 * FROM g_usuario gu WHERE gu.LOGIN LIKE :login"""
# Preenchimento dos parâmetros
params = {
"login" : g_usuario_login_schema.login
"login" : g_usuario_authenticate_schema.login
}
# Execução do sql
response = self.fetch_one(sql, params)
# Retorna os dados localizados
return response
return response

View file

@ -0,0 +1,47 @@
import re
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioCpfSchema, GUsuarioSchema
class GetByUsuarioCpfRepository(BaseRepository):
@staticmethod
def limpar_cpf(cpf: str) -> str:
"""Remove caracteres não numéricos de uma string de CPF."""
return re.sub(r'\D', '', cpf)
@staticmethod
def aplicar_mascara_cpf(cpf: str) -> str:
"""Aplica a máscara no CPF, assumindo que a string contém apenas dígitos."""
if len(cpf) == 11:
return f"{cpf[:3]}.{cpf[3:6]}.{cpf[6:9]}-{cpf[9:]}"
return cpf # Retorna o original se o comprimento for inválido
def execute(self, g_usuario_schema: GUsuarioCpfSchema) -> GUsuarioSchema:
# 1. Limpa o CPF de entrada (remove pontos e traços)
cpf_limpo = self.limpar_cpf(g_usuario_schema.cpf)
# 2. Cria uma versão do CPF com a máscara (para buscar registros com máscara)
cpf_mascarado = self.aplicar_mascara_cpf(cpf_limpo)
# 3. Define a consulta SQL com a lógica OR para buscar ambos os formatos
sql = """
SELECT * FROM g_usuario gu
WHERE gu.cpf = :cpf_limpo
OR gu.cpf = :cpf_mascarado
"""
# 4. Preenche os parâmetros SQL com os dois valores
params = {
'cpf_limpo': cpf_limpo,
'cpf_mascarado': cpf_mascarado
}
# 5. Executa a instrução SQL
result = self.fetch_one(sql, params)
# 6. Retorna o resultado no formato GUsuarioSchema ou None
if result:
return GUsuarioSchema(**result)
return None

View file

@ -0,0 +1,18 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioEmailSchema, GUsuarioSchema
class GetByUsuarioEmailRepository(BaseRepository):
def execute(self, g_usuario_schema = GUsuarioEmailSchema)-> GUsuarioSchema:
# Define a consulta sql
sql = """ SELECT * FROM g_usuario gu WHERE gu.email = :email """
# Preenchimento dos parâmetros SQL
params = {
'email': g_usuario_schema.email
}
# Execução da instrução sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,18 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioLoginSchema, GUsuarioSchema
class GetByUsuarioLoginRepository(BaseRepository):
def execute(self, g_usuario_schema = GUsuarioLoginSchema)-> GUsuarioSchema:
# Define a consulta sql
sql = """ SELECT * FROM g_usuario gu WHERE gu.login = :login """
# Preenchimento dos parâmetros SQL
params = {
'login': g_usuario_schema.login
}
# Execução da instrução sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,57 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSaveSchema
class SaveRepository(BaseRepository):
def execute(self, usuario_schema : GUsuarioSaveSchema):
try:
# Montagem do SQL
sql = """ UPDATE OR INSERT INTO G_USUARIO(
USUARIO_ID,
TROCARSENHA,
LOGIN,
SITUACAO,
NOME_COMPLETO,
FUNCAO,
EMAIL,
CPF,
SENHA_API
) VALUES (
:usuario_id,
:trocarsenha,
:login,
:situacao,
:nome_completo,
:funcao,
:email,
:cpf,
:senha_api
) MATCHING (usuario_id) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'usuario_id': usuario_schema.usuario_id,
'trocarsenha': usuario_schema.trocarsenha,
'login': usuario_schema.login,
'situacao': usuario_schema.situacao,
'nome_completo': usuario_schema.nome_completo,
'funcao': usuario_schema.funcao,
'email': usuario_schema.email,
'cpf': usuario_schema.cpf,
'senha_api': usuario_schema.senha_api
}
# Excução do sql
return self.run_and_return(sql, params)
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar usuário: {e}"
)

View file

@ -0,0 +1,73 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioUpdateSchema
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
def execute(self, usuario_id: int, usuario_schema: GUsuarioUpdateSchema):
try:
updates = []
params = {}
if usuario_schema.trocarsenha is not None:
updates.append("TROCARSENHA = :trocarsenha")
params["trocarsenha"] = usuario_schema.trocarsenha
if usuario_schema.login is not None:
updates.append("LOGIN = :login")
params["login"] = usuario_schema.login
if usuario_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = usuario_schema.situacao
if usuario_schema.nome_completo is not None:
updates.append("NOME_COMPLETO = :nome_completo")
params["nome_completo"] = usuario_schema.nome_completo
if usuario_schema.funcao is not None:
updates.append("FUNCAO = :funcao")
params["funcao"] = usuario_schema.funcao
if usuario_schema.email is not None:
updates.append("EMAIL = :email")
params["email"] = usuario_schema.email
if usuario_schema.cpf is not None:
updates.append("CPF = :cpf")
params["cpf"] = usuario_schema.cpf
if usuario_schema.senha_api is not None:
updates.append("SENHA_API = :senha_api")
params["senha_api"] = usuario_schema.senha_api
if not updates:
return False
params["usuario_id"] = usuario_id
sql = f"UPDATE G_USUARIO SET {', '.join(updates)} WHERE USUARIO_ID = :usuario_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result.usuario_id:
# Informa que não existe usuário a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum usuário localizado para esta solicitação'
)
# Verifica o resultado da execução
if result:
# Se houver um resultado, a atualização foi bem-sucedida
return result
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar usuário: {e}"
)

View file

@ -1,116 +0,0 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_usuario_schema import GUsuarioSchema
class SaveRepository(BaseRepository):
def execute(self, usuario_schema : GUsuarioSchema):
# Montagem do SQL
sql = """ UPDATE OR INSERT INTO G_USUARIO(
USUARIO_ID,
TROCARSENHA,
LOGIN,
SENHA,
SITUACAO,
NOME_COMPLETO,
FUNCAO,
ASSINA,
SIGLA,
USUARIO_TAB,
ULTIMO_LOGIN,
ULTIMO_LOGIN_REGS,
DATA_EXPIRACAO,
SENHA_ANTERIOR,
ANDAMENTO_PADRAO,
LEMBRETE_PERGUNTA,
LEMBRETE_RESPOSTA,
ANDAMENTO_PADRAO2,
RECEBER_MENSAGEM_ARROLAMENTO,
EMAIL,
ASSINA_CERTIDAO,
RECEBER_EMAIL_PENHORA,
FOTO,
NAO_RECEBER_CHAT_TODOS,
PODE_ALTERAR_CAIXA,
RECEBER_CHAT_CERTIDAO_ONLINE,
RECEBER_CHAT_CANCELAMENTO,
CPF,
SOMENTE_LEITURA,
RECEBER_CHAT_ENVIO_ONR,
TIPO_USUARIO,
SENHA_API
) VALUES (
:usuario_id,
:trocarsenha,
:login,
:senha,
:situacao,
:nome_completo,
:funcao,
:assina,
:sigla,
:usuario_tab,
:ultimo_login,
:ultimo_login_regs,
:data_expiracao,
:senha_anterior,
:andamento_padrao,
:lembrete_pergunta,
:lembrete_resposta,
:andamento_padrao2,
:receber_mensagem_arrolamento,
:email,
:assina_certidao,
:receber_email_penhora,
:foto,
:nao_receber_chat_todos,
:pode_alterar_caixa,
:receber_chat_certidao_online,
:receber_chat_cancelamento,
:cpf,
:somente_leitura,
:receber_chat_envio_onr,
:tipo_usuario,
:senha_api
) MATCHING (usuario_id) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'usuario_id': usuario_schema.usuario_id,
'trocarsenha': usuario_schema.trocarsenha,
'login': usuario_schema.login,
'senha': usuario_schema.senha,
'situacao': usuario_schema.situacao,
'nome_completo': usuario_schema.nome_completo,
'funcao': usuario_schema.funcao,
'assina': usuario_schema.assina,
'sigla': usuario_schema.sigla,
'usuario_tab': usuario_schema.usuario_tab,
'ultimo_login': usuario_schema.ultimo_login,
'ultimo_login_regs': usuario_schema.ultimo_login_regs,
'data_expiracao': usuario_schema.data_expiracao,
'senha_anterior': usuario_schema.senha_anterior,
'andamento_padrao': usuario_schema.andamento_padrao,
'lembrete_pergunta': usuario_schema.lembrete_pergunta,
'lembrete_resposta': usuario_schema.lembrete_resposta,
'andamento_padrao2': usuario_schema.andamento_padrao2,
'receber_mensagem_arrolamento': usuario_schema.receber_mensagem_arrolamento,
'email': usuario_schema.email,
'assina_certidao': usuario_schema.assina_certidao,
'receber_email_penhora': usuario_schema.receber_email_penhora,
'foto': usuario_schema.foto,
'nao_receber_chat_todos': usuario_schema.nao_receber_chat_todos,
'pode_alterar_caixa': usuario_schema.pode_alterar_caixa,
'receber_chat_certidao_online': usuario_schema.receber_chat_certidao_online,
'receber_chat_cancelamento': usuario_schema.receber_chat_cancelamento,
'cpf': usuario_schema.cpf,
'somente_leitura': usuario_schema.somente_leitura,
'receber_chat_envio_onr': usuario_schema.receber_chat_envio_onr,
'tipo_usuario': usuario_schema.tipo_usuario,
'senha_api': usuario_schema.senha_api
}
# Excução do sql
return self.run_and_return(sql, params)

View file

@ -0,0 +1,41 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoIdSchema
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
"""
Repositório para a operação de exclusão de um registro na tabela
t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoIdSchema):
"""
Executa a consulta SQL para remover um registro pelo ID.
Args:
andamentoservico_schema (TTbAndamentoservicoIdSchema): O esquema com o ID a ser removido.
Returns:
O resultado da operação de exclusão.
"""
try:
# Montagem do sql
sql = """ DELETE FROM T_TB_ANDAMENTOSERVICO WHERE TB_ANDAMENTOSERVICO_ID = :tb_andamentoservico_id """
# Preenchimento de parâmetros
params = {
"tb_andamentoservico_id": andamentoservico_schema.tb_andamentoservico_id
}
# Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na exclusão
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir andamento de serviço: {e}"
)

View file

@ -0,0 +1,29 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoDescricaoSchema
class GetByDescricaoRepository(BaseRepository):
"""
Repositório para a operação de busca de um registro na tabela
t_tb_andamentoservico por descrição.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoDescricaoSchema):
"""
Executa a consulta SQL para buscar um registro pela descrição.
Args:
andamentoservico_schema (TTbAndamentoservicoDescricaoSchema): O esquema com a descrição a ser buscada.
Returns:
Um dicionário contendo os dados do registro ou None se não for encontrado.
"""
# Montagem do SQL
sql = """ SELECT * FROM T_TB_ANDAMENTOSERVICO WHERE DESCRICAO = :descricao """
# Preenchimento de parâmetros
params = {
'descricao': andamentoservico_schema.descricao
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,23 @@
from abstracts.repository import BaseRepository
class IndexRepository(BaseRepository):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_tb_andamentoservico.
"""
def execute(self):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT * FROM T_TB_ANDAMENTOSERVICO """
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,59 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoSaveSchema
class SaveRepository(BaseRepository):
"""
Repositório para a operação de salvamento de um novo registro na tabela t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoSaveSchema):
"""
Executa a operação de salvamento no banco de dados.
Args:
andamentoservico_schema (TTbAndamentoservicoSaveSchema): O esquema com os dados a serem salvos.
Returns:
O registro recém-criado.
Raises:
HTTPException: Caso ocorra um erro na execução da query.
"""
try:
# Montagem do SQL
sql = """ INSERT INTO T_TB_ANDAMENTOSERVICO(
TB_ANDAMENTOSERVICO_ID,
DESCRICAO,
SITUACAO,
TIPO,
USA_EMAIL
) VALUES (
:tb_andamentoservico_id,
:descricao,
:situacao,
:tipo,
:usa_email
) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'tb_andamentoservico_id': andamentoservico_schema.tb_andamentoservico_id,
'descricao': andamentoservico_schema.descricao,
'situacao': andamentoservico_schema.situacao,
'tipo': andamentoservico_schema.tipo,
'usa_email': andamentoservico_schema.usa_email
}
# Execução do sql
return self.run_and_return(sql, params)
except Exception as e:
# Informa que houve uma falha no salvamento do registro
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar andamento de serviço: {e}"
)

View file

@ -0,0 +1,46 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoIdSchema
from fastapi import HTTPException, status
class ShowRepository(BaseRepository):
"""
Repositório para a operação de exibição de um registro na tabela t_tb_andamentoservico.
"""
def execute(self, andamentoservico_schema: TTbAndamentoservicoIdSchema):
"""
Busca um andamento de serviço específico pelo ID.
Args:
andamentoservico_schema (TTbAndamentoservicoIdSchema): O esquema que contém o ID do registro.
Returns:
O registro encontrado.
Raises:
HTTPException: Caso ocorra um erro na execução da query ou o registro não seja encontrado.
"""
try:
# Montagem do SQL
sql = "SELECT * FROM T_TB_ANDAMENTOSERVICO WHERE TB_ANDAMENTOSERVICO_ID = :tb_andamentoservico_id"
# Preenchimento de parâmetros
params = {
'tb_andamentoservico_id': andamentoservico_schema.tb_andamentoservico_id
}
# Execução do SQL
result = self.fetch_one(sql, params)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Registro não encontrado"
)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Erro ao buscar registro: {str(e)}"
)

View file

@ -0,0 +1,67 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_andamentoservico_schema import TTbAndamentoservicoUpdateSchema
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
"""
Repositório para a operação de atualização na tabela T_TB_ANDAMENTOSERVICO.
"""
def execute(self, tb_andamentoservico_id: int, andamentoservico_schema: TTbAndamentoservicoUpdateSchema):
"""
Executa a atualização de um registro na tabela.
Args:
andamentoservico_schema (TTbAndamentoservicoUpdateSchema): O esquema com os dados a serem atualizados.
Returns:
O registro atualizado.
Raises:
HTTPException: Se o registro não for encontrado ou ocorrer um erro na atualização.
"""
try:
updates = []
params = {}
if andamentoservico_schema.descricao is not None:
updates.append("DESCRICAO = :descricao")
params["descricao"] = andamentoservico_schema.descricao
if andamentoservico_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = andamentoservico_schema.situacao
if andamentoservico_schema.tipo is not None:
updates.append("TIPO = :tipo")
params["tipo"] = andamentoservico_schema.tipo
if andamentoservico_schema.usa_email is not None:
updates.append("USA_EMAIL = :usa_email")
params["usa_email"] = andamentoservico_schema.usa_email
if not updates:
return False
params["tb_andamentoservico_id"] = tb_andamentoservico_id
sql = f"UPDATE T_TB_ANDAMENTOSERVICO SET {', '.join(updates)} WHERE tb_andamentoservico_id = :tb_andamentoservico_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result:
# Informa que não existe o registro a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum andamento de serviço localizado para esta solicitação'
)
# Se houver um resultado, a atualização foi bem-sucedida
return result
except Exception as e:
# Informa que houve uma falha na atualização
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar o andamento de serviço: {e}"
)

View file

@ -0,0 +1,41 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoIdSchema
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
"""
Repositório para a operação de exclusão de um registro na tabela
t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoIdSchema):
"""
Executa a consulta SQL para remover um registro pelo ID.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoIdSchema): O esquema com o ID a ser removido.
Returns:
O resultado da operação de exclusão.
"""
try:
# Montagem do sql
sql = """ DELETE FROM T_TB_RECONHECIMENTOTIPO WHERE TB_RECONHECIMENTOTIPO_ID = :tb_reconhecimentotipo_id """
# Preenchimento de parâmetros
params = {
"tb_reconhecimentotipo_id": reconhecimentotipo_schema.tb_reconhecimentotipo_id
}
# Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na exclusão
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir tipo de reconhecimento: {e}"
)

View file

@ -0,0 +1,29 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoDescricaoSchema
class GetByDescricaoRepository(BaseRepository):
"""
Repositório para a operação de busca de um registro na tabela
t_tb_reconhecimentotipo por descrição.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoDescricaoSchema):
"""
Executa a consulta SQL para buscar um registro pela descrição.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoDescricaoSchema): O esquema com a descrição a ser buscada.
Returns:
Um dicionário contendo os dados do registro ou None se não for encontrado.
"""
# Montagem do SQL
sql = """ SELECT * FROM T_TB_RECONHECIMENTOTIPO WHERE DESCRICAO = :descricao """
# Preenchimento de parâmetros
params = {
'descricao': reconhecimentotipo_schema.descricao
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,23 @@
from abstracts.repository import BaseRepository
class IndexRepository(BaseRepository):
"""
Repositório para a operação de listagem de todos os registros
na tabela t_tb_reconhecimentotipo.
"""
def execute(self):
"""
Executa a consulta SQL para buscar todos os registros.
Returns:
Uma lista de dicionários contendo os dados dos registros.
"""
# Montagem do SQL
sql = """ SELECT * FROM T_TB_RECONHECIMENTOTIPO """
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,53 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoSaveSchema
class SaveRepository(BaseRepository):
"""
Repositório para a operação de salvamento de um novo registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoSaveSchema):
"""
Executa a operação de salvamento no banco de dados.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoSaveSchema): O esquema com os dados a serem salvos.
Returns:
O registro recém-criado.
Raises:
HTTPException: Caso ocorra um erro na execução da query.
"""
try:
# Montagem do SQL
sql = """ INSERT INTO T_TB_RECONHECIMENTOTIPO(
TB_RECONHECIMENTOTIPO_ID,
DESCRICAO,
SITUACAO
) VALUES (
:tb_reconhecimentotipo_id,
:descricao,
:situacao
) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'tb_reconhecimentotipo_id': reconhecimentotipo_schema.tb_reconhecimentotipo_id,
'descricao': reconhecimentotipo_schema.descricao,
'situacao': reconhecimentotipo_schema.situacao
}
# Execução do sql
return self.run_and_return(sql, params)
except Exception as e:
# Informa que houve uma falha no salvamento do registro
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar registro: {e}"
)

View file

@ -0,0 +1,46 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoIdSchema
from fastapi import HTTPException, status
class ShowRepository(BaseRepository):
"""
Repositório para a operação de exibição de um registro na tabela t_tb_reconhecimentotipo.
"""
def execute(self, reconhecimentotipo_schema: TTbReconhecimentotipoIdSchema):
"""
Busca um tipo de reconhecimento específico pelo ID.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoIdSchema): O esquema que contém o ID do registro.
Returns:
O registro encontrado ou None se não existir.
Raises:
HTTPException: Caso ocorra um erro na execução da query.
"""
try:
# Montagem do SQL
sql = "SELECT * FROM T_TB_RECONHECIMENTOTIPO WHERE TB_RECONHECIMENTOTIPO_ID = :tb_reconhecimentotipo_id"
# Preenchimento de parâmetros
params = {
'tb_reconhecimentotipo_id': reconhecimentotipo_schema.tb_reconhecimentotipo_id
}
# Execução do SQL
result = self.fetch_one(sql, params)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Registro não encontrado"
)
return result
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Erro ao buscar registro: {str(e)}"
)

View file

@ -0,0 +1,60 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.t_tb_reconhecimentotipo_schema import TTbReconhecimentotipoUpdateSchema
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
"""
Repositório para a operação de atualização na tabela T_TB_RECONHECIMENTOTIPO.
"""
def execute(self, tb_reconhecimentotipo_id : int, reconhecimentotipo_schema: TTbReconhecimentotipoUpdateSchema):
"""
Executa a atualização de um registro na tabela.
Args:
reconhecimentotipo_schema (T_TbReconhecimentotipoUpdateSchema): O esquema com os dados a serem atualizados.
Returns:
O registro atualizado.
Raises:
HTTPException: Se o registro não for encontrado ou ocorrer um erro na atualização.
"""
try:
updates = []
params = {}
if reconhecimentotipo_schema.descricao is not None:
updates.append("DESCRICAO = :descricao")
params["descricao"] = reconhecimentotipo_schema.descricao
if reconhecimentotipo_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = reconhecimentotipo_schema.situacao
if not updates:
return False
params["tb_reconhecimentotipo_id"] = tb_reconhecimentotipo_id
sql = f"UPDATE T_TB_RECONHECIMENTOTIPO SET {', '.join(updates)} WHERE tb_reconhecimentotipo_id = :tb_reconhecimentotipo_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result.tb_reconhecimentotipo_id:
# Informa que não existe o registro a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum tipo de reconhecimento localizado para esta solicitação'
)
# Se houver um resultado, a atualização foi bem-sucedida
if result:
return result
except Exception as e:
# Informa que houve uma falha na atualização
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar o tipo de reconhecimento: {e}"
)

View file

@ -1,9 +1,7 @@
from datetime import date, datetime
from typing import Optional
from pydantic import BaseModel
class CaixaItemSchema(BaseModel):
especie_pagamento: Optional[str] = None
caixa_item_id: Optional[int] = None

View file

@ -0,0 +1,236 @@
from pydantic import BaseModel, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class CCaixaServicoSchema(BaseModel):
interno_sistema: Optional[str] = None
caixa_servico_id: int
descricao: Optional[str] = None
situacao: Optional[str] = None
tipo_transacao: Optional[str] = None
sistema_id: Optional[int] = None
selo_grupo_id: Optional[int] = None
emitir_relatorio: Optional[str] = None
repasse: Optional[str] = None
repetir_descricao: Optional[str] = None
codigo_conta: Optional[int] = None
tipo_conta_carneleao: Optional[str] = None
centro_de_custa_id: Optional[int] = None
devolucao_juizo: Optional[str] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para criação de novo servico (POST)
# ----------------------------------------------------
class CCaixaServicoSaveSchema(BaseModel):
caixa_servico_id: Optional[int] = None
tipo_transacao: Optional[str] = None
sistema_id: Optional[int] = None
situacao: Optional[str] = None
interno_sistema: Optional[str] = None
descricao: Optional[str] = None
emitir_relatorio: Optional[str] = None
tipo_conta_carneleao: Optional[str] = None
centro_de_custa_id: Optional[int] = None
repetir_descricao: Optional[str] = None
# Sanitiza os inputs enviados
@field_validator('tipo_transacao',
'situacao',
'interno_sistema',
'descricao',
'emitir_relatorio',
'tipo_conta_carneleao',
'repetir_descricao')
def validate_required_fields(cls, v):
# Se for string e composta só por dígitos
if v is None or (isinstance(v, str) and len(v.strip()) == 0):
# Não lança exceção aqui, apenas retorna para o model_validator
pass
return Text.sanitize_input(v)
# Validador para `sistema_id` e outros campos numéricos, se necessário
@field_validator('sistema_id')
def validate_sistema_id(cls, v):
if v is not None and v <= 0:
raise ValueError("O sistema precisa ser informado.")
return v
# Verifica se os campos foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
# Variavel responsavel em armaezar os erros
errors = []
# Validação do tipo_transacao
if not self.tipo_transacao or len(self.tipo_transacao.strip()) == 0:
errors.append({'input': 'tipo_transacao', 'message': 'O tipo transação é obrigatório.'})
# Validação da sistema_id
if not self.sistema_id or self.sistema_id == 0:
errors.append({'input': 'sistema_id', 'message': 'O sistema precisa ser informado.'})
# Validação do situacao
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatório.'})
# Validação da interno_sistema
if not self.interno_sistema or len(self.interno_sistema.strip()) == 0:
errors.append({'input': 'interno_sistema', 'message': 'O uso interno é obrigatória.'})
# Validação do descricao
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatório.'})
# Validação do emitir_relatorio
if not self.emitir_relatorio or len(self.emitir_relatorio.strip()) == 0:
errors.append({'input': 'emitir_relatorio', 'message': 'Sair nos relatórios é obrigatório.'})
# Validação do repetir_descricao
if not self.repetir_descricao or len(self.repetir_descricao.strip()) == 0:
errors.append({'input': 'repetir_descricao', 'message': 'Repetir descrição de serviço no caixa é obrigatório.'})
# Se houver errors, lança uma única exceção
if errors:
# Lança uma exceção do FastAPI para um tratamento limpo
# O `detail` da exceção será a lista de errors que criamos
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar caixa serviço (PUT)
# ----------------------------------------------------
class CCaixaServicoUpdateSchema(BaseModel):
tipo_transacao: Optional[str] = None
sistema_id: Optional[int] = None
situacao: Optional[str] = None
interno_sistema: Optional[str] = None
descricao: Optional[str] = None
emitir_relatorio: Optional[str] = None
tipo_conta_carneleao: Optional[str] = None
centro_de_custa_id: Optional[int] = None
repetir_descricao: Optional[str] = None
# Sanitiza os inputs enviados
@field_validator('tipo_transacao',
'situacao',
'interno_sistema',
'descricao',
'emitir_relatorio',
'tipo_conta_carneleao',
'repetir_descricao')
def validate_required_fields(cls, v):
if not v or len(v.strip()) == 0:
# Não lança exceção aqui. Apenas retorna para o model_validator.
# O Pydantic já faria a checagem básica.
# Este validador individual é mais para sanitização.
pass
return Text.sanitize_input(v)
# Validador para `sistema_id` e outros campos numéricos, se necessário
@field_validator('sistema_id')
def validate_sistema_id(cls, v):
if v is not None and v <= 0:
raise ValueError("O sistema precisa ser informado.")
return v
# Verifica se os campos foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
# Variavel responsavel em armaezar os erros
errors = []
# Validação do tipo_transacao
if not self.tipo_transacao or len(self.tipo_transacao.strip()) == 0:
errors.append({'input': 'tipo_transacao', 'message': 'O tipo transação é obrigatório.'})
# Validação da sistema_id
if not self.sistema_id or self.sistema_id == 0:
errors.append({'input': 'sistema_id', 'message': 'O sistema precisa ser informado.'})
# Validação do situacao
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatório.'})
# Validação da interno_sistema
if not self.interno_sistema or len(self.interno_sistema.strip()) == 0:
errors.append({'input': 'interno_sistema', 'message': 'O uso interno é obrigatória.'})
# Validação do descricao
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatório.'})
# Validação do emitir_relatorio
if not self.emitir_relatorio or len(self.emitir_relatorio.strip()) == 0:
errors.append({'input': 'emitir_relatorio', 'message': 'Sair nos relatórios é obrigatório.'})
# Validação do repetir_descricao
if not self.repetir_descricao or len(self.repetir_descricao.strip()) == 0:
errors.append({'input': 'repetir_descricao', 'message': 'Repetir descrição de serviço no caixa é obrigatório.'})
# Se houver errors, lança uma única exceção
if errors:
# Lança uma exceção do FastAPI para um tratamento limpo
# O `detail` da exceção será a lista de errors que criamos
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para localizar um caixa serviço especifico pelo ID (GET)
# ----------------------------------------------------
class CCaixaServicoIdSchema(BaseModel):
caixa_servico_id: int
# ----------------------------------------------------
# Schema para localizar um caixa serviço pela descrição (GET)
# ----------------------------------------------------
class CCaixaServicoDescricaoSchema(BaseModel):
# Campos utilizados
descricao: str # senha_api obrigatório
# Validação e sanitização do login
@field_validator('descricao')
def validar_e_sanitizar_descricao(cls, v):
# Verifica se a descrição foi informada
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe a descrição'
)
# Sanitiza a decrição para evitar XSS e SQL Injection
return Text.sanitize_input(v)
class Config:
from_attributes = True

View file

@ -1,11 +1,27 @@
from pydantic import BaseModel, EmailStr, constr
from pydantic import BaseModel, EmailStr, constr, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
from datetime import datetime
# Funções utilitárias para segurança (hash e verificação de senha)
from actions.security.security import Security
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# Funções para validar E-mail
from actions.validations.email import Email
# Funções para validar cpf
from actions.validations.cpf import CPF
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class GUsuarioSchema(BaseModel):
usuario_id: Optional[int] = None
trocarsenha: Optional[str] = None
login: Optional[str] = None
login: str
senha: Optional[str] = None
situacao: Optional[str] = None
nome_completo: Optional[str] = None
@ -34,14 +50,311 @@ class GUsuarioSchema(BaseModel):
somente_leitura: Optional[str] = None
receber_chat_envio_onr: Optional[str] = None
tipo_usuario: Optional[str]= None
senha_api: Optional[str]= None
senha_api: str
class Config:
from_attributes = True
class GUsuarioLoginSchema(BaseModel):
login: Optional[str] = None
senha_api: Optional[str] = None
class GUsuarioMe(BaseModel):
# ----------------------------------------------------
# Schema para acesso ao sistema
# ----------------------------------------------------
class GUsuarioAuthenticateSchema(BaseModel):
# Campos utilizados
login: str # login obrigatório
senha_api: str # senha_api obrigatório
# Validação e sanitização do login
@field_validator('login')
def validar_e_sanitizar_login(cls, v):
# Verifica se o login foi informado
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe o login'
)
# Sanitiza o login para evitar XSS e SQL Injection
return Text.sanitize_input(v)
# Validação e sanitização da senha
@field_validator('senha_api')
def validar_e_sanitizar_senha(cls, v):
# Verifica se a senha foi informada
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe a senha'
)
# Sanitiza a senha para evitar XSS e SQL Injection
return Text.sanitize_input(v)
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um usuário especifico pelo ID (GET)
# ----------------------------------------------------
class GUsuarioIdSchema(BaseModel):
usuario_id: int
# ----------------------------------------------------
# Schema para criação de novo usuário (POST)
# ----------------------------------------------------
class GUsuarioSaveSchema(BaseModel):
trocarsenha: Optional[str] = None
login: str
situacao: str
nome_completo: str
funcao: str
email: EmailStr
cpf: str
senha_api: str
# Sanitiza os inputs enviados
@field_validator('login', 'email', 'cpf', 'situacao', 'nome_completo', 'funcao')
def validate_required_fields(cls, v):
if not v or len(v.strip()) == 0:
# Não lança exceção aqui. Apenas retorna para o model_validator.
# O Pydantic já faria a checagem básica.
# Este validador individual é mais para sanitização.
pass
return Text.sanitize_input(v)
# Verifica se os campos foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
# Variavel responsavel em armaezar os erros
errors = []
# Validação do login
if not self.login or len(self.login.strip()) == 0:
errors.append({'input': 'login', 'message': 'O login é obrigatório.'})
# Validação da situação
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
# Validação do nome_completo
if not self.nome_completo or len(self.nome_completo.strip()) == 0:
errors.append({'input': 'nome_completo', 'message': 'O nome completo é obrigatório.'})
# Validação da função
if not self.funcao or len(self.funcao.strip()) == 0:
errors.append({'input': 'funcao', 'message': 'A função é obrigatória.'})
# Validação do email
if not self.email or len(self.email.strip()) == 0:
errors.append({'input': 'email', 'message': 'O e-mail é obrigatório.'})
# Validação do cpf
if not self.cpf or len(self.cpf.strip()) == 0:
errors.append({'input': 'cpf', 'message': 'O CPF é obrigatório.'})
# Validação da senha
if not self.senha_api or len(self.senha_api.strip()) == 0:
errors.append({'input': 'senha_api', 'message': 'A senha é obrigatória.'})
# Criptografa a senha
self.senha_api = Security.hash_senha_api(self.senha_api)
# Se houver errors, lança uma única exceção
if errors:
# Lança uma exceção do FastAPI para um tratamento limpo
# O `detail` da exceção será a lista de errors que criamos
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar usuário (PUT)
# ----------------------------------------------------
class GUsuarioUpdateSchema(BaseModel):
trocarsenha: Optional[str] = None
login: Optional[str] = None
situacao: Optional[str] = None
nome_completo: Optional[str] = None
funcao: Optional[str] = None
email: Optional[str] = None
cpf: Optional[str] = None
senha_api: Optional[str] = None
# Sanitiza os inputs enviados
@field_validator('login', 'email', 'cpf', 'situacao', 'nome_completo', 'funcao')
def validate_required_fields(cls, v):
if not v or len(v.strip()) == 0:
# Não lança exceção aqui. Apenas retorna para o model_validator.
# O Pydantic já faria a checagem básica.
# Este validador individual é mais para sanitização.
pass
return Text.sanitize_input(v)
# Verifica se os campos foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
# Variavel responsavel em armaezar os erros
errors = []
# Validação do login
if not self.login or len(self.login.strip()) == 0:
errors.append({'input': 'login', 'message': 'O login é obrigatório.'})
# Validação da situação
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
# Validação do nome_completo
if not self.nome_completo or len(self.nome_completo.strip()) == 0:
errors.append({'input': 'nome_completo', 'message': 'O nome completo é obrigatório.'})
# Validação da função
if not self.funcao or len(self.funcao.strip()) == 0:
errors.append({'input': 'funcao', 'message': 'A função é obrigatória.'})
# Validação do email
if not self.email or len(self.email.strip()) == 0:
errors.append({'input': 'email', 'message': 'O e-mail é obrigatório.'})
# Validação do cpf
if not self.cpf or len(self.cpf.strip()) == 0:
errors.append({'input': 'cpf', 'message': 'O CPF é obrigatório.'})
# Validação da senha
if not self.senha_api or len(self.senha_api.strip()) == 0:
errors.append({'input': 'senha_api', 'message': 'A senha é obrigatória.'})
# Se houver errors, lança uma única exceção
if errors:
# Lança uma exceção do FastAPI para um tratamento limpo
# O `detail` da exceção será a lista de errors que criamos
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para localizar usuário pelo e-mail
# ----------------------------------------------------
class GUsuarioEmailSchema(BaseModel):
# Use EmailStr para garantir a validação automática do formato
email: Optional[EmailStr] = None
# Sanitiza o input
@field_validator('email')
def sanitize_email(cls, v):
# A sanitização é feita apenas se o valor não for None ou vazio
if v:
return Text.sanitize_input(v)
return v
# Verifica se o e-mail é válido
@field_validator('email')
def check_email(cls, v):
# A verificação é feita apenas se o valor não for None ou vazio
if not Email.is_valid_email(v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um e-mail válido'
)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_email(self):
if not self.email or len(self.email.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um e-mail'
)
return self
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar usuário pelo CPF
# ----------------------------------------------------
class GUsuarioCpfSchema(BaseModel):
# Use EmailStr para garantir a validação automática do formato
cpf: Optional[str] = None
# Sanitiza o input
@field_validator('cpf')
def sanitize_cpf(cls, v):
# A sanitização é feita apenas se o valor não for None ou vazio
if v:
return Text.sanitize_input(v)
return v
# Verifica se o e-mail é válido
@field_validator('cpf')
def check_email(cls, v):
# A verificação é feita apenas se o valor não for None ou vazio
if not CPF.is_valid_cpf(v):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um CPF válido'
)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_cpf(self):
if not self.cpf or len(self.cpf.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um CPF'
)
return self
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar usuário pelo Login
# ----------------------------------------------------
class GUsuarioLoginSchema(BaseModel):
# Use EmailStr para garantir a validação automática do formato
login: Optional[str] = None
# Sanitiza o input
@field_validator('login')
def sanitize_login(cls, v):
# A sanitização é feita apenas se o valor não for None ou vazio
if v:
return Text.sanitize_input(v)
return v
# Valida se o campo não está vazio
@model_validator(mode='after')
def validate_login(self):
if not self.login or len(self.login.strip()) == 0:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Informe um Login'
)
return self
class Config:
from_attributes = True

View file

@ -0,0 +1,126 @@
from pydantic import BaseModel, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
# Supondo que a classe Text esteja disponível e com o método sanitize_input
try:
from actions.validations.text import Text
except ImportError:
class Text:
@staticmethod
def sanitize_input(value: str) -> str:
return value.strip()
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class TTbAndamentoservicoSchema(BaseModel):
tb_andamentoservico_id: Optional[int] = None
descricao: Optional[str] = None
situacao: Optional[str] = None
tipo: Optional[str] = None
usa_email: Optional[str] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um tipo especifico pelo ID (GET)
# ----------------------------------------------------
class TTbAndamentoservicoIdSchema(BaseModel):
tb_andamentoservico_id: int
# ----------------------------------------------------
# Schema para localizar um tipo especifico pela descrição (GET)
# ----------------------------------------------------
class TTbAndamentoservicoDescricaoSchema(BaseModel):
descricao: str
# ----------------------------------------------------
# Schema para criação de novo tipo (POST)
# ----------------------------------------------------
class TTbAndamentoservicoSaveSchema(BaseModel):
tb_andamentoservico_id: Optional[int] = None
descricao: str
situacao: str
tipo: str
usa_email: str
# Sanitiza os inputs enviados
@field_validator('descricao', 'situacao', 'tipo', 'usa_email')
def sanitize_fields(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
errors = []
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatória.'})
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
if not self.tipo or len(self.tipo.strip()) == 0:
errors.append({'input': 'tipo', 'message': 'O tipo é obrigatório.'})
if not self.usa_email or len(self.usa_email.strip()) == 0:
errors.append({'input': 'usa_email', 'message': 'O campo usa_email é obrigatório.'})
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar tipo (PUT)
# ----------------------------------------------------
class TTbAndamentoservicoUpdateSchema(BaseModel):
descricao: Optional[str] = None
situacao: Optional[str] = None
tipo: Optional[str] = None
usa_email: Optional[str] = None
# Sanitiza os inputs enviados
@field_validator('descricao', 'situacao', 'tipo', 'usa_email')
def sanitize_fields(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
errors = []
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatória.'})
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
if not self.tipo or len(self.tipo.strip()) == 0:
errors.append({'input': 'tipo', 'message': 'O tipo é obrigatório.'})
if not self.usa_email or len(self.usa_email.strip()) == 0:
errors.append({'input': 'usa_email', 'message': 'O campo usa_email é obrigatório.'})
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self

View file

@ -0,0 +1,109 @@
from pydantic import BaseModel, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
# Supondo que a classe Text esteja disponível e com o método sanitize_input
try:
from actions.validations.text import Text
except ImportError:
class Text:
@staticmethod
def sanitize_input(value: str) -> str:
return value.strip()
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class TTbReconhecimentotipoSchema(BaseModel):
tb_reconhecimentotipo_id: Optional[int] = None
descricao: Optional[str] = None
situacao: Optional[str] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um tipo especifico pelo ID (GET)
# ----------------------------------------------------
class TTbReconhecimentotipoIdSchema(BaseModel):
tb_reconhecimentotipo_id: int
# ----------------------------------------------------
# Schema para localizar um tipo especifico pela descrição (GET)
# ----------------------------------------------------
class TTbReconhecimentotipoDescricaoSchema(BaseModel):
descricao: str
# ----------------------------------------------------
# Schema para criação de novo tipo (POST)
# ----------------------------------------------------
class TTbReconhecimentotipoSaveSchema(BaseModel):
tb_reconhecimentotipo_id: Optional[int] = None
descricao: str
situacao: str
# Sanitiza os inputs enviados
@field_validator('descricao', 'situacao')
def sanitize_fields(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
errors = []
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatória.'})
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualizar tipo (PUT)
# ----------------------------------------------------
class TTbReconhecimentotipoUpdateSchema(BaseModel):
descricao: Optional[str] = None
situacao: Optional[str] = None
# Sanitiza os inputs enviados
@field_validator('descricao', 'situacao')
def sanitize_fields(cls, v):
if v:
return Text.sanitize_input(v)
return v
# Verifica se os campos obrigatórios foram enviados
@model_validator(mode='after')
def validate_all_fields(self):
errors = []
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatória.'})
if not self.situacao or len(self.situacao.strip()) == 0:
errors.append({'input': 'situacao', 'message': 'A situação é obrigatória.'})
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self

View file

@ -1,11 +1,7 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.actions.c_caixa_item.delete_action import \
DeleteAction
from packages.v1.administrativo.actions.c_caixa_item.show_action import \
ShowAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_delete_action import DeleteAction
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_show_action import ShowAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
class DeleteService:

View file

@ -1,7 +1,5 @@
from packages.v1.administrativo.actions.c_caixa_item.index_action import \
IndexAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSearchSchema
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_index_action import IndexAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSearchSchema
class IndexService:

View file

@ -1,11 +1,7 @@
from packages.v1.administrativo.actions.c_caixa_item.save_action import \
SaveAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_save_action import SaveAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from packages.v1.sequencia.services.g_sequencia.generate_service import \
GenerateService
from packages.v1.sequencia.services.g_sequencia.generate_service import GenerateService
class SaveService:

View file

@ -1,10 +1,6 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.actions.c_caixa_item.show_action import \
ShowAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import \
CaixaItemSchema
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_show_action import ShowAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
class ShowService:

View file

@ -0,0 +1,13 @@
from packages.v1.administrativo.actions.c_caixa_item.c_caixa_item_update_action import UpdateAction
from packages.v1.administrativo.schemas.c_caixa_item_schema import CaixaItemSchema
class UpdateService:
def execute(self, caixa_item_id : int, caixa_item_schema: CaixaItemSchema):
# Instânciamento de ações
updateAction = UpdateAction()
# Retorna todos produtos desejados
return updateAction.execute(caixa_item_id, caixa_item_schema)

View file

@ -0,0 +1,15 @@
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoIdSchema
from packages.v1.administrativo.actions.c_caixa_servico.c_caixa_servico_delete_action import DeleteAction
class DeleteService:
def execute(self, caixa_servico_schema: CCaixaServicoIdSchema):
# Instânciamento de ação
delete_action = DeleteAction()
# Executa a ação em questão
data = delete_action.execute(caixa_servico_schema)
# Retorno da informação
return data

View file

@ -0,0 +1,26 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoDescricaoSchema
from packages.v1.administrativo.actions.c_caixa_servico.c_caixa_servico_get_by_descricao_action import ShowAction
class GetDescricaoService:
def execute(self, caixa_servico_schema: CCaixaServicoDescricaoSchema, messageValidate: bool):
# Instânciamento de ação
show_action = ShowAction()
# Executa a ação em questão
data = show_action.execute(caixa_servico_schema)
if messageValidate:
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o registro'
)
# Retorno da informação
return data

View file

@ -0,0 +1,24 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.c_caixa_servico_schema import CCaixaServicoSchema
from packages.v1.administrativo.actions.c_caixa_servico.c_caixa_servico_index_action import IndexAction
class IndexService:
def execute(self):
# Instânciamento de acções
index_action = IndexAction()
# Executa a busca de todas as ações
data = index_action.execute()
# Verifica se foi loalizado registros
if not data:
# Retorna uma exeção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar os usuários'
)
# Retorna as informações localizadas
return data

Some files were not shown because too many files have changed in this diff Show more