Ferramentas/AjustaFundos/actions/file/json_merge.py

67 lines
2.4 KiB
Python

import json
import threading
from pathlib import Path
from typing import Dict, List, Any
class JsonArrayPersistentStorage:
"""
Classe para gravar e atualizar um JSON de forma persistente,
com MERGE profundo e proteção contra escrita paralela.
"""
def __init__(self, caminho_arquivo: Path):
self.caminho = caminho_arquivo
self.lock = threading.Lock() # 🔒 proteção de escrita
self.caminho.parent.mkdir(parents=True, exist_ok=True)
self._cache = self._load_existing()
# ------------------------------------------------------------------
def _load_existing(self) -> Dict[str, Any]:
if not self.caminho.exists():
return {}
try:
with open(self.caminho, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
# ------------------------------------------------------------------
def _deep_merge(self, original: Any, novo: Any) -> Any:
if isinstance(original, dict) and isinstance(novo, dict):
for k, v in novo.items():
if k not in original:
original[k] = v
else:
original[k] = self._deep_merge(original[k], v)
return original
if isinstance(original, list) and isinstance(novo, list):
for item in novo:
if item not in original:
original.append(item)
return original
return novo
# ------------------------------------------------------------------
def _save(self):
with open(self.caminho, "w", encoding="utf-8") as f:
json.dump(self._cache, f, indent=4, ensure_ascii=False)
# ------------------------------------------------------------------
# MÉTODO QUE RESOLVE O ERRO - AGORA COM LOCK
# ------------------------------------------------------------------
def add(self, novos_dados: Dict[str, List[dict]]):
"""
Adiciona novos dados com merge profundo.
Protegido por lock para evitar race condition.
"""
with self.lock: # 🔒 trava escrita e merge até finalizar
self._cache = self._deep_merge(self._cache, novos_dados)
self._save()
# ------------------------------------------------------------------
def get_all(self) -> Dict[str, Any]:
return self._cache