diff --git a/premium/core/state/store.py b/premium/core/state/store.py new file mode 100644 index 0000000..11b2320 --- /dev/null +++ b/premium/core/state/store.py @@ -0,0 +1,711 @@ +""" +EU-Utility Premium - State Management +====================================== + +Redux-inspired state store with: +- Immutable state updates +- Time-travel debugging +- Selectors for derived state +- Middleware support +- State persistence + +Example: + from premium.core.state import StateStore, Action, Reducer + + # Define actions + class IncrementAction(Action): + type = "INCREMENT" + + # Define reducer + def counter_reducer(state: int, action: Action) -> int: + if action.type == "INCREMENT": + return state + 1 + return state + + # Create store + store = StateStore(counter_reducer, initial_state=0) + store.dispatch(IncrementAction()) +""" + +from __future__ import annotations + +import copy +import json +import logging +import threading +import time +import uuid +from abc import ABC, abstractmethod +from dataclasses import dataclass, field, asdict +from datetime import datetime +from enum import Enum, auto +from pathlib import Path +from typing import ( + Any, Callable, Dict, Generic, List, Optional, Set, TypeVar, Union, + Protocol, runtime_checkable +) + + +# ============================================================================= +# TYPE DEFINITIONS +# ============================================================================= + +T = TypeVar('T') +State = TypeVar('State') + + +@runtime_checkable +class Action(Protocol): + """Protocol for actions.""" + type: str + payload: Optional[Any] = None + meta: Optional[Dict[str, Any]] = None + + +class ActionBase: + """Base class for actions.""" + type: str = "UNKNOWN" + payload: Any = None + meta: Optional[Dict[str, Any]] = None + timestamp: datetime = field(default_factory=datetime.now) + + def __init__(self, payload: Any = None, meta: Optional[Dict[str, Any]] = None): + self.payload = payload + self.meta = meta or {} + self.timestamp = datetime.now() + + def to_dict(self) -> Dict[str, Any]: + """Convert action to dictionary.""" + return { + 'type': self.type, + 'payload': self.payload, + 'meta': self.meta, + 'timestamp': self.timestamp.isoformat(), + } + + +# Reducer type: (state, action) -> new_state +Reducer = Callable[[State, Action], State] + +# Selector type: state -> derived_value +Selector = Callable[[State], T] + +# Subscriber type: (new_state, old_state) -> None +Subscriber = Callable[[State, State], None] + +# Middleware type: store -> next -> action -> result +Middleware = Callable[['StateStore', Callable[[Action], Any], Action], Any] + + +# ============================================================================= +# BUILT-IN ACTIONS +# ============================================================================= + +class StateResetAction(ActionBase): + """Reset state to initial value.""" + type = "@@STATE/RESET" + + +class StateRestoreAction(ActionBase): + """Restore state from snapshot.""" + type = "@@STATE/RESTORE" + + def __init__(self, state: State, meta: Optional[Dict[str, Any]] = None): + super().__init__(payload=state, meta=meta) + + +class StateBatchAction(ActionBase): + """Batch multiple actions.""" + type = "@@STATE/BATCH" + + def __init__(self, actions: List[Action], meta: Optional[Dict[str, Any]] = None): + super().__init__(payload=actions, meta=meta) + + +class StateHydrateAction(ActionBase): + """Hydrate state from persisted data.""" + type = "@@STATE/HYDRATE" + + def __init__(self, state: State, meta: Optional[Dict[str, Any]] = None): + super().__init__(payload=state, meta=meta) + + +# ============================================================================= +# STATE SNAPSHOT +# ============================================================================= + +@dataclass +class StateSnapshot: + """Immutable snapshot of state at a point in time.""" + state: Any + action: Optional[Action] = None + timestamp: datetime = field(default_factory=datetime.now) + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + + def to_dict(self) -> Dict[str, Any]: + """Convert snapshot to dictionary.""" + return { + 'id': self.id, + 'state': self.state, + 'action': self.action.to_dict() if self.action else None, + 'timestamp': self.timestamp.isoformat(), + } + + +# ============================================================================= +# STORE SLICE +# ============================================================================= + +@dataclass +class StoreSlice(Generic[T]): + """A slice of the store with its own reducer and state.""" + name: str + reducer: Reducer + initial_state: T + selectors: Dict[str, Selector] = field(default_factory=dict) + + +# ============================================================================= +# COMBINED REDUCER +# ============================================================================= + +def combine_reducers(reducers: Dict[str, Reducer]) -> Reducer: + """Combine multiple reducers into one. + + Each reducer manages a slice of the state. + + Args: + reducers: Dict mapping slice names to reducers + + Returns: + Combined reducer function + + Example: + root_reducer = combine_reducers({ + 'counter': counter_reducer, + 'todos': todos_reducer, + }) + """ + def combined(state: Dict[str, Any], action: Action) -> Dict[str, Any]: + new_state = {} + has_changed = False + + for key, reducer in reducers.items(): + previous_state = state.get(key) if isinstance(state, dict) else None + new_slice = reducer(previous_state, action) + new_state[key] = new_slice + + if new_slice is not previous_state: + has_changed = True + + return new_state if has_changed else state + + return combined + + +# ============================================================================= +# MIDDLEWARE +# ============================================================================= + +def logging_middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any: + """Middleware that logs all actions.""" + print(f"[State] Action: {action.type}") + result = next(action) + print(f"[State] New state: {store.get_state()}") + return result + + +def thunk_middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any: + """Middleware that allows thunk actions (functions).""" + if callable(action) and not isinstance(action, ActionBase): + # It's a thunk - call it with dispatch and get_state + return action(store.dispatch, store.get_state) + return next(action) + + +def persistence_middleware( + storage_path: Path, + debounce_ms: int = 1000 +) -> Middleware: + """Create middleware that persists state to disk. + + Args: + storage_path: Path to store state + debounce_ms: Debounce time in milliseconds + + Returns: + Middleware function + """ + last_save = 0 + pending_save = False + + def middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any: + nonlocal last_save, pending_save + + result = next(action) + + # Debounce saves + current_time = time.time() * 1000 + if current_time - last_save > debounce_ms: + try: + state = store.get_state() + with open(storage_path, 'w', encoding='utf-8') as f: + json.dump(state, f, indent=2, default=str) + last_save = current_time + pending_save = False + except Exception as e: + logging.getLogger("StateStore").error(f"Failed to persist state: {e}") + else: + pending_save = True + + return result + + return middleware + + +# ============================================================================= +# STATE STORE +# ============================================================================= + +class StateStore(Generic[State]): + """Redux-inspired state store with time-travel debugging. + + Features: + - Immutable state updates + - Action history for debugging + - State snapshots for time travel + - Selectors for derived state + - Middleware support + - State persistence + + Example: + store = StateStore( + reducer=root_reducer, + initial_state={'count': 0}, + middleware=[logging_middleware] + ) + + # Subscribe to changes + unsubscribe = store.subscribe(lambda new, old: print(f"Changed: {old} -> {new}")) + + # Dispatch actions + store.dispatch(IncrementAction()) + + # Use selectors + count = store.select(lambda state: state['count']) + + # Time travel + store.undo() # Undo last action + store.jump_to_snapshot(0) # Jump to initial state + """ + + def __init__( + self, + reducer: Reducer[State, Action], + initial_state: Optional[State] = None, + middleware: Optional[List[Middleware]] = None, + max_history: int = 1000, + enable_time_travel: bool = True + ): + """Initialize state store. + + Args: + reducer: Root reducer function + initial_state: Initial state value + middleware: List of middleware functions + max_history: Maximum action history size + enable_time_travel: Enable time-travel debugging + """ + self._reducer = reducer + self._state: State = initial_state + self._initial_state = copy.deepcopy(initial_state) + self._middleware = middleware or [] + self._max_history = max_history + self._enable_time_travel = enable_time_travel + + # Subscribers + self._subscribers: Dict[str, Subscriber] = {} + self._subscriber_counter = 0 + + # History for time travel + self._history: List[StateSnapshot] = [] + self._current_index = -1 + + # Lock for thread safety + self._lock = threading.RLock() + self._logger = logging.getLogger("StateStore") + + # Create initial snapshot + if enable_time_travel: + self._add_snapshot(StateSnapshot( + state=copy.deepcopy(self._state), + action=None + )) + + # ========== Core Methods ========== + + def get_state(self) -> State: + """Get current state (immutable).""" + with self._lock: + return copy.deepcopy(self._state) + + def dispatch(self, action: Action) -> Action: + """Dispatch an action to update state. + + Args: + action: Action to dispatch + + Returns: + The dispatched action + """ + # Apply middleware chain + def dispatch_action(a: Action) -> Action: + return self._apply_reducer(a) + + # Build middleware chain + chain = dispatch_action + for mw in reversed(self._middleware): + chain = lambda a, mw=mw, next=chain: mw(self, next, a) + + return chain(action) + + def _apply_reducer(self, action: Action) -> Action: + """Apply reducer and update state.""" + with self._lock: + old_state = self._state + new_state = self._reducer(copy.deepcopy(old_state), action) + + # Only update if state changed + if new_state is not old_state: + self._state = new_state + + # Add to history + if self._enable_time_travel: + # Remove any future states if we're not at the end + if self._current_index < len(self._history) - 1: + self._history = self._history[:self._current_index + 1] + + self._add_snapshot(StateSnapshot( + state=copy.deepcopy(new_state), + action=action + )) + + # Notify subscribers + self._notify_subscribers(new_state, old_state) + + return action + + def _add_snapshot(self, snapshot: StateSnapshot) -> None: + """Add snapshot to history.""" + self._history.append(snapshot) + self._current_index = len(self._history) - 1 + + # Trim history if needed + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history:] + self._current_index = len(self._history) - 1 + + # ========== Subscription ========== + + def subscribe(self, callback: Subscriber, selector: Optional[Selector] = None) -> Callable[[], None]: + """Subscribe to state changes. + + Args: + callback: Function called when state changes + selector: Optional selector to compare specific parts + + Returns: + Unsubscribe function + """ + with self._lock: + self._subscriber_counter += 1 + sub_id = f"sub_{self._subscriber_counter}" + + # Wrap callback with selector if provided + if selector: + last_value = selector(self._state) + + def wrapped_callback(new_state: State, old_state: State) -> None: + nonlocal last_value + new_value = selector(new_state) + if new_value != last_value: + last_value = new_value + callback(new_state, old_state) + + self._subscribers[sub_id] = wrapped_callback + else: + self._subscribers[sub_id] = callback + + def unsubscribe() -> None: + with self._lock: + self._subscribers.pop(sub_id, None) + + return unsubscribe + + def _notify_subscribers(self, new_state: State, old_state: State) -> None: + """Notify all subscribers of state change.""" + for callback in list(self._subscribers.values()): + try: + callback(new_state, old_state) + except Exception as e: + self._logger.error(f"Error in subscriber: {e}") + + # ========== Selectors ========== + + def select(self, selector: Selector[T]) -> T: + """Select a derived value from state. + + Args: + selector: Function that extracts value from state + + Returns: + Selected value + """ + with self._lock: + return selector(copy.deepcopy(self._state)) + + def create_selector(self, *input_selectors: Selector, combiner: Callable) -> Selector: + """Create a memoized selector. + + Args: + input_selectors: Selectors that provide input values + combiner: Function that combines inputs into output + + Returns: + Memoized selector function + """ + last_inputs = [None] * len(input_selectors) + last_result = None + + def memoized_selector(state: State) -> Any: + nonlocal last_inputs, last_result + + inputs = [s(state) for s in input_selectors] + + # Check if inputs changed + if inputs != last_inputs: + last_inputs = inputs + last_result = combiner(*inputs) + + return last_result + + return memoized_selector + + # ========== Time Travel ========== + + def get_history(self) -> List[StateSnapshot]: + """Get action history.""" + with self._lock: + return self._history.copy() + + def get_current_index(self) -> int: + """Get current position in history.""" + return self._current_index + + def can_undo(self) -> bool: + """Check if undo is possible.""" + return self._current_index > 0 + + def can_redo(self) -> bool: + """Check if redo is possible.""" + return self._current_index < len(self._history) - 1 + + def undo(self) -> bool: + """Undo last action. + + Returns: + True if undo was performed + """ + if not self.can_undo(): + return False + + with self._lock: + self._current_index -= 1 + old_state = self._state + self._state = copy.deepcopy(self._history[self._current_index].state) + self._notify_subscribers(self._state, old_state) + + return True + + def redo(self) -> bool: + """Redo last undone action. + + Returns: + True if redo was performed + """ + if not self.can_redo(): + return False + + with self._lock: + self._current_index += 1 + old_state = self._state + self._state = copy.deepcopy(self._history[self._current_index].state) + self._notify_subscribers(self._state, old_state) + + return True + + def jump_to_snapshot(self, index: int) -> bool: + """Jump to a specific snapshot in history. + + Args: + index: Snapshot index + + Returns: + True if jump was successful + """ + if index < 0 or index >= len(self._history): + return False + + with self._lock: + old_state = self._state + self._current_index = index + self._state = copy.deepcopy(self._history[index].state) + self._notify_subscribers(self._state, old_state) + + return True + + def reset(self) -> None: + """Reset to initial state.""" + self.dispatch(StateResetAction()) + + with self._lock: + old_state = self._state + self._state = copy.deepcopy(self._initial_state) + + if self._enable_time_travel: + self._history.clear() + self._current_index = -1 + self._add_snapshot(StateSnapshot( + state=copy.deepcopy(self._state), + action=None + )) + + self._notify_subscribers(self._state, old_state) + + # ========== Persistence ========== + + def save_to_disk(self, path: Path) -> bool: + """Save current state to disk. + + Args: + path: File path to save to + + Returns: + True if saved successfully + """ + try: + with self._lock: + state_data = { + 'state': self._state, + 'timestamp': datetime.now().isoformat(), + 'history_count': len(self._history), + } + + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, 'w', encoding='utf-8') as f: + json.dump(state_data, f, indent=2, default=str) + + return True + except Exception as e: + self._logger.error(f"Failed to save state: {e}") + return False + + def load_from_disk(self, path: Path) -> bool: + """Load state from disk. + + Args: + path: File path to load from + + Returns: + True if loaded successfully + """ + try: + with open(path, 'r', encoding='utf-8') as f: + data = json.load(f) + + state = data.get('state') + if state is not None: + self.dispatch(StateHydrateAction(state)) + + return True + except Exception as e: + self._logger.error(f"Failed to load state: {e}") + return False + + +# ============================================================================= +# MODULE-LEVEL STORE (Application-wide state) +# ============================================================================= + +_module_stores: Dict[str, StateStore] = {} + + +def create_store( + name: str, + reducer: Reducer, + initial_state: Optional[Any] = None, + **kwargs +) -> StateStore: + """Create or get a named store. + + Args: + name: Unique store name + reducer: Reducer function + initial_state: Initial state + **kwargs: Additional arguments for StateStore + + Returns: + StateStore instance + """ + if name not in _module_stores: + _module_stores[name] = StateStore( + reducer=reducer, + initial_state=initial_state, + **kwargs + ) + return _module_stores[name] + + +def get_store(name: str) -> Optional[StateStore]: + """Get a named store. + + Args: + name: Store name + + Returns: + StateStore or None if not found + """ + return _module_stores.get(name) + + +def remove_store(name: str) -> bool: + """Remove a named store. + + Args: + name: Store name + + Returns: + True if removed + """ + if name in _module_stores: + del _module_stores[name] + return True + return False + + +# ============================================================================= +# EXPORTS +# ============================================================================= + +__all__ = [ + # Types + 'Action', 'ActionBase', 'Reducer', 'Selector', 'Subscriber', 'Middleware', + # Actions + 'StateResetAction', 'StateRestoreAction', 'StateBatchAction', 'StateHydrateAction', + # Classes + 'StateSnapshot', 'StoreSlice', 'StateStore', + # Functions + 'combine_reducers', 'create_store', 'get_store', 'remove_store', + # Middleware + 'logging_middleware', 'thunk_middleware', 'persistence_middleware', +] diff --git a/premium/plugins/manager.py b/premium/plugins/manager.py new file mode 100644 index 0000000..e19e562 --- /dev/null +++ b/premium/plugins/manager.py @@ -0,0 +1,814 @@ +""" +EU-Utility Premium - Plugin Manager +==================================== + +Enterprise-grade plugin manager with: +- Sandboxed execution +- Dynamic loading/unloading +- Dependency resolution +- Lifecycle management +- Security validation + +Example: + manager = PluginManager() + manager.discover_plugins(Path("./plugins")) + manager.load_all() + + # Activate specific plugin + manager.activate_plugin("my_plugin") +""" + +from __future__ import annotations + +import asyncio +import hashlib +import importlib.util +import json +import logging +import os +import sys +import threading +import time +import traceback +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Set, Type, Union +from uuid import uuid4 + +from premium.plugins.api import ( + PluginAPI, PluginContext, PluginError, PluginInstance, PluginLoadError, + PluginInitError, PluginManifest, PluginPermissionError, PluginState, + PermissionLevel, PluginDependencyError, PluginVersionError +) + + +# API version for compatibility checking +API_VERSION = "3.0.0" + + +# ============================================================================= +# PLUGIN SANDBOX +# ============================================================================= + +class PluginSandbox: + """Sandbox for plugin execution. + + Restricts plugin access based on permissions. + """ + + def __init__(self, plugin_id: str, permissions: Set[PermissionLevel]): + self.plugin_id = plugin_id + self.permissions = permissions + self._original_open = open + self._restricted_paths: Set[Path] = set() + + def can_access_file(self, path: Path, mode: str = 'r') -> bool: + """Check if plugin can access a file.""" + if 'w' in mode or 'a' in mode or 'x' in mode: + if PermissionLevel.FILE_WRITE not in self.permissions: + return False + + if PermissionLevel.FILE_READ not in self.permissions: + return False + + # Check restricted paths + resolved = path.resolve() + for restricted in self._restricted_paths: + try: + resolved.relative_to(restricted) + return False + except ValueError: + pass + + return True + + def can_access_network(self) -> bool: + """Check if plugin can access network.""" + return PermissionLevel.NETWORK in self.permissions + + def can_access_ui(self) -> bool: + """Check if plugin can manipulate UI.""" + return PermissionLevel.UI in self.permissions + + def can_access_memory(self) -> bool: + """Check if plugin can access memory (dangerous).""" + return PermissionLevel.MEMORY in self.permissions + + +# ============================================================================= +# PLUGIN LOADER +# ============================================================================= + +class PluginLoader: + """Loads plugin modules with validation and security checks.""" + + def __init__(self, sandbox: Optional[PluginSandbox] = None): + self.sandbox = sandbox + self._loaded_modules: Dict[str, Any] = {} + + def load_plugin_class( + self, + plugin_path: Path, + manifest: PluginManifest + ) -> Type[PluginAPI]: + """Load a plugin class from a directory. + + Args: + plugin_path: Path to plugin directory + manifest: Plugin manifest + + Returns: + PluginAPI subclass + + Raises: + PluginLoadError: If loading fails + """ + entry_file = plugin_path / manifest.entry_point + + if not entry_file.exists(): + raise PluginLoadError( + f"Entry point not found: {manifest.entry_point}" + ) + + # Validate file hash for security + file_hash = self._compute_hash(entry_file) + + # Create unique module name + module_name = f"premium_plugin_{manifest.name.lower().replace(' ', '_')}_{file_hash[:8]}" + + try: + # Load module + spec = importlib.util.spec_from_file_location( + module_name, entry_file + ) + if spec is None or spec.loader is None: + raise PluginLoadError("Failed to create module spec") + + module = importlib.util.module_from_spec(spec) + + # Add to sys.modules temporarily + sys.modules[module_name] = module + + try: + spec.loader.exec_module(module) + except Exception as e: + raise PluginLoadError(f"Failed to execute module: {e}") + + # Find PluginAPI subclass + plugin_class = None + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) and + issubclass(attr, PluginAPI) and + attr is not PluginAPI and + not attr.__name__.startswith('Base') + ): + plugin_class = attr + break + + if plugin_class is None: + raise PluginLoadError( + f"No PluginAPI subclass found in {manifest.entry_point}" + ) + + # Validate manifest matches + if not hasattr(plugin_class, 'manifest'): + raise PluginLoadError("Plugin class missing manifest attribute") + + return plugin_class + + except PluginLoadError: + raise + except Exception as e: + raise PluginLoadError(f"Unexpected error loading plugin: {e}") + + def _compute_hash(self, file_path: Path) -> str: + """Compute SHA256 hash of a file.""" + h = hashlib.sha256() + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + h.update(chunk) + return h.hexdigest() + + +# ============================================================================= +# DEPENDENCY RESOLVER +# ============================================================================= + +@dataclass +class DependencyNode: + """Node in dependency graph.""" + plugin_id: str + manifest: PluginManifest + dependencies: Set[str] = field(default_factory=set) + dependents: Set[str] = field(default_factory=set) + resolved: bool = False + + +class DependencyResolver: + """Resolves plugin dependencies using topological sort.""" + + def __init__(self): + self._nodes: Dict[str, DependencyNode] = {} + + def add_plugin(self, plugin_id: str, manifest: PluginManifest) -> None: + """Add a plugin to the dependency graph.""" + if plugin_id not in self._nodes: + self._nodes[plugin_id] = DependencyNode( + plugin_id=plugin_id, + manifest=manifest + ) + + node = self._nodes[plugin_id] + + # Add dependencies + for dep_id in manifest.dependencies.keys(): + node.dependencies.add(dep_id) + + if dep_id not in self._nodes: + self._nodes[dep_id] = DependencyNode( + plugin_id=dep_id, + manifest=PluginManifest( + name=dep_id, + version="0.0.0", + author="Unknown" + ) + ) + + self._nodes[dep_id].dependents.add(plugin_id) + + def resolve_load_order(self, plugin_ids: List[str]) -> List[str]: + """Resolve plugin load order using topological sort. + + Returns: + List of plugin IDs in load order + + Raises: + PluginDependencyError: If circular dependency detected + """ + order: List[str] = [] + visited: Set[str] = set() + temp_mark: Set[str] = set() + + def visit(node_id: str, path: List[str]) -> None: + if node_id in temp_mark: + cycle = " -> ".join(path + [node_id]) + raise PluginDependencyError(f"Circular dependency: {cycle}") + + if node_id in visited: + return + + temp_mark.add(node_id) + path.append(node_id) + + node = self._nodes.get(node_id) + if node: + for dep_id in node.dependencies: + visit(dep_id, path.copy()) + + temp_mark.remove(node_id) + visited.add(node_id) + order.append(node_id) + + for plugin_id in plugin_ids: + if plugin_id not in visited: + visit(plugin_id, []) + + return order + + def get_dependents(self, plugin_id: str) -> Set[str]: + """Get all plugins that depend on a given plugin.""" + node = self._nodes.get(plugin_id) + if node: + return node.dependents.copy() + return set() + + def check_conflicts(self) -> List[str]: + """Check for version conflicts in dependencies.""" + conflicts = [] + # TODO: Implement version conflict checking + return conflicts + + +# ============================================================================= +# PLUGIN MANAGER +# ============================================================================= + +class PluginManager: + """Manages plugin lifecycle with sandboxing and dependency resolution. + + This is the main entry point for plugin management. It handles: + - Discovery of plugins in directories + - Loading and unloading + - Dependency resolution + - Lifecycle state management + - Security sandboxing + + Example: + manager = PluginManager( + plugin_dirs=[Path("./plugins"), Path("~/.eu-utility/plugins")], + data_dir=Path("~/.eu-utility/data") + ) + manager.discover_all() + manager.load_all() + + # Get active plugins + for plugin_id, instance in manager.get_active_plugins().items(): + print(f"{plugin_id}: {instance.manifest.name}") + """ + + def __init__( + self, + plugin_dirs: Optional[List[Path]] = None, + data_dir: Optional[Path] = None, + event_bus: Optional[Any] = None, + state_store: Optional[Any] = None, + widget_api: Optional[Any] = None, + nexus_api: Optional[Any] = None, + max_workers: int = 4 + ): + """Initialize plugin manager. + + Args: + plugin_dirs: Directories to search for plugins + data_dir: Directory for plugin data storage + event_bus: Event bus for plugin communication + state_store: State store for plugins + widget_api: Widget API for UI plugins + nexus_api: Nexus API for Entropia data + max_workers: Max worker threads for background tasks + """ + self.plugin_dirs = plugin_dirs or [] + self.data_dir = data_dir or Path.home() / ".eu-utility" / "data" + self.event_bus = event_bus + self.state_store = state_store + self.widget_api = widget_api + self.nexus_api = nexus_api + + # Ensure data directory exists + self.data_dir.mkdir(parents=True, exist_ok=True) + + # Plugin storage + self._instances: Dict[str, PluginInstance] = {} + self._classes: Dict[str, Type[PluginAPI]] = {} + self._paths: Dict[str, Path] = {} + + # Support systems + self._loader = PluginLoader() + self._resolver = DependencyResolver() + self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._logger = logging.getLogger("PluginManager") + + # State + self._discovered = False + self._lock = threading.RLock() + + # ========== Discovery ========== + + def discover_plugins(self, directory: Path) -> List[str]: + """Discover plugins in a directory. + + Args: + directory: Directory to search for plugins + + Returns: + List of discovered plugin IDs + """ + discovered: List[str] = [] + + if not directory.exists(): + self._logger.warning(f"Plugin directory does not exist: {directory}") + return discovered + + for item in directory.iterdir(): + if not item.is_dir(): + continue + + if item.name.startswith('.') or item.name.startswith('__'): + continue + + manifest_path = item / "plugin.json" + if not manifest_path.exists(): + continue + + try: + manifest = PluginManifest.from_json(manifest_path) + plugin_id = self._generate_plugin_id(manifest, item) + + with self._lock: + self._paths[plugin_id] = item + self._resolver.add_plugin(plugin_id, manifest) + + discovered.append(plugin_id) + self._logger.debug(f"Discovered plugin: {manifest.name} ({plugin_id})") + + except Exception as e: + self._logger.error(f"Failed to load manifest from {item}: {e}") + + return discovered + + def discover_all(self) -> int: + """Discover plugins in all configured directories. + + Returns: + Total number of plugins discovered + """ + total = 0 + for directory in self.plugin_dirs: + total += len(self.discover_plugins(directory)) + + self._discovered = True + self._logger.info(f"Discovered {total} plugins") + return total + + # ========== Loading ========== + + def load_plugin(self, plugin_id: str) -> bool: + """Load a plugin by ID. + + Args: + plugin_id: Unique plugin identifier + + Returns: + True if loaded successfully + """ + with self._lock: + if plugin_id in self._instances: + return True + + path = self._paths.get(plugin_id) + if not path: + self._logger.error(f"Plugin not found: {plugin_id}") + return False + + manifest_path = path / "plugin.json" + manifest = PluginManifest.from_json(manifest_path) + + # Create instance record + instance = PluginInstance( + plugin_id=plugin_id, + manifest=manifest, + state=PluginState.LOADING + ) + self._instances[plugin_id] = instance + + try: + # Load plugin class + plugin_class = self._loader.load_plugin_class(path, manifest) + self._classes[plugin_id] = plugin_class + + with self._lock: + instance.state = PluginState.LOADED + instance.load_time = datetime.now() + + self._logger.info(f"Loaded plugin: {manifest.name}") + return True + + except Exception as e: + with self._lock: + instance.state = PluginState.ERROR + instance.error_message = str(e) + instance.error_traceback = traceback.format_exc() + + self._logger.error(f"Failed to load plugin {manifest.name}: {e}") + return False + + def load_all(self, auto_activate: bool = False) -> Dict[str, bool]: + """Load all discovered plugins. + + Args: + auto_activate: Automatically activate loaded plugins + + Returns: + Dict mapping plugin IDs to success status + """ + if not self._discovered: + self.discover_all() + + # Resolve load order + plugin_ids = list(self._paths.keys()) + try: + load_order = self._resolver.resolve_load_order(plugin_ids) + except PluginDependencyError as e: + self._logger.error(f"Dependency resolution failed: {e}") + load_order = plugin_ids # Fall back to default order + + # Load in order + results: Dict[str, bool] = {} + for plugin_id in load_order: + results[plugin_id] = self.load_plugin(plugin_id) + + if auto_activate and results[plugin_id]: + self.activate_plugin(plugin_id) + + return results + + # ========== Initialization ========== + + def init_plugin(self, plugin_id: str, config: Optional[Dict[str, Any]] = None) -> bool: + """Initialize a loaded plugin. + + Args: + plugin_id: Plugin ID + config: Optional configuration override + + Returns: + True if initialized successfully + """ + with self._lock: + instance = self._instances.get(plugin_id) + if not instance: + self._logger.error(f"Plugin not loaded: {plugin_id}") + return False + + if instance.state not in (PluginState.LOADED, PluginState.INACTIVE): + self._logger.warning(f"Cannot initialize plugin in state: {instance.state}") + return False + + plugin_class = self._classes.get(plugin_id) + if not plugin_class: + self._logger.error(f"Plugin class not found: {plugin_id}") + return False + + instance.state = PluginState.INITIALIZING + + try: + # Create plugin directory for data + plugin_data_dir = self.data_dir / plugin_id + plugin_data_dir.mkdir(parents=True, exist_ok=True) + + # Load saved config or use provided + saved_config = self._load_plugin_config(plugin_id) + if config: + saved_config.update(config) + + # Create logger + logger = logging.getLogger(f"Plugin.{instance.manifest.name}") + + # Create sandbox + sandbox = PluginSandbox(plugin_id, instance.manifest.permissions) + + # Create context + ctx = PluginContext( + plugin_id=plugin_id, + manifest=instance.manifest, + data_dir=plugin_data_dir, + config=saved_config, + logger=logger, + event_bus=self.event_bus, + state_store=self.state_store, + widget_api=self.widget_api, + nexus_api=self.nexus_api, + permissions=instance.manifest.permissions + ) + + # Create and initialize plugin instance + plugin = plugin_class() + plugin._set_context(ctx) + plugin.on_init(ctx) + + with self._lock: + instance.instance = plugin + instance.state = PluginState.INACTIVE + + self._logger.info(f"Initialized plugin: {instance.manifest.name}") + return True + + except Exception as e: + with self._lock: + instance.state = PluginState.ERROR + instance.error_message = str(e) + instance.error_traceback = traceback.format_exc() + + self._logger.error(f"Failed to initialize plugin {plugin_id}: {e}") + return False + + # ========== Activation ========== + + def activate_plugin(self, plugin_id: str) -> bool: + """Activate an initialized plugin. + + Args: + plugin_id: Plugin ID + + Returns: + True if activated successfully + """ + with self._lock: + instance = self._instances.get(plugin_id) + if not instance: + return False + + if instance.state == PluginState.ACTIVE: + return True + + if instance.state != PluginState.INACTIVE: + # Try to initialize first + if instance.state == PluginState.LOADED: + self.init_plugin(plugin_id) + + if instance.state != PluginState.INACTIVE: + return False + + instance.state = PluginState.ACTIVATING + plugin = instance.instance + + try: + plugin.on_activate() + + with self._lock: + instance.state = PluginState.ACTIVE + instance.activate_time = datetime.now() + + self._logger.info(f"Activated plugin: {instance.manifest.name}") + return True + + except Exception as e: + with self._lock: + instance.state = PluginState.ERROR + instance.error_message = str(e) + + self._logger.error(f"Failed to activate plugin {plugin_id}: {e}") + return False + + def deactivate_plugin(self, plugin_id: str) -> bool: + """Deactivate an active plugin. + + Args: + plugin_id: Plugin ID + + Returns: + True if deactivated successfully + """ + with self._lock: + instance = self._instances.get(plugin_id) + if not instance or instance.state != PluginState.ACTIVE: + return False + + instance.state = PluginState.DEACTIVATING + plugin = instance.instance + + try: + plugin.on_deactivate() + + with self._lock: + instance.state = PluginState.INACTIVE + + self._logger.info(f"Deactivated plugin: {instance.manifest.name}") + return True + + except Exception as e: + self._logger.error(f"Error deactivating plugin {plugin_id}: {e}") + return False + + # ========== Unloading ========== + + def unload_plugin(self, plugin_id: str, force: bool = False) -> bool: + """Unload a plugin. + + Args: + plugin_id: Plugin ID + force: Force unload even if dependents exist + + Returns: + True if unloaded successfully + """ + with self._lock: + instance = self._instances.get(plugin_id) + if not instance: + return True + + # Check dependents + if not force: + dependents = self._resolver.get_dependents(plugin_id) + active_dependents = [ + d for d in dependents + if d in self._instances and self._instances[d].is_active() + ] + if active_dependents: + self._logger.error( + f"Cannot unload {plugin_id}: active dependents {active_dependents}" + ) + return False + + # Deactivate if active + if instance.state == PluginState.ACTIVE: + self.deactivate_plugin(plugin_id) + + instance.state = PluginState.UNLOADING + plugin = instance.instance + + try: + if plugin: + # Save config before shutdown + self._save_plugin_config(plugin_id, plugin.ctx.config) + plugin.on_shutdown() + + with self._lock: + instance.state = PluginState.UNLOADED + instance.instance = None + del self._instances[plugin_id] + del self._classes[plugin_id] + + self._logger.info(f"Unloaded plugin: {instance.manifest.name}") + return True + + except Exception as e: + self._logger.error(f"Error unloading plugin {plugin_id}: {e}") + return False + + def unload_all(self) -> None: + """Unload all plugins in reverse dependency order.""" + # Get active plugins in reverse load order + plugin_ids = list(self._instances.keys()) + + for plugin_id in reversed(plugin_ids): + self.unload_plugin(plugin_id, force=True) + + # ========== Queries ========== + + def get_instance(self, plugin_id: str) -> Optional[PluginInstance]: + """Get plugin instance info.""" + return self._instances.get(plugin_id) + + def get_plugin(self, plugin_id: str) -> Optional[PluginAPI]: + """Get active plugin instance.""" + instance = self._instances.get(plugin_id) + if instance and instance.state == PluginState.ACTIVE: + return instance.instance + return None + + def get_all_instances(self) -> Dict[str, PluginInstance]: + """Get all plugin instances.""" + return self._instances.copy() + + def get_active_plugins(self) -> Dict[str, PluginInstance]: + """Get all active plugins.""" + return { + k: v for k, v in self._instances.items() + if v.state == PluginState.ACTIVE + } + + def get_plugin_ui(self, plugin_id: str) -> Optional[Any]: + """Get plugin's UI widget.""" + plugin = self.get_plugin(plugin_id) + if plugin: + return plugin.create_widget() + return None + + def get_plugin_states(self) -> Dict[str, PluginState]: + """Get state of all plugins.""" + return { + k: v.state for k, v in self._instances.items() + } + + # ========== Configuration ========== + + def _load_plugin_config(self, plugin_id: str) -> Dict[str, Any]: + """Load plugin configuration from disk.""" + config_path = self.data_dir / plugin_id / "config.json" + if config_path.exists(): + try: + with open(config_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + self._logger.error(f"Failed to load config for {plugin_id}: {e}") + return {} + + def _save_plugin_config(self, plugin_id: str, config: Dict[str, Any]) -> None: + """Save plugin configuration to disk.""" + config_path = self.data_dir / plugin_id / "config.json" + try: + with open(config_path, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2) + except Exception as e: + self._logger.error(f"Failed to save config for {plugin_id}: {e}") + + # ========== Utility ========== + + def _generate_plugin_id(self, manifest: PluginManifest, path: Path) -> str: + """Generate unique plugin ID.""" + # Use path hash for uniqueness + path_hash = hashlib.md5(str(path).encode()).hexdigest()[:8] + name_slug = manifest.name.lower().replace(' ', '_').replace('-', '_') + return f"{name_slug}_{path_hash}" + + def shutdown(self) -> None: + """Shutdown the plugin manager.""" + self.unload_all() + self._executor.shutdown(wait=True) + + +# ============================================================================= +# EXPORTS +# ============================================================================= + +__all__ = [ + 'PluginManager', + 'PluginLoader', + 'DependencyResolver', + 'PluginSandbox', +]