EU-Utility/core/data_store_secure.py

463 lines
15 KiB
Python

"""
EU-Utility - Data Store Service (Security Hardened)
Thread-safe persistent data storage for plugins with path validation.
"""
import json
import shutil
import threading
import platform
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime
from collections import OrderedDict
from core.security_utils import (
PathValidator, InputValidator, DataValidator, SecurityError
)
# Cross-platform file locking
try:
import fcntl # Unix/Linux/Mac
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# Windows fallback using portalocker or threading lock
try:
import portalocker
HAS_PORTALOCKER = True
except ImportError:
HAS_PORTALOCKER = False
class DataStore:
"""
Singleton data persistence service for plugins (Security Hardened).
Features:
- Thread-safe file operations with file locking
- Auto-backup on write (keeps last 5 versions)
- Per-plugin JSON storage
- Auto-create directories
- Path traversal protection
- Input validation
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, data_dir: str = "data/plugins"):
if self._initialized:
return
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Resolve and validate base path
self._base_path = self.data_dir.resolve()
# Memory cache for frequently accessed data
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Backup settings
self.max_backups = 5
self._initialized = True
def _get_plugin_file(self, plugin_id: str) -> Path:
"""
Get the storage file path for a plugin with path validation.
Args:
plugin_id: Unique identifier for the plugin
Returns:
Safe file path
Raises:
SecurityError: If plugin_id is invalid or path traversal detected
"""
# Validate plugin_id
if not isinstance(plugin_id, str):
raise SecurityError("plugin_id must be a string")
if not plugin_id:
raise SecurityError("plugin_id cannot be empty")
# Sanitize plugin_id to create a safe filename
safe_name = PathValidator.sanitize_filename(plugin_id, '_')
# Additional check: ensure no path traversal remains
if '..' in safe_name or '/' in safe_name or '\\' in safe_name:
raise SecurityError(f"Invalid characters in plugin_id: {plugin_id}")
# Construct path
file_path = self.data_dir / f"{safe_name}.json"
# Security check: ensure resolved path is within data_dir
try:
resolved_path = file_path.resolve()
if not str(resolved_path).startswith(str(self._base_path)):
raise SecurityError(f"Path traversal detected: {plugin_id}")
except (OSError, ValueError) as e:
raise SecurityError(f"Invalid path for plugin_id: {plugin_id}") from e
return file_path
def _get_backup_dir(self, plugin_id: str) -> Path:
"""Get the backup directory for a plugin with validation."""
# Reuse validation from _get_plugin_file
safe_name = PathValidator.sanitize_filename(plugin_id, '_')
backup_dir = self.data_dir / ".backups" / safe_name
# Validate backup path
try:
resolved = backup_dir.resolve()
if not str(resolved).startswith(str(self._base_path)):
raise SecurityError(f"Backup path traversal detected: {plugin_id}")
except (OSError, ValueError) as e:
raise SecurityError(f"Invalid backup path: {plugin_id}") from e
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]:
"""Load all data for a plugin from disk with validation."""
# Check cache first
with self._cache_lock:
if plugin_id in self._cache:
return self._cache[plugin_id].copy()
file_path = self._get_plugin_file(plugin_id)
if not file_path.exists():
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=False)
try:
data = json.load(f)
finally:
self._unlock_file(f)
# Validate loaded data
if not isinstance(data, dict):
print(f"[DataStore] Invalid data format for {plugin_id}, resetting")
return {}
# Validate data structure
try:
DataValidator.validate_data_structure(data)
except SecurityError as e:
print(f"[DataStore] Security error in {plugin_id} data: {e}")
return {}
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return data
except (json.JSONDecodeError, IOError) as e:
print(f"[DataStore] Error loading data for {plugin_id}: {e}")
return {}
def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool:
"""Save all data for a plugin to disk with backup."""
# Validate data before saving
try:
DataValidator.validate_data_structure(data)
except SecurityError as e:
print(f"[DataStore] Security error saving {plugin_id}: {e}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup if file exists
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Write to temp file first, then move (atomic operation)
temp_path = file_path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=True)
try:
json.dump(data, f, indent=2, ensure_ascii=False)
f.flush()
import os
os.fsync(f.fileno())
finally:
self._unlock_file(f)
# Atomic move
temp_path.replace(file_path)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return True
except IOError as e:
print(f"[DataStore] Error saving data for {plugin_id}: {e}")
# Clean up temp file if exists
temp_path = file_path.with_suffix('.tmp')
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
return False
def _lock_file(self, f, exclusive: bool = False):
"""Cross-platform file locking."""
if HAS_FCNTL:
# Unix/Linux/Mac
lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
fcntl.flock(f.fileno(), lock_type)
elif HAS_PORTALOCKER:
# Windows with portalocker
import portalocker
lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH
portalocker.lock(f, lock_type)
else:
# Fallback: rely on threading lock (already held)
pass
def _unlock_file(self, f):
"""Cross-platform file unlock."""
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
elif HAS_PORTALOCKER:
import portalocker
portalocker.unlock(f)
else:
# Fallback: nothing to do
pass
def _create_backup(self, plugin_id: str, file_path: Path):
"""Create a backup of the current data file."""
backup_dir = self._get_backup_dir(plugin_id)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{timestamp}.json"
try:
shutil.copy2(file_path, backup_path)
self._cleanup_old_backups(backup_dir)
except IOError as e:
print(f"[DataStore] Error creating backup for {plugin_id}: {e}")
def _cleanup_old_backups(self, backup_dir: Path):
"""Remove old backups, keeping only the last N versions."""
try:
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
while len(backups) > self.max_backups:
old_backup = backups.pop(0)
old_backup.unlink()
except IOError as e:
print(f"[DataStore] Error cleaning up backups: {e}")
def save(self, plugin_id: str, key: str, data: Any) -> bool:
"""
Save data for a plugin with validation.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
# Validate key
if not isinstance(key, str):
print(f"[DataStore] Invalid key type for {plugin_id}")
return False
if not key:
print(f"[DataStore] Empty key not allowed for {plugin_id}")
return False
if not InputValidator.validate_json_key(key):
print(f"[DataStore] Invalid key format for {plugin_id}: {key}")
return False
plugin_data = self._load_plugin_data(plugin_id)
plugin_data[key] = data
return self._save_plugin_data(plugin_id, plugin_data)
def load(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""
Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
# Validate key
if not isinstance(key, str):
return default
plugin_data = self._load_plugin_data(plugin_id)
return plugin_data.get(key, default)
def delete(self, plugin_id: str, key: str) -> bool:
"""
Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
# Validate key
if not isinstance(key, str):
return False
plugin_data = self._load_plugin_data(plugin_id)
if key in plugin_data:
del plugin_data[key]
return self._save_plugin_data(plugin_id, plugin_data)
return False
def get_all_keys(self, plugin_id: str) -> list:
"""
Get all keys stored for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of keys
"""
plugin_data = self._load_plugin_data(plugin_id)
return list(plugin_data.keys())
def clear_plugin(self, plugin_id: str) -> bool:
"""
Clear all data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
True if successful, False otherwise
"""
file_path = self._get_plugin_file(plugin_id)
# Create backup before clearing
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Clear cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
# Remove file
try:
if file_path.exists():
file_path.unlink()
return True
except IOError as e:
print(f"[DataStore] Error clearing data for {plugin_id}: {e}")
return False
def get_backups(self, plugin_id: str) -> list:
"""
Get list of available backups for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of backup file paths
"""
backup_dir = self._get_backup_dir(plugin_id)
if not backup_dir.exists():
return []
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
return [str(b) for b in backups]
def restore_backup(self, plugin_id: str, backup_path: str) -> bool:
"""
Restore data from a backup.
Args:
plugin_id: Unique identifier for the plugin
backup_path: Path to the backup file
Returns:
True if successful, False otherwise
"""
backup_file = Path(backup_path)
if not backup_file.exists():
print(f"[DataStore] Backup not found: {backup_path}")
return False
# Validate backup path is within backups directory
try:
backup_dir = self._get_backup_dir(plugin_id)
resolved_backup = backup_file.resolve()
resolved_backup_dir = backup_dir.resolve()
if not str(resolved_backup).startswith(str(resolved_backup_dir)):
print(f"[DataStore] Invalid backup path: {backup_path}")
return False
except (OSError, ValueError) as e:
print(f"[DataStore] Path validation error: {e}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup of current state before restoring
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Copy backup to main file
shutil.copy2(backup_file, file_path)
# Invalidate cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
return True
except IOError as e:
print(f"[DataStore] Error restoring backup for {plugin_id}: {e}")
return False
# Singleton instance
_data_store = None
_data_store_lock = threading.Lock()
def get_data_store() -> DataStore:
"""Get the global DataStore instance."""
global _data_store
if _data_store is None:
with _data_store_lock:
if _data_store is None:
_data_store = DataStore()
return _data_store