EU-Utility/premium/plugins/manager.py

815 lines
27 KiB
Python

"""
EU-Utility Premium - Plugin Manager
====================================
Enterprise-grade plugin manager with:
- Sandboxed execution
- Dynamic loading/unloading
- Dependency resolution
- Lifecycle management
- Security validation
Example:
manager = PluginManager()
manager.discover_plugins(Path("./plugins"))
manager.load_all()
# Activate specific plugin
manager.activate_plugin("my_plugin")
"""
from __future__ import annotations
import asyncio
import hashlib
import importlib.util
import json
import logging
import os
import sys
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Type, Union
from uuid import uuid4
from premium.plugins.api import (
PluginAPI, PluginContext, PluginError, PluginInstance, PluginLoadError,
PluginInitError, PluginManifest, PluginPermissionError, PluginState,
PermissionLevel, PluginDependencyError, PluginVersionError
)
# API version for compatibility checking
API_VERSION = "3.0.0"
# =============================================================================
# PLUGIN SANDBOX
# =============================================================================
class PluginSandbox:
"""Sandbox for plugin execution.
Restricts plugin access based on permissions.
"""
def __init__(self, plugin_id: str, permissions: Set[PermissionLevel]):
self.plugin_id = plugin_id
self.permissions = permissions
self._original_open = open
self._restricted_paths: Set[Path] = set()
def can_access_file(self, path: Path, mode: str = 'r') -> bool:
"""Check if plugin can access a file."""
if 'w' in mode or 'a' in mode or 'x' in mode:
if PermissionLevel.FILE_WRITE not in self.permissions:
return False
if PermissionLevel.FILE_READ not in self.permissions:
return False
# Check restricted paths
resolved = path.resolve()
for restricted in self._restricted_paths:
try:
resolved.relative_to(restricted)
return False
except ValueError:
pass
return True
def can_access_network(self) -> bool:
"""Check if plugin can access network."""
return PermissionLevel.NETWORK in self.permissions
def can_access_ui(self) -> bool:
"""Check if plugin can manipulate UI."""
return PermissionLevel.UI in self.permissions
def can_access_memory(self) -> bool:
"""Check if plugin can access memory (dangerous)."""
return PermissionLevel.MEMORY in self.permissions
# =============================================================================
# PLUGIN LOADER
# =============================================================================
class PluginLoader:
"""Loads plugin modules with validation and security checks."""
def __init__(self, sandbox: Optional[PluginSandbox] = None):
self.sandbox = sandbox
self._loaded_modules: Dict[str, Any] = {}
def load_plugin_class(
self,
plugin_path: Path,
manifest: PluginManifest
) -> Type[PluginAPI]:
"""Load a plugin class from a directory.
Args:
plugin_path: Path to plugin directory
manifest: Plugin manifest
Returns:
PluginAPI subclass
Raises:
PluginLoadError: If loading fails
"""
entry_file = plugin_path / manifest.entry_point
if not entry_file.exists():
raise PluginLoadError(
f"Entry point not found: {manifest.entry_point}"
)
# Validate file hash for security
file_hash = self._compute_hash(entry_file)
# Create unique module name
module_name = f"premium_plugin_{manifest.name.lower().replace(' ', '_')}_{file_hash[:8]}"
try:
# Load module
spec = importlib.util.spec_from_file_location(
module_name, entry_file
)
if spec is None or spec.loader is None:
raise PluginLoadError("Failed to create module spec")
module = importlib.util.module_from_spec(spec)
# Add to sys.modules temporarily
sys.modules[module_name] = module
try:
spec.loader.exec_module(module)
except Exception as e:
raise PluginLoadError(f"Failed to execute module: {e}")
# Find PluginAPI subclass
plugin_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type) and
issubclass(attr, PluginAPI) and
attr is not PluginAPI and
not attr.__name__.startswith('Base')
):
plugin_class = attr
break
if plugin_class is None:
raise PluginLoadError(
f"No PluginAPI subclass found in {manifest.entry_point}"
)
# Validate manifest matches
if not hasattr(plugin_class, 'manifest'):
raise PluginLoadError("Plugin class missing manifest attribute")
return plugin_class
except PluginLoadError:
raise
except Exception as e:
raise PluginLoadError(f"Unexpected error loading plugin: {e}")
def _compute_hash(self, file_path: Path) -> str:
"""Compute SHA256 hash of a file."""
h = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()
# =============================================================================
# DEPENDENCY RESOLVER
# =============================================================================
@dataclass
class DependencyNode:
"""Node in dependency graph."""
plugin_id: str
manifest: PluginManifest
dependencies: Set[str] = field(default_factory=set)
dependents: Set[str] = field(default_factory=set)
resolved: bool = False
class DependencyResolver:
"""Resolves plugin dependencies using topological sort."""
def __init__(self):
self._nodes: Dict[str, DependencyNode] = {}
def add_plugin(self, plugin_id: str, manifest: PluginManifest) -> None:
"""Add a plugin to the dependency graph."""
if plugin_id not in self._nodes:
self._nodes[plugin_id] = DependencyNode(
plugin_id=plugin_id,
manifest=manifest
)
node = self._nodes[plugin_id]
# Add dependencies
for dep_id in manifest.dependencies.keys():
node.dependencies.add(dep_id)
if dep_id not in self._nodes:
self._nodes[dep_id] = DependencyNode(
plugin_id=dep_id,
manifest=PluginManifest(
name=dep_id,
version="0.0.0",
author="Unknown"
)
)
self._nodes[dep_id].dependents.add(plugin_id)
def resolve_load_order(self, plugin_ids: List[str]) -> List[str]:
"""Resolve plugin load order using topological sort.
Returns:
List of plugin IDs in load order
Raises:
PluginDependencyError: If circular dependency detected
"""
order: List[str] = []
visited: Set[str] = set()
temp_mark: Set[str] = set()
def visit(node_id: str, path: List[str]) -> None:
if node_id in temp_mark:
cycle = " -> ".join(path + [node_id])
raise PluginDependencyError(f"Circular dependency: {cycle}")
if node_id in visited:
return
temp_mark.add(node_id)
path.append(node_id)
node = self._nodes.get(node_id)
if node:
for dep_id in node.dependencies:
visit(dep_id, path.copy())
temp_mark.remove(node_id)
visited.add(node_id)
order.append(node_id)
for plugin_id in plugin_ids:
if plugin_id not in visited:
visit(plugin_id, [])
return order
def get_dependents(self, plugin_id: str) -> Set[str]:
"""Get all plugins that depend on a given plugin."""
node = self._nodes.get(plugin_id)
if node:
return node.dependents.copy()
return set()
def check_conflicts(self) -> List[str]:
"""Check for version conflicts in dependencies."""
conflicts = []
# TODO: Implement version conflict checking
return conflicts
# =============================================================================
# PLUGIN MANAGER
# =============================================================================
class PluginManager:
"""Manages plugin lifecycle with sandboxing and dependency resolution.
This is the main entry point for plugin management. It handles:
- Discovery of plugins in directories
- Loading and unloading
- Dependency resolution
- Lifecycle state management
- Security sandboxing
Example:
manager = PluginManager(
plugin_dirs=[Path("./plugins"), Path("~/.eu-utility/plugins")],
data_dir=Path("~/.eu-utility/data")
)
manager.discover_all()
manager.load_all()
# Get active plugins
for plugin_id, instance in manager.get_active_plugins().items():
print(f"{plugin_id}: {instance.manifest.name}")
"""
def __init__(
self,
plugin_dirs: Optional[List[Path]] = None,
data_dir: Optional[Path] = None,
event_bus: Optional[Any] = None,
state_store: Optional[Any] = None,
widget_api: Optional[Any] = None,
nexus_api: Optional[Any] = None,
max_workers: int = 4
):
"""Initialize plugin manager.
Args:
plugin_dirs: Directories to search for plugins
data_dir: Directory for plugin data storage
event_bus: Event bus for plugin communication
state_store: State store for plugins
widget_api: Widget API for UI plugins
nexus_api: Nexus API for Entropia data
max_workers: Max worker threads for background tasks
"""
self.plugin_dirs = plugin_dirs or []
self.data_dir = data_dir or Path.home() / ".eu-utility" / "data"
self.event_bus = event_bus
self.state_store = state_store
self.widget_api = widget_api
self.nexus_api = nexus_api
# Ensure data directory exists
self.data_dir.mkdir(parents=True, exist_ok=True)
# Plugin storage
self._instances: Dict[str, PluginInstance] = {}
self._classes: Dict[str, Type[PluginAPI]] = {}
self._paths: Dict[str, Path] = {}
# Support systems
self._loader = PluginLoader()
self._resolver = DependencyResolver()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._logger = logging.getLogger("PluginManager")
# State
self._discovered = False
self._lock = threading.RLock()
# ========== Discovery ==========
def discover_plugins(self, directory: Path) -> List[str]:
"""Discover plugins in a directory.
Args:
directory: Directory to search for plugins
Returns:
List of discovered plugin IDs
"""
discovered: List[str] = []
if not directory.exists():
self._logger.warning(f"Plugin directory does not exist: {directory}")
return discovered
for item in directory.iterdir():
if not item.is_dir():
continue
if item.name.startswith('.') or item.name.startswith('__'):
continue
manifest_path = item / "plugin.json"
if not manifest_path.exists():
continue
try:
manifest = PluginManifest.from_json(manifest_path)
plugin_id = self._generate_plugin_id(manifest, item)
with self._lock:
self._paths[plugin_id] = item
self._resolver.add_plugin(plugin_id, manifest)
discovered.append(plugin_id)
self._logger.debug(f"Discovered plugin: {manifest.name} ({plugin_id})")
except Exception as e:
self._logger.error(f"Failed to load manifest from {item}: {e}")
return discovered
def discover_all(self) -> int:
"""Discover plugins in all configured directories.
Returns:
Total number of plugins discovered
"""
total = 0
for directory in self.plugin_dirs:
total += len(self.discover_plugins(directory))
self._discovered = True
self._logger.info(f"Discovered {total} plugins")
return total
# ========== Loading ==========
def load_plugin(self, plugin_id: str) -> bool:
"""Load a plugin by ID.
Args:
plugin_id: Unique plugin identifier
Returns:
True if loaded successfully
"""
with self._lock:
if plugin_id in self._instances:
return True
path = self._paths.get(plugin_id)
if not path:
self._logger.error(f"Plugin not found: {plugin_id}")
return False
manifest_path = path / "plugin.json"
manifest = PluginManifest.from_json(manifest_path)
# Create instance record
instance = PluginInstance(
plugin_id=plugin_id,
manifest=manifest,
state=PluginState.LOADING
)
self._instances[plugin_id] = instance
try:
# Load plugin class
plugin_class = self._loader.load_plugin_class(path, manifest)
self._classes[plugin_id] = plugin_class
with self._lock:
instance.state = PluginState.LOADED
instance.load_time = datetime.now()
self._logger.info(f"Loaded plugin: {manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
instance.error_traceback = traceback.format_exc()
self._logger.error(f"Failed to load plugin {manifest.name}: {e}")
return False
def load_all(self, auto_activate: bool = False) -> Dict[str, bool]:
"""Load all discovered plugins.
Args:
auto_activate: Automatically activate loaded plugins
Returns:
Dict mapping plugin IDs to success status
"""
if not self._discovered:
self.discover_all()
# Resolve load order
plugin_ids = list(self._paths.keys())
try:
load_order = self._resolver.resolve_load_order(plugin_ids)
except PluginDependencyError as e:
self._logger.error(f"Dependency resolution failed: {e}")
load_order = plugin_ids # Fall back to default order
# Load in order
results: Dict[str, bool] = {}
for plugin_id in load_order:
results[plugin_id] = self.load_plugin(plugin_id)
if auto_activate and results[plugin_id]:
self.activate_plugin(plugin_id)
return results
# ========== Initialization ==========
def init_plugin(self, plugin_id: str, config: Optional[Dict[str, Any]] = None) -> bool:
"""Initialize a loaded plugin.
Args:
plugin_id: Plugin ID
config: Optional configuration override
Returns:
True if initialized successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
self._logger.error(f"Plugin not loaded: {plugin_id}")
return False
if instance.state not in (PluginState.LOADED, PluginState.INACTIVE):
self._logger.warning(f"Cannot initialize plugin in state: {instance.state}")
return False
plugin_class = self._classes.get(plugin_id)
if not plugin_class:
self._logger.error(f"Plugin class not found: {plugin_id}")
return False
instance.state = PluginState.INITIALIZING
try:
# Create plugin directory for data
plugin_data_dir = self.data_dir / plugin_id
plugin_data_dir.mkdir(parents=True, exist_ok=True)
# Load saved config or use provided
saved_config = self._load_plugin_config(plugin_id)
if config:
saved_config.update(config)
# Create logger
logger = logging.getLogger(f"Plugin.{instance.manifest.name}")
# Create sandbox
sandbox = PluginSandbox(plugin_id, instance.manifest.permissions)
# Create context
ctx = PluginContext(
plugin_id=plugin_id,
manifest=instance.manifest,
data_dir=plugin_data_dir,
config=saved_config,
logger=logger,
event_bus=self.event_bus,
state_store=self.state_store,
widget_api=self.widget_api,
nexus_api=self.nexus_api,
permissions=instance.manifest.permissions
)
# Create and initialize plugin instance
plugin = plugin_class()
plugin._set_context(ctx)
plugin.on_init(ctx)
with self._lock:
instance.instance = plugin
instance.state = PluginState.INACTIVE
self._logger.info(f"Initialized plugin: {instance.manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
instance.error_traceback = traceback.format_exc()
self._logger.error(f"Failed to initialize plugin {plugin_id}: {e}")
return False
# ========== Activation ==========
def activate_plugin(self, plugin_id: str) -> bool:
"""Activate an initialized plugin.
Args:
plugin_id: Plugin ID
Returns:
True if activated successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
return False
if instance.state == PluginState.ACTIVE:
return True
if instance.state != PluginState.INACTIVE:
# Try to initialize first
if instance.state == PluginState.LOADED:
self.init_plugin(plugin_id)
if instance.state != PluginState.INACTIVE:
return False
instance.state = PluginState.ACTIVATING
plugin = instance.instance
try:
plugin.on_activate()
with self._lock:
instance.state = PluginState.ACTIVE
instance.activate_time = datetime.now()
self._logger.info(f"Activated plugin: {instance.manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
self._logger.error(f"Failed to activate plugin {plugin_id}: {e}")
return False
def deactivate_plugin(self, plugin_id: str) -> bool:
"""Deactivate an active plugin.
Args:
plugin_id: Plugin ID
Returns:
True if deactivated successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance or instance.state != PluginState.ACTIVE:
return False
instance.state = PluginState.DEACTIVATING
plugin = instance.instance
try:
plugin.on_deactivate()
with self._lock:
instance.state = PluginState.INACTIVE
self._logger.info(f"Deactivated plugin: {instance.manifest.name}")
return True
except Exception as e:
self._logger.error(f"Error deactivating plugin {plugin_id}: {e}")
return False
# ========== Unloading ==========
def unload_plugin(self, plugin_id: str, force: bool = False) -> bool:
"""Unload a plugin.
Args:
plugin_id: Plugin ID
force: Force unload even if dependents exist
Returns:
True if unloaded successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
return True
# Check dependents
if not force:
dependents = self._resolver.get_dependents(plugin_id)
active_dependents = [
d for d in dependents
if d in self._instances and self._instances[d].is_active()
]
if active_dependents:
self._logger.error(
f"Cannot unload {plugin_id}: active dependents {active_dependents}"
)
return False
# Deactivate if active
if instance.state == PluginState.ACTIVE:
self.deactivate_plugin(plugin_id)
instance.state = PluginState.UNLOADING
plugin = instance.instance
try:
if plugin:
# Save config before shutdown
self._save_plugin_config(plugin_id, plugin.ctx.config)
plugin.on_shutdown()
with self._lock:
instance.state = PluginState.UNLOADED
instance.instance = None
del self._instances[plugin_id]
del self._classes[plugin_id]
self._logger.info(f"Unloaded plugin: {instance.manifest.name}")
return True
except Exception as e:
self._logger.error(f"Error unloading plugin {plugin_id}: {e}")
return False
def unload_all(self) -> None:
"""Unload all plugins in reverse dependency order."""
# Get active plugins in reverse load order
plugin_ids = list(self._instances.keys())
for plugin_id in reversed(plugin_ids):
self.unload_plugin(plugin_id, force=True)
# ========== Queries ==========
def get_instance(self, plugin_id: str) -> Optional[PluginInstance]:
"""Get plugin instance info."""
return self._instances.get(plugin_id)
def get_plugin(self, plugin_id: str) -> Optional[PluginAPI]:
"""Get active plugin instance."""
instance = self._instances.get(plugin_id)
if instance and instance.state == PluginState.ACTIVE:
return instance.instance
return None
def get_all_instances(self) -> Dict[str, PluginInstance]:
"""Get all plugin instances."""
return self._instances.copy()
def get_active_plugins(self) -> Dict[str, PluginInstance]:
"""Get all active plugins."""
return {
k: v for k, v in self._instances.items()
if v.state == PluginState.ACTIVE
}
def get_plugin_ui(self, plugin_id: str) -> Optional[Any]:
"""Get plugin's UI widget."""
plugin = self.get_plugin(plugin_id)
if plugin:
return plugin.create_widget()
return None
def get_plugin_states(self) -> Dict[str, PluginState]:
"""Get state of all plugins."""
return {
k: v.state for k, v in self._instances.items()
}
# ========== Configuration ==========
def _load_plugin_config(self, plugin_id: str) -> Dict[str, Any]:
"""Load plugin configuration from disk."""
config_path = self.data_dir / plugin_id / "config.json"
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
self._logger.error(f"Failed to load config for {plugin_id}: {e}")
return {}
def _save_plugin_config(self, plugin_id: str, config: Dict[str, Any]) -> None:
"""Save plugin configuration to disk."""
config_path = self.data_dir / plugin_id / "config.json"
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except Exception as e:
self._logger.error(f"Failed to save config for {plugin_id}: {e}")
# ========== Utility ==========
def _generate_plugin_id(self, manifest: PluginManifest, path: Path) -> str:
"""Generate unique plugin ID."""
# Use path hash for uniqueness
path_hash = hashlib.md5(str(path).encode()).hexdigest()[:8]
name_slug = manifest.name.lower().replace(' ', '_').replace('-', '_')
return f"{name_slug}_{path_hash}"
def shutdown(self) -> None:
"""Shutdown the plugin manager."""
self.unload_all()
self._executor.shutdown(wait=True)
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
'PluginManager',
'PluginLoader',
'DependencyResolver',
'PluginSandbox',
]