diff --git a/core/data_store_secure.py b/core/data_store_secure.py deleted file mode 100644 index c2e25bd..0000000 --- a/core/data_store_secure.py +++ /dev/null @@ -1,462 +0,0 @@ -""" -EU-Utility - Data Store Service (Security Hardened) - -Thread-safe persistent data storage for plugins with path validation. -""" - -import json -import shutil -import threading -import platform -from pathlib import Path -from typing import Any, Dict, Optional -from datetime import datetime -from collections import OrderedDict - -from core.security_utils import ( - PathValidator, InputValidator, DataValidator, SecurityError -) - -# Cross-platform file locking -try: - import fcntl # Unix/Linux/Mac - HAS_FCNTL = True -except ImportError: - HAS_FCNTL = False - # Windows fallback using portalocker or threading lock - try: - import portalocker - HAS_PORTALOCKER = True - except ImportError: - HAS_PORTALOCKER = False - - -class DataStore: - """ - Singleton data persistence service for plugins (Security Hardened). - - Features: - - Thread-safe file operations with file locking - - Auto-backup on write (keeps last 5 versions) - - Per-plugin JSON storage - - Auto-create directories - - Path traversal protection - - Input validation - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self, data_dir: str = "data/plugins"): - if self._initialized: - return - - self.data_dir = Path(data_dir) - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Resolve and validate base path - self._base_path = self.data_dir.resolve() - - # Memory cache for frequently accessed data - self._cache: Dict[str, Dict[str, Any]] = {} - self._cache_lock = threading.Lock() - - # Backup settings - self.max_backups = 5 - - self._initialized = True - - def _get_plugin_file(self, plugin_id: str) -> Path: - """ - Get the storage file path for a plugin with path validation. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - Safe file path - - Raises: - SecurityError: If plugin_id is invalid or path traversal detected - """ - # Validate plugin_id - if not isinstance(plugin_id, str): - raise SecurityError("plugin_id must be a string") - - if not plugin_id: - raise SecurityError("plugin_id cannot be empty") - - # Sanitize plugin_id to create a safe filename - safe_name = PathValidator.sanitize_filename(plugin_id, '_') - - # Additional check: ensure no path traversal remains - if '..' in safe_name or '/' in safe_name or '\\' in safe_name: - raise SecurityError(f"Invalid characters in plugin_id: {plugin_id}") - - # Construct path - file_path = self.data_dir / f"{safe_name}.json" - - # Security check: ensure resolved path is within data_dir - try: - resolved_path = file_path.resolve() - if not str(resolved_path).startswith(str(self._base_path)): - raise SecurityError(f"Path traversal detected: {plugin_id}") - except (OSError, ValueError) as e: - raise SecurityError(f"Invalid path for plugin_id: {plugin_id}") from e - - return file_path - - def _get_backup_dir(self, plugin_id: str) -> Path: - """Get the backup directory for a plugin with validation.""" - # Reuse validation from _get_plugin_file - safe_name = PathValidator.sanitize_filename(plugin_id, '_') - backup_dir = self.data_dir / ".backups" / safe_name - - # Validate backup path - try: - resolved = backup_dir.resolve() - if not str(resolved).startswith(str(self._base_path)): - raise SecurityError(f"Backup path traversal detected: {plugin_id}") - except (OSError, ValueError) as e: - raise SecurityError(f"Invalid backup path: {plugin_id}") from e - - backup_dir.mkdir(parents=True, exist_ok=True) - return backup_dir - - def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]: - """Load all data for a plugin from disk with validation.""" - # Check cache first - with self._cache_lock: - if plugin_id in self._cache: - return self._cache[plugin_id].copy() - - file_path = self._get_plugin_file(plugin_id) - - if not file_path.exists(): - return {} - - try: - with open(file_path, 'r', encoding='utf-8') as f: - # Cross-platform file locking - self._lock_file(f, exclusive=False) - try: - data = json.load(f) - finally: - self._unlock_file(f) - - # Validate loaded data - if not isinstance(data, dict): - print(f"[DataStore] Invalid data format for {plugin_id}, resetting") - return {} - - # Validate data structure - try: - DataValidator.validate_data_structure(data) - except SecurityError as e: - print(f"[DataStore] Security error in {plugin_id} data: {e}") - return {} - - # Update cache - with self._cache_lock: - self._cache[plugin_id] = data.copy() - return data - except (json.JSONDecodeError, IOError) as e: - print(f"[DataStore] Error loading data for {plugin_id}: {e}") - return {} - - def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool: - """Save all data for a plugin to disk with backup.""" - # Validate data before saving - try: - DataValidator.validate_data_structure(data) - except SecurityError as e: - print(f"[DataStore] Security error saving {plugin_id}: {e}") - return False - - file_path = self._get_plugin_file(plugin_id) - - try: - # Create backup if file exists - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Write to temp file first, then move (atomic operation) - temp_path = file_path.with_suffix('.tmp') - - with open(temp_path, 'w', encoding='utf-8') as f: - # Cross-platform file locking - self._lock_file(f, exclusive=True) - try: - json.dump(data, f, indent=2, ensure_ascii=False) - f.flush() - import os - os.fsync(f.fileno()) - finally: - self._unlock_file(f) - - # Atomic move - temp_path.replace(file_path) - - # Update cache - with self._cache_lock: - self._cache[plugin_id] = data.copy() - - return True - except IOError as e: - print(f"[DataStore] Error saving data for {plugin_id}: {e}") - # Clean up temp file if exists - temp_path = file_path.with_suffix('.tmp') - if temp_path.exists(): - try: - temp_path.unlink() - except: - pass - return False - - def _lock_file(self, f, exclusive: bool = False): - """Cross-platform file locking.""" - if HAS_FCNTL: - # Unix/Linux/Mac - lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH - fcntl.flock(f.fileno(), lock_type) - elif HAS_PORTALOCKER: - # Windows with portalocker - import portalocker - lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH - portalocker.lock(f, lock_type) - else: - # Fallback: rely on threading lock (already held) - pass - - def _unlock_file(self, f): - """Cross-platform file unlock.""" - if HAS_FCNTL: - fcntl.flock(f.fileno(), fcntl.LOCK_UN) - elif HAS_PORTALOCKER: - import portalocker - portalocker.unlock(f) - else: - # Fallback: nothing to do - pass - - def _create_backup(self, plugin_id: str, file_path: Path): - """Create a backup of the current data file.""" - backup_dir = self._get_backup_dir(plugin_id) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = backup_dir / f"{timestamp}.json" - - try: - shutil.copy2(file_path, backup_path) - self._cleanup_old_backups(backup_dir) - except IOError as e: - print(f"[DataStore] Error creating backup for {plugin_id}: {e}") - - def _cleanup_old_backups(self, backup_dir: Path): - """Remove old backups, keeping only the last N versions.""" - try: - backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime) - while len(backups) > self.max_backups: - old_backup = backups.pop(0) - old_backup.unlink() - except IOError as e: - print(f"[DataStore] Error cleaning up backups: {e}") - - def save(self, plugin_id: str, key: str, data: Any) -> bool: - """ - Save data for a plugin with validation. - - Args: - plugin_id: Unique identifier for the plugin - key: Key under which to store the data - data: Data to store (must be JSON serializable) - - Returns: - True if successful, False otherwise - """ - # Validate key - if not isinstance(key, str): - print(f"[DataStore] Invalid key type for {plugin_id}") - return False - - if not key: - print(f"[DataStore] Empty key not allowed for {plugin_id}") - return False - - if not InputValidator.validate_json_key(key): - print(f"[DataStore] Invalid key format for {plugin_id}: {key}") - return False - - plugin_data = self._load_plugin_data(plugin_id) - plugin_data[key] = data - return self._save_plugin_data(plugin_id, plugin_data) - - def load(self, plugin_id: str, key: str, default: Any = None) -> Any: - """ - Load data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - key: Key of the data to load - default: Default value if key not found - - Returns: - The stored data or default value - """ - # Validate key - if not isinstance(key, str): - return default - - plugin_data = self._load_plugin_data(plugin_id) - return plugin_data.get(key, default) - - def delete(self, plugin_id: str, key: str) -> bool: - """ - Delete data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - key: Key of the data to delete - - Returns: - True if key existed and was deleted, False otherwise - """ - # Validate key - if not isinstance(key, str): - return False - - plugin_data = self._load_plugin_data(plugin_id) - if key in plugin_data: - del plugin_data[key] - return self._save_plugin_data(plugin_id, plugin_data) - return False - - def get_all_keys(self, plugin_id: str) -> list: - """ - Get all keys stored for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - List of keys - """ - plugin_data = self._load_plugin_data(plugin_id) - return list(plugin_data.keys()) - - def clear_plugin(self, plugin_id: str) -> bool: - """ - Clear all data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - True if successful, False otherwise - """ - file_path = self._get_plugin_file(plugin_id) - - # Create backup before clearing - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Clear cache - with self._cache_lock: - if plugin_id in self._cache: - del self._cache[plugin_id] - - # Remove file - try: - if file_path.exists(): - file_path.unlink() - return True - except IOError as e: - print(f"[DataStore] Error clearing data for {plugin_id}: {e}") - return False - - def get_backups(self, plugin_id: str) -> list: - """ - Get list of available backups for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - List of backup file paths - """ - backup_dir = self._get_backup_dir(plugin_id) - if not backup_dir.exists(): - return [] - - backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) - return [str(b) for b in backups] - - def restore_backup(self, plugin_id: str, backup_path: str) -> bool: - """ - Restore data from a backup. - - Args: - plugin_id: Unique identifier for the plugin - backup_path: Path to the backup file - - Returns: - True if successful, False otherwise - """ - backup_file = Path(backup_path) - if not backup_file.exists(): - print(f"[DataStore] Backup not found: {backup_path}") - return False - - # Validate backup path is within backups directory - try: - backup_dir = self._get_backup_dir(plugin_id) - resolved_backup = backup_file.resolve() - resolved_backup_dir = backup_dir.resolve() - if not str(resolved_backup).startswith(str(resolved_backup_dir)): - print(f"[DataStore] Invalid backup path: {backup_path}") - return False - except (OSError, ValueError) as e: - print(f"[DataStore] Path validation error: {e}") - return False - - file_path = self._get_plugin_file(plugin_id) - - try: - # Create backup of current state before restoring - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Copy backup to main file - shutil.copy2(backup_file, file_path) - - # Invalidate cache - with self._cache_lock: - if plugin_id in self._cache: - del self._cache[plugin_id] - - return True - except IOError as e: - print(f"[DataStore] Error restoring backup for {plugin_id}: {e}") - return False - - -# Singleton instance -_data_store = None -_data_store_lock = threading.Lock() - - -def get_data_store() -> DataStore: - """Get the global DataStore instance.""" - global _data_store - if _data_store is None: - with _data_store_lock: - if _data_store is None: - _data_store = DataStore() - return _data_store diff --git a/core/data_store_vulnerable.py b/core/data_store_vulnerable.py deleted file mode 100644 index 1c8486e..0000000 --- a/core/data_store_vulnerable.py +++ /dev/null @@ -1,355 +0,0 @@ -""" -EU-Utility - Data Store Service - -Thread-safe persistent data storage for plugins. -Provides file locking, auto-backup, and singleton access. -""" - -import json -import shutil -import threading -import platform -from pathlib import Path -from typing import Any, Dict, Optional -from datetime import datetime -from collections import OrderedDict - -# Cross-platform file locking -try: - import fcntl # Unix/Linux/Mac - HAS_FCNTL = True -except ImportError: - HAS_FCNTL = False - # Windows fallback using portalocker or threading lock - try: - import portalocker - HAS_PORTALOCKER = True - except ImportError: - HAS_PORTALOCKER = False - - -class DataStore: - """ - Singleton data persistence service for plugins. - - Features: - - Thread-safe file operations with file locking - - Auto-backup on write (keeps last 5 versions) - - Per-plugin JSON storage - - Auto-create directories - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self, data_dir: str = "data/plugins"): - if self._initialized: - return - - self.data_dir = Path(data_dir) - self.data_dir.mkdir(parents=True, exist_ok=True) - - # Memory cache for frequently accessed data - self._cache: Dict[str, Dict[str, Any]] = {} - self._cache_lock = threading.Lock() - - # Backup settings - self.max_backups = 5 - - self._initialized = True - - def _get_plugin_file(self, plugin_id: str) -> Path: - """Get the storage file path for a plugin.""" - # Sanitize plugin_id to create a safe filename - safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") - return self.data_dir / f"{safe_name}.json" - - def _get_backup_dir(self, plugin_id: str) -> Path: - """Get the backup directory for a plugin.""" - safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") - backup_dir = self.data_dir / ".backups" / safe_name - backup_dir.mkdir(parents=True, exist_ok=True) - return backup_dir - - def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]: - """Load all data for a plugin from disk.""" - # Check cache first - with self._cache_lock: - if plugin_id in self._cache: - return self._cache[plugin_id].copy() - - file_path = self._get_plugin_file(plugin_id) - - if not file_path.exists(): - return {} - - try: - with open(file_path, 'r', encoding='utf-8') as f: - # Cross-platform file locking - self._lock_file(f, exclusive=False) - try: - data = json.load(f) - finally: - self._unlock_file(f) - - # Update cache - with self._cache_lock: - self._cache[plugin_id] = data.copy() - return data - except (json.JSONDecodeError, IOError) as e: - print(f"[DataStore] Error loading data for {plugin_id}: {e}") - return {} - - def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool: - """Save all data for a plugin to disk with backup.""" - file_path = self._get_plugin_file(plugin_id) - - try: - # Create backup if file exists - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Write to temp file first, then move (atomic operation) - temp_path = file_path.with_suffix('.tmp') - - with open(temp_path, 'w', encoding='utf-8') as f: - # Cross-platform file locking - self._lock_file(f, exclusive=True) - try: - json.dump(data, f, indent=2, ensure_ascii=False) - f.flush() - import os - os.fsync(f.fileno()) - finally: - self._unlock_file(f) - - # Atomic move - temp_path.replace(file_path) - - # Update cache - with self._cache_lock: - self._cache[plugin_id] = data.copy() - - return True - except IOError as e: - print(f"[DataStore] Error saving data for {plugin_id}: {e}") - # Clean up temp file if exists - temp_path = file_path.with_suffix('.tmp') - if temp_path.exists(): - temp_path.unlink() - return False - - def _lock_file(self, f, exclusive: bool = False): - """Cross-platform file locking.""" - if HAS_FCNTL: - # Unix/Linux/Mac - lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH - fcntl.flock(f.fileno(), lock_type) - elif HAS_PORTALOCKER: - # Windows with portalocker - import portalocker - lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH - portalocker.lock(f, lock_type) - else: - # Fallback: rely on threading lock (already held) - pass - - def _unlock_file(self, f): - """Cross-platform file unlock.""" - if HAS_FCNTL: - fcntl.flock(f.fileno(), fcntl.LOCK_UN) - elif HAS_PORTALOCKER: - import portalocker - portalocker.unlock(f) - else: - # Fallback: nothing to do - pass - - def _create_backup(self, plugin_id: str, file_path: Path): - """Create a backup of the current data file.""" - backup_dir = self._get_backup_dir(plugin_id) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = backup_dir / f"{timestamp}.json" - - try: - shutil.copy2(file_path, backup_path) - self._cleanup_old_backups(backup_dir) - except IOError as e: - print(f"[DataStore] Error creating backup for {plugin_id}: {e}") - - def _cleanup_old_backups(self, backup_dir: Path): - """Remove old backups, keeping only the last N versions.""" - try: - backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime) - while len(backups) > self.max_backups: - old_backup = backups.pop(0) - old_backup.unlink() - except IOError as e: - print(f"[DataStore] Error cleaning up backups: {e}") - - def save(self, plugin_id: str, key: str, data: Any) -> bool: - """ - Save data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - key: Key under which to store the data - data: Data to store (must be JSON serializable) - - Returns: - True if successful, False otherwise - """ - plugin_data = self._load_plugin_data(plugin_id) - plugin_data[key] = data - return self._save_plugin_data(plugin_id, plugin_data) - - def load(self, plugin_id: str, key: str, default: Any = None) -> Any: - """ - Load data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - key: Key of the data to load - default: Default value if key not found - - Returns: - The stored data or default value - """ - plugin_data = self._load_plugin_data(plugin_id) - return plugin_data.get(key, default) - - def delete(self, plugin_id: str, key: str) -> bool: - """ - Delete data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - key: Key of the data to delete - - Returns: - True if key existed and was deleted, False otherwise - """ - plugin_data = self._load_plugin_data(plugin_id) - if key in plugin_data: - del plugin_data[key] - return self._save_plugin_data(plugin_id, plugin_data) - return False - - def get_all_keys(self, plugin_id: str) -> list: - """ - Get all keys stored for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - List of keys - """ - plugin_data = self._load_plugin_data(plugin_id) - return list(plugin_data.keys()) - - def clear_plugin(self, plugin_id: str) -> bool: - """ - Clear all data for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - True if successful, False otherwise - """ - file_path = self._get_plugin_file(plugin_id) - - # Create backup before clearing - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Clear cache - with self._cache_lock: - if plugin_id in self._cache: - del self._cache[plugin_id] - - # Remove file - try: - if file_path.exists(): - file_path.unlink() - return True - except IOError as e: - print(f"[DataStore] Error clearing data for {plugin_id}: {e}") - return False - - def get_backups(self, plugin_id: str) -> list: - """ - Get list of available backups for a plugin. - - Args: - plugin_id: Unique identifier for the plugin - - Returns: - List of backup file paths - """ - backup_dir = self._get_backup_dir(plugin_id) - if not backup_dir.exists(): - return [] - - backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) - return [str(b) for b in backups] - - def restore_backup(self, plugin_id: str, backup_path: str) -> bool: - """ - Restore data from a backup. - - Args: - plugin_id: Unique identifier for the plugin - backup_path: Path to the backup file - - Returns: - True if successful, False otherwise - """ - backup_file = Path(backup_path) - if not backup_file.exists(): - print(f"[DataStore] Backup not found: {backup_path}") - return False - - file_path = self._get_plugin_file(plugin_id) - - try: - # Create backup of current state before restoring - if file_path.exists(): - self._create_backup(plugin_id, file_path) - - # Copy backup to main file - shutil.copy2(backup_file, file_path) - - # Invalidate cache - with self._cache_lock: - if plugin_id in self._cache: - del self._cache[plugin_id] - - return True - except IOError as e: - print(f"[DataStore] Error restoring backup for {plugin_id}: {e}") - return False - - -# Singleton instance -_data_store = None -_data_store_lock = threading.Lock() - - -def get_data_store() -> DataStore: - """Get the global DataStore instance.""" - global _data_store - if _data_store is None: - with _data_store_lock: - if _data_store is None: - _data_store = DataStore() - return _data_store diff --git a/core/screenshot_secure.py b/core/screenshot_secure.py deleted file mode 100644 index 6dec639..0000000 --- a/core/screenshot_secure.py +++ /dev/null @@ -1,547 +0,0 @@ -""" -EU-Utility - Screenshot Service Core Module (Security Hardened) - -Fast, reliable screen capture functionality with path validation. -""" - -import io -import os -import time -import platform -import threading -from collections import deque -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Tuple, Any, Union - -from PIL import Image - -from core.security_utils import PathValidator, InputValidator, SecurityError - - -class ScreenshotService: - """ - Core screenshot service with cross-platform support (Security Hardened). - - Features: - - Singleton pattern for single instance across app - - Fast screen capture using PIL.ImageGrab (Windows) or pyautogui (cross-platform) - - Configurable auto-save with timestamps - - Screenshot history (last 20 in memory) - - PNG by default, JPG quality settings - - Thread-safe operations - - Path traversal protection - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - self._initialized = True - self._lock = threading.Lock() - - # Configuration - self._auto_save: bool = True - self._format: str = "PNG" - self._quality: int = 95 # For JPEG - self._history_size: int = 20 - - # Screenshot history (thread-safe deque) - self._history: deque = deque(maxlen=self._history_size) - self._last_screenshot: Optional[Image.Image] = None - - # Platform detection - MUST be before _get_default_save_path() - self._platform = platform.system().lower() - self._use_pil = self._platform == "windows" - - # Set save path AFTER platform detection - self._save_path: Path = self._get_default_save_path() - - # Lazy init for capture backends - self._pil_available: Optional[bool] = None - self._pyautogui_available: Optional[bool] = None - - # Resolve base path for validation - self._base_save_path = self._save_path.resolve() - - # Ensure save directory exists - self._ensure_save_directory() - - print(f"[Screenshot] Service initialized (auto_save={self._auto_save}, format={self._format})") - - def _get_default_save_path(self) -> Path: - """Get default save path for screenshots.""" - # Use Documents/Entropia Universe/Screenshots/ as default - if self._platform == "windows": - documents = Path.home() / "Documents" - else: - documents = Path.home() / "Documents" - - return documents / "Entropia Universe" / "Screenshots" - - def _ensure_save_directory(self) -> None: - """Ensure the save directory exists.""" - try: - self._save_path.mkdir(parents=True, exist_ok=True) - except Exception as e: - print(f"[Screenshot] Warning: Could not create save directory: {e}") - - def _check_pil_grab(self) -> bool: - """Check if PIL.ImageGrab is available.""" - if self._pil_available is not None: - return self._pil_available - - try: - from PIL import ImageGrab - self._pil_available = True - return True - except ImportError: - self._pil_available = False - return False - - def _check_pyautogui(self) -> bool: - """Check if pyautogui is available.""" - if self._pyautogui_available is not None: - return self._pyautogui_available - - try: - import pyautogui - self._pyautogui_available = True - return True - except ImportError: - self._pyautogui_available = False - return False - - def capture(self, full_screen: bool = True) -> Image.Image: - """ - Capture screenshot. - - Args: - full_screen: If True, capture entire screen. If False, use default region. - - Returns: - PIL Image object - - Raises: - RuntimeError: If no capture backend is available - """ - with self._lock: - screenshot = self._do_capture(full_screen=full_screen) - - # Store in history - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': None if full_screen else 'custom' - }) - - # Auto-save if enabled - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - def capture_region(self, x: int, y: int, width: int, height: int) -> Image.Image: - """ - Capture specific screen region. - - Args: - x: Left coordinate - y: Top coordinate - width: Region width - height: Region height - - Returns: - PIL Image object - - Raises: - SecurityError: If region parameters are invalid - """ - # Validate region parameters - from core.security_utils import InputValidator - InputValidator.validate_region_coordinates(x, y, width, height) - - with self._lock: - screenshot = self._do_capture(region=(x, y, x + width, y + height)) - - # Store in history - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': (x, y, width, height) - }) - - # Auto-save if enabled - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - def capture_window(self, window_handle: int) -> Optional[Image.Image]: - """ - Capture specific window by handle (Windows only). - - Args: - window_handle: Window handle (HWND on Windows) - - Returns: - PIL Image object or None if capture failed - """ - if self._platform != "windows": - print("[Screenshot] capture_window is Windows-only") - return None - - # Validate window handle - if not isinstance(window_handle, int) or window_handle <= 0: - print("[Screenshot] Invalid window handle") - return None - - try: - import win32gui - import win32ui - import win32con - from ctypes import windll - - # Get window dimensions - left, top, right, bottom = win32gui.GetWindowRect(window_handle) - width = right - left - height = bottom - top - - # Sanity check dimensions - if width <= 0 or height <= 0 or width > 7680 or height > 4320: - print("[Screenshot] Invalid window dimensions") - return None - - # Create device context - hwndDC = win32gui.GetWindowDC(window_handle) - mfcDC = win32ui.CreateDCFromHandle(hwndDC) - saveDC = mfcDC.CreateCompatibleDC() - - # Create bitmap - saveBitMap = win32ui.CreateBitmap() - saveBitMap.CreateCompatibleBitmap(mfcDC, width, height) - saveDC.SelectObject(saveBitMap) - - # Copy screen into bitmap - result = windll.user32.PrintWindow(window_handle, saveDC.GetSafeHdc(), 3) - - # Convert to PIL Image - bmpinfo = saveBitMap.GetInfo() - bmpstr = saveBitMap.GetBitmapBits(True) - screenshot = Image.frombuffer( - 'RGB', - (bmpinfo['bmWidth'], bmpinfo['bmHeight']), - bmpstr, 'raw', 'BGRX', 0, 1 - ) - - # Cleanup - win32gui.DeleteObject(saveBitMap.GetHandle()) - saveDC.DeleteDC() - mfcDC.DeleteDC() - win32gui.ReleaseDC(window_handle, hwndDC) - - if result != 1: - return None - - with self._lock: - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': 'window', - 'window_handle': window_handle - }) - - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - except Exception as e: - print(f"[Screenshot] Window capture failed: {e}") - return None - - def _do_capture(self, full_screen: bool = True, region: Optional[Tuple[int, int, int, int]] = None) -> Image.Image: - """Internal capture method.""" - # Try PIL.ImageGrab first (Windows, faster) - if self._use_pil and self._check_pil_grab(): - from PIL import ImageGrab - if region: - return ImageGrab.grab(bbox=region) - else: - return ImageGrab.grab() - - # Fall back to pyautogui (cross-platform) - if self._check_pyautogui(): - import pyautogui - if region: - x1, y1, x2, y2 = region - return pyautogui.screenshot(region=(x1, y1, x2 - x1, y2 - y1)) - else: - return pyautogui.screenshot() - - raise RuntimeError( - "No screenshot backend available. " - "Install pillow (Windows) or pyautogui (cross-platform)." - ) - - def _auto_save_screenshot(self, image: Image.Image) -> Optional[Path]: - """Automatically save screenshot with timestamp.""" - try: - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3] - filename = f"screenshot_{timestamp}.{self._format.lower()}" - return self.save_screenshot(image, filename) - except Exception as e: - print(f"[Screenshot] Auto-save failed: {e}") - return None - - def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path: - """ - Save screenshot to file with path validation. - - Args: - image: PIL Image to save - filename: Optional filename (auto-generated if None) - - Returns: - Path to saved file - - Raises: - SecurityError: If filename is invalid - """ - if filename is None: - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3] - filename = f"screenshot_{timestamp}.{self._format.lower()}" - - # Sanitize filename - safe_filename = PathValidator.sanitize_filename(filename, '_') - - # Ensure correct extension - if not safe_filename.lower().endswith(('.png', '.jpg', '.jpeg')): - safe_filename += f".{self._format.lower()}" - - filepath = self._save_path / safe_filename - - # Security check: ensure resolved path is within save_path - try: - resolved_path = filepath.resolve() - if not str(resolved_path).startswith(str(self._base_save_path)): - raise SecurityError("Path traversal detected in filename") - except (OSError, ValueError) as e: - print(f"[Screenshot] Security error: {e}") - # Fallback to safe default - safe_filename = f"screenshot_{int(time.time())}.{self._format.lower()}" - filepath = self._save_path / safe_filename - - # Save with appropriate settings - if safe_filename.lower().endswith('.jpg') or safe_filename.lower().endswith('.jpeg'): - image = image.convert('RGB') # JPEG doesn't support alpha - image.save(filepath, 'JPEG', quality=self._quality, optimize=True) - else: - image.save(filepath, 'PNG', optimize=True) - - return filepath - - def get_last_screenshot(self) -> Optional[Image.Image]: - """ - Get the most recent screenshot. - - Returns: - PIL Image or None if no screenshots taken yet - """ - with self._lock: - return self._last_screenshot.copy() if self._last_screenshot else None - - def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: - """ - Get screenshot history. - - Args: - limit: Maximum number of entries (default: all) - - Returns: - List of dicts with 'timestamp', 'region', 'image' keys - """ - with self._lock: - history = list(self._history) - if limit: - history = history[-limit:] - return [ - { - 'timestamp': entry['timestamp'], - 'region': entry['region'], - 'image': entry['image'].copy() - } - for entry in history - ] - - def clear_history(self) -> None: - """Clear screenshot history from memory.""" - with self._lock: - self._history.clear() - self._last_screenshot = None - - # ========== Configuration ========== - - @property - def auto_save(self) -> bool: - """Get auto-save setting.""" - return self._auto_save - - @auto_save.setter - def auto_save(self, value: bool) -> None: - """Set auto-save setting.""" - self._auto_save = bool(value) - - @property - def save_path(self) -> Path: - """Get current save path.""" - return self._save_path - - @save_path.setter - def save_path(self, path: Union[str, Path]) -> None: - """Set save path.""" - self._save_path = Path(path) - self._base_save_path = self._save_path.resolve() - self._ensure_save_directory() - - @property - def format(self) -> str: - """Get image format (PNG or JPEG).""" - return self._format - - @format.setter - def format(self, fmt: str) -> None: - """Set image format.""" - fmt = fmt.upper() - if fmt in ('PNG', 'JPG', 'JPEG'): - self._format = 'PNG' if fmt == 'PNG' else 'JPEG' - else: - raise ValueError(f"Unsupported format: {fmt}") - - @property - def quality(self) -> int: - """Get JPEG quality (1-100).""" - return self._quality - - @quality.setter - def quality(self, value: int) -> None: - """Set JPEG quality (1-100).""" - self._quality = max(1, min(100, int(value))) - - def configure(self, - auto_save: Optional[bool] = None, - save_path: Optional[Union[str, Path]] = None, - format: Optional[str] = None, - quality: Optional[int] = None) -> Dict[str, Any]: - """ - Configure screenshot service settings. - - Args: - auto_save: Enable/disable auto-save - save_path: Directory to save screenshots - format: Image format (PNG or JPEG) - quality: JPEG quality (1-100) - - Returns: - Current configuration as dict - """ - if auto_save is not None: - self.auto_save = auto_save - if save_path is not None: - self.save_path = save_path - if format is not None: - self.format = format - if quality is not None: - self.quality = quality - - return self.get_config() - - def get_config(self) -> Dict[str, Any]: - """Get current configuration.""" - return { - 'auto_save': self._auto_save, - 'save_path': str(self._save_path), - 'format': self._format, - 'quality': self._quality, - 'history_size': self._history_size, - 'platform': self._platform, - 'backend': 'PIL' if self._use_pil else 'pyautogui' - } - - # ========== Utility Methods ========== - - def image_to_bytes(self, image: Image.Image, format: Optional[str] = None) -> bytes: - """ - Convert PIL Image to bytes. - - Args: - image: PIL Image - format: Output format (default: current format setting) - - Returns: - Image as bytes - """ - fmt = (format or self._format).upper() - buffer = io.BytesIO() - - if fmt == 'JPEG': - image = image.convert('RGB') - image.save(buffer, 'JPEG', quality=self._quality) - else: - image.save(buffer, 'PNG') - - return buffer.getvalue() - - def get_available_backends(self) -> List[str]: - """Get list of available capture backends.""" - backends = [] - if self._check_pil_grab(): - backends.append('PIL.ImageGrab') - if self._check_pyautogui(): - backends.append('pyautogui') - return backends - - def is_available(self) -> bool: - """Check if screenshot service is available (has working backend).""" - return self._check_pil_grab() or self._check_pyautogui() - - -# Singleton instance -_screenshot_service = None - -def get_screenshot_service() -> ScreenshotService: - """Get the global ScreenshotService instance.""" - global _screenshot_service - if _screenshot_service is None: - _screenshot_service = ScreenshotService() - return _screenshot_service - - -# Convenience functions for quick screenshots -def quick_capture() -> Image.Image: - """Quick full-screen capture.""" - return get_screenshot_service().capture(full_screen=True) - -def quick_capture_region(x: int, y: int, width: int, height: int) -> Image.Image: - """Quick region capture.""" - return get_screenshot_service().capture_region(x, y, width, height) - -def quick_save(filename: Optional[str] = None) -> Path: - """Quick capture and save.""" - service = get_screenshot_service() - image = service.capture() - return service.save_screenshot(image, filename) diff --git a/core/screenshot_vulnerable.py b/core/screenshot_vulnerable.py deleted file mode 100644 index 1eff642..0000000 --- a/core/screenshot_vulnerable.py +++ /dev/null @@ -1,511 +0,0 @@ -""" -EU-Utility - Screenshot Service Core Module - -Fast, reliable screen capture functionality for all plugins. -Part of core - not a plugin. Plugins access via PluginAPI. -""" - -import io -import os -import platform -import threading -from collections import deque -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Tuple, Any, Union - -try: - from PIL import Image - PIL_AVAILABLE = True -except ImportError: - PIL_AVAILABLE = False - Image = None - - -class ScreenshotService: - """ - Core screenshot service with cross-platform support. - - Features: - - Singleton pattern for single instance across app - - Fast screen capture using PIL.ImageGrab (Windows) or pyautogui (cross-platform) - - Configurable auto-save with timestamps - - Screenshot history (last 20 in memory) - - PNG by default, JPG quality settings - - Thread-safe operations - """ - - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - self._initialized = True - self._lock = threading.Lock() - - # Configuration - self._auto_save: bool = True - self._format: str = "PNG" - self._quality: int = 95 # For JPEG - self._history_size: int = 20 - - # Screenshot history (thread-safe deque) - self._history: deque = deque(maxlen=self._history_size) - self._last_screenshot = None - - # Platform detection - MUST be before _get_default_save_path() - self._platform = platform.system().lower() - self._use_pil = self._platform == "windows" - - # Set save path AFTER platform detection - self._save_path: Path = self._get_default_save_path() - - # Lazy init for capture backends - self._pil_available: Optional[bool] = None - self._pyautogui_available: Optional[bool] = None - - # Ensure save directory exists - self._ensure_save_directory() - - print(f"[Screenshot] Service initialized (auto_save={self._auto_save}, format={self._format})") - - def _get_default_save_path(self) -> Path: - """Get default save path for screenshots.""" - # Use Documents/Entropia Universe/Screenshots/ as default - if self._platform == "windows": - documents = Path.home() / "Documents" - else: - documents = Path.home() / "Documents" - - return documents / "Entropia Universe" / "Screenshots" - - def _ensure_save_directory(self) -> None: - """Ensure the save directory exists.""" - try: - self._save_path.mkdir(parents=True, exist_ok=True) - except Exception as e: - print(f"[Screenshot] Warning: Could not create save directory: {e}") - - def _check_pil_grab(self) -> bool: - """Check if PIL.ImageGrab is available.""" - if self._pil_available is not None: - return self._pil_available - - try: - from PIL import ImageGrab - self._pil_available = True - return True - except ImportError: - self._pil_available = False - return False - - def _check_pyautogui(self) -> bool: - """Check if pyautogui is available.""" - if self._pyautogui_available is not None: - return self._pyautogui_available - - try: - import pyautogui - self._pyautogui_available = True - return True - except ImportError: - self._pyautogui_available = False - return False - - def capture(self, full_screen: bool = True): - """ - Capture screenshot. - - Args: - full_screen: If True, capture entire screen. If False, use default region. - - Returns: - PIL Image object - - Raises: - RuntimeError: If no capture backend is available - """ - with self._lock: - screenshot = self._do_capture(full_screen=full_screen) - - # Store in history - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': None if full_screen else 'custom' - }) - - # Auto-save if enabled - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - def capture_region(self, x: int, y: int, width: int, height: int): - """ - Capture specific screen region. - - Args: - x: Left coordinate - y: Top coordinate - width: Region width - height: Region height - - Returns: - PIL Image object - """ - with self._lock: - screenshot = self._do_capture(region=(x, y, x + width, y + height)) - - # Store in history - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': (x, y, width, height) - }) - - # Auto-save if enabled - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - def capture_window(self, window_handle: int) -> Optional[Any]: - """ - Capture specific window by handle (Windows only). - - Args: - window_handle: Window handle (HWND on Windows) - - Returns: - PIL Image object or None if capture failed - """ - if self._platform != "windows": - print("[Screenshot] capture_window is Windows-only") - return None - - try: - import win32gui - import win32ui - import win32con - from ctypes import windll - - # Get window dimensions - left, top, right, bottom = win32gui.GetWindowRect(window_handle) - width = right - left - height = bottom - top - - # Create device context - hwndDC = win32gui.GetWindowDC(window_handle) - mfcDC = win32ui.CreateDCFromHandle(hwndDC) - saveDC = mfcDC.CreateCompatibleDC() - - # Create bitmap - saveBitMap = win32ui.CreateBitmap() - saveBitMap.CreateCompatibleBitmap(mfcDC, width, height) - saveDC.SelectObject(saveBitMap) - - # Copy screen into bitmap - result = windll.user32.PrintWindow(window_handle, saveDC.GetSafeHdc(), 3) - - # Convert to PIL Image - bmpinfo = saveBitMap.GetInfo() - bmpstr = saveBitMap.GetBitmapBits(True) - screenshot = Image.frombuffer( - 'RGB', - (bmpinfo['bmWidth'], bmpinfo['bmHeight']), - bmpstr, 'raw', 'BGRX', 0, 1 - ) - - # Cleanup - win32gui.DeleteObject(saveBitMap.GetHandle()) - saveDC.DeleteDC() - mfcDC.DeleteDC() - win32gui.ReleaseDC(window_handle, hwndDC) - - if result != 1: - return None - - with self._lock: - self._last_screenshot = screenshot.copy() - self._history.append({ - 'image': screenshot.copy(), - 'timestamp': datetime.now(), - 'region': 'window', - 'window_handle': window_handle - }) - - if self._auto_save: - self._auto_save_screenshot(screenshot) - - return screenshot - - except Exception as e: - print(f"[Screenshot] Window capture failed: {e}") - return None - - def _do_capture(self, full_screen: bool = True, region: Optional[Tuple[int, int, int, int]] = None) -> Image.Image: - """Internal capture method.""" - # Try PIL.ImageGrab first (Windows, faster) - if self._use_pil and self._check_pil_grab(): - from PIL import ImageGrab - if region: - return ImageGrab.grab(bbox=region) - else: - return ImageGrab.grab() - - # Fall back to pyautogui (cross-platform) - if self._check_pyautogui(): - import pyautogui - if region: - x1, y1, x2, y2 = region - return pyautogui.screenshot(region=(x1, y1, x2 - x1, y2 - y1)) - else: - return pyautogui.screenshot() - - raise RuntimeError( - "No screenshot backend available. " - "Install pillow (Windows) or pyautogui (cross-platform)." - ) - - def _auto_save_screenshot(self, image: Image.Image) -> Optional[Path]: - """Automatically save screenshot with timestamp.""" - try: - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3] - filename = f"screenshot_{timestamp}.{self._format.lower()}" - return self.save_screenshot(image, filename) - except Exception as e: - print(f"[Screenshot] Auto-save failed: {e}") - return None - - def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path: - """ - Save screenshot to file. - - Args: - image: PIL Image to save - filename: Optional filename (auto-generated if None) - - Returns: - Path to saved file - """ - if filename is None: - timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3] - filename = f"screenshot_{timestamp}.{self._format.lower()}" - - # Ensure correct extension - if not filename.lower().endswith(('.png', '.jpg', '.jpeg')): - filename += f".{self._format.lower()}" - - filepath = self._save_path / filename - - # Save with appropriate settings - if filename.lower().endswith('.jpg') or filename.lower().endswith('.jpeg'): - image = image.convert('RGB') # JPEG doesn't support alpha - image.save(filepath, 'JPEG', quality=self._quality, optimize=True) - else: - image.save(filepath, 'PNG', optimize=True) - - return filepath - - def get_last_screenshot(self) -> Optional[Image.Image]: - """ - Get the most recent screenshot. - - Returns: - PIL Image or None if no screenshots taken yet - """ - with self._lock: - return self._last_screenshot.copy() if self._last_screenshot else None - - def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: - """ - Get screenshot history. - - Args: - limit: Maximum number of entries (default: all) - - Returns: - List of dicts with 'timestamp', 'region', 'image' keys - """ - with self._lock: - history = list(self._history) - if limit: - history = history[-limit:] - return [ - { - 'timestamp': entry['timestamp'], - 'region': entry['region'], - 'image': entry['image'].copy() - } - for entry in history - ] - - def clear_history(self) -> None: - """Clear screenshot history from memory.""" - with self._lock: - self._history.clear() - self._last_screenshot = None - - # ========== Configuration ========== - - @property - def auto_save(self) -> bool: - """Get auto-save setting.""" - return self._auto_save - - @auto_save.setter - def auto_save(self, value: bool) -> None: - """Set auto-save setting.""" - self._auto_save = bool(value) - - @property - def save_path(self) -> Path: - """Get current save path.""" - return self._save_path - - @save_path.setter - def save_path(self, path: Union[str, Path]) -> None: - """Set save path.""" - self._save_path = Path(path) - self._ensure_save_directory() - - @property - def format(self) -> str: - """Get image format (PNG or JPEG).""" - return self._format - - @format.setter - def format(self, fmt: str) -> None: - """Set image format.""" - fmt = fmt.upper() - if fmt in ('PNG', 'JPG', 'JPEG'): - self._format = 'PNG' if fmt == 'PNG' else 'JPEG' - else: - raise ValueError(f"Unsupported format: {fmt}") - - @property - def quality(self) -> int: - """Get JPEG quality (1-100).""" - return self._quality - - @quality.setter - def quality(self, value: int) -> None: - """Set JPEG quality (1-100).""" - self._quality = max(1, min(100, int(value))) - - def configure(self, - auto_save: Optional[bool] = None, - save_path: Optional[Union[str, Path]] = None, - format: Optional[str] = None, - quality: Optional[int] = None) -> Dict[str, Any]: - """ - Configure screenshot service settings. - - Args: - auto_save: Enable/disable auto-save - save_path: Directory to save screenshots - format: Image format (PNG or JPEG) - quality: JPEG quality (1-100) - - Returns: - Current configuration as dict - """ - if auto_save is not None: - self.auto_save = auto_save - if save_path is not None: - self.save_path = save_path - if format is not None: - self.format = format - if quality is not None: - self.quality = quality - - return self.get_config() - - def get_config(self) -> Dict[str, Any]: - """Get current configuration.""" - return { - 'auto_save': self._auto_save, - 'save_path': str(self._save_path), - 'format': self._format, - 'quality': self._quality, - 'history_size': self._history_size, - 'platform': self._platform, - 'backend': 'PIL' if self._use_pil else 'pyautogui' - } - - # ========== Utility Methods ========== - - def image_to_bytes(self, image: Image.Image, format: Optional[str] = None) -> bytes: - """ - Convert PIL Image to bytes. - - Args: - image: PIL Image - format: Output format (default: current format setting) - - Returns: - Image as bytes - """ - fmt = (format or self._format).upper() - buffer = io.BytesIO() - - if fmt == 'JPEG': - image = image.convert('RGB') - image.save(buffer, 'JPEG', quality=self._quality) - else: - image.save(buffer, 'PNG') - - return buffer.getvalue() - - def get_available_backends(self) -> List[str]: - """Get list of available capture backends.""" - backends = [] - if self._check_pil_grab(): - backends.append('PIL.ImageGrab') - if self._check_pyautogui(): - backends.append('pyautogui') - return backends - - def is_available(self) -> bool: - """Check if screenshot service is available (has working backend).""" - return self._check_pil_grab() or self._check_pyautogui() - - -# Singleton instance -_screenshot_service = None - -def get_screenshot_service() -> ScreenshotService: - """Get the global ScreenshotService instance.""" - global _screenshot_service - if _screenshot_service is None: - _screenshot_service = ScreenshotService() - return _screenshot_service - - -# Convenience functions for quick screenshots -def quick_capture() -> Image.Image: - """Quick full-screen capture.""" - return get_screenshot_service().capture(full_screen=True) - -def quick_capture_region(x: int, y: int, width: int, height: int) -> Image.Image: - """Quick region capture.""" - return get_screenshot_service().capture_region(x, y, width, height) - -def quick_save(filename: Optional[str] = None) -> Path: - """Quick capture and save.""" - service = get_screenshot_service() - image = service.capture() - return service.save_screenshot(image, filename)