""" EU-Utility Premium - Plugin Manager ==================================== Enterprise-grade plugin manager with: - Sandboxed execution - Dynamic loading/unloading - Dependency resolution - Lifecycle management - Security validation Example: manager = PluginManager() manager.discover_plugins(Path("./plugins")) manager.load_all() # Activate specific plugin manager.activate_plugin("my_plugin") """ from __future__ import annotations import asyncio import hashlib import importlib.util import json import logging import os import sys import threading import time import traceback from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Set, Type, Union from uuid import uuid4 from premium.plugins.api import ( PluginAPI, PluginContext, PluginError, PluginInstance, PluginLoadError, PluginInitError, PluginManifest, PluginPermissionError, PluginState, PermissionLevel, PluginDependencyError, PluginVersionError ) # API version for compatibility checking API_VERSION = "3.0.0" # ============================================================================= # PLUGIN SANDBOX # ============================================================================= class PluginSandbox: """Sandbox for plugin execution. Restricts plugin access based on permissions. """ def __init__(self, plugin_id: str, permissions: Set[PermissionLevel]): self.plugin_id = plugin_id self.permissions = permissions self._original_open = open self._restricted_paths: Set[Path] = set() def can_access_file(self, path: Path, mode: str = 'r') -> bool: """Check if plugin can access a file.""" if 'w' in mode or 'a' in mode or 'x' in mode: if PermissionLevel.FILE_WRITE not in self.permissions: return False if PermissionLevel.FILE_READ not in self.permissions: return False # Check restricted paths resolved = path.resolve() for restricted in self._restricted_paths: try: resolved.relative_to(restricted) return False except ValueError: pass return True def can_access_network(self) -> bool: """Check if plugin can access network.""" return PermissionLevel.NETWORK in self.permissions def can_access_ui(self) -> bool: """Check if plugin can manipulate UI.""" return PermissionLevel.UI in self.permissions def can_access_memory(self) -> bool: """Check if plugin can access memory (dangerous).""" return PermissionLevel.MEMORY in self.permissions # ============================================================================= # PLUGIN LOADER # ============================================================================= class PluginLoader: """Loads plugin modules with validation and security checks.""" def __init__(self, sandbox: Optional[PluginSandbox] = None): self.sandbox = sandbox self._loaded_modules: Dict[str, Any] = {} def load_plugin_class( self, plugin_path: Path, manifest: PluginManifest ) -> Type[PluginAPI]: """Load a plugin class from a directory. Args: plugin_path: Path to plugin directory manifest: Plugin manifest Returns: PluginAPI subclass Raises: PluginLoadError: If loading fails """ entry_file = plugin_path / manifest.entry_point if not entry_file.exists(): raise PluginLoadError( f"Entry point not found: {manifest.entry_point}" ) # Validate file hash for security file_hash = self._compute_hash(entry_file) # Create unique module name module_name = f"premium_plugin_{manifest.name.lower().replace(' ', '_')}_{file_hash[:8]}" try: # Load module spec = importlib.util.spec_from_file_location( module_name, entry_file ) if spec is None or spec.loader is None: raise PluginLoadError("Failed to create module spec") module = importlib.util.module_from_spec(spec) # Add to sys.modules temporarily sys.modules[module_name] = module try: spec.loader.exec_module(module) except Exception as e: raise PluginLoadError(f"Failed to execute module: {e}") # Find PluginAPI subclass plugin_class = None for attr_name in dir(module): attr = getattr(module, attr_name) if ( isinstance(attr, type) and issubclass(attr, PluginAPI) and attr is not PluginAPI and not attr.__name__.startswith('Base') ): plugin_class = attr break if plugin_class is None: raise PluginLoadError( f"No PluginAPI subclass found in {manifest.entry_point}" ) # Validate manifest matches if not hasattr(plugin_class, 'manifest'): raise PluginLoadError("Plugin class missing manifest attribute") return plugin_class except PluginLoadError: raise except Exception as e: raise PluginLoadError(f"Unexpected error loading plugin: {e}") def _compute_hash(self, file_path: Path) -> str: """Compute SHA256 hash of a file.""" h = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(8192), b''): h.update(chunk) return h.hexdigest() # ============================================================================= # DEPENDENCY RESOLVER # ============================================================================= @dataclass class DependencyNode: """Node in dependency graph.""" plugin_id: str manifest: PluginManifest dependencies: Set[str] = field(default_factory=set) dependents: Set[str] = field(default_factory=set) resolved: bool = False class DependencyResolver: """Resolves plugin dependencies using topological sort.""" def __init__(self): self._nodes: Dict[str, DependencyNode] = {} def add_plugin(self, plugin_id: str, manifest: PluginManifest) -> None: """Add a plugin to the dependency graph.""" if plugin_id not in self._nodes: self._nodes[plugin_id] = DependencyNode( plugin_id=plugin_id, manifest=manifest ) node = self._nodes[plugin_id] # Add dependencies for dep_id in manifest.dependencies.keys(): node.dependencies.add(dep_id) if dep_id not in self._nodes: self._nodes[dep_id] = DependencyNode( plugin_id=dep_id, manifest=PluginManifest( name=dep_id, version="0.0.0", author="Unknown" ) ) self._nodes[dep_id].dependents.add(plugin_id) def resolve_load_order(self, plugin_ids: List[str]) -> List[str]: """Resolve plugin load order using topological sort. Returns: List of plugin IDs in load order Raises: PluginDependencyError: If circular dependency detected """ order: List[str] = [] visited: Set[str] = set() temp_mark: Set[str] = set() def visit(node_id: str, path: List[str]) -> None: if node_id in temp_mark: cycle = " -> ".join(path + [node_id]) raise PluginDependencyError(f"Circular dependency: {cycle}") if node_id in visited: return temp_mark.add(node_id) path.append(node_id) node = self._nodes.get(node_id) if node: for dep_id in node.dependencies: visit(dep_id, path.copy()) temp_mark.remove(node_id) visited.add(node_id) order.append(node_id) for plugin_id in plugin_ids: if plugin_id not in visited: visit(plugin_id, []) return order def get_dependents(self, plugin_id: str) -> Set[str]: """Get all plugins that depend on a given plugin.""" node = self._nodes.get(plugin_id) if node: return node.dependents.copy() return set() def check_conflicts(self) -> List[str]: """Check for version conflicts in dependencies.""" conflicts = [] # TODO: Implement version conflict checking return conflicts # ============================================================================= # PLUGIN MANAGER # ============================================================================= class PluginManager: """Manages plugin lifecycle with sandboxing and dependency resolution. This is the main entry point for plugin management. It handles: - Discovery of plugins in directories - Loading and unloading - Dependency resolution - Lifecycle state management - Security sandboxing Example: manager = PluginManager( plugin_dirs=[Path("./plugins"), Path("~/.eu-utility/plugins")], data_dir=Path("~/.eu-utility/data") ) manager.discover_all() manager.load_all() # Get active plugins for plugin_id, instance in manager.get_active_plugins().items(): print(f"{plugin_id}: {instance.manifest.name}") """ def __init__( self, plugin_dirs: Optional[List[Path]] = None, data_dir: Optional[Path] = None, event_bus: Optional[Any] = None, state_store: Optional[Any] = None, widget_api: Optional[Any] = None, nexus_api: Optional[Any] = None, max_workers: int = 4 ): """Initialize plugin manager. Args: plugin_dirs: Directories to search for plugins data_dir: Directory for plugin data storage event_bus: Event bus for plugin communication state_store: State store for plugins widget_api: Widget API for UI plugins nexus_api: Nexus API for Entropia data max_workers: Max worker threads for background tasks """ self.plugin_dirs = plugin_dirs or [] self.data_dir = data_dir or Path.home() / ".eu-utility" / "data" self.event_bus = event_bus self.state_store = state_store self.widget_api = widget_api self.nexus_api = nexus_api # Ensure data directory exists self.data_dir.mkdir(parents=True, exist_ok=True) # Plugin storage self._instances: Dict[str, PluginInstance] = {} self._classes: Dict[str, Type[PluginAPI]] = {} self._paths: Dict[str, Path] = {} # Support systems self._loader = PluginLoader() self._resolver = DependencyResolver() self._executor = ThreadPoolExecutor(max_workers=max_workers) self._logger = logging.getLogger("PluginManager") # State self._discovered = False self._lock = threading.RLock() # ========== Discovery ========== def discover_plugins(self, directory: Path) -> List[str]: """Discover plugins in a directory. Args: directory: Directory to search for plugins Returns: List of discovered plugin IDs """ discovered: List[str] = [] if not directory.exists(): self._logger.warning(f"Plugin directory does not exist: {directory}") return discovered for item in directory.iterdir(): if not item.is_dir(): continue if item.name.startswith('.') or item.name.startswith('__'): continue manifest_path = item / "plugin.json" if not manifest_path.exists(): continue try: manifest = PluginManifest.from_json(manifest_path) plugin_id = self._generate_plugin_id(manifest, item) with self._lock: self._paths[plugin_id] = item self._resolver.add_plugin(plugin_id, manifest) discovered.append(plugin_id) self._logger.debug(f"Discovered plugin: {manifest.name} ({plugin_id})") except Exception as e: self._logger.error(f"Failed to load manifest from {item}: {e}") return discovered def discover_all(self) -> int: """Discover plugins in all configured directories. Returns: Total number of plugins discovered """ total = 0 for directory in self.plugin_dirs: total += len(self.discover_plugins(directory)) self._discovered = True self._logger.info(f"Discovered {total} plugins") return total # ========== Loading ========== def load_plugin(self, plugin_id: str) -> bool: """Load a plugin by ID. Args: plugin_id: Unique plugin identifier Returns: True if loaded successfully """ with self._lock: if plugin_id in self._instances: return True path = self._paths.get(plugin_id) if not path: self._logger.error(f"Plugin not found: {plugin_id}") return False manifest_path = path / "plugin.json" manifest = PluginManifest.from_json(manifest_path) # Create instance record instance = PluginInstance( plugin_id=plugin_id, manifest=manifest, state=PluginState.LOADING ) self._instances[plugin_id] = instance try: # Load plugin class plugin_class = self._loader.load_plugin_class(path, manifest) self._classes[plugin_id] = plugin_class with self._lock: instance.state = PluginState.LOADED instance.load_time = datetime.now() self._logger.info(f"Loaded plugin: {manifest.name}") return True except Exception as e: with self._lock: instance.state = PluginState.ERROR instance.error_message = str(e) instance.error_traceback = traceback.format_exc() self._logger.error(f"Failed to load plugin {manifest.name}: {e}") return False def load_all(self, auto_activate: bool = False) -> Dict[str, bool]: """Load all discovered plugins. Args: auto_activate: Automatically activate loaded plugins Returns: Dict mapping plugin IDs to success status """ if not self._discovered: self.discover_all() # Resolve load order plugin_ids = list(self._paths.keys()) try: load_order = self._resolver.resolve_load_order(plugin_ids) except PluginDependencyError as e: self._logger.error(f"Dependency resolution failed: {e}") load_order = plugin_ids # Fall back to default order # Load in order results: Dict[str, bool] = {} for plugin_id in load_order: results[plugin_id] = self.load_plugin(plugin_id) if auto_activate and results[plugin_id]: self.activate_plugin(plugin_id) return results # ========== Initialization ========== def init_plugin(self, plugin_id: str, config: Optional[Dict[str, Any]] = None) -> bool: """Initialize a loaded plugin. Args: plugin_id: Plugin ID config: Optional configuration override Returns: True if initialized successfully """ with self._lock: instance = self._instances.get(plugin_id) if not instance: self._logger.error(f"Plugin not loaded: {plugin_id}") return False if instance.state not in (PluginState.LOADED, PluginState.INACTIVE): self._logger.warning(f"Cannot initialize plugin in state: {instance.state}") return False plugin_class = self._classes.get(plugin_id) if not plugin_class: self._logger.error(f"Plugin class not found: {plugin_id}") return False instance.state = PluginState.INITIALIZING try: # Create plugin directory for data plugin_data_dir = self.data_dir / plugin_id plugin_data_dir.mkdir(parents=True, exist_ok=True) # Load saved config or use provided saved_config = self._load_plugin_config(plugin_id) if config: saved_config.update(config) # Create logger logger = logging.getLogger(f"Plugin.{instance.manifest.name}") # Create sandbox sandbox = PluginSandbox(plugin_id, instance.manifest.permissions) # Create context ctx = PluginContext( plugin_id=plugin_id, manifest=instance.manifest, data_dir=plugin_data_dir, config=saved_config, logger=logger, event_bus=self.event_bus, state_store=self.state_store, widget_api=self.widget_api, nexus_api=self.nexus_api, permissions=instance.manifest.permissions ) # Create and initialize plugin instance plugin = plugin_class() plugin._set_context(ctx) plugin.on_init(ctx) with self._lock: instance.instance = plugin instance.state = PluginState.INACTIVE self._logger.info(f"Initialized plugin: {instance.manifest.name}") return True except Exception as e: with self._lock: instance.state = PluginState.ERROR instance.error_message = str(e) instance.error_traceback = traceback.format_exc() self._logger.error(f"Failed to initialize plugin {plugin_id}: {e}") return False # ========== Activation ========== def activate_plugin(self, plugin_id: str) -> bool: """Activate an initialized plugin. Args: plugin_id: Plugin ID Returns: True if activated successfully """ with self._lock: instance = self._instances.get(plugin_id) if not instance: return False if instance.state == PluginState.ACTIVE: return True if instance.state != PluginState.INACTIVE: # Try to initialize first if instance.state == PluginState.LOADED: self.init_plugin(plugin_id) if instance.state != PluginState.INACTIVE: return False instance.state = PluginState.ACTIVATING plugin = instance.instance try: plugin.on_activate() with self._lock: instance.state = PluginState.ACTIVE instance.activate_time = datetime.now() self._logger.info(f"Activated plugin: {instance.manifest.name}") return True except Exception as e: with self._lock: instance.state = PluginState.ERROR instance.error_message = str(e) self._logger.error(f"Failed to activate plugin {plugin_id}: {e}") return False def deactivate_plugin(self, plugin_id: str) -> bool: """Deactivate an active plugin. Args: plugin_id: Plugin ID Returns: True if deactivated successfully """ with self._lock: instance = self._instances.get(plugin_id) if not instance or instance.state != PluginState.ACTIVE: return False instance.state = PluginState.DEACTIVATING plugin = instance.instance try: plugin.on_deactivate() with self._lock: instance.state = PluginState.INACTIVE self._logger.info(f"Deactivated plugin: {instance.manifest.name}") return True except Exception as e: self._logger.error(f"Error deactivating plugin {plugin_id}: {e}") return False # ========== Unloading ========== def unload_plugin(self, plugin_id: str, force: bool = False) -> bool: """Unload a plugin. Args: plugin_id: Plugin ID force: Force unload even if dependents exist Returns: True if unloaded successfully """ with self._lock: instance = self._instances.get(plugin_id) if not instance: return True # Check dependents if not force: dependents = self._resolver.get_dependents(plugin_id) active_dependents = [ d for d in dependents if d in self._instances and self._instances[d].is_active() ] if active_dependents: self._logger.error( f"Cannot unload {plugin_id}: active dependents {active_dependents}" ) return False # Deactivate if active if instance.state == PluginState.ACTIVE: self.deactivate_plugin(plugin_id) instance.state = PluginState.UNLOADING plugin = instance.instance try: if plugin: # Save config before shutdown self._save_plugin_config(plugin_id, plugin.ctx.config) plugin.on_shutdown() with self._lock: instance.state = PluginState.UNLOADED instance.instance = None del self._instances[plugin_id] del self._classes[plugin_id] self._logger.info(f"Unloaded plugin: {instance.manifest.name}") return True except Exception as e: self._logger.error(f"Error unloading plugin {plugin_id}: {e}") return False def unload_all(self) -> None: """Unload all plugins in reverse dependency order.""" # Get active plugins in reverse load order plugin_ids = list(self._instances.keys()) for plugin_id in reversed(plugin_ids): self.unload_plugin(plugin_id, force=True) # ========== Queries ========== def get_instance(self, plugin_id: str) -> Optional[PluginInstance]: """Get plugin instance info.""" return self._instances.get(plugin_id) def get_plugin(self, plugin_id: str) -> Optional[PluginAPI]: """Get active plugin instance.""" instance = self._instances.get(plugin_id) if instance and instance.state == PluginState.ACTIVE: return instance.instance return None def get_all_instances(self) -> Dict[str, PluginInstance]: """Get all plugin instances.""" return self._instances.copy() def get_active_plugins(self) -> Dict[str, PluginInstance]: """Get all active plugins.""" return { k: v for k, v in self._instances.items() if v.state == PluginState.ACTIVE } def get_plugin_ui(self, plugin_id: str) -> Optional[Any]: """Get plugin's UI widget.""" plugin = self.get_plugin(plugin_id) if plugin: return plugin.create_widget() return None def get_plugin_states(self) -> Dict[str, PluginState]: """Get state of all plugins.""" return { k: v.state for k, v in self._instances.items() } # ========== Configuration ========== def _load_plugin_config(self, plugin_id: str) -> Dict[str, Any]: """Load plugin configuration from disk.""" config_path = self.data_dir / plugin_id / "config.json" if config_path.exists(): try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: self._logger.error(f"Failed to load config for {plugin_id}: {e}") return {} def _save_plugin_config(self, plugin_id: str, config: Dict[str, Any]) -> None: """Save plugin configuration to disk.""" config_path = self.data_dir / plugin_id / "config.json" try: with open(config_path, 'w', encoding='utf-8') as f: json.dump(config, f, indent=2) except Exception as e: self._logger.error(f"Failed to save config for {plugin_id}: {e}") # ========== Utility ========== def _generate_plugin_id(self, manifest: PluginManifest, path: Path) -> str: """Generate unique plugin ID.""" # Use path hash for uniqueness path_hash = hashlib.md5(str(path).encode()).hexdigest()[:8] name_slug = manifest.name.lower().replace(' ', '_').replace('-', '_') return f"{name_slug}_{path_hash}" def shutdown(self) -> None: """Shutdown the plugin manager.""" self.unload_all() self._executor.shutdown(wait=True) # ============================================================================= # EXPORTS # ============================================================================= __all__ = [ 'PluginManager', 'PluginLoader', 'DependencyResolver', 'PluginSandbox', ]