EU-Utility/core/data_store_vulnerable.py

356 lines
11 KiB
Python

"""
EU-Utility - Data Store Service
Thread-safe persistent data storage for plugins.
Provides file locking, auto-backup, and singleton access.
"""
import json
import shutil
import threading
import platform
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime
from collections import OrderedDict
# Cross-platform file locking
try:
import fcntl # Unix/Linux/Mac
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# Windows fallback using portalocker or threading lock
try:
import portalocker
HAS_PORTALOCKER = True
except ImportError:
HAS_PORTALOCKER = False
class DataStore:
"""
Singleton data persistence service for plugins.
Features:
- Thread-safe file operations with file locking
- Auto-backup on write (keeps last 5 versions)
- Per-plugin JSON storage
- Auto-create directories
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, data_dir: str = "data/plugins"):
if self._initialized:
return
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Memory cache for frequently accessed data
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Backup settings
self.max_backups = 5
self._initialized = True
def _get_plugin_file(self, plugin_id: str) -> Path:
"""Get the storage file path for a plugin."""
# Sanitize plugin_id to create a safe filename
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
return self.data_dir / f"{safe_name}.json"
def _get_backup_dir(self, plugin_id: str) -> Path:
"""Get the backup directory for a plugin."""
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
backup_dir = self.data_dir / ".backups" / safe_name
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]:
"""Load all data for a plugin from disk."""
# Check cache first
with self._cache_lock:
if plugin_id in self._cache:
return self._cache[plugin_id].copy()
file_path = self._get_plugin_file(plugin_id)
if not file_path.exists():
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=False)
try:
data = json.load(f)
finally:
self._unlock_file(f)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return data
except (json.JSONDecodeError, IOError) as e:
print(f"[DataStore] Error loading data for {plugin_id}: {e}")
return {}
def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool:
"""Save all data for a plugin to disk with backup."""
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup if file exists
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Write to temp file first, then move (atomic operation)
temp_path = file_path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=True)
try:
json.dump(data, f, indent=2, ensure_ascii=False)
f.flush()
import os
os.fsync(f.fileno())
finally:
self._unlock_file(f)
# Atomic move
temp_path.replace(file_path)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return True
except IOError as e:
print(f"[DataStore] Error saving data for {plugin_id}: {e}")
# Clean up temp file if exists
temp_path = file_path.with_suffix('.tmp')
if temp_path.exists():
temp_path.unlink()
return False
def _lock_file(self, f, exclusive: bool = False):
"""Cross-platform file locking."""
if HAS_FCNTL:
# Unix/Linux/Mac
lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
fcntl.flock(f.fileno(), lock_type)
elif HAS_PORTALOCKER:
# Windows with portalocker
import portalocker
lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH
portalocker.lock(f, lock_type)
else:
# Fallback: rely on threading lock (already held)
pass
def _unlock_file(self, f):
"""Cross-platform file unlock."""
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
elif HAS_PORTALOCKER:
import portalocker
portalocker.unlock(f)
else:
# Fallback: nothing to do
pass
def _create_backup(self, plugin_id: str, file_path: Path):
"""Create a backup of the current data file."""
backup_dir = self._get_backup_dir(plugin_id)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{timestamp}.json"
try:
shutil.copy2(file_path, backup_path)
self._cleanup_old_backups(backup_dir)
except IOError as e:
print(f"[DataStore] Error creating backup for {plugin_id}: {e}")
def _cleanup_old_backups(self, backup_dir: Path):
"""Remove old backups, keeping only the last N versions."""
try:
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
while len(backups) > self.max_backups:
old_backup = backups.pop(0)
old_backup.unlink()
except IOError as e:
print(f"[DataStore] Error cleaning up backups: {e}")
def save(self, plugin_id: str, key: str, data: Any) -> bool:
"""
Save data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
plugin_data[key] = data
return self._save_plugin_data(plugin_id, plugin_data)
def load(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""
Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
plugin_data = self._load_plugin_data(plugin_id)
return plugin_data.get(key, default)
def delete(self, plugin_id: str, key: str) -> bool:
"""
Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
if key in plugin_data:
del plugin_data[key]
return self._save_plugin_data(plugin_id, plugin_data)
return False
def get_all_keys(self, plugin_id: str) -> list:
"""
Get all keys stored for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of keys
"""
plugin_data = self._load_plugin_data(plugin_id)
return list(plugin_data.keys())
def clear_plugin(self, plugin_id: str) -> bool:
"""
Clear all data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
True if successful, False otherwise
"""
file_path = self._get_plugin_file(plugin_id)
# Create backup before clearing
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Clear cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
# Remove file
try:
if file_path.exists():
file_path.unlink()
return True
except IOError as e:
print(f"[DataStore] Error clearing data for {plugin_id}: {e}")
return False
def get_backups(self, plugin_id: str) -> list:
"""
Get list of available backups for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of backup file paths
"""
backup_dir = self._get_backup_dir(plugin_id)
if not backup_dir.exists():
return []
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
return [str(b) for b in backups]
def restore_backup(self, plugin_id: str, backup_path: str) -> bool:
"""
Restore data from a backup.
Args:
plugin_id: Unique identifier for the plugin
backup_path: Path to the backup file
Returns:
True if successful, False otherwise
"""
backup_file = Path(backup_path)
if not backup_file.exists():
print(f"[DataStore] Backup not found: {backup_path}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup of current state before restoring
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Copy backup to main file
shutil.copy2(backup_file, file_path)
# Invalidate cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
return True
except IOError as e:
print(f"[DataStore] Error restoring backup for {plugin_id}: {e}")
return False
# Singleton instance
_data_store = None
_data_store_lock = threading.Lock()
def get_data_store() -> DataStore:
"""Get the global DataStore instance."""
global _data_store
if _data_store is None:
with _data_store_lock:
if _data_store is None:
_data_store = DataStore()
return _data_store