[MVPTN-4] feat(CrudRegimeComunhao): Crud completo regime de comunhão

This commit is contained in:
Kenio 2025-09-15 15:58:27 -03:00
parent a3ec0f0106
commit 9d24f7e315
22 changed files with 870 additions and 0 deletions

View file

@ -0,0 +1,26 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoIdSchema
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_delete_repository import DeleteRepository
class DeleteAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de exclusão de um registro na tabela g_tb_regimecomunhao.
"""
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
"""
Executa a operação de exclusão no banco de dados.
Args:
regimecomunhao_schema (GTbRegimecomunhaoIdSchema): O esquema com o ID a ser excluído.
Returns:
O resultado da operação de exclusão.
"""
# Instanciamento do repositório
delete_repository = DeleteRepository()
# Execução do repositório
return delete_repository.execute(regimecomunhao_schema)

View file

@ -0,0 +1,29 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoDescricaoSchema
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_get_by_descricao_repository import GetByDescricaoRepository
class GetByDescricaoAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de busca de um registro na tabela g_tb_regimecomunhao por descrição.
"""
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoDescricaoSchema):
"""
Executa a operação de busca no banco de dados.
Args:
regimecomunhao_schema (GTbRegimecomunhaoDescricaoSchema): O esquema com a descrição a ser buscada.
Returns:
O registro encontrado ou None.
"""
# Instanciamento do repositório
show_repository = GetByDescricaoRepository()
# Execução do repositório
response = show_repository.execute(regimecomunhao_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,25 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_index_repository import IndexRepository
class IndexAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de listagem de todos os registros na tabela g_tb_regimecomunhao.
"""
def execute(self):
"""
Executa a operação de listagem no banco de dados.
Returns:
A lista de todos os registros.
"""
# Instanciamento do repositório
index_repository = IndexRepository()
# Execução do repositório
response = index_repository.execute()
# Retorno da informação
return response

View file

@ -0,0 +1,29 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoSaveSchema
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_save_repository import SaveRepository
class SaveAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a operação
de salvar um novo registro na tabela g_tb_regimecomunhao.
"""
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoSaveSchema):
"""
Executa a operação de salvamento.
Args:
regimecomunhao_schema (GTbRegimecomunhaoSaveSchema): O esquema com os dados a serem salvos.
Returns:
O resultado da operação de salvamento.
"""
# Instânciamento do repositório
save_repository = SaveRepository()
# Execução do repositório
response = save_repository.execute(regimecomunhao_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,29 @@
from abstracts.action import BaseAction
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoIdSchema
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_show_repository import ShowRepository
class ShowAction(BaseAction):
"""
Serviço responsável por encapsular a lógica de negócio para a exibição
de um registro na tabela g_tb_regimecomunhao.
"""
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
"""
Executa a operação de exibição.
Args:
regimecomunhao_schema (GTbRegimecomunhaoIdSchema): O esquema com o ID do registro a ser exibido.
Returns:
O resultado da operação de exibição.
"""
# Instânciamento do repositório
show_repository = ShowRepository()
# Execução do repositório
response = show_repository.execute(regimecomunhao_schema)
# Retorno da informação
return response

View file

@ -0,0 +1,25 @@
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoUpdateSchema
from packages.v1.administrativo.repositories.g_tb_regimecomunhao.g_tb_regimecomunhao_update_repository import UpdateRepository
class UpdateAction:
"""
Service responsável por encapsular a lógica de negócio para a atualização
de um registro na tabela g_tb_regimecomunhao.
"""
def execute(self, tb_regimecomunhao_id : int, regimecomunhao_schema: GTbRegimecomunhaoUpdateSchema):
"""
Executa a operação de atualização.
Args:
regimecomunhao_schema (GTbRegimecomunhaoUpdateSchema): O esquema com os dados a serem atualizados.
Returns:
O resultado da operação de atualização.
"""
# Instância o repositório de atualização
update_repository = UpdateRepository()
# Chama o método de execução do repositório para realizar a atualização
return update_repository.execute(tb_regimecomunhao_id, regimecomunhao_schema)

View file

@ -0,0 +1,117 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import (
GTbRegimecomunhaoSchema,
GTbRegimecomunhaoSaveSchema,
GTbRegimecomunhaoUpdateSchema,
GTbRegimecomunhaoIdSchema,
GTbRegimecomunhaoDescricaoSchema
)
class GTbRegimecomunhaoController:
def __init__(self):
# Action responsável por carregar as services de acordo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("g_tb_regimecomunhao")
pass
# Lista todos os regimes de comunhão
def index(self):
# Importação da classe desejada
indexService = self.dynamic_import.service("g_tb_regimecomunhao_index_service", "IndexService")
# Instância da classe service
self.indexService = indexService()
# Lista todos os regimes de comunhão
return {
'message': 'Regimes de comunhão localizados com sucesso',
'data': self.indexService.execute()
}
# Busca um regime de comunhão específico pelo ID
def show(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
# Importação da classe desejada
show_service = self.dynamic_import.service('g_tb_regimecomunhao_show_service', 'ShowService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o regime de comunhão desejado
return {
'message': 'Regime de comunhão localizado com sucesso',
'data': self.show_service.execute(regimecomunhao_schema)
}
# Busca um regime de comunhão pela descrição
def get_by_descricao(self, regimecomunhao_schema: GTbRegimecomunhaoDescricaoSchema):
# Importação da classe desejada
show_service = self.dynamic_import.service('g_tb_regimecomunhao_get_by_descricao_service', 'GetByDescricaoService')
# Instância da classe desejada
self.show_service = show_service()
# Busca e retorna o regime de comunhão desejado
return {
'message': 'Regime de comunhão localizado com sucesso',
'data': self.show_service.execute(regimecomunhao_schema, True)# True para retornar a mensagem de erro caso não localize o serviço
}
# Cadastra um novo regime de comunhão
def save(self, regimecomunhao_schema: GTbRegimecomunhaoSaveSchema):
# Importação da classe desejada
save_service = self.dynamic_import.service('g_tb_regimecomunhao_save_service', 'GTbRegimecomunhaoSaveService')
# Instância da classe desejada
self.save_service = save_service()
# Busca e retorna o regime de comunhão desejado
return {
'message': 'Regime de comunhão salvo com sucesso',
'data': self.save_service.execute(regimecomunhao_schema)
}
# Atualiza os dados de um regime de comunhão
def update(self, tb_regimecomunhao_id: int, regimecomunhao_schema: GTbRegimecomunhaoUpdateSchema):
# Importação da classe desejada
update_service = self.dynamic_import.service('g_tb_regimecomunhao_update_service', 'GTbRegimecomunhaoUpdateService')
# Instância da classe desejada
self.update_service = update_service()
# Busca e retorna o regime de comunhão desejado
return {
'message': 'Regime de comunhão atualizado com sucesso',
'data': self.update_service.execute(tb_regimecomunhao_id, regimecomunhao_schema)
}
# Exclui um regime de comunhão
def delete(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
# Importação da classe desejada
delete_service = self.dynamic_import.service('g_tb_regimecomunhao_delete_service', 'DeleteService')
# Instância da classe desejada
self.delete_service = delete_service()
# Busca e retorna o regime de comunhão desejado
return {
'message': 'Regime de comunhão removido com sucesso',
'data': self.delete_service.execute(regimecomunhao_schema)
}

View file

@ -0,0 +1,110 @@
# Importação de bibliotecas
from typing import Optional
from fastapi import APIRouter, Body, Depends, status
from actions.jwt.get_current_user import get_current_user
from packages.v1.administrativo.controllers.g_tb_regimecomunhao_controller import GTbRegimecomunhaoController
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import (
GTbRegimecomunhaoSchema,
GTbRegimecomunhaoSaveSchema,
GTbRegimecomunhaoUpdateSchema,
GTbRegimecomunhaoDescricaoSchema,
GTbRegimecomunhaoIdSchema
)
# Inicializa o roteador para as rotas da tabela
router = APIRouter()
# Instânciamento do controller desejado
g_tb_regimecomunhao_controller = GTbRegimecomunhaoController()
# Lista todos os regimes de comunhão
@router.get('/',
status_code=status.HTTP_200_OK,
summary='Lista todos os regimes de comunhão cadastrados',
response_description='Lista todos os regimes de comunhão cadastrados')
async def index(current_user: dict = Depends(get_current_user)):
# Busca todos os registros
response = g_tb_regimecomunhao_controller.index()
# Retorna os dados localizados
return response
# Localiza um registro pela descrição
@router.get('/descricao',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pela descrição',
response_description='Busca um registro em especifico')
async def get_by_descricao(descricao: str, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
regimecomunhao_schema = GTbRegimecomunhaoDescricaoSchema(descricao=descricao)
# Busca o registro especifico pela descrição
response = g_tb_regimecomunhao_controller.get_by_descricao(regimecomunhao_schema)
# Retorna os dados localizados
return response
# Localiza um registro pelo ID
@router.get('/{tb_regimecomunhao_id}',
status_code=status.HTTP_200_OK,
summary='Busca um registro em especifico pelo ID',
response_description='Busca um registro em especifico')
async def show(tb_regimecomunhao_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
regimecomunhao_schema = GTbRegimecomunhaoIdSchema(tb_regimecomunhao_id=tb_regimecomunhao_id)
# Busca o registro especifico pelo ID
response = g_tb_regimecomunhao_controller.show(regimecomunhao_schema)
# Retorna os dados localizados
return response
# Cadastro de novo registro
@router.post('/',
status_code=status.HTTP_200_OK,
summary='Cadastra um novo registro de regime de comunhão',
response_description='Cadastra um novo registro')
async def save(regimecomunhao_schema: GTbRegimecomunhaoSaveSchema, current_user: dict = Depends(get_current_user)):
# Efetua o cadastro no banco de dados
response = g_tb_regimecomunhao_controller.save(regimecomunhao_schema)
# Retorna os dados localizados
return response
# Atualiza os dados de um registro
@router.put('/{tb_regimecomunhao_id}',
status_code=status.HTTP_200_OK,
summary='Atualiza um registro',
response_description='Atualiza um registro')
async def update(tb_regimecomunhao_id: int, regimecomunhao_schema: GTbRegimecomunhaoUpdateSchema, current_user: dict = Depends(get_current_user)):
# Efetua a atualização dos dados
response = g_tb_regimecomunhao_controller.update(tb_regimecomunhao_id, regimecomunhao_schema)
# Retorna os dados localizados
return response
# Exclui um determinado registro
@router.delete('/{tb_regimecomunhao_id}',
status_code=status.HTTP_200_OK,
summary='Remove um registro',
response_description='Remove um registro')
async def delete(tb_regimecomunhao_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
regimecomunhao_schema = GTbRegimecomunhaoIdSchema(tb_regimecomunhao_id=tb_regimecomunhao_id)
# Efetua a exclusão do registro
response = g_tb_regimecomunhao_controller.delete(regimecomunhao_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,33 @@
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import \
GTbRegimecomunhaoIdSchema
from abstracts.repository import BaseRepository
from fastapi import HTTPException, status
class DeleteRepository(BaseRepository):
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
try:
# Montagem do sql
sql = """ DELETE FROM G_TB_REGIMECOMUNHAO ccs WHERE ccs.tb_regimecomunhao_id = :tb_regimecomunhao_id """
# Preenchimento de parâmetros
params = {
"tb_regimecomunhao_id": regimecomunhao_schema.tb_regimecomunhao_id
}
# Execução do sql
response = self.run(sql, params)
# Retorna o resultado
return response
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao excluir regime de comunhão: {e}"
)

View file

@ -0,0 +1,17 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoDescricaoSchema
class GetByDescricaoRepository(BaseRepository):
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoDescricaoSchema):
# Montagem do sql
sql = """ SELECT * FROM G_TB_REGIMECOMUNHAO WHERE DESCRICAO = :descricao """
# Preenchimento de parâmetros
params = {
'descricao': regimecomunhao_schema.descricao
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,22 @@
from abstracts.repository import BaseRepository
class IndexRepository(BaseRepository):
def execute(self):
"""
Executa a operação de listagem de todos os registros na tabela
g_tb_regimecomunhao.
"""
# Montagem do sql
sql = """ SELECT TB_REGIMECOMUNHAO_ID,
DESCRICAO,
SITUACAO,
TB_REGIMEBENS_ID
FROM G_TB_REGIMECOMUNHAO """
# Execução do sql
response = self.fetch_all(sql)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,45 @@
from fastapi import HTTPException, status
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoSaveSchema
class SaveRepository(BaseRepository):
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoSaveSchema):
try:
# Montagem do SQL
sql = """ INSERT INTO G_TB_REGIMECOMUNHAO (
TB_REGIMECOMUNHAO_ID,
DESCRICAO,
TEXTO,
SITUACAO,
TB_REGIMEBENS_ID
) VALUES (
:tb_regimecomunhao_id,
:descricao,
:texto,
:situacao,
:tb_regimebens_id
) RETURNING *;"""
# Preenchimento de parâmetros
params = {
'tb_regimecomunhao_id': regimecomunhao_schema.tb_regimecomunhao_id,
'descricao': regimecomunhao_schema.descricao,
'texto': regimecomunhao_schema.texto.encode("utf-8") if regimecomunhao_schema.texto else None, # Convertendo string para bytes
'situacao': regimecomunhao_schema.situacao,
'tb_regimebens_id': regimecomunhao_schema.tb_regimebens_id
}
# Execução do sql
return self.run_and_return(sql, params)
except Exception as e:
# Informa que houve uma falha na atualização do usuário
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao salvar regime de comunhão: {e}"
)

View file

@ -0,0 +1,17 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoIdSchema
class ShowRepository(BaseRepository):
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
# Montagem do sql
sql = """ SELECT * FROM G_TB_REGIMECOMUNHAO WHERE TB_REGIMECOMUNHAO_ID = :tb_regimecomunhao_id """
# Preenchimento de parâmetros
params = {
'tb_regimecomunhao_id' : regimecomunhao_schema.tb_regimecomunhao_id
}
# Execução do sql
return self.fetch_one(sql, params)

View file

@ -0,0 +1,62 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoUpdateSchema
from fastapi import HTTPException, status
class UpdateRepository(BaseRepository):
def execute(self, tb_regimecomunhao_id : int, regimecomunhao_schema: GTbRegimecomunhaoUpdateSchema):
try:
updates = []
params = {}
if regimecomunhao_schema.descricao is not None:
updates.append("DESCRICAO = :descricao")
params["descricao"] = regimecomunhao_schema.descricao
if regimecomunhao_schema.texto is not None:
updates.append("TEXTO = :texto")
params["texto"] = (
regimecomunhao_schema.texto.encode("utf-8")
if isinstance(regimecomunhao_schema.texto, str)
else regimecomunhao_schema.texto
)
if regimecomunhao_schema.situacao is not None:
updates.append("SITUACAO = :situacao")
params["situacao"] = regimecomunhao_schema.situacao
if regimecomunhao_schema.tb_regimebens_id is not None:
updates.append("TB_REGIMEBENS_ID = :tb_regimebens_id")
params["tb_regimebens_id"] = regimecomunhao_schema.tb_regimebens_id
if not updates:
return False
params["tb_regimecomunhao_id"] = tb_regimecomunhao_id
sql = f"UPDATE G_TB_REGIMECOMUNHAO SET {', '.join(updates)} WHERE tb_regimecomunhao_id = :tb_regimecomunhao_id RETURNING *;"
# Executa a query
result = self.run_and_return(sql, params)
if not result.tb_regimecomunhao_id:
# Informa que não existe o registro a ser modificado
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Nenhum regime de comunhão localizado para esta solicitação'
)
# Verifica o resultado da execução
if result:
# Se houver um resultado, a atualização foi bem-sucedida
return result
except Exception as e:
# Informa que houve uma falha na atualização
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Erro ao atualizar regime de comunhão: {e}"
)

View file

@ -0,0 +1,98 @@
from pydantic import BaseModel, field_validator, model_validator
from fastapi import HTTPException, status
from typing import Optional
from decimal import Decimal
# Funções para sanitização de entradas (evitar XSS, SQLi etc.)
from actions.validations.text import Text
# ----------------------------------------------------
# Schema base
# ----------------------------------------------------
class GTbRegimecomunhaoSchema(BaseModel):
tb_regimecomunhao_id: int
descricao: Optional[str] = None
texto: Optional[str] = None
situacao: Optional[str] = None
tb_regimebens_id: Optional[int] = None
class Config:
from_attributes = True
# ----------------------------------------------------
# Schema para criação de novo registro (POST)
# ----------------------------------------------------
class GTbRegimecomunhaoSaveSchema(BaseModel):
tb_regimecomunhao_id: Optional[int] = None
descricao: str
texto: Optional[str] = None
situacao: Optional[str] = None
tb_regimebens_id: Optional[int] = None
@field_validator('descricao')
def validate_descricao(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError("A descrição é obrigatória.")
return Text.sanitize_input(v)
@model_validator(mode='after')
def validate_all_fields(self):
errors = []
if not self.descricao or len(self.descricao.strip()) == 0:
errors.append({'input': 'descricao', 'message': 'A descrição é obrigatória.'})
if errors:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=errors
)
return self
# ----------------------------------------------------
# Schema para atualização de registro (PUT)
# ----------------------------------------------------
class GTbRegimecomunhaoUpdateSchema(BaseModel):
descricao: Optional[str] = None
texto: Optional[str] = None
situacao: Optional[str] = None
tb_regimebens_id: Optional[int] = None
@field_validator('descricao')
def validate_descricao_field(cls, v):
return Text.sanitize_input(v)
# ----------------------------------------------------
# Schema para localizar um registro específico pelo ID (GET, DELETE)
# ----------------------------------------------------
class GTbRegimecomunhaoIdSchema(BaseModel):
# Campos utilizados
tb_regimecomunhao_id: int
# ----------------------------------------------------
# Schema para localizar um registro pela descrição (GET)
# ----------------------------------------------------
class GTbRegimecomunhaoDescricaoSchema(BaseModel):
# Campos utilizados
descricao: str
@field_validator('descricao')
def validar_e_sanitizar_descricao(cls, v):
if not v:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Informe a descrição'
)
return Text.sanitize_input(v)
class Config:
from_attributes = True

View file

@ -0,0 +1,18 @@
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoIdSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_delete_action import DeleteAction
class DeleteService:
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
"""
Executa a lógica de negócio para a exclusão de um registro na tabela
g_tb_regimecomunhao.
"""
# Instanciamento de ação
delete_action = DeleteAction()
# Executa a ação em questão
data = delete_action.execute(regimecomunhao_schema)
# Retorno da informação
return data

View file

@ -0,0 +1,29 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoDescricaoSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_get_by_descricao_action import GetByDescricaoAction
class GetByDescricaoService:
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoDescricaoSchema, messageValidate: bool):
"""
Executa a lógica de negócio para a exibição de um registro na tabela
g_tb_regimecomunhao pela descrição.
"""
# Instanciamento de ação
show_action = GetByDescricaoAction()
# Executa a ação em questão
data = show_action.execute(regimecomunhao_schema)
if messageValidate:
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o registro'
)
# Retorno da informação
return data

View file

@ -0,0 +1,27 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_index_action import IndexAction
class IndexService:
def execute(self):
"""
Executa a lógica de negócio para a listagem de todos os registros na tabela
g_tb_regimecomunhao.
"""
# Instanciamento de ações
index_action = IndexAction()
# Executa a busca de todas as ações
data = index_action.execute()
# Verifica se foi localizado registros
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar os regimes de comunhão'
)
# Retorna as informações localizadas
return data

View file

@ -0,0 +1,71 @@
from actions.dynamic_import.dynamic_import import DynamicImport
from packages.v1.sequencia.schemas.g_sequencia import GSequenciaSchema
from packages.v1.sequencia.services.g_sequencia.generate_service import GenerateService
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoSaveSchema, GTbRegimecomunhaoDescricaoSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_save_action import SaveAction
from fastapi import HTTPException, status
class GTbRegimecomunhaoSaveService:
def __init__(self):
# Action responsável por carregar as services de acodo com o estado
self.dynamic_import = DynamicImport()
# Define o pacote que deve ser carregado
self.dynamic_import.set_package("administrativo")
# Define a tabela que o pacote pertence
self.dynamic_import.set_table("g_tb_regimecomunhao")
pass
# Cadastra o novo caixa serviço
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoSaveSchema):
# Armazena possíveis erros
errors = []
# Verifica se o e-mail já esta sendo utilizado
# Importação de service de email
descricao_service = self.dynamic_import.service("g_tb_regimecomunhao_get_by_descricao_service", "GetByDescricaoService")
# Instânciamento da service
self.descricao_service = descricao_service()
# Verifica se a descrição já está sendo utilizada
self.response = self.descricao_service.execute(GTbRegimecomunhaoDescricaoSchema(descricao=regimecomunhao_schema.descricao), False)
# Se houver retorno significa que a descrição já esta sendo utiizada
if self.response:
errors.append({'input': 'descricao', 'message': 'a descrição informada já está sendo utilizada.'})
# Se houver erros, informo
if errors:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=errors
)
# Verifica se precisa gerar o id de sequencia
if not regimecomunhao_schema.tb_regimecomunhao_id:
# Crio um objeto de sequencia
sequencia_schema = GSequenciaSchema()
# Define os dados para atualizar a sequencia
sequencia_schema.tabela = 'G_TB_REGIMECOMUNHAO'
# Busco a sequência atualizada
generate = GenerateService()
# Busco a sequência atualizada
sequencia = generate.execute(sequencia_schema)
# Atualiza os dados da chave primária
regimecomunhao_schema.tb_regimecomunhao_id = sequencia.sequencia
# Instânciamento de ações
saveAction = SaveAction()
# Retorna todos produtos desejados
return saveAction.execute(regimecomunhao_schema)

View file

@ -0,0 +1,23 @@
from fastapi import HTTPException, status
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoIdSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_show_action import ShowAction
class ShowService:
def execute(self, regimecomunhao_schema: GTbRegimecomunhaoIdSchema):
# Instanciamento de ação
show_action = ShowAction()
# Executa a ação em questão
data = show_action.execute(regimecomunhao_schema)
if not data:
# Retorna uma exceção
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='Não foi possível localizar o registro'
)
# Retorno da informação
return data

View file

@ -0,0 +1,12 @@
from packages.v1.administrativo.schemas.g_tb_regimecomunhao_schema import GTbRegimecomunhaoUpdateSchema
from packages.v1.administrativo.actions.g_tb_regimecomunhao.g_tb_regimecomunhao_update_action import UpdateAction
class GTbRegimecomunhaoUpdateService:
def execute(self, tb_regimecomunhao_id : int, regimecomunhao_schema: GTbRegimecomunhaoUpdateSchema):
# Instanciamento de ações
updateAction = UpdateAction()
# Retorna todos produtos desejados
return updateAction.execute(tb_regimecomunhao_id, regimecomunhao_schema)

View file

@ -8,6 +8,7 @@ from packages.v1.administrativo.endpoints import g_usuario_endpoint
from packages.v1.administrativo.endpoints import c_caixa_servico_endpoint
from packages.v1.administrativo.endpoints import t_tb_reconhecimentotipo_endpoint
from packages.v1.administrativo.endpoints import t_tb_andamentoservico_endpoint
from packages.v1.administrativo.endpoints import g_tb_regimecomunhao_endpoint
# Cria uma instância do APIRouter que vai agregar todas as rotas da API
api_router = APIRouter()
@ -41,3 +42,8 @@ api_router.include_router(
api_router.include_router(
g_tb_profissao_endpoint.router, prefix="/administrativo/g_tb_profissao", tags=["Profissões"]
)
# Inclui as rotas de g_tb_regimecomunhao
api_router.include_router(
g_tb_regimecomunhao_endpoint.router, prefix="/administrativo/g_tb_regimecomunhao", tags=["Regime de Comunhão"]
)