Second commit

This commit is contained in:
Kenio 2025-10-07 13:39:43 -03:00
parent 938e628efc
commit 436640ae31
24 changed files with 954 additions and 9 deletions

View file

@ -0,0 +1,13 @@
{
"user": "backup",
"password": "sun147oi",
"host": "api_mysql",
"port": 3306,
"name": "monitoring",
"charset": "utf8mb4",
"pool": {
"pre_ping": true,
"size": 5,
"max_overflow": 10
}
}

View file

@ -1,4 +1,4 @@
from packages.v1.administrativo.schemas.log_schema import LogSchema
from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.repositories.log.log_get_by_log_id_repository import GetByLogIdRepository
@ -8,7 +8,7 @@ class GetByLogIdAction:
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogSchema):
def execute(self, log_schema: LogIdSchema):
"""
Executa a lógica de busca do log pelo seu ID.

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'backup'.
from packages.v1.administrativo.repositories.log.log_show_backup_repository import ShowBackupRepository
class ShowBackupAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'Backup').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_backup_repository = ShowBackupRepository()
# Execução do repositório
return log_show_backup_repository.execute(log_schema)

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'database'.
from packages.v1.administrativo.repositories.log.log_show_database_repository import ShowDatabaseRepository
class ShowDatabaseAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'Database').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_database_repository = ShowDatabaseRepository()
# Execução do repositório
return log_show_database_repository.execute(log_schema)

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'disk'.
from packages.v1.administrativo.repositories.log.log_show_disk_repository import ShowDiskRepository
class ShowDiskAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'Disk').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_disk_repository = ShowDiskRepository()
# Execução do repositório
return log_show_disk_repository.execute(log_schema)

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'Ged'.
from packages.v1.administrativo.repositories.log.log_show_ged_repository import ShowGedRepository
class ShowGedAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'GED').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_ged_repository = ShowGedRepository()
# Execução do repositório
return log_show_ged_repository.execute(log_schema)

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'Server'.
from packages.v1.administrativo.repositories.log.log_show_server_repository import ShowServerRepository
class ShowServerAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'Server').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_server_repository = ShowServerRepository()
# Execução do repositório
return log_show_server_repository.execute(log_schema)

View file

@ -0,0 +1,29 @@
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
# Presumindo que o repositório original de busca por ID ainda é o que deve ser usado,
# mas renomeando a importação para refletir a nova ação 'warning'.
from packages.v1.administrativo.repositories.log.log_show_warning_repository import ShowWarningRepository
class ShowWarningAction:
"""
Action responsável por buscar um registro específico na tabela 'log'
utilizando a chave primária 'log_id' (aqui referenciada como parte da ação 'Warning').
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a lógica de busca do log.
Apesar da tabela 'log' ter outros campos (client_id, date_post, file),
o schema 'LogClientIdSchema' é usado aqui para encapsular o identificador (provavelmente o 'client_id')
que será utilizado na busca pelo repositório.
:param log_schema: Schema contendo o identificador necessário para a busca.
:return: Resultado da busca do repositório.
"""
# Instanciação do repositório de busca pelo ID do log
# Mantendo o repositório original, pois o foco da mudança foi a Action.
log_show_warning_repository = ShowWarningRepository()
# Execução do repositório
return log_show_warning_repository.execute(log_schema)

View file

@ -5,7 +5,8 @@ from packages.v1.administrativo.schemas.log_schema import (
LogSaveSchema,
LogUpdateSchema,
LogIdSchema,
LogFileSchema
LogFileSchema,
LogClientIdSchema
)
import json # Necessário para carregar o arquivo app.json
@ -175,3 +176,99 @@ class LogController:
'message': 'Log removido com sucesso',
'data': self.delete_service.execute(log_schema)
}
# Localiza os dados do GED pelo ID do cliente
def getGed(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_ged_service = self.dynamic_import.service('log_show_ged_service', 'ShowGedService')
# Instânciamento da classe desejada
self.log_show_ged_service = log_show_ged_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_ged_service.execute(log_schema)
}
# Localiza os dados do GED pelo ID do cliente
def getServer(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_server_service = self.dynamic_import.service('log_show_server_service', 'ShowServerService')
# Instânciamento da classe desejada
self.log_show_server_service = log_show_server_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_server_service.execute(log_schema)
}
# Localiza os dados do database pelo ID do cliente
def getDatabase(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_database_service = self.dynamic_import.service('log_show_database_service', 'ShowDatabaseService')
# Instânciamento da classe desejada
self.log_show_database_service = log_show_database_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_database_service.execute(log_schema)
}
# Localiza os dados do database pelo ID do cliente
def getBackup(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_backup_service = self.dynamic_import.service('log_show_backup_service', 'ShowBackupService')
# Instânciamento da classe desejada
self.log_show_backup_service = log_show_backup_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_backup_service.execute(log_schema)
}
# Localiza os dados do database pelo ID do cliente
def getDisk(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_disk_service = self.dynamic_import.service('log_show_disk_service', 'ShowDiskService')
# Instânciamento da classe desejada
self.log_show_disk_service = log_show_disk_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_disk_service.execute(log_schema)
}
# Localiza os dados do database pelo ID do cliente
def getWarning(self, log_schema: LogClientIdSchema):
#Importação da classe desejada
log_show_warning_service = self.dynamic_import.service('log_show_warning_service', 'ShowWarningService')
# Instânciamento da classe desejada
self.log_show_warning_service = log_show_warning_service()
# Busca e retorna o log desejado
return {
'message': 'Visualização de LOG',
'data': self.log_show_warning_service.execute(log_schema)
}

View file

@ -9,7 +9,8 @@ from packages.v1.administrativo.schemas.log_schema import (
LogSaveSchema,
LogUpdateSchema,
LogIdSchema,
LogFileSchema
LogFileSchema,
LogClientIdSchema
)
# Inicializa o roteador para as rotas de log
@ -109,3 +110,105 @@ async def delete(log_id: int, current_user: dict = Depends(get_current_user)):
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/ged/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no GED')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getGed(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/server/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no GED')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getServer(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/database/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no Database')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getDatabase(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/backup/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no Backup')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getBackup(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/disk/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no Backup')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getDisk(log_schema)
# Retorna os dados localizados
return response
# Localiza um log pelo CLIENT_ID
@router.get('/warning/{client_id}',
status_code=status.HTTP_200_OK,
summary='Busca o último registro pelo CLIENT_ID',
response_description='Busca registros específicos de um cliente no Backup')
async def show(client_id: int, current_user: dict = Depends(get_current_user)):
# Cria o schema com os dados recebidos
log_schema = LogClientIdSchema(client_id=client_id)
# Busca um log específico pelo ID
response = log_controller.getWarning(log_schema)
# Retorna os dados localizados
return response

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowBackupRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowDatabaseRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowDiskRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowGedRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowServerRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -0,0 +1,30 @@
from abstracts.repository import BaseRepository
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
class ShowWarningRepository(BaseRepository):
"""
Repositório responsável por buscar um registro único na tabela 'log'
utilizando a chave primária 'log_id'.
"""
def execute(self, log_schema: LogClientIdSchema):
"""
Executa a busca de um log pelo seu ID.
:param log_schema: Schema contendo o log_id.
:return: O registro de log encontrado ou None.
"""
# Define a consulta sql. O SELECT * retorna todos os campos da DDL:
# log_id, client_id, date_post, file.
sql = """ SELECT * FROM log l WHERE l.client_id = :clientId ORDER BY l.log_id DESC LIMIT 1 """
# Preenchimento dos parâmetros SQL
params = {
'clientId': log_schema.client_id
}
# Execução da instrução sql para buscar um único registro
return self.fetch_one(sql, params)

View file

@ -21,6 +21,13 @@ class LogSchema(BaseModel):
from_attributes = True
# ----------------------------------------------------
# Schema para localizar um log especifico pelo CLIENT_ID (GET)
# ----------------------------------------------------
class LogClientIdSchema(BaseModel):
client_id: int
# ----------------------------------------------------
# Schema para localizar um log especifico pelo ID (GET/DELETE)
# ----------------------------------------------------

View file

@ -0,0 +1,53 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_backup_action import ShowBackupAction
class ShowBackupService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_backup_action = ShowBackupAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_backup_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de backup em:
- 'partition': informações de disco do backup
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de backup do JSON
backup_data = dados_json.get("backup", {})
# 5 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"backup": backup_data
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de backup encontrado para o cliente fornecido."
)

View file

@ -0,0 +1,70 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_database_action import ShowDatabaseAction
class ShowDatabaseService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_database_action = ShowDatabaseAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_database_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de database em:
- 'partition': informações de disco do database
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de database do JSON
database_data = dados_json.get("database", {})
# 2 Separa o campo 'partition' das demais chaves
partition = database_data.get("partition", {})
# 3 Separa o campo 'file_size_mb' das demais chaves
file_size_mb = database_data.get("file_size_mb", {})
# 4 Separa o campo 'db_accessible' das demais chaves
db_accessible = database_data.get("db_accessible", {})
# 5 Separa o campo 'last_modified' das demais chaves
last_modified = database_data.get("last_modified", {})
# 6 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"database": {
"partition": partition,
"file_size_mb": file_size_mb,
"db_accessible": db_accessible,
"last_modified": last_modified
}
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de Database encontrado para o cliente fornecido."
)

View file

@ -0,0 +1,53 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_disk_action import ShowDiskAction
class ShowDiskService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_disk_action = ShowDiskAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_disk_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de disk em:
- 'partition': informações de disco do disk
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de disk do JSON
disk_data = dados_json.get("disk", {})
# 2 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"disk": disk_data
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de unidade de discos encontrado para o cliente fornecido."
)

View file

@ -0,0 +1,77 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_ged_action import ShowGedAction
class ShowGedService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_ged_action = ShowGedAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_ged_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de GED em:
- 'partition': informações de disco do GED
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de GED do JSON
ged_data = dados_json.get("ged", {})
# 2 Separa o campo 'partition' das demais chaves
partition_info = ged_data.get("partition", {})
# 3 Cria uma lista para armazenar os diretórios do GED (exceto partition)
arquivos_list = []
# 4 Percorre todas as chaves dentro de 'ged'
for path, info in ged_data.items():
# Ignora o campo 'partition'
if path == "partition":
continue
# Cada entrada é um dicionário contendo os dados da pasta
arquivos_list.append({
"path": path,
"data": info.get("data"),
"hora": info.get("hora"),
"quantidade": info.get("quantidade")
})
# 5 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"ged": {
"partition": partition_info,
"arquivos": arquivos_list
}
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de GED encontrado para o cliente fornecido."
)

View file

@ -0,0 +1,66 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_server_action import ShowServerAction
class ShowServerService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_server_action = ShowServerAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_server_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de Server em:
- 'partition': informações de disco do Server
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de Server do JSON
server_data = dados_json.get("server", {})
# 2 Separa o campo 'cpu' das demais chaves
cpu = server_data.get("cpu", {})
# 3 Separa o campo 'memory' das demais chaves
memory = server_data.get("memory", {})
# 4 Separa o campo 'operacional_system' das demais chaves
operacional_system = server_data.get("operacional_system", {})
# 5 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"server": {
"cpu": cpu,
"memory": memory,
"operacional_system": operacional_system
}
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de Server encontrado para o cliente fornecido."
)

View file

@ -3,20 +3,19 @@ from fastapi import HTTPException, status
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
# from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.schemas.log_schema import LogIdSchema
from packages.v1.administrativo.actions.log.log_show_action import ShowAction
class ShowService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
# def execute(self, log_id_schema: LogIdSchema):
def execute(self, schema):
def execute(self, log_id_schema: LogIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_action = ShowAction()
# Executa a ação em questão (buscando pelo log_id)
data = log_show_action.execute(schema)
data = log_show_action.execute(log_id_schema)
if not data:
# Retorna uma exceção

View file

@ -0,0 +1,53 @@
from fastapi import HTTPException, status
import json
# NOTE: O esquema de entrada para a ação de exibição de um único log
# precisaria de um esquema (Schema) que carregue o 'log_id'.
# Vamos assumir a necessidade de um LogIdSchema.
from packages.v1.administrativo.schemas.log_schema import LogClientIdSchema
from packages.v1.administrativo.actions.log.log_show_warning_action import ShowWarningAction
class ShowWarningService:
# O método execute deve receber o esquema que contém a ID do log a ser buscado
def execute(self, client_id_schema: LogClientIdSchema):
# Instanciamento de ação com prefixo 'log'
log_show_warning_action = ShowWarningAction()
# Executa a ação em questão (buscando pelo log_id)
dados = log_show_warning_action.execute(client_id_schema)
# verifica se 'dados' não é None e contém 'file'
if dados and dados.get("file"):
# Converte a string JSON armazenada no campo 'file' de volta para um dicionário
dados_json = json.loads(dados["file"])
"""
Separa as informações de warning em:
- 'partition': informações de disco do warning
- 'arquivos': lista com as demais pastas e suas quantidades
"""
# 1 Extrai o bloco principal de warning do JSON
warning_data = dados_json.get("warning", {})
# 5 Monta o JSON final
data = {
"cns": dados_json.get("cns"),
"cartorio": dados_json.get("cartorio"),
"data": dados_json.get("data"),
"hora": dados_json.get("hora"),
"warning": warning_data
}
# Retorno da informação (log_id, client_id, date_post, file)
return data
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nenhum dado de warning encontrado para o cliente fornecido."
)