security: Remove vulnerable files, consolidate screenshot and data_store to secure versions

This commit is contained in:
devmatrix 2026-02-16 20:59:19 +00:00
parent 6f3b6f6781
commit af2a1c0b12
4 changed files with 0 additions and 1875 deletions

View File

@ -1,462 +0,0 @@
"""
EU-Utility - Data Store Service (Security Hardened)
Thread-safe persistent data storage for plugins with path validation.
"""
import json
import shutil
import threading
import platform
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime
from collections import OrderedDict
from core.security_utils import (
PathValidator, InputValidator, DataValidator, SecurityError
)
# Cross-platform file locking
try:
import fcntl # Unix/Linux/Mac
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# Windows fallback using portalocker or threading lock
try:
import portalocker
HAS_PORTALOCKER = True
except ImportError:
HAS_PORTALOCKER = False
class DataStore:
"""
Singleton data persistence service for plugins (Security Hardened).
Features:
- Thread-safe file operations with file locking
- Auto-backup on write (keeps last 5 versions)
- Per-plugin JSON storage
- Auto-create directories
- Path traversal protection
- Input validation
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, data_dir: str = "data/plugins"):
if self._initialized:
return
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Resolve and validate base path
self._base_path = self.data_dir.resolve()
# Memory cache for frequently accessed data
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Backup settings
self.max_backups = 5
self._initialized = True
def _get_plugin_file(self, plugin_id: str) -> Path:
"""
Get the storage file path for a plugin with path validation.
Args:
plugin_id: Unique identifier for the plugin
Returns:
Safe file path
Raises:
SecurityError: If plugin_id is invalid or path traversal detected
"""
# Validate plugin_id
if not isinstance(plugin_id, str):
raise SecurityError("plugin_id must be a string")
if not plugin_id:
raise SecurityError("plugin_id cannot be empty")
# Sanitize plugin_id to create a safe filename
safe_name = PathValidator.sanitize_filename(plugin_id, '_')
# Additional check: ensure no path traversal remains
if '..' in safe_name or '/' in safe_name or '\\' in safe_name:
raise SecurityError(f"Invalid characters in plugin_id: {plugin_id}")
# Construct path
file_path = self.data_dir / f"{safe_name}.json"
# Security check: ensure resolved path is within data_dir
try:
resolved_path = file_path.resolve()
if not str(resolved_path).startswith(str(self._base_path)):
raise SecurityError(f"Path traversal detected: {plugin_id}")
except (OSError, ValueError) as e:
raise SecurityError(f"Invalid path for plugin_id: {plugin_id}") from e
return file_path
def _get_backup_dir(self, plugin_id: str) -> Path:
"""Get the backup directory for a plugin with validation."""
# Reuse validation from _get_plugin_file
safe_name = PathValidator.sanitize_filename(plugin_id, '_')
backup_dir = self.data_dir / ".backups" / safe_name
# Validate backup path
try:
resolved = backup_dir.resolve()
if not str(resolved).startswith(str(self._base_path)):
raise SecurityError(f"Backup path traversal detected: {plugin_id}")
except (OSError, ValueError) as e:
raise SecurityError(f"Invalid backup path: {plugin_id}") from e
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]:
"""Load all data for a plugin from disk with validation."""
# Check cache first
with self._cache_lock:
if plugin_id in self._cache:
return self._cache[plugin_id].copy()
file_path = self._get_plugin_file(plugin_id)
if not file_path.exists():
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=False)
try:
data = json.load(f)
finally:
self._unlock_file(f)
# Validate loaded data
if not isinstance(data, dict):
print(f"[DataStore] Invalid data format for {plugin_id}, resetting")
return {}
# Validate data structure
try:
DataValidator.validate_data_structure(data)
except SecurityError as e:
print(f"[DataStore] Security error in {plugin_id} data: {e}")
return {}
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return data
except (json.JSONDecodeError, IOError) as e:
print(f"[DataStore] Error loading data for {plugin_id}: {e}")
return {}
def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool:
"""Save all data for a plugin to disk with backup."""
# Validate data before saving
try:
DataValidator.validate_data_structure(data)
except SecurityError as e:
print(f"[DataStore] Security error saving {plugin_id}: {e}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup if file exists
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Write to temp file first, then move (atomic operation)
temp_path = file_path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=True)
try:
json.dump(data, f, indent=2, ensure_ascii=False)
f.flush()
import os
os.fsync(f.fileno())
finally:
self._unlock_file(f)
# Atomic move
temp_path.replace(file_path)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return True
except IOError as e:
print(f"[DataStore] Error saving data for {plugin_id}: {e}")
# Clean up temp file if exists
temp_path = file_path.with_suffix('.tmp')
if temp_path.exists():
try:
temp_path.unlink()
except:
pass
return False
def _lock_file(self, f, exclusive: bool = False):
"""Cross-platform file locking."""
if HAS_FCNTL:
# Unix/Linux/Mac
lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
fcntl.flock(f.fileno(), lock_type)
elif HAS_PORTALOCKER:
# Windows with portalocker
import portalocker
lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH
portalocker.lock(f, lock_type)
else:
# Fallback: rely on threading lock (already held)
pass
def _unlock_file(self, f):
"""Cross-platform file unlock."""
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
elif HAS_PORTALOCKER:
import portalocker
portalocker.unlock(f)
else:
# Fallback: nothing to do
pass
def _create_backup(self, plugin_id: str, file_path: Path):
"""Create a backup of the current data file."""
backup_dir = self._get_backup_dir(plugin_id)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{timestamp}.json"
try:
shutil.copy2(file_path, backup_path)
self._cleanup_old_backups(backup_dir)
except IOError as e:
print(f"[DataStore] Error creating backup for {plugin_id}: {e}")
def _cleanup_old_backups(self, backup_dir: Path):
"""Remove old backups, keeping only the last N versions."""
try:
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
while len(backups) > self.max_backups:
old_backup = backups.pop(0)
old_backup.unlink()
except IOError as e:
print(f"[DataStore] Error cleaning up backups: {e}")
def save(self, plugin_id: str, key: str, data: Any) -> bool:
"""
Save data for a plugin with validation.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
# Validate key
if not isinstance(key, str):
print(f"[DataStore] Invalid key type for {plugin_id}")
return False
if not key:
print(f"[DataStore] Empty key not allowed for {plugin_id}")
return False
if not InputValidator.validate_json_key(key):
print(f"[DataStore] Invalid key format for {plugin_id}: {key}")
return False
plugin_data = self._load_plugin_data(plugin_id)
plugin_data[key] = data
return self._save_plugin_data(plugin_id, plugin_data)
def load(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""
Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
# Validate key
if not isinstance(key, str):
return default
plugin_data = self._load_plugin_data(plugin_id)
return plugin_data.get(key, default)
def delete(self, plugin_id: str, key: str) -> bool:
"""
Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
# Validate key
if not isinstance(key, str):
return False
plugin_data = self._load_plugin_data(plugin_id)
if key in plugin_data:
del plugin_data[key]
return self._save_plugin_data(plugin_id, plugin_data)
return False
def get_all_keys(self, plugin_id: str) -> list:
"""
Get all keys stored for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of keys
"""
plugin_data = self._load_plugin_data(plugin_id)
return list(plugin_data.keys())
def clear_plugin(self, plugin_id: str) -> bool:
"""
Clear all data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
True if successful, False otherwise
"""
file_path = self._get_plugin_file(plugin_id)
# Create backup before clearing
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Clear cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
# Remove file
try:
if file_path.exists():
file_path.unlink()
return True
except IOError as e:
print(f"[DataStore] Error clearing data for {plugin_id}: {e}")
return False
def get_backups(self, plugin_id: str) -> list:
"""
Get list of available backups for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of backup file paths
"""
backup_dir = self._get_backup_dir(plugin_id)
if not backup_dir.exists():
return []
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
return [str(b) for b in backups]
def restore_backup(self, plugin_id: str, backup_path: str) -> bool:
"""
Restore data from a backup.
Args:
plugin_id: Unique identifier for the plugin
backup_path: Path to the backup file
Returns:
True if successful, False otherwise
"""
backup_file = Path(backup_path)
if not backup_file.exists():
print(f"[DataStore] Backup not found: {backup_path}")
return False
# Validate backup path is within backups directory
try:
backup_dir = self._get_backup_dir(plugin_id)
resolved_backup = backup_file.resolve()
resolved_backup_dir = backup_dir.resolve()
if not str(resolved_backup).startswith(str(resolved_backup_dir)):
print(f"[DataStore] Invalid backup path: {backup_path}")
return False
except (OSError, ValueError) as e:
print(f"[DataStore] Path validation error: {e}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup of current state before restoring
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Copy backup to main file
shutil.copy2(backup_file, file_path)
# Invalidate cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
return True
except IOError as e:
print(f"[DataStore] Error restoring backup for {plugin_id}: {e}")
return False
# Singleton instance
_data_store = None
_data_store_lock = threading.Lock()
def get_data_store() -> DataStore:
"""Get the global DataStore instance."""
global _data_store
if _data_store is None:
with _data_store_lock:
if _data_store is None:
_data_store = DataStore()
return _data_store

View File

@ -1,355 +0,0 @@
"""
EU-Utility - Data Store Service
Thread-safe persistent data storage for plugins.
Provides file locking, auto-backup, and singleton access.
"""
import json
import shutil
import threading
import platform
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime
from collections import OrderedDict
# Cross-platform file locking
try:
import fcntl # Unix/Linux/Mac
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# Windows fallback using portalocker or threading lock
try:
import portalocker
HAS_PORTALOCKER = True
except ImportError:
HAS_PORTALOCKER = False
class DataStore:
"""
Singleton data persistence service for plugins.
Features:
- Thread-safe file operations with file locking
- Auto-backup on write (keeps last 5 versions)
- Per-plugin JSON storage
- Auto-create directories
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, data_dir: str = "data/plugins"):
if self._initialized:
return
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Memory cache for frequently accessed data
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Backup settings
self.max_backups = 5
self._initialized = True
def _get_plugin_file(self, plugin_id: str) -> Path:
"""Get the storage file path for a plugin."""
# Sanitize plugin_id to create a safe filename
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
return self.data_dir / f"{safe_name}.json"
def _get_backup_dir(self, plugin_id: str) -> Path:
"""Get the backup directory for a plugin."""
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
backup_dir = self.data_dir / ".backups" / safe_name
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]:
"""Load all data for a plugin from disk."""
# Check cache first
with self._cache_lock:
if plugin_id in self._cache:
return self._cache[plugin_id].copy()
file_path = self._get_plugin_file(plugin_id)
if not file_path.exists():
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=False)
try:
data = json.load(f)
finally:
self._unlock_file(f)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return data
except (json.JSONDecodeError, IOError) as e:
print(f"[DataStore] Error loading data for {plugin_id}: {e}")
return {}
def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool:
"""Save all data for a plugin to disk with backup."""
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup if file exists
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Write to temp file first, then move (atomic operation)
temp_path = file_path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
# Cross-platform file locking
self._lock_file(f, exclusive=True)
try:
json.dump(data, f, indent=2, ensure_ascii=False)
f.flush()
import os
os.fsync(f.fileno())
finally:
self._unlock_file(f)
# Atomic move
temp_path.replace(file_path)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return True
except IOError as e:
print(f"[DataStore] Error saving data for {plugin_id}: {e}")
# Clean up temp file if exists
temp_path = file_path.with_suffix('.tmp')
if temp_path.exists():
temp_path.unlink()
return False
def _lock_file(self, f, exclusive: bool = False):
"""Cross-platform file locking."""
if HAS_FCNTL:
# Unix/Linux/Mac
lock_type = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
fcntl.flock(f.fileno(), lock_type)
elif HAS_PORTALOCKER:
# Windows with portalocker
import portalocker
lock_type = portalocker.LOCK_EX if exclusive else portalocker.LOCK_SH
portalocker.lock(f, lock_type)
else:
# Fallback: rely on threading lock (already held)
pass
def _unlock_file(self, f):
"""Cross-platform file unlock."""
if HAS_FCNTL:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
elif HAS_PORTALOCKER:
import portalocker
portalocker.unlock(f)
else:
# Fallback: nothing to do
pass
def _create_backup(self, plugin_id: str, file_path: Path):
"""Create a backup of the current data file."""
backup_dir = self._get_backup_dir(plugin_id)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{timestamp}.json"
try:
shutil.copy2(file_path, backup_path)
self._cleanup_old_backups(backup_dir)
except IOError as e:
print(f"[DataStore] Error creating backup for {plugin_id}: {e}")
def _cleanup_old_backups(self, backup_dir: Path):
"""Remove old backups, keeping only the last N versions."""
try:
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
while len(backups) > self.max_backups:
old_backup = backups.pop(0)
old_backup.unlink()
except IOError as e:
print(f"[DataStore] Error cleaning up backups: {e}")
def save(self, plugin_id: str, key: str, data: Any) -> bool:
"""
Save data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
plugin_data[key] = data
return self._save_plugin_data(plugin_id, plugin_data)
def load(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""
Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
plugin_data = self._load_plugin_data(plugin_id)
return plugin_data.get(key, default)
def delete(self, plugin_id: str, key: str) -> bool:
"""
Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
if key in plugin_data:
del plugin_data[key]
return self._save_plugin_data(plugin_id, plugin_data)
return False
def get_all_keys(self, plugin_id: str) -> list:
"""
Get all keys stored for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of keys
"""
plugin_data = self._load_plugin_data(plugin_id)
return list(plugin_data.keys())
def clear_plugin(self, plugin_id: str) -> bool:
"""
Clear all data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
True if successful, False otherwise
"""
file_path = self._get_plugin_file(plugin_id)
# Create backup before clearing
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Clear cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
# Remove file
try:
if file_path.exists():
file_path.unlink()
return True
except IOError as e:
print(f"[DataStore] Error clearing data for {plugin_id}: {e}")
return False
def get_backups(self, plugin_id: str) -> list:
"""
Get list of available backups for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of backup file paths
"""
backup_dir = self._get_backup_dir(plugin_id)
if not backup_dir.exists():
return []
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
return [str(b) for b in backups]
def restore_backup(self, plugin_id: str, backup_path: str) -> bool:
"""
Restore data from a backup.
Args:
plugin_id: Unique identifier for the plugin
backup_path: Path to the backup file
Returns:
True if successful, False otherwise
"""
backup_file = Path(backup_path)
if not backup_file.exists():
print(f"[DataStore] Backup not found: {backup_path}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup of current state before restoring
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Copy backup to main file
shutil.copy2(backup_file, file_path)
# Invalidate cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
return True
except IOError as e:
print(f"[DataStore] Error restoring backup for {plugin_id}: {e}")
return False
# Singleton instance
_data_store = None
_data_store_lock = threading.Lock()
def get_data_store() -> DataStore:
"""Get the global DataStore instance."""
global _data_store
if _data_store is None:
with _data_store_lock:
if _data_store is None:
_data_store = DataStore()
return _data_store

View File

@ -1,547 +0,0 @@
"""
EU-Utility - Screenshot Service Core Module (Security Hardened)
Fast, reliable screen capture functionality with path validation.
"""
import io
import os
import time
import platform
import threading
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
from PIL import Image
from core.security_utils import PathValidator, InputValidator, SecurityError
class ScreenshotService:
"""
Core screenshot service with cross-platform support (Security Hardened).
Features:
- Singleton pattern for single instance across app
- Fast screen capture using PIL.ImageGrab (Windows) or pyautogui (cross-platform)
- Configurable auto-save with timestamps
- Screenshot history (last 20 in memory)
- PNG by default, JPG quality settings
- Thread-safe operations
- Path traversal protection
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._lock = threading.Lock()
# Configuration
self._auto_save: bool = True
self._format: str = "PNG"
self._quality: int = 95 # For JPEG
self._history_size: int = 20
# Screenshot history (thread-safe deque)
self._history: deque = deque(maxlen=self._history_size)
self._last_screenshot: Optional[Image.Image] = None
# Platform detection - MUST be before _get_default_save_path()
self._platform = platform.system().lower()
self._use_pil = self._platform == "windows"
# Set save path AFTER platform detection
self._save_path: Path = self._get_default_save_path()
# Lazy init for capture backends
self._pil_available: Optional[bool] = None
self._pyautogui_available: Optional[bool] = None
# Resolve base path for validation
self._base_save_path = self._save_path.resolve()
# Ensure save directory exists
self._ensure_save_directory()
print(f"[Screenshot] Service initialized (auto_save={self._auto_save}, format={self._format})")
def _get_default_save_path(self) -> Path:
"""Get default save path for screenshots."""
# Use Documents/Entropia Universe/Screenshots/ as default
if self._platform == "windows":
documents = Path.home() / "Documents"
else:
documents = Path.home() / "Documents"
return documents / "Entropia Universe" / "Screenshots"
def _ensure_save_directory(self) -> None:
"""Ensure the save directory exists."""
try:
self._save_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[Screenshot] Warning: Could not create save directory: {e}")
def _check_pil_grab(self) -> bool:
"""Check if PIL.ImageGrab is available."""
if self._pil_available is not None:
return self._pil_available
try:
from PIL import ImageGrab
self._pil_available = True
return True
except ImportError:
self._pil_available = False
return False
def _check_pyautogui(self) -> bool:
"""Check if pyautogui is available."""
if self._pyautogui_available is not None:
return self._pyautogui_available
try:
import pyautogui
self._pyautogui_available = True
return True
except ImportError:
self._pyautogui_available = False
return False
def capture(self, full_screen: bool = True) -> Image.Image:
"""
Capture screenshot.
Args:
full_screen: If True, capture entire screen. If False, use default region.
Returns:
PIL Image object
Raises:
RuntimeError: If no capture backend is available
"""
with self._lock:
screenshot = self._do_capture(full_screen=full_screen)
# Store in history
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': None if full_screen else 'custom'
})
# Auto-save if enabled
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
def capture_region(self, x: int, y: int, width: int, height: int) -> Image.Image:
"""
Capture specific screen region.
Args:
x: Left coordinate
y: Top coordinate
width: Region width
height: Region height
Returns:
PIL Image object
Raises:
SecurityError: If region parameters are invalid
"""
# Validate region parameters
from core.security_utils import InputValidator
InputValidator.validate_region_coordinates(x, y, width, height)
with self._lock:
screenshot = self._do_capture(region=(x, y, x + width, y + height))
# Store in history
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': (x, y, width, height)
})
# Auto-save if enabled
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
def capture_window(self, window_handle: int) -> Optional[Image.Image]:
"""
Capture specific window by handle (Windows only).
Args:
window_handle: Window handle (HWND on Windows)
Returns:
PIL Image object or None if capture failed
"""
if self._platform != "windows":
print("[Screenshot] capture_window is Windows-only")
return None
# Validate window handle
if not isinstance(window_handle, int) or window_handle <= 0:
print("[Screenshot] Invalid window handle")
return None
try:
import win32gui
import win32ui
import win32con
from ctypes import windll
# Get window dimensions
left, top, right, bottom = win32gui.GetWindowRect(window_handle)
width = right - left
height = bottom - top
# Sanity check dimensions
if width <= 0 or height <= 0 or width > 7680 or height > 4320:
print("[Screenshot] Invalid window dimensions")
return None
# Create device context
hwndDC = win32gui.GetWindowDC(window_handle)
mfcDC = win32ui.CreateDCFromHandle(hwndDC)
saveDC = mfcDC.CreateCompatibleDC()
# Create bitmap
saveBitMap = win32ui.CreateBitmap()
saveBitMap.CreateCompatibleBitmap(mfcDC, width, height)
saveDC.SelectObject(saveBitMap)
# Copy screen into bitmap
result = windll.user32.PrintWindow(window_handle, saveDC.GetSafeHdc(), 3)
# Convert to PIL Image
bmpinfo = saveBitMap.GetInfo()
bmpstr = saveBitMap.GetBitmapBits(True)
screenshot = Image.frombuffer(
'RGB',
(bmpinfo['bmWidth'], bmpinfo['bmHeight']),
bmpstr, 'raw', 'BGRX', 0, 1
)
# Cleanup
win32gui.DeleteObject(saveBitMap.GetHandle())
saveDC.DeleteDC()
mfcDC.DeleteDC()
win32gui.ReleaseDC(window_handle, hwndDC)
if result != 1:
return None
with self._lock:
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': 'window',
'window_handle': window_handle
})
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
except Exception as e:
print(f"[Screenshot] Window capture failed: {e}")
return None
def _do_capture(self, full_screen: bool = True, region: Optional[Tuple[int, int, int, int]] = None) -> Image.Image:
"""Internal capture method."""
# Try PIL.ImageGrab first (Windows, faster)
if self._use_pil and self._check_pil_grab():
from PIL import ImageGrab
if region:
return ImageGrab.grab(bbox=region)
else:
return ImageGrab.grab()
# Fall back to pyautogui (cross-platform)
if self._check_pyautogui():
import pyautogui
if region:
x1, y1, x2, y2 = region
return pyautogui.screenshot(region=(x1, y1, x2 - x1, y2 - y1))
else:
return pyautogui.screenshot()
raise RuntimeError(
"No screenshot backend available. "
"Install pillow (Windows) or pyautogui (cross-platform)."
)
def _auto_save_screenshot(self, image: Image.Image) -> Optional[Path]:
"""Automatically save screenshot with timestamp."""
try:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
return self.save_screenshot(image, filename)
except Exception as e:
print(f"[Screenshot] Auto-save failed: {e}")
return None
def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
"""
Save screenshot to file with path validation.
Args:
image: PIL Image to save
filename: Optional filename (auto-generated if None)
Returns:
Path to saved file
Raises:
SecurityError: If filename is invalid
"""
if filename is None:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
# Sanitize filename
safe_filename = PathValidator.sanitize_filename(filename, '_')
# Ensure correct extension
if not safe_filename.lower().endswith(('.png', '.jpg', '.jpeg')):
safe_filename += f".{self._format.lower()}"
filepath = self._save_path / safe_filename
# Security check: ensure resolved path is within save_path
try:
resolved_path = filepath.resolve()
if not str(resolved_path).startswith(str(self._base_save_path)):
raise SecurityError("Path traversal detected in filename")
except (OSError, ValueError) as e:
print(f"[Screenshot] Security error: {e}")
# Fallback to safe default
safe_filename = f"screenshot_{int(time.time())}.{self._format.lower()}"
filepath = self._save_path / safe_filename
# Save with appropriate settings
if safe_filename.lower().endswith('.jpg') or safe_filename.lower().endswith('.jpeg'):
image = image.convert('RGB') # JPEG doesn't support alpha
image.save(filepath, 'JPEG', quality=self._quality, optimize=True)
else:
image.save(filepath, 'PNG', optimize=True)
return filepath
def get_last_screenshot(self) -> Optional[Image.Image]:
"""
Get the most recent screenshot.
Returns:
PIL Image or None if no screenshots taken yet
"""
with self._lock:
return self._last_screenshot.copy() if self._last_screenshot else None
def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get screenshot history.
Args:
limit: Maximum number of entries (default: all)
Returns:
List of dicts with 'timestamp', 'region', 'image' keys
"""
with self._lock:
history = list(self._history)
if limit:
history = history[-limit:]
return [
{
'timestamp': entry['timestamp'],
'region': entry['region'],
'image': entry['image'].copy()
}
for entry in history
]
def clear_history(self) -> None:
"""Clear screenshot history from memory."""
with self._lock:
self._history.clear()
self._last_screenshot = None
# ========== Configuration ==========
@property
def auto_save(self) -> bool:
"""Get auto-save setting."""
return self._auto_save
@auto_save.setter
def auto_save(self, value: bool) -> None:
"""Set auto-save setting."""
self._auto_save = bool(value)
@property
def save_path(self) -> Path:
"""Get current save path."""
return self._save_path
@save_path.setter
def save_path(self, path: Union[str, Path]) -> None:
"""Set save path."""
self._save_path = Path(path)
self._base_save_path = self._save_path.resolve()
self._ensure_save_directory()
@property
def format(self) -> str:
"""Get image format (PNG or JPEG)."""
return self._format
@format.setter
def format(self, fmt: str) -> None:
"""Set image format."""
fmt = fmt.upper()
if fmt in ('PNG', 'JPG', 'JPEG'):
self._format = 'PNG' if fmt == 'PNG' else 'JPEG'
else:
raise ValueError(f"Unsupported format: {fmt}")
@property
def quality(self) -> int:
"""Get JPEG quality (1-100)."""
return self._quality
@quality.setter
def quality(self, value: int) -> None:
"""Set JPEG quality (1-100)."""
self._quality = max(1, min(100, int(value)))
def configure(self,
auto_save: Optional[bool] = None,
save_path: Optional[Union[str, Path]] = None,
format: Optional[str] = None,
quality: Optional[int] = None) -> Dict[str, Any]:
"""
Configure screenshot service settings.
Args:
auto_save: Enable/disable auto-save
save_path: Directory to save screenshots
format: Image format (PNG or JPEG)
quality: JPEG quality (1-100)
Returns:
Current configuration as dict
"""
if auto_save is not None:
self.auto_save = auto_save
if save_path is not None:
self.save_path = save_path
if format is not None:
self.format = format
if quality is not None:
self.quality = quality
return self.get_config()
def get_config(self) -> Dict[str, Any]:
"""Get current configuration."""
return {
'auto_save': self._auto_save,
'save_path': str(self._save_path),
'format': self._format,
'quality': self._quality,
'history_size': self._history_size,
'platform': self._platform,
'backend': 'PIL' if self._use_pil else 'pyautogui'
}
# ========== Utility Methods ==========
def image_to_bytes(self, image: Image.Image, format: Optional[str] = None) -> bytes:
"""
Convert PIL Image to bytes.
Args:
image: PIL Image
format: Output format (default: current format setting)
Returns:
Image as bytes
"""
fmt = (format or self._format).upper()
buffer = io.BytesIO()
if fmt == 'JPEG':
image = image.convert('RGB')
image.save(buffer, 'JPEG', quality=self._quality)
else:
image.save(buffer, 'PNG')
return buffer.getvalue()
def get_available_backends(self) -> List[str]:
"""Get list of available capture backends."""
backends = []
if self._check_pil_grab():
backends.append('PIL.ImageGrab')
if self._check_pyautogui():
backends.append('pyautogui')
return backends
def is_available(self) -> bool:
"""Check if screenshot service is available (has working backend)."""
return self._check_pil_grab() or self._check_pyautogui()
# Singleton instance
_screenshot_service = None
def get_screenshot_service() -> ScreenshotService:
"""Get the global ScreenshotService instance."""
global _screenshot_service
if _screenshot_service is None:
_screenshot_service = ScreenshotService()
return _screenshot_service
# Convenience functions for quick screenshots
def quick_capture() -> Image.Image:
"""Quick full-screen capture."""
return get_screenshot_service().capture(full_screen=True)
def quick_capture_region(x: int, y: int, width: int, height: int) -> Image.Image:
"""Quick region capture."""
return get_screenshot_service().capture_region(x, y, width, height)
def quick_save(filename: Optional[str] = None) -> Path:
"""Quick capture and save."""
service = get_screenshot_service()
image = service.capture()
return service.save_screenshot(image, filename)

View File

@ -1,511 +0,0 @@
"""
EU-Utility - Screenshot Service Core Module
Fast, reliable screen capture functionality for all plugins.
Part of core - not a plugin. Plugins access via PluginAPI.
"""
import io
import os
import platform
import threading
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Union
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
Image = None
class ScreenshotService:
"""
Core screenshot service with cross-platform support.
Features:
- Singleton pattern for single instance across app
- Fast screen capture using PIL.ImageGrab (Windows) or pyautogui (cross-platform)
- Configurable auto-save with timestamps
- Screenshot history (last 20 in memory)
- PNG by default, JPG quality settings
- Thread-safe operations
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._lock = threading.Lock()
# Configuration
self._auto_save: bool = True
self._format: str = "PNG"
self._quality: int = 95 # For JPEG
self._history_size: int = 20
# Screenshot history (thread-safe deque)
self._history: deque = deque(maxlen=self._history_size)
self._last_screenshot = None
# Platform detection - MUST be before _get_default_save_path()
self._platform = platform.system().lower()
self._use_pil = self._platform == "windows"
# Set save path AFTER platform detection
self._save_path: Path = self._get_default_save_path()
# Lazy init for capture backends
self._pil_available: Optional[bool] = None
self._pyautogui_available: Optional[bool] = None
# Ensure save directory exists
self._ensure_save_directory()
print(f"[Screenshot] Service initialized (auto_save={self._auto_save}, format={self._format})")
def _get_default_save_path(self) -> Path:
"""Get default save path for screenshots."""
# Use Documents/Entropia Universe/Screenshots/ as default
if self._platform == "windows":
documents = Path.home() / "Documents"
else:
documents = Path.home() / "Documents"
return documents / "Entropia Universe" / "Screenshots"
def _ensure_save_directory(self) -> None:
"""Ensure the save directory exists."""
try:
self._save_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"[Screenshot] Warning: Could not create save directory: {e}")
def _check_pil_grab(self) -> bool:
"""Check if PIL.ImageGrab is available."""
if self._pil_available is not None:
return self._pil_available
try:
from PIL import ImageGrab
self._pil_available = True
return True
except ImportError:
self._pil_available = False
return False
def _check_pyautogui(self) -> bool:
"""Check if pyautogui is available."""
if self._pyautogui_available is not None:
return self._pyautogui_available
try:
import pyautogui
self._pyautogui_available = True
return True
except ImportError:
self._pyautogui_available = False
return False
def capture(self, full_screen: bool = True):
"""
Capture screenshot.
Args:
full_screen: If True, capture entire screen. If False, use default region.
Returns:
PIL Image object
Raises:
RuntimeError: If no capture backend is available
"""
with self._lock:
screenshot = self._do_capture(full_screen=full_screen)
# Store in history
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': None if full_screen else 'custom'
})
# Auto-save if enabled
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
def capture_region(self, x: int, y: int, width: int, height: int):
"""
Capture specific screen region.
Args:
x: Left coordinate
y: Top coordinate
width: Region width
height: Region height
Returns:
PIL Image object
"""
with self._lock:
screenshot = self._do_capture(region=(x, y, x + width, y + height))
# Store in history
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': (x, y, width, height)
})
# Auto-save if enabled
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
def capture_window(self, window_handle: int) -> Optional[Any]:
"""
Capture specific window by handle (Windows only).
Args:
window_handle: Window handle (HWND on Windows)
Returns:
PIL Image object or None if capture failed
"""
if self._platform != "windows":
print("[Screenshot] capture_window is Windows-only")
return None
try:
import win32gui
import win32ui
import win32con
from ctypes import windll
# Get window dimensions
left, top, right, bottom = win32gui.GetWindowRect(window_handle)
width = right - left
height = bottom - top
# Create device context
hwndDC = win32gui.GetWindowDC(window_handle)
mfcDC = win32ui.CreateDCFromHandle(hwndDC)
saveDC = mfcDC.CreateCompatibleDC()
# Create bitmap
saveBitMap = win32ui.CreateBitmap()
saveBitMap.CreateCompatibleBitmap(mfcDC, width, height)
saveDC.SelectObject(saveBitMap)
# Copy screen into bitmap
result = windll.user32.PrintWindow(window_handle, saveDC.GetSafeHdc(), 3)
# Convert to PIL Image
bmpinfo = saveBitMap.GetInfo()
bmpstr = saveBitMap.GetBitmapBits(True)
screenshot = Image.frombuffer(
'RGB',
(bmpinfo['bmWidth'], bmpinfo['bmHeight']),
bmpstr, 'raw', 'BGRX', 0, 1
)
# Cleanup
win32gui.DeleteObject(saveBitMap.GetHandle())
saveDC.DeleteDC()
mfcDC.DeleteDC()
win32gui.ReleaseDC(window_handle, hwndDC)
if result != 1:
return None
with self._lock:
self._last_screenshot = screenshot.copy()
self._history.append({
'image': screenshot.copy(),
'timestamp': datetime.now(),
'region': 'window',
'window_handle': window_handle
})
if self._auto_save:
self._auto_save_screenshot(screenshot)
return screenshot
except Exception as e:
print(f"[Screenshot] Window capture failed: {e}")
return None
def _do_capture(self, full_screen: bool = True, region: Optional[Tuple[int, int, int, int]] = None) -> Image.Image:
"""Internal capture method."""
# Try PIL.ImageGrab first (Windows, faster)
if self._use_pil and self._check_pil_grab():
from PIL import ImageGrab
if region:
return ImageGrab.grab(bbox=region)
else:
return ImageGrab.grab()
# Fall back to pyautogui (cross-platform)
if self._check_pyautogui():
import pyautogui
if region:
x1, y1, x2, y2 = region
return pyautogui.screenshot(region=(x1, y1, x2 - x1, y2 - y1))
else:
return pyautogui.screenshot()
raise RuntimeError(
"No screenshot backend available. "
"Install pillow (Windows) or pyautogui (cross-platform)."
)
def _auto_save_screenshot(self, image: Image.Image) -> Optional[Path]:
"""Automatically save screenshot with timestamp."""
try:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
return self.save_screenshot(image, filename)
except Exception as e:
print(f"[Screenshot] Auto-save failed: {e}")
return None
def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
"""
Save screenshot to file.
Args:
image: PIL Image to save
filename: Optional filename (auto-generated if None)
Returns:
Path to saved file
"""
if filename is None:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
# Ensure correct extension
if not filename.lower().endswith(('.png', '.jpg', '.jpeg')):
filename += f".{self._format.lower()}"
filepath = self._save_path / filename
# Save with appropriate settings
if filename.lower().endswith('.jpg') or filename.lower().endswith('.jpeg'):
image = image.convert('RGB') # JPEG doesn't support alpha
image.save(filepath, 'JPEG', quality=self._quality, optimize=True)
else:
image.save(filepath, 'PNG', optimize=True)
return filepath
def get_last_screenshot(self) -> Optional[Image.Image]:
"""
Get the most recent screenshot.
Returns:
PIL Image or None if no screenshots taken yet
"""
with self._lock:
return self._last_screenshot.copy() if self._last_screenshot else None
def get_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get screenshot history.
Args:
limit: Maximum number of entries (default: all)
Returns:
List of dicts with 'timestamp', 'region', 'image' keys
"""
with self._lock:
history = list(self._history)
if limit:
history = history[-limit:]
return [
{
'timestamp': entry['timestamp'],
'region': entry['region'],
'image': entry['image'].copy()
}
for entry in history
]
def clear_history(self) -> None:
"""Clear screenshot history from memory."""
with self._lock:
self._history.clear()
self._last_screenshot = None
# ========== Configuration ==========
@property
def auto_save(self) -> bool:
"""Get auto-save setting."""
return self._auto_save
@auto_save.setter
def auto_save(self, value: bool) -> None:
"""Set auto-save setting."""
self._auto_save = bool(value)
@property
def save_path(self) -> Path:
"""Get current save path."""
return self._save_path
@save_path.setter
def save_path(self, path: Union[str, Path]) -> None:
"""Set save path."""
self._save_path = Path(path)
self._ensure_save_directory()
@property
def format(self) -> str:
"""Get image format (PNG or JPEG)."""
return self._format
@format.setter
def format(self, fmt: str) -> None:
"""Set image format."""
fmt = fmt.upper()
if fmt in ('PNG', 'JPG', 'JPEG'):
self._format = 'PNG' if fmt == 'PNG' else 'JPEG'
else:
raise ValueError(f"Unsupported format: {fmt}")
@property
def quality(self) -> int:
"""Get JPEG quality (1-100)."""
return self._quality
@quality.setter
def quality(self, value: int) -> None:
"""Set JPEG quality (1-100)."""
self._quality = max(1, min(100, int(value)))
def configure(self,
auto_save: Optional[bool] = None,
save_path: Optional[Union[str, Path]] = None,
format: Optional[str] = None,
quality: Optional[int] = None) -> Dict[str, Any]:
"""
Configure screenshot service settings.
Args:
auto_save: Enable/disable auto-save
save_path: Directory to save screenshots
format: Image format (PNG or JPEG)
quality: JPEG quality (1-100)
Returns:
Current configuration as dict
"""
if auto_save is not None:
self.auto_save = auto_save
if save_path is not None:
self.save_path = save_path
if format is not None:
self.format = format
if quality is not None:
self.quality = quality
return self.get_config()
def get_config(self) -> Dict[str, Any]:
"""Get current configuration."""
return {
'auto_save': self._auto_save,
'save_path': str(self._save_path),
'format': self._format,
'quality': self._quality,
'history_size': self._history_size,
'platform': self._platform,
'backend': 'PIL' if self._use_pil else 'pyautogui'
}
# ========== Utility Methods ==========
def image_to_bytes(self, image: Image.Image, format: Optional[str] = None) -> bytes:
"""
Convert PIL Image to bytes.
Args:
image: PIL Image
format: Output format (default: current format setting)
Returns:
Image as bytes
"""
fmt = (format or self._format).upper()
buffer = io.BytesIO()
if fmt == 'JPEG':
image = image.convert('RGB')
image.save(buffer, 'JPEG', quality=self._quality)
else:
image.save(buffer, 'PNG')
return buffer.getvalue()
def get_available_backends(self) -> List[str]:
"""Get list of available capture backends."""
backends = []
if self._check_pil_grab():
backends.append('PIL.ImageGrab')
if self._check_pyautogui():
backends.append('pyautogui')
return backends
def is_available(self) -> bool:
"""Check if screenshot service is available (has working backend)."""
return self._check_pil_grab() or self._check_pyautogui()
# Singleton instance
_screenshot_service = None
def get_screenshot_service() -> ScreenshotService:
"""Get the global ScreenshotService instance."""
global _screenshot_service
if _screenshot_service is None:
_screenshot_service = ScreenshotService()
return _screenshot_service
# Convenience functions for quick screenshots
def quick_capture() -> Image.Image:
"""Quick full-screen capture."""
return get_screenshot_service().capture(full_screen=True)
def quick_capture_region(x: int, y: int, width: int, height: int) -> Image.Image:
"""Quick region capture."""
return get_screenshot_service().capture_region(x, y, width, height)
def quick_save(filename: Optional[str] = None) -> Path:
"""Quick capture and save."""
service = get_screenshot_service()
image = service.capture()
return service.save_screenshot(image, filename)