diff --git a/core/base_plugin.py b/core/base_plugin.py new file mode 100644 index 0000000..8934498 --- /dev/null +++ b/core/base_plugin.py @@ -0,0 +1,1193 @@ +""" +EU-Utility - Plugin Base Class + +Defines the interface that all plugins must implement. +Includes PluginAPI integration for cross-plugin communication. +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, TYPE_CHECKING, Callable, List, Type + +if TYPE_CHECKING: + from core.overlay_window import OverlayWindow + from core.plugin_api import PluginAPI, APIEndpoint, APIType + from core.event_bus import BaseEvent, EventCategory + + +class BasePlugin(ABC): + """Base class for all EU-Utility plugins. + + To define hotkeys for your plugin, use either: + + 1. Legacy single hotkey (simple toggle): + hotkey = "ctrl+shift+n" + + 2. New multi-hotkey format (recommended): + hotkeys = [ + { + 'action': 'toggle', # Unique action identifier + 'description': 'Toggle My Plugin', # Display name in settings + 'default': 'ctrl+shift+m', # Default hotkey combination + 'config_key': 'myplugin_toggle' # Settings key (optional) + }, + { + 'action': 'quick_action', + 'description': 'Quick Scan', + 'default': 'ctrl+shift+s', + } + ] + """ + + # Plugin metadata - override in subclass + name: str = "Unnamed Plugin" + version: str = "1.0.0" + author: str = "Unknown" + description: str = "No description provided" + icon: Optional[str] = None + + # Plugin settings + hotkey: Optional[str] = None # Legacy single hotkey (e.g., "ctrl+shift+n") + hotkeys: Optional[List[Dict[str, str]]] = None # New multi-hotkey format + enabled: bool = True + + # Dependencies - override in subclass + # Format: { + # 'pip': ['package1', 'package2>=1.0'], + # 'plugins': ['plugin_id1', 'plugin_id2'], # Other plugins this plugin requires + # 'optional': {'package3': 'description'} + # } + dependencies: Dict[str, Any] = {} + + def __init__(self, overlay_window: 'OverlayWindow', config: Dict[str, Any]): + self.overlay = overlay_window + self.config = config + self._ui = None + self._api_registered = False + self._plugin_id = f"{self.__class__.__module__}.{self.__class__.__name__}" + + # Track event subscriptions for cleanup + self._event_subscriptions: List[str] = [] + + # Get API instance + try: + from core.plugin_api import get_api + self.api = get_api() + except ImportError: + self.api = None + + @abstractmethod + def initialize(self) -> None: + """Called when plugin is loaded. Setup API connections, etc.""" + pass + + @abstractmethod + def get_ui(self) -> Any: + """Return the plugin's UI widget (QWidget).""" + return None + + def on_show(self) -> None: + """Called when overlay becomes visible.""" + pass + + def on_hide(self) -> None: + """Called when overlay is hidden.""" + pass + + def on_hotkey(self) -> None: + """Called when plugin's hotkey is pressed.""" + pass + + def shutdown(self) -> None: + """Called when app is closing. Cleanup resources.""" + # Unregister APIs + if self.api and self._api_registered: + self.api.unregister_api(self._plugin_id) + + # Unsubscribe from all typed events + self.unsubscribe_all_typed() + + # ========== Config Methods ========== + + def get_config(self, key: str, default: Any = None) -> Any: + """Get a config value with default.""" + return self.config.get(key, default) + + def set_config(self, key: str, value: Any) -> None: + """Set a config value.""" + self.config[key] = value + + # ========== API Methods ========== + + def register_api(self, name: str, handler: Callable, api_type: 'APIType' = None, description: str = "") -> bool: + """Register an API endpoint for other plugins to use. + + Example: + self.register_api( + "scan_window", + self.scan_window, + APIType.OCR, + "Scan game window and return text" + ) + """ + if not self.api: + print(f"[{self.name}] API not available") + return False + + try: + from core.plugin_api import APIEndpoint, APIType + + if api_type is None: + api_type = APIType.UTILITY + + endpoint = APIEndpoint( + name=name, + api_type=api_type, + description=description, + handler=handler, + plugin_id=self._plugin_id, + version=self.version + ) + + success = self.api.register_api(endpoint) + if success: + self._api_registered = True + return success + + except Exception as e: + print(f"[{self.name}] Failed to register API: {e}") + return False + + def call_api(self, plugin_id: str, api_name: str, *args, **kwargs) -> Any: + """Call another plugin's API. + + Example: + # Call Game Reader's OCR API + result = self.call_api("plugins.game_reader.plugin", "capture_screen") + """ + if not self.api: + raise RuntimeError("API not available") + + return self.api.call_api(plugin_id, api_name, *args, **kwargs) + + def find_apis(self, api_type: 'APIType' = None) -> list: + """Find available APIs from other plugins.""" + if not self.api: + return [] + + return self.api.find_apis(api_type) + + # ========== Shared Services ========== + + def ocr_capture(self, region: tuple = None) -> Dict[str, Any]: + """Capture screen and perform OCR. + + Returns: + {'text': str, 'confidence': float, 'raw_results': list} + """ + if not self.api: + return {"text": "", "confidence": 0, "error": "API not available"} + + return self.api.ocr_capture(region) + + # ========== Screenshot Service Methods ========== + + def capture_screen(self, full_screen: bool = True): + """Capture screenshot. + + Args: + full_screen: If True, capture entire screen + + Returns: + PIL Image object + + Example: + # Capture full screen + screenshot = self.capture_screen() + + # Capture specific region + region = self.capture_region(100, 100, 800, 600) + """ + if not self.api: + raise RuntimeError("API not available") + + return self.api.capture_screen(full_screen) + + def capture_region(self, x: int, y: int, width: int, height: int): + """Capture specific screen region. + + Args: + x: Left coordinate + y: Top coordinate + width: Region width + height: Region height + + Returns: + PIL Image object + + Example: + # Capture a 400x200 region starting at (100, 100) + image = self.capture_region(100, 100, 400, 200) + """ + if not self.api: + raise RuntimeError("API not available") + + return self.api.capture_region(x, y, width, height) + + def get_last_screenshot(self): + """Get the most recent screenshot. + + Returns: + PIL Image or None if no screenshots taken yet + """ + if not self.api: + return None + + return self.api.get_last_screenshot() + + def read_log(self, lines: int = 50, filter_text: str = None) -> list: + """Read recent game log lines.""" + if not self.api: + return [] + + return self.api.read_log(lines, filter_text) + + def get_shared_data(self, key: str, default=None): + """Get shared data from other plugins.""" + if not self.api: + return default + + return self.api.get_data(key, default) + + def set_shared_data(self, key: str, value: Any): + """Set shared data for other plugins.""" + if self.api: + self.api.set_data(key, value) + + # ========== Legacy Event System ========== + + def publish_event(self, event_type: str, data: Dict[str, Any]): + """Publish an event for other plugins to consume (legacy).""" + if self.api: + self.api.publish_event(event_type, data) + + def subscribe(self, event_type: str, callback: Callable): + """Subscribe to events from other plugins (legacy).""" + if self.api: + self.api.subscribe(event_type, callback) + + # ========== Enhanced Typed Event System ========== + + def publish_typed(self, event: 'BaseEvent') -> None: + """ + Publish a typed event to the Event Bus. + + Args: + event: A typed event instance (SkillGainEvent, LootEvent, etc.) + + Example: + from core.event_bus import LootEvent + + self.publish_typed(LootEvent( + mob_name="Daikiba", + items=[{"name": "Animal Oil", "value": 0.05}], + total_tt_value=0.05 + )) + """ + if self.api: + self.api.publish_typed(event) + + def subscribe_typed( + self, + event_class: Type['BaseEvent'], + callback: Callable, + **filter_kwargs + ) -> str: + """ + Subscribe to a specific event type with optional filtering. + + Args: + event_class: The event class to subscribe to + callback: Function to call when matching events occur + **filter_kwargs: Additional filter criteria + - min_damage: Minimum damage threshold + - max_damage: Maximum damage threshold + - mob_types: List of mob names to filter + - skill_names: List of skill names to filter + - sources: List of event sources to filter + - replay_last: Number of recent events to replay + - predicate: Custom filter function + + Returns: + Subscription ID (store this to unsubscribe later) + + Example: + from core.event_bus import DamageEvent + + # Subscribe to all damage events + self.sub_id = self.subscribe_typed(DamageEvent, self.on_damage) + + # Subscribe to high damage events only + self.sub_id = self.subscribe_typed( + DamageEvent, + self.on_big_hit, + min_damage=100 + ) + + # Subscribe with replay + self.sub_id = self.subscribe_typed( + SkillGainEvent, + self.on_skill_gain, + replay_last=10 + ) + """ + if not self.api: + print(f"[{self.name}] API not available for event subscription") + return "" + + sub_id = self.api.subscribe_typed(event_class, callback, **filter_kwargs) + if sub_id: + self._event_subscriptions.append(sub_id) + return sub_id + + def unsubscribe_typed(self, subscription_id: str) -> bool: + """ + Unsubscribe from a specific typed event subscription. + + Args: + subscription_id: The subscription ID returned by subscribe_typed + + Returns: + True if subscription was found and removed + """ + if not self.api: + return False + + result = self.api.unsubscribe_typed(subscription_id) + if result and subscription_id in self._event_subscriptions: + self._event_subscriptions.remove(subscription_id) + return result + + def unsubscribe_all_typed(self) -> None: + """Unsubscribe from all typed event subscriptions.""" + if not self.api: + return + + for sub_id in self._event_subscriptions[:]: # Copy list to avoid modification during iteration + self.api.unsubscribe_typed(sub_id) + self._event_subscriptions.clear() + + def get_recent_events( + self, + event_type: Type['BaseEvent'] = None, + count: int = 100, + category: 'EventCategory' = None + ) -> List['BaseEvent']: + """ + Get recent events from history. + + Args: + event_type: Filter by event class + count: Maximum number of events to return + category: Filter by event category + + Returns: + List of matching events + + Example: + from core.event_bus import LootEvent + + # Get last 20 loot events + recent_loot = self.get_recent_events(LootEvent, 20) + """ + if not self.api: + return [] + + return self.api.get_recent_events(event_type, count, category) + + def get_event_stats(self) -> Dict[str, Any]: + """ + Get Event Bus statistics. + + Returns: + Dict with event bus statistics + """ + if not self.api: + return {} + + return self.api.get_event_stats() + + # ========== Utility Methods ========== + + def format_ped(self, value: float) -> str: + """Format PED value.""" + if self.api: + return self.api.format_ped(value) + return f"{value:.2f} PED" + + def format_pec(self, value: float) -> str: + """Format PEC value.""" + if self.api: + return self.api.format_pec(value) + return f"{value:.0f} PEC" + + def calculate_dpp(self, damage: float, ammo: int, decay: float) -> float: + """Calculate Damage Per PEC.""" + if self.api: + return self.api.calculate_dpp(damage, ammo, decay) + + # Fallback calculation + if damage <= 0: + return 0.0 + ammo_cost = ammo * 0.01 + total_cost = ammo_cost + decay + if total_cost <= 0: + return 0.0 + return damage / (total_cost / 100) + + def calculate_markup(self, price: float, tt: float) -> float: + """Calculate markup percentage.""" + if self.api: + return self.api.calculate_markup(price, tt) + + if tt <= 0: + return 0.0 + return (price / tt) * 100 + + # ========== Audio Service Methods ========== + + def play_sound(self, filename_or_key: str, blocking: bool = False) -> bool: + """Play a sound by key or filename. + + Args: + filename_or_key: Sound key ('global', 'hof', 'skill_gain', 'alert', 'error') + or path to file + blocking: If True, wait for sound to complete (default: False) + + Returns: + True if sound was queued/played, False on error or if muted + + Examples: + # Play predefined sounds + self.play_sound('hof') + self.play_sound('skill_gain') + self.play_sound('alert') + + # Play custom sound file + self.play_sound('/path/to/custom.wav') + """ + if not self.api: + return False + + return self.api.play_sound(filename_or_key, blocking) + + def set_volume(self, volume: float) -> None: + """Set global audio volume. + + Args: + volume: Volume level from 0.0 (mute) to 1.0 (max) + """ + if self.api: + self.api.set_volume(volume) + + def get_volume(self) -> float: + """Get current audio volume. + + Returns: + Current volume level (0.0 to 1.0) + """ + if not self.api: + return 0.0 + + return self.api.get_volume() + + def mute(self) -> None: + """Mute all audio.""" + if self.api: + self.api.mute_audio() + + def unmute(self) -> None: + """Unmute audio.""" + if self.api: + self.api.unmute_audio() + + def toggle_mute(self) -> bool: + """Toggle audio mute state. + + Returns: + New muted state (True if now muted) + """ + if not self.api: + return False + + return self.api.toggle_mute_audio() + + def is_muted(self) -> bool: + """Check if audio is muted. + + Returns: + True if audio is muted + """ + if not self.api: + return False + + return self.api.is_audio_muted() + + def is_audio_available(self) -> bool: + """Check if audio service is available. + + Returns: + True if audio backend is initialized and working + """ + if not self.api: + return False + + return self.api.is_audio_available() + + # ========== Background Task Methods ========== + + def run_in_background(self, func: Callable, *args, + priority: str = 'normal', + on_complete: Callable = None, + on_error: Callable = None, + **kwargs) -> str: + """Run a function in a background thread. + + Use this instead of creating your own QThreads. + + Args: + func: Function to execute in background + *args: Positional arguments for the function + priority: 'high', 'normal', or 'low' (default: 'normal') + on_complete: Called with result when task completes successfully + on_error: Called with exception when task fails + **kwargs: Keyword arguments for the function + + Returns: + Task ID for tracking/cancellation + + Example: + def heavy_calculation(data): + return process(data) + + def on_done(result): + self.update_ui(result) + + def on_fail(error): + self.show_error(str(error)) + + task_id = self.run_in_background( + heavy_calculation, + large_dataset, + priority='high', + on_complete=on_done, + on_error=on_fail + ) + + # Or with decorator style: + @self.run_in_background + def fetch_remote_data(): + return requests.get(url).json() + """ + if not self.api: + raise RuntimeError("API not available") + + return self.api.run_in_background( + func, *args, + priority=priority, + on_complete=on_complete, + on_error=on_error, + **kwargs + ) + + def schedule_task(self, delay_ms: int, func: Callable, *args, + priority: str = 'normal', + on_complete: Callable = None, + on_error: Callable = None, + periodic: bool = False, + interval_ms: int = None, + **kwargs) -> str: + """Schedule a task for delayed or periodic execution. + + Args: + delay_ms: Milliseconds to wait before first execution + func: Function to execute + *args: Positional arguments + priority: 'high', 'normal', or 'low' + on_complete: Called with result after each execution + on_error: Called with exception if execution fails + periodic: If True, repeat execution at interval_ms + interval_ms: Milliseconds between periodic executions + **kwargs: Keyword arguments + + Returns: + Task ID for tracking/cancellation + + Example: + # One-time delayed execution + task_id = self.schedule_task( + 5000, # 5 seconds + lambda: print("Hello after delay!") + ) + + # Periodic data refresh (every 30 seconds) + self.schedule_task( + 0, # Start immediately + self.refresh_data, + periodic=True, + interval_ms=30000, + on_complete=lambda data: self.update_display(data) + ) + """ + if not self.api: + raise RuntimeError("API not available") + + return self.api.schedule_task( + delay_ms, func, *args, + priority=priority, + on_complete=on_complete, + on_error=on_error, + periodic=periodic, + interval_ms=interval_ms, + **kwargs + ) + + def cancel_task(self, task_id: str) -> bool: + """Cancel a pending or running task. + + Args: + task_id: Task ID returned by run_in_background or schedule_task + + Returns: + True if task was cancelled, False if not found or already done + """ + if not self.api: + return False + + return self.api.cancel_task(task_id) + + def connect_task_signals(self, + on_completed: Callable = None, + on_failed: Callable = None, + on_started: Callable = None, + on_cancelled: Callable = None) -> bool: + """Connect to task status signals for UI updates. + + Connects Qt signals so UI updates from background threads are thread-safe. + + Args: + on_completed: Called with (task_id, result) when tasks complete + on_failed: Called with (task_id, error_message) when tasks fail + on_started: Called with (task_id) when tasks start + on_cancelled: Called with (task_id) when tasks are cancelled + + Returns: + True if signals were connected + + Example: + class MyPlugin(BasePlugin): + def initialize(self): + # Connect task signals for UI updates + self.connect_task_signals( + on_completed=self._on_task_done, + on_failed=self._on_task_error + ) + + def _on_task_done(self, task_id, result): + self.status_label.setText(f"Task {task_id}: Done!") + + def _on_task_error(self, task_id, error): + self.status_label.setText(f"Task {task_id} failed: {error}") + """ + if not self.api: + return False + + connected = False + + if on_completed: + connected = self.api.connect_task_signal('completed', on_completed) or connected + if on_failed: + connected = self.api.connect_task_signal('failed', on_failed) or connected + if on_started: + connected = self.api.connect_task_signal('started', on_started) or connected + if on_cancelled: + connected = self.api.connect_task_signal('cancelled', on_cancelled) or connected + + return connected + + # ========== Nexus API Methods ========== + + def nexus_search(self, query: str, entity_type: str = "items", limit: int = 20) -> List[Dict[str, Any]]: + """Search for entities via Entropia Nexus API. + + Args: + query: Search query string + entity_type: Type of entity to search. Valid types: + - items, weapons, armors + - mobs, pets + - blueprints, materials + - locations, teleporters, shops, planets, areas + - skills + - enhancers, medicaltools, finders, excavators, refiners + - vehicles, decorations, furniture + - storagecontainers, strongboxes, vendors + limit: Maximum number of results (default: 20, max: 100) + + Returns: + List of search result dictionaries + + Example: + # Search for weapons + results = self.nexus_search("ArMatrix", entity_type="weapons") + + # Search for mobs + mobs = self.nexus_search("Atrox", entity_type="mobs") + + # Search for locations + locations = self.nexus_search("Fort", entity_type="locations") + + # Process results + for item in results: + print(f"{item['name']} ({item['type']})") + """ + if not self.api: + return [] + + return self.api.nexus_search(query, entity_type, limit) + + def nexus_get_item_details(self, item_id: str) -> Optional[Dict[str, Any]]: + """Get detailed information about a specific item. + + Args: + item_id: The item's unique identifier (e.g., "armatrix_lp-35") + + Returns: + Dictionary with item details, or None if not found + + Example: + details = self.nexus_get_item_details("armatrix_lp-35") + if details: + print(f"Name: {details['name']}") + print(f"TT Value: {details['tt_value']} PED") + print(f"Damage: {details.get('damage', 'N/A')}") + print(f"Range: {details.get('range', 'N/A')}m") + """ + if not self.api: + return None + + return self.api.nexus_get_item_details(item_id) + + def nexus_get_market_data(self, item_id: str) -> Optional[Dict[str, Any]]: + """Get market data for a specific item. + + Args: + item_id: The item's unique identifier + + Returns: + Dictionary with market data, or None if not found + + Example: + market = self.nexus_get_market_data("armatrix_lp-35") + if market: + print(f"Current markup: {market['current_markup']:.1f}%") + print(f"7-day avg: {market['avg_markup_7d']:.1f}%") + print(f"24h Volume: {market['volume_24h']}") + + # Check orders + for buy in market.get('buy_orders', [])[:5]: + print(f"Buy: {buy['price']} PED x {buy['quantity']}") + """ + if not self.api: + return None + + return self.api.nexus_get_market_data(item_id) + + def nexus_is_available(self) -> bool: + """Check if Nexus API is available. + + Returns: + True if Nexus API service is ready + """ + if not self.api: + return False + + return self.api.nexus_is_available() + + # ========== HTTP Client Methods ========== + + def http_get(self, url: str, cache_ttl: int = 300, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]: + """Make an HTTP GET request with caching. + + Args: + url: The URL to fetch + cache_ttl: Cache TTL in seconds (default: 300 = 5 minutes) + headers: Additional headers + **kwargs: Additional arguments + + Returns: + Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache' + + Example: + response = self.http_get( + "https://api.example.com/data", + cache_ttl=600, + headers={'Accept': 'application/json'} + ) + if response['status_code'] == 200: + data = response['json'] + """ + if not self.api: + raise RuntimeError("API not available") + + # Get HTTP client from services + http_client = self.api.services.get('http') + if not http_client: + raise RuntimeError("HTTP client not available") + + return http_client.get(url, cache_ttl=cache_ttl, headers=headers, **kwargs) + + # ========== DataStore Methods ========== + + def save_data(self, key: str, data: Any) -> bool: + """Save data to persistent storage. + + Data is automatically scoped to this plugin and survives app restarts. + + Args: + key: Key to store data under + data: Data to store (must be JSON serializable) + + Returns: + True if saved successfully + + Example: + # Save plugin settings + self.save_data("settings", {"theme": "dark", "volume": 0.8}) + + # Save user progress + self.save_data("total_loot", {"ped": 150.50, "items": 42}) + """ + if not self.api: + return False + + data_store = self.api.services.get('data_store') + if not data_store: + print(f"[{self.name}] DataStore not available") + return False + + return data_store.save(self._plugin_id, key, data) + + def load_data(self, key: str, default: Any = None) -> Any: + """Load data from persistent storage. + + Args: + key: Key to load data from + default: Default value if key doesn't exist + + Returns: + Stored data or default value + + Example: + # Load settings with defaults + settings = self.load_data("settings", {"theme": "light", "volume": 1.0}) + + # Load progress + progress = self.load_data("total_loot", {"ped": 0, "items": 0}) + print(f"Total loot: {progress['ped']} PED") + """ + if not self.api: + return default + + data_store = self.api.services.get('data_store') + if not data_store: + return default + + return data_store.load(self._plugin_id, key, default) + + def delete_data(self, key: str) -> bool: + """Delete data from persistent storage. + + Args: + key: Key to delete + + Returns: + True if deleted (or didn't exist), False on error + """ + if not self.api: + return False + + data_store = self.api.services.get('data_store') + if not data_store: + return False + + return data_store.delete(self._plugin_id, key) + + def get_all_data_keys(self) -> List[str]: + """Get all data keys stored by this plugin. + + Returns: + List of key names + """ + if not self.api: + return [] + + data_store = self.api.services.get('data_store') + if not data_store: + return [] + + return data_store.get_all_keys(self._plugin_id) + + # ========== Window Manager Methods ========== + + def get_eu_window(self) -> Optional[Dict[str, Any]]: + """Get information about the Entropia Universe game window. + + Returns: + Dict with window info or None if not found: + - handle: Window handle (int) + - title: Window title (str) + - rect: (left, top, right, bottom) tuple + - width: Window width (int) + - height: Window height (int) + - visible: Whether window is visible (bool) + + Example: + window = self.get_eu_window() + if window: + print(f"EU window: {window['width']}x{window['height']}") + print(f"Position: {window['rect']}") + """ + if not self.api: + return None + + return self.api.get_eu_window() + + def is_eu_focused(self) -> bool: + """Check if Entropia Universe window is currently focused. + + Returns: + True if EU is the active window + + Example: + if self.is_eu_focused(): + # Safe to capture screenshot + screenshot = self.capture_screen() + """ + if not self.api: + return False + + return self.api.is_eu_focused() + + def is_eu_visible(self) -> bool: + """Check if Entropia Universe window is visible. + + Returns: + True if EU window is visible (not minimized) + """ + if not self.api: + return False + + return self.api.is_eu_visible() + + def bring_eu_to_front(self) -> bool: + """Bring Entropia Universe window to front and focus it. + + Returns: + True if successful + """ + if not self.api: + return False + + return self.api.bring_eu_to_front() + + # ========== Clipboard Methods ========== + + def copy_to_clipboard(self, text: str) -> bool: + """Copy text to system clipboard. + + Args: + text: Text to copy + + Returns: + True if successful + + Example: + # Copy coordinates + self.copy_to_clipboard("12345, 67890") + + # Copy calculation result + result = self.calculate_dpp(50, 100, 2.5) + self.copy_to_clipboard(f"DPP: {result:.2f}") + """ + if not self.api: + return False + + return self.api.copy_to_clipboard(text) + + def paste_from_clipboard(self) -> str: + """Paste text from system clipboard. + + Returns: + Clipboard content or empty string + + Example: + # Get pasted coordinates + coords = self.paste_from_clipboard() + if coords: + x, y = map(int, coords.split(",")) + """ + if not self.api: + return "" + + return self.api.paste_from_clipboard() + + def get_clipboard_history(self, limit: int = 10) -> List[Dict[str, str]]: + """Get recent clipboard history. + + Args: + limit: Maximum number of entries to return + + Returns: + List of clipboard entries with 'text', 'timestamp', 'source' + """ + if not self.api: + return [] + + return self.api.get_clipboard_history(limit) + + # ========== Notification Methods ========== + + def notify(self, title: str, message: str, notification_type: str = 'info', sound: bool = False, duration_ms: int = 5000) -> str: + """Show a toast notification. + + Args: + title: Notification title + message: Notification message + notification_type: 'info', 'warning', 'error', or 'success' + sound: Play notification sound + duration_ms: How long to show notification (default: 5000ms) + + Returns: + Notification ID + + Example: + # Info notification + self.notify("Session Started", "Tracking loot...") + + # Success with sound + self.notify("Global!", "You received 150 PED", notification_type='success', sound=True) + + # Warning + self.notify("Low Ammo", "Only 100 shots remaining", notification_type='warning') + + # Error + self.notify("Connection Failed", "Check your internet", notification_type='error', sound=True) + """ + if not self.api: + return "" + + return self.api.notify(title, message, notification_type, sound, duration_ms) + + def notify_info(self, title: str, message: str, sound: bool = False) -> str: + """Show info notification (convenience method).""" + return self.notify(title, message, 'info', sound) + + def notify_success(self, title: str, message: str, sound: bool = False) -> str: + """Show success notification (convenience method).""" + return self.notify(title, message, 'success', sound) + + def notify_warning(self, title: str, message: str, sound: bool = False) -> str: + """Show warning notification (convenience method).""" + return self.notify(title, message, 'warning', sound) + + def notify_error(self, title: str, message: str, sound: bool = True) -> str: + """Show error notification (convenience method).""" + return self.notify(title, message, 'error', sound) + + def close_notification(self, notification_id: str) -> bool: + """Close a specific notification. + + Args: + notification_id: ID returned by notify() + + Returns: + True if closed + """ + if not self.api: + return False + + return self.api.close_notification(notification_id) + + def close_all_notifications(self) -> None: + """Close all visible notifications.""" + if not self.api: + return + + self.api.close_all_notifications() + + # ========== Settings Methods ========== + + def get_setting(self, key: str, default: Any = None) -> Any: + """Get a global EU-Utility setting. + + These are user preferences that apply across all plugins. + + Args: + key: Setting key + default: Default value if not set + + Returns: + Setting value + + Available settings: + - theme: 'dark', 'light', or 'auto' + - overlay_opacity: float 0.0-1.0 + - icon_size: 'small', 'medium', 'large' + - minimize_to_tray: bool + - show_tooltips: bool + - global_hotkeys: Dict of hotkey mappings + """ + if not self.api: + return default + + settings = self.api.services.get('settings') + if not settings: + return default + + return settings.get(key, default) + + def set_setting(self, key: str, value: Any) -> bool: + """Set a global EU-Utility setting. + + Args: + key: Setting key + value: Value to set + + Returns: + True if saved + """ + if not self.api: + return False + + settings = self.api.services.get('settings') + if not settings: + return False + + return settings.set(key, value) + + # ========== Logging Methods ========== + + def log_debug(self, message: str) -> None: + """Log debug message (development only).""" + print(f"[DEBUG][{self.name}] {message}") + + def log_info(self, message: str) -> None: + """Log info message.""" + print(f"[INFO][{self.name}] {message}") + + def log_warning(self, message: str) -> None: + """Log warning message.""" + print(f"[WARNING][{self.name}] {message}") + + def log_error(self, message: str) -> None: + """Log error message.""" + print(f"[ERROR][{self.name}] {message}") diff --git a/core/plugin_manager.py b/core/plugin_manager.py index bf6d1ea..c21dff1 100644 --- a/core/plugin_manager.py +++ b/core/plugin_manager.py @@ -12,7 +12,7 @@ import importlib.util from pathlib import Path from typing import Dict, List, Type, Optional -from plugins.base_plugin import BasePlugin +from core.base_plugin import BasePlugin class PluginManager: diff --git a/core/plugin_manager_optimized.py b/core/plugin_manager_optimized.py index 29e65d3..aee0623 100644 --- a/core/plugin_manager_optimized.py +++ b/core/plugin_manager_optimized.py @@ -20,7 +20,7 @@ from typing import Dict, List, Type, Optional, Any from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache -from plugins.base_plugin import BasePlugin +from core.base_plugin import BasePlugin class PluginMetadata: diff --git a/plugins/plugin_store_ui/plugin.py b/plugins/plugin_store_ui/plugin.py index 6f85f09..6b6007c 100644 --- a/plugins/plugin_store_ui/plugin.py +++ b/plugins/plugin_store_ui/plugin.py @@ -6,7 +6,7 @@ Provides the Plugin Store interface for browsing and installing plugins. from PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel -from plugins.base_plugin import BasePlugin +from core.base_plugin import BasePlugin from core.plugin_store import PluginStoreUI diff --git a/plugins/settings/plugin.py b/plugins/settings/plugin.py index bd4aeaa..2b8882b 100644 --- a/plugins/settings/plugin.py +++ b/plugins/settings/plugin.py @@ -13,7 +13,7 @@ from PyQt6.QtWidgets import ( from PyQt6.QtCore import Qt, QTimer from core.settings import get_settings -from plugins.base_plugin import BasePlugin +from core.base_plugin import BasePlugin class SettingsPlugin(BasePlugin):