33 KiB
EU-Utility Security Audit & Hardening Guide
Audit Date: 2026-02-14
Auditor: Security Agent
Scope: EU-Utility Core System & Plugin Architecture
Executive Summary
This security audit covers the EU-Utility application, a PyQt6-based overlay utility for Entropia Universe. The codebase shows good architectural patterns (singleton services, plugin system, event bus) but has several security vulnerabilities that need addressing.
Risk Level: MEDIUM-HIGH
Key Issues: 7 Critical, 12 High, 8 Medium priority findings
1. INPUT VALIDATION FINDINGS
🔴 CRITICAL: Path Traversal in DataStore
Location: core/data_store.py:68
def _get_plugin_file(self, plugin_id: str) -> Path:
"""Get the storage file path for a plugin."""
# Sanitize plugin_id to create a safe filename
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
return self.data_dir / f"{safe_name}.json"
Vulnerability: The sanitization is insufficient. Attackers can bypass using:
..sequences (not replaced)- Null bytes (
%00) - Unicode normalization attacks
- Double encoding (
..%252f)
Attack Scenario:
# Malicious plugin could write outside data directory
plugin_id = "../../../etc/passwd"
# Results in: data/plugins/.._.._.._etc_passwd.json (still dangerous)
Fix:
def _get_plugin_file(self, plugin_id: str) -> Path:
"""Get the storage file path for a plugin."""
import re
# Whitelist approach: only allow alphanumeric, dash, underscore
safe_name = re.sub(r'[^a-zA-Z0-9_-]', '_', plugin_id)
# Prevent path traversal with .. sequences
safe_name = safe_name.replace('..', '_')
file_path = self.data_dir / f"{safe_name}.json"
# Security check: ensure resolved path is within data_dir
try:
resolved_path = file_path.resolve()
resolved_data_dir = self.data_dir.resolve()
if not str(resolved_path).startswith(str(resolved_data_dir)):
raise ValueError(f"Path traversal detected: {plugin_id}")
except (OSError, ValueError) as e:
raise ValueError(f"Invalid plugin_id: {plugin_id}") from e
return file_path
🔴 CRITICAL: Unvalidated JSON Loading in Settings
Location: core/settings.py:63-69
def _load(self):
"""Load settings from file."""
self._settings = self.DEFAULTS.copy()
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
saved = json.load(f)
self._settings.update(saved)
except Exception as e:
print(f"Error loading settings: {e}")
Vulnerability: No validation of loaded JSON structure. Malformed settings can:
- Inject arbitrary types (objects, functions via
__reduce__) - Overflow memory with large nested structures
- Corrupt application state
Fix:
def _load(self):
"""Load settings from file."""
self._settings = self.DEFAULTS.copy()
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
saved = json.load(f)
# Validate loaded data
if not isinstance(saved, dict):
raise ValueError("Settings must be a dictionary")
# Check for suspicious keys
dangerous_keys = ['__class__', '__reduce__', '__reduce_ex__', '__getstate__']
for key in saved.keys():
if key.startswith('__') or key in dangerous_keys:
raise ValueError(f"Invalid key in settings: {key}")
# Validate values against expected types
for key, value in saved.items():
if key in self.DEFAULTS:
expected_type = type(self.DEFAULTS[key])
if not isinstance(value, expected_type):
print(f"[Settings] Type mismatch for {key}, using default")
continue
self._settings[key] = value
except json.JSONDecodeError as e:
print(f"[Settings] JSON parse error: {e}")
except Exception as e:
print(f"[Settings] Error loading settings: {e}")
🟠 HIGH: Unvalidated Region Parameters in OCR
Location: core/ocr_service.py:115-130
def capture_screen(self, region: Tuple[int, int, int, int] = None) -> 'Image.Image':
if region:
x, y, width, height = region
return screenshot_service.capture_region(x, y, width, height)
Vulnerability: No bounds checking on region parameters. Can cause:
- Memory exhaustion with huge regions
- Negative coordinates causing crashes
- Screen scraping outside monitor bounds
Fix:
def capture_screen(self, region: Tuple[int, int, int, int] = None) -> 'Image.Image':
if region:
x, y, width, height = region
# Validate region parameters
if not all(isinstance(v, int) for v in region):
raise ValueError("Region values must be integers")
if width <= 0 or height <= 0:
raise ValueError("Region width and height must be positive")
if width > 7680 or height > 4320: # 8K resolution limit
raise ValueError("Region dimensions exceed maximum allowed (8K)")
if x < -10000 or y < -10000: # Sanity check
raise ValueError("Region coordinates out of reasonable bounds")
return screenshot_service.capture_region(x, y, width, height)
🟠 HIGH: Unvalidated URL Parameters in NexusAPI
Location: core/nexus_api.py:180-200
def _make_request(self, endpoint: str, params: Dict[str, Any] = None, ...):
url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}"
# ...
response = session.get(url, params=params, timeout=30)
Vulnerability: Endpoint parameter not validated, allowing:
- Path traversal in URL (
../admin/config) - SSRF attacks via crafted endpoints
- Injection of query parameters
Fix:
def _make_request(self, endpoint: str, params: Dict[str, Any] = None, ...):
# Validate endpoint format
import re
if not re.match(r'^[a-zA-Z0-9_/-]+$', endpoint):
raise ValueError(f"Invalid endpoint format: {endpoint}")
# Prevent path traversal
if '..' in endpoint or endpoint.startswith('/'):
raise ValueError(f"Path traversal detected in endpoint: {endpoint}")
url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}"
# Validate and sanitize params
if params:
sanitized_params = {}
for key, value in params.items():
# Validate key
if not re.match(r'^[a-zA-Z0-9_]+$', str(key)):
continue
# Sanitize value
if isinstance(value, str):
sanitized_params[key] = value[:1000] # Limit length
else:
sanitized_params[key] = value
params = sanitized_params
response = session.get(url, params=params, timeout=30)
2. PATH TRAVERSAL PREVENTION
🔴 CRITICAL: Screenshot Save Path Vulnerability
Location: core/screenshot.py:186-200
def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
if filename is None:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
filepath = self._save_path / filename
# ... saves without validation
Vulnerability: Filename parameter allows path traversal.
Fix:
def save_screenshot(self, image: Image.Image, filename: Optional[str] = None) -> Path:
if filename is None:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S_%f")[:-3]
filename = f"screenshot_{timestamp}.{self._format.lower()}"
# Sanitize filename
import re
# Remove path components and dangerous characters
safe_filename = re.sub(r'[\\/:*?"<>|]', '_', filename)
safe_filename = safe_filename.replace('..', '_')
# Ensure filename is not empty and has reasonable length
if not safe_filename or len(safe_filename) > 255:
safe_filename = f"screenshot_{int(time.time())}.png"
filepath = self._save_path / safe_filename
# Security check: ensure resolved path is within save_path
try:
resolved_path = filepath.resolve()
resolved_save_path = self._save_path.resolve()
if not str(resolved_path).startswith(str(resolved_save_path)):
raise ValueError("Path traversal detected")
except (OSError, ValueError) as e:
print(f"[Screenshot] Security error: {e}")
# Fallback to safe default
filepath = self._save_path / f"screenshot_{int(time.time())}.png"
# Save with appropriate settings
# ... rest of method
🟠 HIGH: Icon Manager Path Traversal
Location: core/icon_manager.py (if exists - check for similar patterns)
Ensure all file operations use safe path resolution.
3. API KEY HANDLING
🟢 GOOD: No Hardcoded API Keys Found
The codebase correctly avoids hardcoded API keys. However, improvements needed:
🟡 MEDIUM: API Key Storage in Plain Text
Issue: If API keys are added in the future, they should not be stored in plain text JSON.
Recommendation:
# Use keyring for secure storage
import keyring
def store_api_key(service_name: str, api_key: str):
"""Securely store API key in system keyring."""
keyring.set_password("EU-Utility", service_name, api_key)
def get_api_key(service_name: str) -> Optional[str]:
"""Retrieve API key from system keyring."""
return keyring.get_password("EU-Utility", service_name)
4. PLUGIN SANDBOXING
🔴 CRITICAL: No Plugin Sandboxing
Location: core/plugin_manager.py:85-130
def discover_plugins(self) -> List[Type[BasePlugin]]:
# ...
spec.loader.exec_module(module) # Executes arbitrary code!
# ...
Vulnerability: Plugins execute with full Python privileges. Malicious plugins can:
- Access filesystem anywhere
- Make network requests
- Execute system commands
- Access other processes' memory
- Steal user data
Risk Assessment:
| Capability | Current Risk | Mitigation Priority |
|---|---|---|
| File System | CRITICAL | HIGH |
| Network | CRITICAL | HIGH |
| Process Execution | CRITICAL | HIGH |
| Memory Access | HIGH | MEDIUM |
| UI Spoofing | MEDIUM | MEDIUM |
Recommended Fix - Plugin Sandbox Architecture:
# core/plugin_sandbox.py
"""
Plugin sandbox using restricted Python execution.
"""
import ast
import builtins
import sys
from types import ModuleType
from typing import Set, Dict, Any
class PluginSandbox:
"""Restricted execution environment for plugins."""
# Whitelist of allowed builtin functions
ALLOWED_BUILTINS: Set[str] = {
'abs', 'all', 'any', 'bool', 'dict', 'dir', 'enumerate',
'filter', 'float', 'format', 'frozenset', 'hasattr', 'hash',
'hex', 'id', 'input', 'int', 'isinstance', 'issubclass',
'iter', 'len', 'list', 'map', 'max', 'min', 'next',
'object', 'oct', 'ord', 'pow', 'print', 'property',
'range', 'repr', 'reversed', 'round', 'set', 'slice',
'sorted', 'staticmethod', 'str', 'sum', 'super', 'tuple',
'type', 'vars', 'zip', '__import__', 'Exception', 'BaseException'
}
# Allowed modules (subset only)
ALLOWED_MODULES: Set[str] = {
'json', 're', 'math', 'random', 'datetime', 'collections',
'itertools', 'functools', 'typing', 'dataclasses', 'enum'
}
def __init__(self, plugin_id: str):
self.plugin_id = plugin_id
self.data_dir = Path(f"data/plugins/{plugin_id}")
self.data_dir.mkdir(parents=True, exist_ok=True)
def create_restricted_globals(self) -> Dict[str, Any]:
"""Create restricted globals for plugin execution."""
# Filter builtins
safe_builtins = {
name: getattr(builtins, name)
for name in self.ALLOWED_BUILTINS
if hasattr(builtins, name)
}
# Add restricted __import__
safe_builtins['__import__'] = self._restricted_import
return {
'__builtins__': safe_builtins,
'__name__': f'__plugin_{self.plugin_id}__',
}
def _restricted_import(self, name: str, *args, **kwargs):
"""Restricted import function."""
# Check if module is allowed
base_module = name.split('.')[0]
if base_module not in self.ALLOWED_MODULES:
raise ImportError(f"Module '{name}' is not allowed in plugin sandbox")
# Special handling for EU-Utility core modules
if name.startswith('core.'):
return self._wrap_core_module(name)
if name.startswith('plugins.'):
raise ImportError("Plugins cannot import other plugins directly")
return __import__(name, *args, **kwargs)
def _wrap_core_module(self, name: str):
"""Wrap core module to restrict dangerous operations."""
# Return proxy that validates all calls
# Implementation depends on specific module
pass
def validate_source(self, source: str) -> bool:
"""Validate plugin source code for dangerous patterns."""
try:
tree = ast.parse(source)
except SyntaxError:
return False
dangerous_nodes = (
ast.Exec, # Python 2
ast.Call, # Need to check for eval, exec
)
for node in ast.walk(tree):
# Check for eval/exec calls
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ('eval', 'exec', 'compile'):
return False
# Check for __import__
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id == '__import__':
# Will be validated at runtime
pass
# Check for dangerous attributes
if isinstance(node, ast.Attribute):
if node.attr.startswith('__') and node.attr.endswith('__'):
# Block access to dunder methods that could be dangerous
dangerous_dunders = {
'__class__', '__bases__', '__mro__', '__subclasses__',
'__globals__', '__code__', '__func__', '__self__',
'__module__', '__dict__', '__weakref__'
}
if node.attr in dangerous_dunders:
return False
return True
def execute(self, source: str, filename: str = '<plugin>'):
"""Execute plugin code in sandbox."""
if not self.validate_source(source):
raise SecurityError("Plugin source contains dangerous patterns")
code = compile(source, filename, 'exec')
globals_dict = self.create_restricted_globals()
# Execute in restricted environment
exec(code, globals_dict)
return globals_dict
class SecurityError(Exception):
"""Raised when a security violation is detected."""
pass
🟠 HIGH: Plugin Code Execution Without Verification
Location: core/plugin_manager.py:108
spec.loader.exec_module(module)
Recommendation: Implement code signing verification:
import hashlib
import hmac
from pathlib import Path
class PluginVerifier:
"""Verify plugin integrity and authenticity."""
def __init__(self, trusted_keys_dir: Path):
self.trusted_keys_dir = trusted_keys_dir
self.verified_plugins: Dict[str, str] = {} # plugin_id -> hash
def verify_plugin(self, plugin_path: Path) -> bool:
"""Verify plugin signature."""
signature_file = plugin_path / ".signature"
manifest_file = plugin_path / "manifest.json"
if not signature_file.exists():
# Unsigned plugins require explicit user consent
return self._prompt_user_for_unsigned(plugin_path)
# Verify signature
try:
with open(manifest_file, 'rb') as f:
manifest_data = f.read()
with open(signature_file, 'r') as f:
signature = f.read().strip()
# Parse signature (format: key_id:signature)
key_id, sig = signature.split(':', 1)
# Load public key
public_key_path = self.trusted_keys_dir / f"{key_id}.pub"
if not public_key_path.exists():
print(f"[PluginVerifier] Unknown key: {key_id}")
return False
with open(public_key_path, 'rb') as f:
public_key = f.read()
# Verify HMAC-SHA256 signature
expected_sig = hmac.new(
public_key,
manifest_data,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(sig, expected_sig):
print("[PluginVerifier] Invalid signature")
return False
# Store verified hash
self.verified_plugins[plugin_path.name] = hashlib.sha256(manifest_data).hexdigest()
return True
except Exception as e:
print(f"[PluginVerifier] Verification error: {e}")
return False
def _prompt_user_for_unsigned(self, plugin_path: Path) -> bool:
"""Prompt user to approve unsigned plugin."""
# This should show a UI dialog
# For now, reject unsigned plugins in production
return False
5. DATA ENCRYPTION
🔴 CRITICAL: Sensitive Data Stored in Plain Text
Locations:
core/data_store.py- Plugin data stored as plain JSONcore/settings.py- User settings in plain text- Plugin data files (loot_tracker, skill_scanner, etc.)
Vulnerability: User data including:
- Game statistics
- Personal preferences
- Potentially sensitive game information
All stored unencrypted on disk.
Fix - Encrypted Data Store:
# core/secure_data_store.py
"""
Encrypted data storage using Fernet (AES-128 in CBC mode).
"""
import json
import base64
from pathlib import Path
from typing import Any, Dict, Optional
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import keyring
class SecureDataStore:
"""Encrypted data storage for sensitive plugin data."""
def __init__(self, plugin_id: str):
self.plugin_id = plugin_id
self.data_dir = Path("data/plugins") / plugin_id
self.data_dir.mkdir(parents=True, exist_ok=True)
self._cipher = None
self._init_encryption()
def _init_encryption(self):
"""Initialize encryption key."""
# Try to get existing key from keyring
key = keyring.get_password("EU-Utility", f"data_key_{self.plugin_id}")
if key is None:
# Generate new key
key = Fernet.generate_key().decode()
keyring.set_password("EU-Utility", f"data_key_{self.plugin_id}", key)
self._cipher = Fernet(key.encode())
def save(self, key: str, data: Any) -> bool:
"""Save encrypted data."""
try:
# Serialize to JSON
json_data = json.dumps(data).encode('utf-8')
# Encrypt
encrypted = self._cipher.encrypt(json_data)
# Save to file
file_path = self.data_dir / f"{key}.encrypted"
with open(file_path, 'wb') as f:
f.write(encrypted)
return True
except Exception as e:
print(f"[SecureDataStore] Save error: {e}")
return False
def load(self, key: str, default: Any = None) -> Any:
"""Load and decrypt data."""
file_path = self.data_dir / f"{key}.encrypted"
if not file_path.exists():
return default
try:
with open(file_path, 'rb') as f:
encrypted = f.read()
# Decrypt
decrypted = self._cipher.decrypt(encrypted)
# Deserialize
return json.loads(decrypted.decode('utf-8'))
except Exception as e:
print(f"[SecureDataStore] Load error: {e}")
return default
🟡 MEDIUM: Clipboard History Stored in Plain Text
Location: core/clipboard.py:45-55
Clipboard may contain sensitive data (passwords, private keys).
Fix:
def _save_history(self):
"""Save clipboard history to encrypted file."""
try:
self._history_file.parent.mkdir(parents=True, exist_ok=True)
# Encrypt sensitive clipboard data
data = [asdict(entry) for entry in self._history]
json_data = json.dumps(data).encode('utf-8')
# Use simple XOR obfuscation at minimum
# Better: use the SecureDataStore above
encrypted = self._encrypt(json_data)
with open(self._history_file, 'wb') as f:
f.write(encrypted)
except Exception as e:
print(f"[Clipboard] Error saving history: {e}")
6. FILE OPERATIONS AUDIT
Summary of File Operations
| Module | Operation | Risk Level | Notes |
|---|---|---|---|
data_store.py |
Read/Write JSON | HIGH | Path traversal possible |
settings.py |
Read/Write JSON | MEDIUM | No validation |
screenshot.py |
Save images | HIGH | Path traversal in filename |
clipboard.py |
Read/Write JSON | LOW | Plain text storage |
http_client.py |
Cache to disk | MEDIUM | Cache poisoning possible |
log_reader.py |
Read log file | LOW | Read-only, path hardcoded |
Recommendations:
- Implement centralized file access control:
# core/file_access.py
"""
Centralized file access control with path validation.
"""
from pathlib import Path
from typing import Optional
import threading
class FileAccessController:
"""Controls and audits file system access."""
_instance = None
_lock = threading.Lock()
# Allowed base directories
ALLOWED_PATHS = {
'data': Path('data').resolve(),
'cache': Path('cache').resolve(),
'screenshots': Path.home() / 'Documents' / 'Entropia Universe' / 'Screenshots',
'temp': Path('temp').resolve(),
}
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def validate_path(self, path: Path, allowed_zone: str) -> Path:
"""Validate that path is within allowed zone."""
if allowed_zone not in self.ALLOWED_PATHS:
raise ValueError(f"Unknown zone: {allowed_zone}")
base_path = self.ALLOWED_PATHS[allowed_zone]
# Resolve the path
try:
resolved = path.resolve()
base_resolved = base_path.resolve()
except (OSError, ValueError) as e:
raise ValueError(f"Invalid path: {e}")
# Check path is within allowed zone
if not str(resolved).startswith(str(base_resolved)):
raise PermissionError(f"Path traversal detected: {path}")
return resolved
def safe_open(self, path: Path, mode: str, allowed_zone: str, **kwargs):
"""Safely open a file after validation."""
validated_path = self.validate_path(path, allowed_zone)
# Ensure parent directory exists
validated_path.parent.mkdir(parents=True, exist_ok=True)
return open(validated_path, mode, **kwargs)
7. NETWORK REQUESTS AUDIT
Summary of Network Operations
| Module | Purpose | Risk Level | Notes |
|---|---|---|---|
nexus_api.py |
Game data API | MEDIUM | No cert pinning, basic retry |
http_client.py |
Generic HTTP | MEDIUM | Disk cache, no integrity check |
plugin_api.py |
Plugin network access | HIGH | No restrictions |
Findings:
🟠 HIGH: No Certificate Pinning
Location: core/nexus_api.py:45-50
self._session = requests.Session()
self._session.headers.update({
'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)',
'Accept': 'application/json'
})
Fix:
import certifi
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
class NexusAPI:
# Expected certificate fingerprint (example)
PINNED_CERT_FINGERPRINT = "sha256/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
def _get_session(self):
"""Get or create HTTP session with security settings."""
if self._session is None:
self._session = requests.Session()
# Use certificate verification
self._session.verify = certifi.where()
# Add certificate pinning (optional but recommended)
# This requires additional implementation
# Set security headers
self._session.headers.update({
'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
})
# Configure retries with backoff
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self._session.mount("https://", adapter)
return self._session
🟡 MEDIUM: Cache Poisoning Risk
Location: core/http_client.py:180-220
Cache files stored without integrity verification.
Fix:
def _save_cache_entry(self, cache_key: str, entry: CacheEntry):
"""Save a cache entry to disk with integrity check."""
cache_path = self._get_cache_path(cache_key)
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
with self._cache_lock:
import base64
import hashlib
import hmac
content_b64 = base64.b64encode(entry.content).decode('utf-8')
data = {
'url': entry.url,
'status_code': entry.status_code,
'headers': entry.headers,
'content': content_b64,
'cached_at': entry.cached_at,
'expires_at': entry.expires_at,
}
# Add integrity signature
data_json = json.dumps(data, sort_keys=True).encode()
signature = hmac.new(
self._cache_key.encode(),
data_json,
hashlib.sha256
).hexdigest()
data['_integrity'] = signature
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
print(f"[HTTP] Cache save error: {e}")
8. CONFIG STORAGE AUDIT
Current Config Files
| File | Format | Encryption | Validation |
|---|---|---|---|
data/settings.json |
JSON | ❌ No | ❌ No |
config/plugins.json |
JSON | ❌ No | ❌ No |
data/plugins/*.json |
JSON | ❌ No | ❌ No |
data/clipboard_history.json |
JSON | ❌ No | ❌ No |
Recommendations:
- Migrate to encrypted config:
# core/secure_config.py
"""
Encrypted configuration management.
"""
from pathlib import Path
from cryptography.fernet import Fernet
import keyring
import json
class SecureConfig:
"""Encrypted configuration storage."""
def __init__(self, config_name: str):
self.config_file = Path("data/secure") / f"{config_name}.enc"
self.config_file.parent.mkdir(parents=True, exist_ok=True)
self._key = self._get_or_create_key()
self._cipher = Fernet(self._key)
self._data = self._load()
def _get_or_create_key(self) -> bytes:
"""Get or create encryption key from keyring."""
key_name = f"eu_utility_config_key_{self.config_file.stem}"
key = keyring.get_password("EU-Utility", key_name)
if key is None:
key = Fernet.generate_key().decode()
keyring.set_password("EU-Utility", key_name, key)
return key.encode()
def _load(self) -> dict:
"""Load encrypted config."""
if not self.config_file.exists():
return {}
try:
with open(self.config_file, 'rb') as f:
encrypted = f.read()
decrypted = self._cipher.decrypt(encrypted)
return json.loads(decrypted.decode('utf-8'))
except Exception as e:
print(f"[SecureConfig] Load error: {e}")
return {}
def save(self) -> bool:
"""Save encrypted config."""
try:
json_data = json.dumps(self._data).encode('utf-8')
encrypted = self._cipher.encrypt(json_data)
with open(self.config_file, 'wb') as f:
f.write(encrypted)
return True
except Exception as e:
print(f"[SecureConfig] Save error: {e}")
return False
def get(self, key: str, default=None):
return self._data.get(key, default)
def set(self, key: str, value):
self._data[key] = value
self.save()
9. SECURITY HARDENING CHECKLIST
Immediate Actions (Critical)
- Fix path traversal in
DataStore._get_plugin_file() - Fix path traversal in
ScreenshotService.save_screenshot() - Add JSON validation in
Settings._load() - Implement plugin sandbox/restricted execution
- Encrypt sensitive data storage
Short-term Actions (High Priority)
- Add input validation for all user-provided paths
- Implement plugin code signing verification
- Add certificate pinning for API calls
- Implement cache integrity verification
- Add rate limiting for plugin API calls
Medium-term Actions (Medium Priority)
- Migrate all config to encrypted storage
- Implement centralized file access control
- Add audit logging for sensitive operations
- Implement network egress filtering for plugins
- Add memory limits for plugin execution
Long-term Actions (Low Priority)
- Implement full plugin isolation (separate process)
- Add runtime behavior monitoring
- Implement automatic threat detection
- Add security-focused unit tests
- Regular security audits
10. SECURE CODING GUIDELINES
For Plugin Developers
"""
Secure Plugin Development Guide
"""
# ✅ DO: Use the PluginAPI for all operations
class SecurePlugin(BasePlugin):
def safe_operation(self):
# Use API methods instead of direct file/network access
data = self.api.get_data('key')
self.api.set_data('key', value)
# ❌ DON'T: Access filesystem directly
class InsecurePlugin(BasePlugin):
def unsafe_operation(self):
# This will be blocked in sandboxed mode
with open('/etc/passwd', 'r') as f:
data = f.read()
# ✅ DO: Validate all inputs
class SecurePlugin(BasePlugin):
def process_user_input(self, user_input: str):
# Validate and sanitize
if not re.match(r'^[a-zA-Z0-9_-]+$', user_input):
raise ValueError("Invalid input")
# Process safely
# ❌ DON'T: Trust external data
class InsecurePlugin(BasePlugin):
def process_external_data(self, data: dict):
# Dangerous: no validation
exec(data['code']) # NEVER DO THIS
Appendix A: Security Testing Commands
# Test path traversal
echo '{"../../../etc/passwd": "test"}' > data/plugins/../../../etc/test.json
# Test for code injection
python -c "import json; json.loads('{\"__class__\": \"test\"}')"
# Verify file permissions
ls -la data/
ls -la data/plugins/
# Check for sensitive data in logs
grep -r "password\|api_key\|secret" logs/
Appendix B: Incident Response
If a security breach is suspected:
- Immediate: Stop the application
- Assess: Check logs for unauthorized access
- Isolate: Remove suspicious plugins
- Clean: Delete cache and temporary files
- Update: Apply security patches
- Monitor: Enable enhanced logging
Document Version: 1.0
Last Updated: 2026-02-14
Next Review: 2026-03-14