""" EU-Utility - Data Store Service Thread-safe persistent data storage for plugins. Provides file locking, auto-backup, and singleton access. """ import json import shutil import threading import platform from pathlib import Path from typing import Any, Dict, Optional from datetime import datetime from collections import OrderedDict # Cross-platform file locking try: import fcntl # Unix/Linux/Mac HAS_FCNTL = True except ImportError: HAS_FCNTL = False # Windows fallback using portalocker or threading lock try: import portalocker HAS_PORTALOCKER = True except ImportError: HAS_PORTALOCKER = False class DataStore: """ Singleton data persistence service for plugins. Features: - Thread-safe file operations with file locking - Auto-backup on write (keeps last 5 versions) - Per-plugin JSON storage - Auto-create directories """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, data_dir: str = "data/plugins"): if self._initialized: return self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) # Memory cache for frequently accessed data self._cache: Dict[str, Dict[str, Any]] = {} self._cache_lock = threading.Lock() # Backup settings self.max_backups = 5 self._initialized = True def _get_plugin_file(self, plugin_id: str) -> Path: """Get the storage file path for a plugin.""" # Sanitize plugin_id to create a safe filename safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") return self.data_dir / f"{safe_name}.json" def _get_backup_dir(self, plugin_id: str) -> Path: """Get the backup directory for a plugin.""" safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") backup_dir = self.data_dir / ".backups" / safe_name backup_dir.mkdir(parents=True, exist_ok=True) return backup_dir def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]: """Load all data for a plugin from disk.""" # Check cache first with self._cache_lock: if plugin_id in self._cache: return self._cache[plugin_id].copy() file_path = self._get_plugin_file(plugin_id) if not file_path.exists(): return {} try: with open(file_path, 'r', encoding='utf-8') as f: # Cross-platform file locking self._lock_file(f, exclusive=False) try: data = json.load(f) finally: self._unlock_file(f) # Update cache with self._cache_lock: self._cache[plugin_id] = data.copy() return data except (json.JSONDecodeError, IOError) as e: print(f"[DataStore] Error loading data for {plugin_id}: {e}") return {} def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool: """Save all data for a plugin to disk with backup.""" file_path = self._get_plugin_file(plugin_id) try: # Create backup if file exists if file_path.exists(): self._create_backup(plugin_id, file_path) # Write to temp file first, then move (atomic operation) temp_path = file_path.with_suffix('.tmp') with open(temp_path, 'w', encoding='utf-8') as f: # Cross-platform file locking self._lock_file(f, exclusive=True) try: json.dump(data, f, indent=2, ensure_ascii=False) f.flush() import os os.fsync(f.fileno()) finally: self._unlock_file(f) # Atomic move temp_path.replace(file_path) # Update cache with self._cache_lock: self._cache[plugin_id] = data.copy() return True except IOError as e: print(f"[DataStore] Error saving data for {plugin_id}: {e}") # Clean up temp file if exists temp_path = file_path.with_suffix('.tmp') if temp_path.exists(): temp_path.unlink() return False def _lock_file(self, f, exclusive: bool = False): """Cross-platform file locking.""" if HAS_FCNTL: # Unix/Linux/Mac lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH fcntl.flock(f.fileno(), lock_type) elif HAS_PORTALOCKER: # Windows with portalocker import portalocker lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH portalocker.lock(f, lock_type) else: # Fallback: rely on threading lock (already held) pass def _unlock_file(self, f): """Cross-platform file unlock.""" if HAS_FCNTL: fcntl.flock(f.fileno(), fcntl.LOCK_UN) elif HAS_PORTALOCKER: import portalocker portalocker.unlock(f) else: # Fallback: nothing to do pass def _create_backup(self, plugin_id: str, file_path: Path): """Create a backup of the current data file.""" backup_dir = self._get_backup_dir(plugin_id) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_path = backup_dir / f"{timestamp}.json" try: shutil.copy2(file_path, backup_path) self._cleanup_old_backups(backup_dir) except IOError as e: print(f"[DataStore] Error creating backup for {plugin_id}: {e}") def _cleanup_old_backups(self, backup_dir: Path): """Remove old backups, keeping only the last N versions.""" try: backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime) while len(backups) > self.max_backups: old_backup = backups.pop(0) old_backup.unlink() except IOError as e: print(f"[DataStore] Error cleaning up backups: {e}") def save(self, plugin_id: str, key: str, data: Any) -> bool: """ Save data for a plugin. Args: plugin_id: Unique identifier for the plugin key: Key under which to store the data data: Data to store (must be JSON serializable) Returns: True if successful, False otherwise """ plugin_data = self._load_plugin_data(plugin_id) plugin_data[key] = data return self._save_plugin_data(plugin_id, plugin_data) def load(self, plugin_id: str, key: str, default: Any = None) -> Any: """ Load data for a plugin. Args: plugin_id: Unique identifier for the plugin key: Key of the data to load default: Default value if key not found Returns: The stored data or default value """ plugin_data = self._load_plugin_data(plugin_id) return plugin_data.get(key, default) def delete(self, plugin_id: str, key: str) -> bool: """ Delete data for a plugin. Args: plugin_id: Unique identifier for the plugin key: Key of the data to delete Returns: True if key existed and was deleted, False otherwise """ plugin_data = self._load_plugin_data(plugin_id) if key in plugin_data: del plugin_data[key] return self._save_plugin_data(plugin_id, plugin_data) return False def get_all_keys(self, plugin_id: str) -> list: """ Get all keys stored for a plugin. Args: plugin_id: Unique identifier for the plugin Returns: List of keys """ plugin_data = self._load_plugin_data(plugin_id) return list(plugin_data.keys()) def clear_plugin(self, plugin_id: str) -> bool: """ Clear all data for a plugin. Args: plugin_id: Unique identifier for the plugin Returns: True if successful, False otherwise """ file_path = self._get_plugin_file(plugin_id) # Create backup before clearing if file_path.exists(): self._create_backup(plugin_id, file_path) # Clear cache with self._cache_lock: if plugin_id in self._cache: del self._cache[plugin_id] # Remove file try: if file_path.exists(): file_path.unlink() return True except IOError as e: print(f"[DataStore] Error clearing data for {plugin_id}: {e}") return False def get_backups(self, plugin_id: str) -> list: """ Get list of available backups for a plugin. Args: plugin_id: Unique identifier for the plugin Returns: List of backup file paths """ backup_dir = self._get_backup_dir(plugin_id) if not backup_dir.exists(): return [] backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) return [str(b) for b in backups] def restore_backup(self, plugin_id: str, backup_path: str) -> bool: """ Restore data from a backup. Args: plugin_id: Unique identifier for the plugin backup_path: Path to the backup file Returns: True if successful, False otherwise """ backup_file = Path(backup_path) if not backup_file.exists(): print(f"[DataStore] Backup not found: {backup_path}") return False file_path = self._get_plugin_file(plugin_id) try: # Create backup of current state before restoring if file_path.exists(): self._create_backup(plugin_id, file_path) # Copy backup to main file shutil.copy2(backup_file, file_path) # Invalidate cache with self._cache_lock: if plugin_id in self._cache: del self._cache[plugin_id] return True except IOError as e: print(f"[DataStore] Error restoring backup for {plugin_id}: {e}") return False # Singleton instance _data_store = None _data_store_lock = threading.Lock() def get_data_store() -> DataStore: """Get the global DataStore instance.""" global _data_store if _data_store is None: with _data_store_lock: if _data_store is None: _data_store = DataStore() return _data_store