EU-Utility/docs/SECURITY_HARDENING_GUIDE.md

33 KiB

EU-Utility Security Audit & Hardening Guide

Audit Date: 2026-02-14
Auditor: Security Agent
Scope: EU-Utility Core System & Plugin Architecture


Executive Summary

This security audit covers the EU-Utility application, a PyQt6-based overlay utility for Entropia Universe. The codebase shows good architectural patterns (singleton services, plugin system, event bus) but has several security vulnerabilities that need addressing.

Risk Level: MEDIUM-HIGH
Key Issues: 7 Critical, 12 High, 8 Medium priority findings


1. INPUT VALIDATION FINDINGS

🔴 CRITICAL: Path Traversal in DataStore

Location: core/data_store.py:68

def _get_plugin_file(self, plugin_id: str) -> Path:
    """Get the storage file path for a plugin."""
    # Sanitize plugin_id to create a safe filename
    safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
    return self.data_dir / f"{safe_name}.json"

Vulnerability: The sanitization is insufficient. Attackers can bypass using:

  • .. sequences (not replaced)
  • Null bytes (%00)
  • Unicode normalization attacks
  • Double encoding (..%252f)

Attack Scenario:

# Malicious plugin could write outside data directory
plugin_id = "../../../etc/passwd"
# Results in: data/plugins/.._.._.._etc_passwd.json (still dangerous)

Fix:

def _get_plugin_file(self, plugin_id: str) -> Path:
    """Get the storage file path for a plugin."""
    import re
    # Whitelist approach: only allow alphanumeric, dash, underscore
    safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', plugin_id)
    # Prevent path traversal with .. sequences
    safe_name = safe_name.replace('..', '_')
    
    file_path = self.data_dir / f"{safe_name}.json"
    
    # Security check: ensure resolved path is within data_dir
    try:
        resolved_path = file_path.resolve()
        resolved_data_dir = self.data_dir.resolve()
        if not str(resolved_path).startswith(str(resolved_data_dir)):
            raise ValueError(f"Path traversal detected: {plugin_id}")
    except (OSError, ValueError) as e:
        raise ValueError(f"Invalid plugin_id: {plugin_id}") from e
    
    return file_path

🔴 CRITICAL: Unvalidated JSON Loading in Settings

Location: core/settings.py:63-69

def _load(self):
    """Load settings from file."""
    self._settings = self.DEFAULTS.copy()
    
    if self.config_file.exists():
        try:
            with open(self.config_file, 'r') as f:
                saved = json.load(f)
                self._settings.update(saved)
        except Exception as e:
            print(f"Error loading settings: {e}")

Vulnerability: No validation of loaded JSON structure. Malformed settings can:

  • Inject arbitrary types (objects, functions via __reduce__)
  • Overflow memory with large nested structures
  • Corrupt application state

Fix:

def _load(self):
    """Load settings from file."""
    self._settings = self.DEFAULTS.copy()
    
    if self.config_file.exists():
        try:
            with open(self.config_file, 'r') as f:
                saved = json.load(f)
            
            # Validate loaded data
            if not isinstance(saved, dict):
                raise ValueError("Settings must be a dictionary")
            
            # Check for suspicious keys
            dangerous_keys = ['__class__', '__reduce__', '__reduce_ex__', '__getstate__']
            for key in saved.keys():
                if key.startswith('__') or key in dangerous_keys:
                    raise ValueError(f"Invalid key in settings: {key}")
            
            # Validate values against expected types
            for key, value in saved.items():
                if key in self.DEFAULTS:
                    expected_type = type(self.DEFAULTS[key])
                    if not isinstance(value, expected_type):
                        print(f"[Settings] Type mismatch for {key}, using default")
                        continue
                self._settings[key] = value
                
        except json.JSONDecodeError as e:
            print(f"[Settings] JSON parse error: {e}")
        except Exception as e:
            print(f"[Settings] Error loading settings: {e}")

🟠 HIGH: Unvalidated Region Parameters in OCR

Location: core/ocr_service.py:115-130

def capture_screen(self, region: Tuple[int, int, int, int] = None) -> 'Image.Image':
    if region:
        x, y, width, height = region
        return screenshot_service.capture_region(x, y, width, height)

Vulnerability: No bounds checking on region parameters. Can cause:

  • Memory exhaustion with huge regions
  • Negative coordinates causing crashes
  • Screen scraping outside monitor bounds

Fix:

def capture_screen(self, region: Tuple[int, int, int, int] = None) -> 'Image.Image':
    if region:
        x, y, width, height = region
        
        # Validate region parameters
        if not all(isinstance(v, int) for v in region):
            raise ValueError("Region values must be integers")
        
        if width <= 0 or height <= 0:
            raise ValueError("Region width and height must be positive")
        
        if width > 7680 or height > 4320:  # 8K resolution limit
            raise ValueError("Region dimensions exceed maximum allowed (8K)")
        
        if x < -10000 or y < -10000:  # Sanity check
            raise ValueError("Region coordinates out of reasonable bounds")
        
        return screenshot_service.capture_region(x, y, width, height)

🟠 HIGH: Unvalidated URL Parameters in NexusAPI

Location: core/nexus_api.py:180-200

def _make_request(self, endpoint: str, params: Dict[str, Any] = None, ...):
    url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}"
    # ...
    response = session.get(url, params=params, timeout=30)

Vulnerability: Endpoint parameter not validated, allowing:

  • Path traversal in URL (../admin/config)
  • SSRF attacks via crafted endpoints
  • Injection of query parameters

Fix:

def _make_request(self, endpoint: str, params: Dict[str, Any] = None, ...):
    # Validate endpoint format
    import re
    if not re.match(r'^[a-zA-Z0-9_/-]+$', endpoint):
        raise ValueError(f"Invalid endpoint format: {endpoint}")
    
    # Prevent path traversal
    if '..' in endpoint or endpoint.startswith('/'):
        raise ValueError(f"Path traversal detected in endpoint: {endpoint}")
    
    url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}"
    
    # Validate and sanitize params
    if params:
        sanitized_params = {}
        for key, value in params.items():
            # Validate key
            if not re.match(r'^[a-zA-Z0-9_]+$', str(key)):
                continue
            # Sanitize value
            if isinstance(value, str):
                sanitized_params[key] = value[:1000]  # Limit length
            else:
                sanitized_params[key] = value
        params = sanitized_params
    
    response = session.get(url, params=params, timeout=30)

2. PATH TRAVERSAL PREVENTION

🔴 CRITICAL: Screenshot Save Path Vulnerability

Location: core/screenshot.py:186-200

def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
    if filename is None:
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
        filename = f"screenshot_{timestamp}.{self._format.lower()}"
    
    filepath = self._save_path / filename
    # ... saves without validation

Vulnerability: Filename parameter allows path traversal.

Fix:

def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
    if filename is None:
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
        filename = f"screenshot_{timestamp}.{self._format.lower()}"
    
    # Sanitize filename
    import re
    # Remove path components and dangerous characters
    safe_filename = re.sub(r'[\\/:*?"<>|]', '_', filename)
    safe_filename = safe_filename.replace('..', '_')
    
    # Ensure filename is not empty and has reasonable length
    if not safe_filename or len(safe_filename) > 255:
        safe_filename = f"screenshot_{int(time.time())}.png"
    
    filepath = self._save_path / safe_filename
    
    # Security check: ensure resolved path is within save_path
    try:
        resolved_path = filepath.resolve()
        resolved_save_path = self._save_path.resolve()
        if not str(resolved_path).startswith(str(resolved_save_path)):
            raise ValueError("Path traversal detected")
    except (OSError, ValueError) as e:
        print(f"[Screenshot] Security error: {e}")
        # Fallback to safe default
        filepath = self._save_path / f"screenshot_{int(time.time())}.png"
    
    # Save with appropriate settings
    # ... rest of method

🟠 HIGH: Icon Manager Path Traversal

Location: core/icon_manager.py (if exists - check for similar patterns)

Ensure all file operations use safe path resolution.


3. API KEY HANDLING

🟢 GOOD: No Hardcoded API Keys Found

The codebase correctly avoids hardcoded API keys. However, improvements needed:

🟡 MEDIUM: API Key Storage in Plain Text

Issue: If API keys are added in the future, they should not be stored in plain text JSON.

Recommendation:

# Use keyring for secure storage
import keyring

def store_api_key(service_name: str, api_key: str):
    """Securely store API key in system keyring."""
    keyring.set_password("EU-Utility", service_name, api_key)

def get_api_key(service_name: str) -> Optional[str]:
    """Retrieve API key from system keyring."""
    return keyring.get_password("EU-Utility", service_name)

4. PLUGIN SANDBOXING

🔴 CRITICAL: No Plugin Sandboxing

Location: core/plugin_manager.py:85-130

def discover_plugins(self) -> List[Type[BasePlugin]]:
    # ...
    spec.loader.exec_module(module)  # Executes arbitrary code!
    # ...

Vulnerability: Plugins execute with full Python privileges. Malicious plugins can:

  • Access filesystem anywhere
  • Make network requests
  • Execute system commands
  • Access other processes' memory
  • Steal user data

Risk Assessment:

Capability Current Risk Mitigation Priority
File System CRITICAL HIGH
Network CRITICAL HIGH
Process Execution CRITICAL HIGH
Memory Access HIGH MEDIUM
UI Spoofing MEDIUM MEDIUM

Recommended Fix - Plugin Sandbox Architecture:

# core/plugin_sandbox.py
"""
Plugin sandbox using restricted Python execution.
"""
import ast
import builtins
import sys
from types import ModuleType
from typing import Set, Dict, Any

class PluginSandbox:
    """Restricted execution environment for plugins."""
    
    # Whitelist of allowed builtin functions
    ALLOWED_BUILTINS: Set[str] = {
        'abs', 'all', 'any', 'bool', 'dict', 'dir', 'enumerate',
        'filter', 'float', 'format', 'frozenset', 'hasattr', 'hash',
        'hex', 'id', 'input', 'int', 'isinstance', 'issubclass',
        'iter', 'len', 'list', 'map', 'max', 'min', 'next',
        'object', 'oct', 'ord', 'pow', 'print', 'property',
        'range', 'repr', 'reversed', 'round', 'set', 'slice',
        'sorted', 'staticmethod', 'str', 'sum', 'super', 'tuple',
        'type', 'vars', 'zip', '__import__', 'Exception', 'BaseException'
    }
    
    # Allowed modules (subset only)
    ALLOWED_MODULES: Set[str] = {
        'json', 're', 'math', 'random', 'datetime', 'collections',
        'itertools', 'functools', 'typing', 'dataclasses', 'enum'
    }
    
    def __init__(self, plugin_id: str):
        self.plugin_id = plugin_id
        self.data_dir = Path(f"data/plugins/{plugin_id}")
        self.data_dir.mkdir(parents=True, exist_ok=True)
    
    def create_restricted_globals(self) -> Dict[str, Any]:
        """Create restricted globals for plugin execution."""
        # Filter builtins
        safe_builtins = {
            name: getattr(builtins, name)
            for name in self.ALLOWED_BUILTINS
            if hasattr(builtins, name)
        }
        
        # Add restricted __import__
        safe_builtins['__import__'] = self._restricted_import
        
        return {
            '__builtins__': safe_builtins,
            '__name__': f'__plugin_{self.plugin_id}__',
        }
    
    def _restricted_import(self, name: str, *args, **kwargs):
        """Restricted import function."""
        # Check if module is allowed
        base_module = name.split('.')[0]
        
        if base_module not in self.ALLOWED_MODULES:
            raise ImportError(f"Module '{name}' is not allowed in plugin sandbox")
        
        # Special handling for EU-Utility core modules
        if name.startswith('core.'):
            return self._wrap_core_module(name)
        
        if name.startswith('plugins.'):
            raise ImportError("Plugins cannot import other plugins directly")
        
        return __import__(name, *args, **kwargs)
    
    def _wrap_core_module(self, name: str):
        """Wrap core module to restrict dangerous operations."""
        # Return proxy that validates all calls
        # Implementation depends on specific module
        pass
    
    def validate_source(self, source: str) -> bool:
        """Validate plugin source code for dangerous patterns."""
        try:
            tree = ast.parse(source)
        except SyntaxError:
            return False
        
        dangerous_nodes = (
            ast.Exec,  # Python 2
            ast.Call,  # Need to check for eval, exec
        )
        
        for node in ast.walk(tree):
            # Check for eval/exec calls
            if isinstance(node, ast.Call):
                if isinstance(node.func, ast.Name):
                    if node.func.id in ('eval', 'exec', 'compile'):
                        return False
            
            # Check for __import__
            if isinstance(node, ast.Call):
                if isinstance(node.func, ast.Name):
                    if node.func.id == '__import__':
                        # Will be validated at runtime
                        pass
            
            # Check for dangerous attributes
            if isinstance(node, ast.Attribute):
                if node.attr.startswith('__') and node.attr.endswith('__'):
                    # Block access to dunder methods that could be dangerous
                    dangerous_dunders = {
                        '__class__', '__bases__', '__mro__', '__subclasses__',
                        '__globals__', '__code__', '__func__', '__self__',
                        '__module__', '__dict__', '__weakref__'
                    }
                    if node.attr in dangerous_dunders:
                        return False
        
        return True
    
    def execute(self, source: str, filename: str = '<plugin>'):
        """Execute plugin code in sandbox."""
        if not self.validate_source(source):
            raise SecurityError("Plugin source contains dangerous patterns")
        
        code = compile(source, filename, 'exec')
        globals_dict = self.create_restricted_globals()
        
        # Execute in restricted environment
        exec(code, globals_dict)
        
        return globals_dict


class SecurityError(Exception):
    """Raised when a security violation is detected."""
    pass

🟠 HIGH: Plugin Code Execution Without Verification

Location: core/plugin_manager.py:108

spec.loader.exec_module(module)

Recommendation: Implement code signing verification:

import hashlib
import hmac
from pathlib import Path

class PluginVerifier:
    """Verify plugin integrity and authenticity."""
    
    def __init__(self, trusted_keys_dir: Path):
        self.trusted_keys_dir = trusted_keys_dir
        self.verified_plugins: Dict[str, str] = {}  # plugin_id -> hash
    
    def verify_plugin(self, plugin_path: Path) -> bool:
        """Verify plugin signature."""
        signature_file = plugin_path / ".signature"
        manifest_file = plugin_path / "manifest.json"
        
        if not signature_file.exists():
            # Unsigned plugins require explicit user consent
            return self._prompt_user_for_unsigned(plugin_path)
        
        # Verify signature
        try:
            with open(manifest_file, 'rb') as f:
                manifest_data = f.read()
            
            with open(signature_file, 'r') as f:
                signature = f.read().strip()
            
            # Parse signature (format: key_id:signature)
            key_id, sig = signature.split(':', 1)
            
            # Load public key
            public_key_path = self.trusted_keys_dir / f"{key_id}.pub"
            if not public_key_path.exists():
                print(f"[PluginVerifier] Unknown key: {key_id}")
                return False
            
            with open(public_key_path, 'rb') as f:
                public_key = f.read()
            
            # Verify HMAC-SHA256 signature
            expected_sig = hmac.new(
                public_key,
                manifest_data,
                hashlib.sha256
            ).hexdigest()
            
            if not hmac.compare_digest(sig, expected_sig):
                print("[PluginVerifier] Invalid signature")
                return False
            
            # Store verified hash
            self.verified_plugins[plugin_path.name] = hashlib.sha256(manifest_data).hexdigest()
            return True
            
        except Exception as e:
            print(f"[PluginVerifier] Verification error: {e}")
            return False
    
    def _prompt_user_for_unsigned(self, plugin_path: Path) -> bool:
        """Prompt user to approve unsigned plugin."""
        # This should show a UI dialog
        # For now, reject unsigned plugins in production
        return False

5. DATA ENCRYPTION

🔴 CRITICAL: Sensitive Data Stored in Plain Text

Locations:

  • core/data_store.py - Plugin data stored as plain JSON
  • core/settings.py - User settings in plain text
  • Plugin data files (loot_tracker, skill_scanner, etc.)

Vulnerability: User data including:

  • Game statistics
  • Personal preferences
  • Potentially sensitive game information

All stored unencrypted on disk.

Fix - Encrypted Data Store:

# core/secure_data_store.py
"""
Encrypted data storage using Fernet (AES-128 in CBC mode).
"""
import json
import base64
from pathlib import Path
from typing import Any, Dict, Optional
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import keyring


class SecureDataStore:
    """Encrypted data storage for sensitive plugin data."""
    
    def __init__(self, plugin_id: str):
        self.plugin_id = plugin_id
        self.data_dir = Path("data/plugins") / plugin_id
        self.data_dir.mkdir(parents=True, exist_ok=True)
        
        self._cipher = None
        self._init_encryption()
    
    def _init_encryption(self):
        """Initialize encryption key."""
        # Try to get existing key from keyring
        key = keyring.get_password("EU-Utility", f"data_key_{self.plugin_id}")
        
        if key is None:
            # Generate new key
            key = Fernet.generate_key().decode()
            keyring.set_password("EU-Utility", f"data_key_{self.plugin_id}", key)
        
        self._cipher = Fernet(key.encode())
    
    def save(self, key: str, data: Any) -> bool:
        """Save encrypted data."""
        try:
            # Serialize to JSON
            json_data = json.dumps(data).encode('utf-8')
            
            # Encrypt
            encrypted = self._cipher.encrypt(json_data)
            
            # Save to file
            file_path = self.data_dir / f"{key}.encrypted"
            with open(file_path, 'wb') as f:
                f.write(encrypted)
            
            return True
        except Exception as e:
            print(f"[SecureDataStore] Save error: {e}")
            return False
    
    def load(self, key: str, default: Any = None) -> Any:
        """Load and decrypt data."""
        file_path = self.data_dir / f"{key}.encrypted"
        
        if not file_path.exists():
            return default
        
        try:
            with open(file_path, 'rb') as f:
                encrypted = f.read()
            
            # Decrypt
            decrypted = self._cipher.decrypt(encrypted)
            
            # Deserialize
            return json.loads(decrypted.decode('utf-8'))
            
        except Exception as e:
            print(f"[SecureDataStore] Load error: {e}")
            return default

🟡 MEDIUM: Clipboard History Stored in Plain Text

Location: core/clipboard.py:45-55

Clipboard may contain sensitive data (passwords, private keys).

Fix:

def _save_history(self):
    """Save clipboard history to encrypted file."""
    try:
        self._history_file.parent.mkdir(parents=True, exist_ok=True)
        
        # Encrypt sensitive clipboard data
        data = [asdict(entry) for entry in self._history]
        json_data = json.dumps(data).encode('utf-8')
        
        # Use simple XOR obfuscation at minimum
        # Better: use the SecureDataStore above
        encrypted = self._encrypt(json_data)
        
        with open(self._history_file, 'wb') as f:
            f.write(encrypted)
    except Exception as e:
        print(f"[Clipboard] Error saving history: {e}")

6. FILE OPERATIONS AUDIT

Summary of File Operations

Module Operation Risk Level Notes
data_store.py Read/Write JSON HIGH Path traversal possible
settings.py Read/Write JSON MEDIUM No validation
screenshot.py Save images HIGH Path traversal in filename
clipboard.py Read/Write JSON LOW Plain text storage
http_client.py Cache to disk MEDIUM Cache poisoning possible
log_reader.py Read log file LOW Read-only, path hardcoded

Recommendations:

  1. Implement centralized file access control:
# core/file_access.py
"""
Centralized file access control with path validation.
"""
from pathlib import Path
from typing import Optional
import threading

class FileAccessController:
    """Controls and audits file system access."""
    
    _instance = None
    _lock = threading.Lock()
    
    # Allowed base directories
    ALLOWED_PATHS = {
        'data': Path('data').resolve(),
        'cache': Path('cache').resolve(),
        'screenshots': Path.home() / 'Documents' / 'Entropia Universe' / 'Screenshots',
        'temp': Path('temp').resolve(),
    }
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
        return cls._instance
    
    def validate_path(self, path: Path, allowed_zone: str) -> Path:
        """Validate that path is within allowed zone."""
        if allowed_zone not in self.ALLOWED_PATHS:
            raise ValueError(f"Unknown zone: {allowed_zone}")
        
        base_path = self.ALLOWED_PATHS[allowed_zone]
        
        # Resolve the path
        try:
            resolved = path.resolve()
            base_resolved = base_path.resolve()
        except (OSError, ValueError) as e:
            raise ValueError(f"Invalid path: {e}")
        
        # Check path is within allowed zone
        if not str(resolved).startswith(str(base_resolved)):
            raise PermissionError(f"Path traversal detected: {path}")
        
        return resolved
    
    def safe_open(self, path: Path, mode: str, allowed_zone: str, **kwargs):
        """Safely open a file after validation."""
        validated_path = self.validate_path(path, allowed_zone)
        
        # Ensure parent directory exists
        validated_path.parent.mkdir(parents=True, exist_ok=True)
        
        return open(validated_path, mode, **kwargs)

7. NETWORK REQUESTS AUDIT

Summary of Network Operations

Module Purpose Risk Level Notes
nexus_api.py Game data API MEDIUM No cert pinning, basic retry
http_client.py Generic HTTP MEDIUM Disk cache, no integrity check
plugin_api.py Plugin network access HIGH No restrictions

Findings:

🟠 HIGH: No Certificate Pinning

Location: core/nexus_api.py:45-50

self._session = requests.Session()
self._session.headers.update({
    'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)',
    'Accept': 'application/json'
})

Fix:

import certifi
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

class NexusAPI:
    # Expected certificate fingerprint (example)
    PINNED_CERT_FINGERPRINT = "sha256/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
    
    def _get_session(self):
        """Get or create HTTP session with security settings."""
        if self._session is None:
            self._session = requests.Session()
            
            # Use certificate verification
            self._session.verify = certifi.where()
            
            # Add certificate pinning (optional but recommended)
            # This requires additional implementation
            
            # Set security headers
            self._session.headers.update({
                'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)',
                'Accept': 'application/json',
                'Accept-Encoding': 'gzip, deflate',
            })
            
            # Configure retries with backoff
            retry_strategy = Retry(
                total=3,
                backoff_factor=1,
                status_forcelist=[429, 500, 502, 503, 504],
            )
            adapter = HTTPAdapter(max_retries=retry_strategy)
            self._session.mount("https://", adapter)
            
        return self._session

🟡 MEDIUM: Cache Poisoning Risk

Location: core/http_client.py:180-220

Cache files stored without integrity verification.

Fix:

def _save_cache_entry(self, cache_key: str, entry: CacheEntry):
    """Save a cache entry to disk with integrity check."""
    cache_path = self._get_cache_path(cache_key)
    cache_path.parent.mkdir(parents=True, exist_ok=True)
    
    try:
        with self._cache_lock:
            import base64
            import hashlib
            import hmac
            
            content_b64 = base64.b64encode(entry.content).decode('utf-8')
            
            data = {
                'url': entry.url,
                'status_code': entry.status_code,
                'headers': entry.headers,
                'content': content_b64,
                'cached_at': entry.cached_at,
                'expires_at': entry.expires_at,
            }
            
            # Add integrity signature
            data_json = json.dumps(data, sort_keys=True).encode()
            signature = hmac.new(
                self._cache_key.encode(),
                data_json,
                hashlib.sha256
            ).hexdigest()
            data['_integrity'] = signature
            
            with open(cache_path, 'w', encoding='utf-8') as f:
                json.dump(data, f)
                
    except Exception as e:
        print(f"[HTTP] Cache save error: {e}")

8. CONFIG STORAGE AUDIT

Current Config Files

File Format Encryption Validation
data/settings.json JSON No No
config/plugins.json JSON No No
data/plugins/*.json JSON No No
data/clipboard_history.json JSON No No

Recommendations:

  1. Migrate to encrypted config:
# core/secure_config.py
"""
Encrypted configuration management.
"""
from pathlib import Path
from cryptography.fernet import Fernet
import keyring
import json

class SecureConfig:
    """Encrypted configuration storage."""
    
    def __init__(self, config_name: str):
        self.config_file = Path("data/secure") / f"{config_name}.enc"
        self.config_file.parent.mkdir(parents=True, exist_ok=True)
        self._key = self._get_or_create_key()
        self._cipher = Fernet(self._key)
        self._data = self._load()
    
    def _get_or_create_key(self) -> bytes:
        """Get or create encryption key from keyring."""
        key_name = f"eu_utility_config_key_{self.config_file.stem}"
        key = keyring.get_password("EU-Utility", key_name)
        
        if key is None:
            key = Fernet.generate_key().decode()
            keyring.set_password("EU-Utility", key_name, key)
        
        return key.encode()
    
    def _load(self) -> dict:
        """Load encrypted config."""
        if not self.config_file.exists():
            return {}
        
        try:
            with open(self.config_file, 'rb') as f:
                encrypted = f.read()
            
            decrypted = self._cipher.decrypt(encrypted)
            return json.loads(decrypted.decode('utf-8'))
        except Exception as e:
            print(f"[SecureConfig] Load error: {e}")
            return {}
    
    def save(self) -> bool:
        """Save encrypted config."""
        try:
            json_data = json.dumps(self._data).encode('utf-8')
            encrypted = self._cipher.encrypt(json_data)
            
            with open(self.config_file, 'wb') as f:
                f.write(encrypted)
            return True
        except Exception as e:
            print(f"[SecureConfig] Save error: {e}")
            return False
    
    def get(self, key: str, default=None):
        return self._data.get(key, default)
    
    def set(self, key: str, value):
        self._data[key] = value
        self.save()

9. SECURITY HARDENING CHECKLIST

Immediate Actions (Critical)

  • Fix path traversal in DataStore._get_plugin_file()
  • Fix path traversal in ScreenshotService.save_screenshot()
  • Add JSON validation in Settings._load()
  • Implement plugin sandbox/restricted execution
  • Encrypt sensitive data storage

Short-term Actions (High Priority)

  • Add input validation for all user-provided paths
  • Implement plugin code signing verification
  • Add certificate pinning for API calls
  • Implement cache integrity verification
  • Add rate limiting for plugin API calls

Medium-term Actions (Medium Priority)

  • Migrate all config to encrypted storage
  • Implement centralized file access control
  • Add audit logging for sensitive operations
  • Implement network egress filtering for plugins
  • Add memory limits for plugin execution

Long-term Actions (Low Priority)

  • Implement full plugin isolation (separate process)
  • Add runtime behavior monitoring
  • Implement automatic threat detection
  • Add security-focused unit tests
  • Regular security audits

10. SECURE CODING GUIDELINES

For Plugin Developers

"""
Secure Plugin Development Guide
"""

# ✅ DO: Use the PluginAPI for all operations
class SecurePlugin(BasePlugin):
    def safe_operation(self):
        # Use API methods instead of direct file/network access
        data = self.api.get_data('key')
        self.api.set_data('key', value)

# ❌ DON'T: Access filesystem directly
class InsecurePlugin(BasePlugin):
    def unsafe_operation(self):
        # This will be blocked in sandboxed mode
        with open('/etc/passwd', 'r') as f:
            data = f.read()

# ✅ DO: Validate all inputs
class SecurePlugin(BasePlugin):
    def process_user_input(self, user_input: str):
        # Validate and sanitize
        if not re.match(r'^[a-zA-Z0-9_-]+$', user_input):
            raise ValueError("Invalid input")
        # Process safely

# ❌ DON'T: Trust external data
class InsecurePlugin(BasePlugin):
    def process_external_data(self, data: dict):
        # Dangerous: no validation
        exec(data['code'])  # NEVER DO THIS

Appendix A: Security Testing Commands

# Test path traversal
echo '{"../../../etc/passwd": "test"}' > data/plugins/../../../etc/test.json

# Test for code injection
python -c "import json; json.loads('{\"__class__\": \"test\"}')"

# Verify file permissions
ls -la data/
ls -la data/plugins/

# Check for sensitive data in logs
grep -r "password\|api_key\|secret" logs/

Appendix B: Incident Response

If a security breach is suspected:

  1. Immediate: Stop the application
  2. Assess: Check logs for unauthorized access
  3. Isolate: Remove suspicious plugins
  4. Clean: Delete cache and temporary files
  5. Update: Apply security patches
  6. Monitor: Enable enhanced logging

Document Version: 1.0
Last Updated: 2026-02-14
Next Review: 2026-03-14