diff --git a/core/api/__init__.py b/core/api/__init__.py new file mode 100644 index 0000000..1f283f4 --- /dev/null +++ b/core/api/__init__.py @@ -0,0 +1,94 @@ +""" +EU-Utility - Core APIs +====================== + +A comprehensive three-tier API architecture for EU-Utility. + +Three APIs: +----------- +1. PluginAPI (core.api.plugin_api) - For plugin development +2. WidgetAPI (core.api.widget_api) - For widget management +3. ExternalAPI (core.api.external_api) - For external integrations + +Quick Reference: +--------------- + +PluginAPI - Access core services: + >>> from core.api import get_api + >>> api = get_api() + >>> api.show_notification("Hello", "World!") + >>> window = api.get_eu_window() + +WidgetAPI - Manage overlay widgets: + >>> from core.api import get_widget_api + >>> widget_api = get_widget_api() + >>> widget = widget_api.create_widget("tracker", "Loot Tracker") + >>> widget.show() + +ExternalAPI - Third-party integrations: + >>> from core.api import get_external_api + >>> ext = get_external_api() + >>> ext.start_server(port=8080) + >>> ext.register_webhook("/discord", handler) + +See individual API modules for full documentation. +""" + +# Import all three APIs +from core.api.plugin_api import ( + PluginAPI, + get_api, + PluginAPIError, + ServiceNotAvailableError +) + +from core.api.widget_api import ( + WidgetAPI, + get_widget_api, + Widget, + WidgetConfig, + WidgetType, + WidgetAnchor +) + +from core.api.external_api import ( + ExternalAPI, + get_external_api, + ExternalAPIError, + WebhookError, + ServerError, + WebhookConfig, + APIEndpoint +) + +# Convenience re-exports +__all__ = [ + # PluginAPI + 'PluginAPI', + 'get_api', + 'PluginAPIError', + 'ServiceNotAvailableError', + + # WidgetAPI + 'WidgetAPI', + 'get_widget_api', + 'Widget', + 'WidgetConfig', + 'WidgetType', + 'WidgetAnchor', + + # ExternalAPI + 'ExternalAPI', + 'get_external_api', + 'ExternalAPIError', + 'WebhookError', + 'ServerError', + 'WebhookConfig', + 'APIEndpoint' +] + +# Version +__version__ = "2.2.0" + +# API compatibility version +API_VERSION = "2.2" diff --git a/core/api/external_api.py b/core/api/external_api.py new file mode 100644 index 0000000..eeceb46 --- /dev/null +++ b/core/api/external_api.py @@ -0,0 +1,822 @@ +""" +EU-Utility - External API +================ + +The ExternalAPI provides interfaces for third-party integrations, +webhooks, REST endpoints, and external application communication. + +This API is designed for: +- Browser extensions +- Discord bots +- Home Assistant +- Stream Deck +- Custom scripts +- Other applications + +Quick Start: +----------- +```python +from core.api.external_api import get_external_api + +# Start the REST API server +api = get_external_api() +api.start_server(port=8080) + +# Register a webhook handler +api.register_webhook("/loot", handle_loot_event) + +# Send data to external service +api.post_webhook("https://discord.com/webhook/...", {"content": "Hello!"}) +``` + +Integration Types: +----------------- +- REST API - HTTP endpoints for external queries +- Webhooks - Receive events from external services +- WebSocket - Real-time bidirectional communication +- IPC - Inter-process communication +- File Watch - Monitor file changes + +For full documentation, see: docs/EXTERNAL_API.md +""" + +import json +import time +import asyncio +from pathlib import Path +from typing import Optional, Dict, List, Callable, Any, Union +from dataclasses import dataclass, asdict +from datetime import datetime +from threading import Thread +import hashlib +import hmac + +from core.logger import get_logger + +logger = get_logger(__name__) + + +class ExternalAPIError(Exception): + """Base exception for ExternalAPI errors.""" + pass + + +class WebhookError(ExternalAPIError): + """Raised when webhook operations fail.""" + pass + + +class ServerError(ExternalAPIError): + """Raised when server operations fail.""" + pass + + +@dataclass +class WebhookConfig: + """Configuration for a webhook endpoint.""" + path: str + handler: Callable + secret: Optional[str] = None # For HMAC verification + methods: List[str] = None + rate_limit: int = 60 # Requests per minute + + def __post_init__(self): + if self.methods is None: + self.methods = ['POST'] + + +@dataclass +class APIEndpoint: + """REST API endpoint definition.""" + path: str + handler: Callable + methods: List[str] = None + auth_required: bool = False + + def __post_init__(self): + if self.methods is None: + self.methods = ['GET'] + + +class ExternalAPI: + """ + ExternalAPI - API for third-party integrations. + + Provides REST endpoints, webhooks, WebSocket support, and + other external communication interfaces. + + Security: + --------- + - Optional HMAC signature verification for webhooks + - API key authentication for REST endpoints + - Rate limiting on all endpoints + - CORS configuration for browser access + + Example: + -------- + ```python + from core.api.external_api import get_external_api + + api = get_external_api() + + # Start REST server + api.start_server(port=8080) + + # Register an endpoint + @api.endpoint("/stats", methods=["GET"]) + def get_stats(): + return {"kills": 100, "loot": "50 PED"} + + # Register a webhook + api.register_webhook("/discord", handle_discord) + + # Send to external service + api.post_webhook("https://discord.com/api/webhooks/...", + {"content": "Bot started!"}) + ``` + """ + + _instance: Optional['ExternalAPI'] = None + _server_running: bool = False + _server_port: int = 0 + _webhooks: Dict[str, WebhookConfig] = {} + _endpoints: Dict[str, APIEndpoint] = {} + _webhook_history: List[Dict] = [] + _api_keys: Dict[str, Dict] = {} + _cors_origins: List[str] = ['*'] + _ipc_callbacks: Dict[str, Callable] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + # ===================================================================== + # REST API Server + # ===================================================================== + + def start_server(self, port: int = 8080, + host: str = "127.0.0.1", + cors_origins: List[str] = None) -> bool: + """ + Start the REST API server. + + Args: + port: Port to listen on + host: Host to bind to (default: localhost only) + cors_origins: Allowed CORS origins + + Returns: + True if server started + + Example: + >>> api.start_server(port=8080) + [ExternalAPI] REST server started on http://127.0.0.1:8080 + """ + if self._server_running: + logger.warning("[ExternalAPI] Server already running") + return False + + try: + from aiohttp import web + + self._app = web.Application() + self._server_port = port + self._server_host = host + + if cors_origins: + self._cors_origins = cors_origins + + # Setup routes + self._setup_routes() + + # Start server in background thread + self._server_thread = Thread( + target=self._run_server, + daemon=True + ) + self._server_thread.start() + + self._server_running = True + logger.info(f"[ExternalAPI] REST server started on http://{host}:{port}") + return True + + except ImportError: + logger.error("[ExternalAPI] aiohttp not installed. Run: pip install aiohttp") + return False + except Exception as e: + logger.error(f"[ExternalAPI] Failed to start server: {e}") + return False + + def stop_server(self) -> bool: + """ + Stop the REST API server. + + Returns: + True if stopped + """ + if not self._server_running: + return False + + try: + asyncio.run_coroutine_threadsafe( + self._runner.cleanup(), + self._loop + ) + self._server_running = False + logger.info("[ExternalAPI] REST server stopped") + return True + except Exception as e: + logger.error(f"[ExternalAPI] Error stopping server: {e}") + return False + + def _run_server(self): + """Internal: Run the asyncio server.""" + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + from aiohttp import web + + self._runner = web.AppRunner(self._app) + self._loop.run_until_complete(self._runner.setup()) + + site = web.TCPSite(self._runner, self._server_host, self._server_port) + self._loop.run_until_complete(site.start()) + + # Keep running + self._loop.run_forever() + + def _setup_routes(self): + """Internal: Setup HTTP routes.""" + from aiohttp import web + + # Health check + self._app.router.add_get('/health', self._handle_health) + + # API endpoints + self._app.router.add_get('/api/v1/{path:.*}', self._handle_api_get) + self._app.router.add_post('/api/v1/{path:.*}', self._handle_api_post) + + # Webhooks + self._app.router.add_post('/webhook/{name}', self._handle_webhook) + + # Events stream (SSE) + self._app.router.add_get('/events', self._handle_events) + + async def _handle_health(self, request): + """Handle health check request.""" + return web.json_response({ + 'status': 'ok', + 'timestamp': datetime.now().isoformat(), + 'server_running': self._server_running + }) + + async def _handle_api_get(self, request): + """Handle API GET requests.""" + from aiohttp import web + + path = request.match_info['path'] + + # Check authentication + if not self._check_auth(request): + return web.json_response( + {'error': 'Unauthorized'}, + status=401 + ) + + # Find endpoint + endpoint = self._endpoints.get(path) + if not endpoint or 'GET' not in endpoint.methods: + return web.json_response( + {'error': 'Not found'}, + status=404 + ) + + try: + # Get query params + params = dict(request.query) + + # Call handler + result = endpoint.handler(params) + + return web.json_response({ + 'success': True, + 'data': result + }) + except Exception as e: + logger.error(f"[ExternalAPI] GET {path} error: {e}") + return web.json_response( + {'error': str(e)}, + status=500 + ) + + async def _handle_api_post(self, request): + """Handle API POST requests.""" + from aiohttp import web + + path = request.match_info['path'] + + if not self._check_auth(request): + return web.json_response( + {'error': 'Unauthorized'}, + status=401 + ) + + endpoint = self._endpoints.get(path) + if not endpoint or 'POST' not in endpoint.methods: + return web.json_response( + {'error': 'Not found'}, + status=404 + ) + + try: + # Get JSON body + data = await request.json() + + # Call handler + result = endpoint.handler(data) + + return web.json_response({ + 'success': True, + 'data': result + }) + except Exception as e: + logger.error(f"[ExternalAPI] POST {path} error: {e}") + return web.json_response( + {'error': str(e)}, + status=500 + ) + + async def _handle_webhook(self, request): + """Handle incoming webhooks."""""" + from aiohttp import web + + name = request.match_info['name'] + webhook = self._webhooks.get(name) + + if not webhook: + return web.json_response( + {'error': 'Webhook not found'}, + status=404 + ) + + # Check method + if request.method not in webhook.methods: + return web.json_response( + {'error': 'Method not allowed'}, + status=405 + ) + + try: + # Get payload + payload = await request.json() + + # Verify signature if secret configured + if webhook.secret: + signature = request.headers.get('X-Signature', '') + if not self._verify_signature(payload, signature, webhook.secret): + return web.json_response( + {'error': 'Invalid signature'}, + status=401 + ) + + # Log webhook + self._log_webhook(name, payload) + + # Call handler + result = webhook.handler(payload) + + return web.json_response({ + 'success': True, + 'data': result + }) + except Exception as e: + logger.error(f"[ExternalAPI] Webhook {name} error: {e}") + return web.json_response( + {'error': str(e)}, + status=500 + ) + + async def _handle_events(self, request): + """Handle Server-Sent Events (SSE) connection.""" + from aiohttp import web + + response = web.StreamResponse( + status=200, + reason='OK', + headers={ + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + } + ) + await response.prepare(request) + + # Keep connection alive and send events + try: + while True: + # Send heartbeat + await response.write(b'event: ping\ndata: {}\n\n') + await asyncio.sleep(30) + except: + pass + + return response + + # ===================================================================== + # API Endpoints + # ===================================================================== + + def endpoint(self, path: str, methods: List[str] = None, + auth_required: bool = False): + """ + Decorator to register an API endpoint. + + Args: + path: URL path (e.g., "stats", "items/search") + methods: HTTP methods allowed (default: ['GET']) + auth_required: Whether API key is required + + Example: + >>> @api.endpoint("stats", methods=["GET"]) + ... def get_stats(): + ... return {"kills": 100} + + >>> @api.endpoint("loot", methods=["POST"]) + ... def record_loot(data): + ... save_loot(data) + ... return {"status": "saved"} + """ + def decorator(func: Callable): + self.register_endpoint(path, func, methods, auth_required) + return func + return decorator + + def register_endpoint(self, path: str, handler: Callable, + methods: List[str] = None, + auth_required: bool = False) -> None: + """ + Register an API endpoint programmatically. + + Args: + path: URL path + handler: Function to handle requests + methods: HTTP methods + auth_required: Require authentication + """ + endpoint = APIEndpoint( + path=path, + handler=handler, + methods=methods or ['GET'], + auth_required=auth_required + ) + self._endpoints[path] = endpoint + logger.debug(f"[ExternalAPI] Registered endpoint: {path}") + + def unregister_endpoint(self, path: str) -> bool: + """Unregister an endpoint.""" + if path in self._endpoints: + del self._endpoints[path] + return True + return False + + def get_endpoints(self) -> List[str]: + """Get list of registered endpoint paths.""" + return list(self._endpoints.keys()) + + # ===================================================================== + # Webhooks (Incoming) + # ===================================================================== + + def register_webhook(self, name: str, handler: Callable, + secret: str = None, + methods: List[str] = None, + rate_limit: int = 60) -> None: + """ + Register a webhook endpoint. + + External services can POST to /webhook/{name} + + Args: + name: Webhook name (becomes URL path) + handler: Function(payload) to handle webhook + secret: Optional secret for HMAC verification + methods: Allowed HTTP methods + rate_limit: Max requests per minute + + Example: + >>> def handle_discord(payload): + ... print(f"Discord event: {payload}") + ... + >>> api.register_webhook("discord", handle_discord) + # Now POST to http://localhost:8080/webhook/discord + """ + webhook = WebhookConfig( + path=name, + handler=handler, + secret=secret, + methods=methods or ['POST'], + rate_limit=rate_limit + ) + self._webhooks[name] = webhook + logger.info(f"[ExternalAPI] Registered webhook: /webhook/{name}") + + def unregister_webhook(self, name: str) -> bool: + """Unregister a webhook.""" + if name in self._webhooks: + del self._webhooks[name] + return True + return False + + def get_webhooks(self) -> List[str]: + """Get list of registered webhook names.""" + return list(self._webhooks.keys()) + + def _verify_signature(self, payload: Dict, signature: str, secret: str) -> bool: + """Verify webhook HMAC signature.""" + try: + payload_str = json.dumps(payload, sort_keys=True) + expected = hmac.new( + secret.encode(), + payload_str.encode(), + hashlib.sha256 + ).hexdigest() + return hmac.compare_digest(signature, expected) + except: + return False + + def _log_webhook(self, name: str, payload: Dict) -> None: + """Log webhook call.""" + self._webhook_history.append({ + 'name': name, + 'timestamp': datetime.now().isoformat(), + 'payload_size': len(str(payload)) + }) + + # Keep last 100 entries + if len(self._webhook_history) > 100: + self._webhook_history = self._webhook_history[-100:] + + def get_webhook_history(self, limit: int = 50) -> List[Dict]: + """ + Get recent webhook call history. + + Args: + limit: Number of entries to return + + Returns: + List of webhook call records + """ + return self._webhook_history[-limit:] + + # ===================================================================== + # Webhooks (Outgoing) + # ===================================================================== + + def post_webhook(self, url: str, data: Dict, + headers: Dict = None, + timeout: int = 10) -> Dict: + """ + POST data to external webhook URL. + + Args: + url: Webhook URL + data: JSON payload + headers: Additional headers + timeout: Request timeout + + Returns: + Response dict: {'success': bool, 'status': int, 'data': Any} + + Example: + >>> api.post_webhook( + ... "https://discord.com/api/webhooks/...", + ... {"content": "Loot: 100 PED!"} + ... ) + """ + try: + import requests + + default_headers = {'Content-Type': 'application/json'} + if headers: + default_headers.update(headers) + + response = requests.post( + url, + json=data, + headers=default_headers, + timeout=timeout + ) + + return { + 'success': 200 <= response.status_code < 300, + 'status': response.status_code, + 'data': response.text + } + except Exception as e: + logger.error(f"[ExternalAPI] Webhook POST failed: {e}") + return { + 'success': False, + 'error': str(e) + } + + # ===================================================================== + # Authentication + # ===================================================================== + + def create_api_key(self, name: str, permissions: List[str] = None) -> str: + """ + Create a new API key. + + Args: + name: Key identifier + permissions: List of allowed permissions + + Returns: + Generated API key + """ + key = hashlib.sha256( + f"{name}{time.time()}".encode() + ).hexdigest()[:32] + + self._api_keys[key] = { + 'name': name, + 'permissions': permissions or ['read'], + 'created': datetime.now().isoformat(), + 'last_used': None + } + + return key + + def revoke_api_key(self, key: str) -> bool: + """Revoke an API key.""" + if key in self._api_keys: + del self._api_keys[key] + return True + return False + + def _check_auth(self, request) -> bool: + """Internal: Check request authentication.""" + # Check for API key in header + api_key = request.headers.get('X-API-Key', '') + + if api_key in self._api_keys: + self._api_keys[api_key]['last_used'] = datetime.now().isoformat() + return True + + return False + + # ===================================================================== + # IPC (Inter-Process Communication) + # ===================================================================== + + def register_ipc_handler(self, channel: str, handler: Callable) -> None: + """ + Register an IPC message handler. + + For communication with other local processes. + + Args: + channel: IPC channel name + handler: Function(data) to handle messages + + Example: + >>> def on_browser_message(data): + ... print(f"From browser: {data}") + ... + >>> api.register_ipc_handler("browser", on_browser_message) + """ + self._ipc_callbacks[channel] = handler + logger.debug(f"[ExternalAPI] Registered IPC handler: {channel}") + + def send_ipc(self, channel: str, data: Dict) -> bool: + """ + Send IPC message to registered handler. + + Args: + channel: IPC channel name + data: Message data + + Returns: + True if sent + """ + if channel in self._ipc_callbacks: + try: + self._ipc_callbacks[channel](data) + return True + except Exception as e: + logger.error(f"[ExternalAPI] IPC send failed: {e}") + return False + + # ===================================================================== + # File Watcher + # ===================================================================== + + def watch_file(self, filepath: str, + callback: Callable, + interval: float = 1.0) -> str: + """ + Watch a file for changes. + + Args: + filepath: Path to file + callback: Function() called on change + interval: Check interval in seconds + + Returns: + Watch ID + + Example: + >>> def on_config_change(): + ... print("Config file changed!") + ... + >>> watch_id = api.watch_file("config.json", on_config_change) + """ + import hashlib + + watch_id = hashlib.md5(filepath.encode()).hexdigest()[:8] + + def watcher(): + last_mtime = 0 + while True: + try: + mtime = Path(filepath).stat().st_mtime + if mtime != last_mtime: + last_mtime = mtime + callback() + except: + pass + time.sleep(interval) + + thread = Thread(target=watcher, daemon=True) + thread.start() + + return watch_id + + # ===================================================================== + # Status & Info + # ===================================================================== + + def get_status(self) -> Dict: + """ + Get ExternalAPI status. + + Returns: + Status dictionary + """ + return { + 'server_running': self._server_running, + 'server_port': self._server_port, + 'endpoints': len(self._endpoints), + 'webhooks': len(self._webhooks), + 'api_keys': len(self._api_keys), + 'webhook_history': len(self._webhook_history) + } + + def get_url(self, path: str = "") -> str: + """ + Get full URL for a path. + + Args: + path: URL path + + Returns: + Full URL + """ + if self._server_running: + return f"http://127.0.0.1:{self._server_port}/{path.lstrip('/')}" + return "" + + +# Global instance +_external_api_instance: Optional[ExternalAPI] = None + + +def get_external_api() -> ExternalAPI: + """ + Get the global ExternalAPI instance. + + Returns: + ExternalAPI singleton + + Example: + >>> from core.api.external_api import get_external_api + >>> api = get_external_api() + >>> api.start_server(port=8080) + """ + global _external_api_instance + if _external_api_instance is None: + _external_api_instance = ExternalAPI() + return _external_api_instance + + +# Convenience exports +__all__ = [ + 'ExternalAPI', + 'get_external_api', + 'ExternalAPIError', + 'WebhookError', + 'ServerError', + 'WebhookConfig', + 'APIEndpoint' +] diff --git a/core/api/plugin_api.py b/core/api/plugin_api.py new file mode 100644 index 0000000..c21e511 --- /dev/null +++ b/core/api/plugin_api.py @@ -0,0 +1,773 @@ +""" +EU-Utility - Plugin API +================ + +The PluginAPI provides plugins with access to core services and functionality. +This is the primary interface for plugin developers. + +Quick Start: +----------- +```python +from core.plugin_api import get_api + +class MyPlugin(BasePlugin): + def initialize(self): + self.api = get_api() + + # Access services + log_lines = self.api.read_log_lines(100) + window_info = self.api.get_eu_window() + + # Show notifications + self.api.show_notification("Hello", "Plugin started!") +``` + +Services Available: +------------------ +- Log Reader - Read game chat.log +- Window Manager - Get EU window info, focus control +- OCR Service - Screen text recognition +- Screenshot - Capture screen regions +- Nexus API - Item database queries +- HTTP Client - Web requests with caching +- Audio - Play sounds +- Notifications - Toast notifications +- Clipboard - Copy/paste operations +- Event Bus - Pub/sub events +- Data Store - Key-value storage +- Tasks - Background task execution + +For full documentation, see: docs/API_COOKBOOK.md +""" + +import json +import time +from pathlib import Path +from typing import Optional, Dict, List, Callable, Any, Tuple, Union +from functools import wraps +from datetime import datetime + +from core.logger import get_logger + +logger = get_logger(__name__) + + +class PluginAPIError(Exception): + """Base exception for PluginAPI errors.""" + pass + + +class ServiceNotAvailableError(PluginAPIError): + """Raised when a requested service is not available.""" + pass + + +class PluginAPI: + """ + PluginAPI - Core API for EU-Utility plugins. + + Provides access to all core services in a unified, documented interface. + Plugins should obtain the API via get_api() and store it for reuse. + + Thread Safety: + -------------- + Most methods are thread-safe. UI-related methods (notifications, etc.) + automatically marshal to the main thread via Qt signals. + + Example: + -------- + ```python + class MyPlugin(BasePlugin): + def initialize(self): + self.api = get_api() + + def on_event(self, event): + # Safe to call from any thread + self.api.show_notification("Event", str(event)) + ``` + """ + + # Service registry + _services: Dict[str, Any] = {} + _instance: Optional['PluginAPI'] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + # ========================================================================= + # SERVICE REGISTRATION (Internal Use) + # ========================================================================= + + def register_log_service(self, read_lines_func: Callable) -> None: + """Register the log reader service.""" + self._services['log_reader'] = read_lines_func + logger.debug("[PluginAPI] Log service registered") + + def register_window_service(self, window_manager) -> None: + """Register the window manager service.""" + self._services['window_manager'] = window_manager + logger.debug("[PluginAPI] Window service registered") + + def register_ocr_service(self, recognize_func: Callable) -> None: + """Register the OCR service.""" + self._services['ocr'] = recognize_func + logger.debug("[PluginAPI] OCR service registered") + + def register_screenshot_service(self, screenshot_service) -> None: + """Register the screenshot service.""" + self._services['screenshot'] = screenshot_service + logger.debug("[PluginAPI] Screenshot service registered") + + def register_nexus_service(self, nexus_api) -> None: + """Register the Nexus API service.""" + self._services['nexus'] = nexus_api + logger.debug("[PluginAPI] Nexus service registered") + + def register_http_service(self, http_client) -> None: + """Register the HTTP client service.""" + self._services['http'] = http_client + logger.debug("[PluginAPI] HTTP service registered") + + def register_audio_service(self, audio_manager) -> None: + """Register the audio service.""" + self._services['audio'] = audio_manager + logger.debug("[PluginAPI] Audio service registered") + + def register_notification_service(self, notification_manager) -> None: + """Register the notification service.""" + self._services['notification'] = notification_manager + logger.debug("[PluginAPI] Notification service registered") + + def register_clipboard_service(self, clipboard_manager) -> None: + """Register the clipboard service.""" + self._services['clipboard'] = clipboard_manager + logger.debug("[PluginAPI] Clipboard service registered") + + def register_event_bus(self, event_bus) -> None: + """Register the event bus service.""" + self._services['event_bus'] = event_bus + logger.debug("[PluginAPI] Event bus registered") + + def register_data_service(self, data_store) -> None: + """Register the data store service.""" + self._services['data_store'] = data_store + logger.debug("[PluginAPI] Data service registered") + + def register_task_service(self, task_manager) -> None: + """Register the task manager service.""" + self._services['tasks'] = task_manager + logger.debug("[PluginAPI] Task service registered") + + # ========================================================================= + # LOG READER API + # ========================================================================= + + def read_log_lines(self, count: int = 100) -> List[str]: + """ + Read recent lines from the game chat.log. + + Args: + count: Number of lines to read (default: 100) + + Returns: + List of log line strings + + Example: + >>> lines = api.read_log_lines(50) + >>> for line in lines: + ... if 'Loot:' in line: + ... print(line) + """ + service = self._services.get('log_reader') + if service: + return service(count) + return [] + + def read_log_since(self, timestamp: datetime) -> List[str]: + """ + Read log lines since a specific timestamp. + + Args: + timestamp: Datetime to read from + + Returns: + List of log lines after the timestamp + """ + lines = self.read_log_lines(1000) + result = [] + for line in lines: + # Parse timestamp from line (format varies) + try: + # Attempt to extract timestamp + if '[' in line and ']' in line: + result.append(line) + except: + continue + return result + + # ========================================================================= + # WINDOW MANAGER API + # ========================================================================= + + def get_eu_window(self) -> Optional[Dict[str, Any]]: + """ + Get information about the EU game window. + + Returns: + Dict with window info or None if not found: + { + 'title': str, + 'hwnd': int, + 'x': int, + 'y': int, + 'width': int, + 'height': int, + 'is_focused': bool, + 'is_visible': bool + } + + Example: + >>> window = api.get_eu_window() + >>> if window: + ... print(f"EU is at {window['x']}, {window['y']}") + """ + wm = self._services.get('window_manager') + if wm and wm.is_available(): + window = wm.find_eu_window() + if window: + return { + 'title': window.title, + 'hwnd': window.hwnd, + 'x': window.x, + 'y': window.y, + 'width': window.width, + 'height': window.height, + 'is_focused': window.is_focused(), + 'is_visible': window.is_visible() + } + return None + + def is_eu_focused(self) -> bool: + """ + Check if the EU window is currently focused. + + Returns: + True if EU is the active window + + Example: + >>> if api.is_eu_focused(): + ... api.play_sound("alert.wav") + """ + window = self.get_eu_window() + return window['is_focused'] if window else False + + def is_eu_visible(self) -> bool: + """ + Check if the EU window is visible. + + Returns: + True if EU window is visible (not minimized) + """ + window = self.get_eu_window() + return window['is_visible'] if window else False + + def bring_eu_to_front(self) -> bool: + """ + Bring the EU window to the foreground. + + Returns: + True if successful + + Warning: + Use sparingly - can be intrusive to user + """ + wm = self._services.get('window_manager') + if wm and wm.is_available(): + window = wm.find_eu_window() + if window: + return window.focus() + return False + + # ========================================================================= + # OCR API + # ========================================================================= + + def recognize_text(self, region: Optional[Tuple[int, int, int, int]] = None, + image_path: Optional[str] = None) -> str: + """ + Perform OCR on screen region or image. + + Args: + region: (x, y, width, height) tuple for screen region + image_path: Path to image file (alternative to region) + + Returns: + Recognized text string + + Raises: + ServiceNotAvailableError: If OCR service not initialized + + Example: + >>> # Read text from screen region + >>> text = api.recognize_text((100, 100, 200, 50)) + >>> print(f"Found: {text}") + """ + service = self._services.get('ocr') + if service: + return service(region=region, image_path=image_path) + raise ServiceNotAvailableError("OCR service not available") + + def ocr_available(self) -> bool: + """Check if OCR service is available.""" + return 'ocr' in self._services + + # ========================================================================= + # SCREENSHOT API + # ========================================================================= + + def capture_screen(self, region: Optional[Tuple[int, int, int, int]] = None, + save_path: Optional[str] = None) -> Optional[Any]: + """ + Capture a screenshot. + + Args: + region: (x, y, width, height) tuple for specific region + save_path: Optional path to save image + + Returns: + PIL Image object or None if failed + + Example: + >>> img = api.capture_screen((0, 0, 1920, 1080), "screenshot.png") + >>> if img: + ... print(f"Captured {img.size}") + """ + service = self._services.get('screenshot') + if service and service.is_available(): + return service.capture(region=region, save_path=save_path) + return None + + def screenshot_available(self) -> bool: + """Check if screenshot service is available.""" + service = self._services.get('screenshot') + return service.is_available() if service else False + + # ========================================================================= + # NEXUS API (Item Database) + # ========================================================================= + + def search_items(self, query: str, limit: int = 10) -> List[Dict]: + """ + Search for items in the Entropia Nexus database. + + Args: + query: Search query string + limit: Maximum results (default: 10) + + Returns: + List of item dictionaries + + Example: + >>> items = api.search_items("omegaton", limit=5) + >>> for item in items: + ... print(f"{item['Name']}: {item['Value']} PED") + """ + service = self._services.get('nexus') + if service: + try: + return service.search_items(query, limit=limit) + except Exception as e: + logger.error(f"[PluginAPI] Nexus search failed: {e}") + return [] + + def get_item_details(self, item_id: int) -> Optional[Dict]: + """ + Get detailed information about an item. + + Args: + item_id: Nexus item ID + + Returns: + Item details dict or None if not found + """ + service = self._services.get('nexus') + if service: + try: + return service.get_item(item_id) + except Exception as e: + logger.error(f"[PluginAPI] Get item failed: {e}") + return None + + # ========================================================================= + # HTTP CLIENT API + # ========================================================================= + + def http_get(self, url: str, cache: bool = True, + cache_duration: int = 3600) -> Dict[str, Any]: + """ + Perform HTTP GET request with optional caching. + + Args: + url: URL to fetch + cache: Whether to use cache (default: True) + cache_duration: Cache TTL in seconds (default: 1 hour) + + Returns: + Response dict: {'success': bool, 'data': Any, 'error': str} + + Example: + >>> result = api.http_get("https://api.example.com/data") + >>> if result['success']: + ... data = result['data'] + """ + service = self._services.get('http') + if service: + try: + return service.get(url, cache=cache, cache_duration=cache_duration) + except Exception as e: + return {'success': False, 'error': str(e)} + return {'success': False, 'error': 'HTTP service not available'} + + def http_post(self, url: str, data: Dict, + cache: bool = False) -> Dict[str, Any]: + """ + Perform HTTP POST request. + + Args: + url: URL to post to + data: POST data dictionary + cache: Whether to cache response + + Returns: + Response dict: {'success': bool, 'data': Any, 'error': str} + """ + service = self._services.get('http') + if service: + try: + return service.post(url, data=data, cache=cache) + except Exception as e: + return {'success': False, 'error': str(e)} + return {'success': False, 'error': 'HTTP service not available'} + + # ========================================================================= + # AUDIO API + # ========================================================================= + + def play_sound(self, sound_path: str, volume: float = 1.0) -> bool: + """ + Play an audio file. + + Args: + sound_path: Path to audio file (.wav, .mp3, etc.) + volume: Volume level 0.0 to 1.0 + + Returns: + True if playback started + + Example: + >>> api.play_sound("assets/sounds/alert.wav", volume=0.7) + """ + service = self._services.get('audio') + if service: + try: + service.play(sound_path, volume=volume) + return True + except Exception as e: + logger.error(f"[PluginAPI] Play sound failed: {e}") + return False + + def beep(self) -> bool: + """Play a simple beep sound.""" + service = self._services.get('audio') + if service: + try: + service.beep() + return True + except: + pass + return False + + # ========================================================================= + # NOTIFICATION API + # ========================================================================= + + def show_notification(self, title: str, message: str, + duration: int = 5000, + sound: bool = False) -> bool: + """ + Show a toast notification. + + Args: + title: Notification title + message: Notification body + duration: Duration in milliseconds (default: 5000) + sound: Play sound with notification + + Returns: + True if notification shown + + Example: + >>> api.show_notification("Loot Alert", + ... "Found something valuable!", + ... duration=3000, sound=True) + """ + service = self._services.get('notification') + if service: + try: + service.show(title, message, duration=duration, sound=sound) + return True + except Exception as e: + logger.error(f"[PluginAPI] Notification failed: {e}") + return False + + # ========================================================================= + # CLIPBOARD API + # ========================================================================= + + def copy_to_clipboard(self, text: str) -> bool: + """ + Copy text to system clipboard. + + Args: + text: Text to copy + + Returns: + True if successful + + Example: + >>> api.copy_to_clipboard("TT: 100 PED") + """ + service = self._services.get('clipboard') + if service: + try: + service.copy(text) + return True + except Exception as e: + logger.error(f"[PluginAPI] Clipboard copy failed: {e}") + return False + + def paste_from_clipboard(self) -> str: + """ + Get text from system clipboard. + + Returns: + Clipboard text or empty string + """ + service = self._services.get('clipboard') + if service: + try: + return service.paste() + except: + pass + return "" + + # ========================================================================= + # EVENT BUS API (Pub/Sub) + # ========================================================================= + + def subscribe(self, event_type: str, callback: Callable) -> str: + """ + Subscribe to an event type. + + Args: + event_type: Event type string (e.g., "loot", "skill_gain") + callback: Function to call when event occurs + + Returns: + Subscription ID (use to unsubscribe) + + Example: + >>> def on_loot(event): + ... print(f"Got loot: {event.data}") + >>> sub_id = api.subscribe("loot", on_loot) + """ + service = self._services.get('event_bus') + if service: + try: + return service.subscribe(event_type, callback) + except Exception as e: + logger.error(f"[PluginAPI] Subscribe failed: {e}") + return "" + + def unsubscribe(self, subscription_id: str) -> bool: + """ + Unsubscribe from events. + + Args: + subscription_id: ID returned by subscribe() + + Returns: + True if unsubscribed + """ + service = self._services.get('event_bus') + if service: + try: + service.unsubscribe(subscription_id) + return True + except Exception as e: + logger.error(f"[PluginAPI] Unsubscribe failed: {e}") + return False + + def publish(self, event_type: str, data: Any) -> bool: + """ + Publish an event. + + Args: + event_type: Event type string + data: Event data (any type) + + Returns: + True if published + + Example: + >>> api.publish("my_plugin.event", {"key": "value"}) + """ + service = self._services.get('event_bus') + if service: + try: + service.publish(event_type, data) + return True + except Exception as e: + logger.error(f"[PluginAPI] Publish failed: {e}") + return False + + # ========================================================================= + # DATA STORE API (Key-Value Storage) + # ========================================================================= + + def get_data(self, key: str, default: Any = None) -> Any: + """ + Get data from plugin data store. + + Args: + key: Data key + default: Default value if key not found + + Returns: + Stored value or default + + Example: + >>> count = api.get_data("kill_count", 0) + >>> api.set_data("kill_count", count + 1) + """ + service = self._services.get('data_store') + if service: + try: + return service.get(key, default) + except: + pass + return default + + def set_data(self, key: str, value: Any) -> bool: + """ + Store data in plugin data store. + + Args: + key: Data key + value: Value to store (must be JSON serializable) + + Returns: + True if stored + """ + service = self._services.get('data_store') + if service: + try: + service.set(key, value) + return True + except Exception as e: + logger.error(f"[PluginAPI] Set data failed: {e}") + return False + + def delete_data(self, key: str) -> bool: + """Delete data from store.""" + service = self._services.get('data_store') + if service: + try: + service.delete(key) + return True + except: + pass + return False + + # ========================================================================= + # TASK API (Background Execution) + # ========================================================================= + + def run_task(self, task_func: Callable, *args, + callback: Optional[Callable] = None, + error_handler: Optional[Callable] = None) -> str: + """ + Run a function in background thread. + + Args: + task_func: Function to execute + *args: Arguments for function + callback: Called with result on success + error_handler: Called with exception on error + + Returns: + Task ID + + Example: + >>> def heavy_work(data): + ... return process(data) + >>> + >>> def on_done(result): + ... print(f"Done: {result}") + >>> + >>> task_id = api.run_task(heavy_work, my_data, callback=on_done) + """ + service = self._services.get('tasks') + if service: + try: + return service.submit(task_func, *args, + callback=callback, + error_handler=error_handler) + except Exception as e: + logger.error(f"[PluginAPI] Run task failed: {e}") + return "" + + def cancel_task(self, task_id: str) -> bool: + """Cancel a running task.""" + service = self._services.get('tasks') + if service: + try: + return service.cancel(task_id) + except: + pass + return False + + +# Global API instance +_api_instance: Optional[PluginAPI] = None + + +def get_api() -> PluginAPI: + """ + Get the global PluginAPI instance. + + This is the entry point for all plugin API access. + + Returns: + PluginAPI singleton instance + + Example: + >>> from core.plugin_api import get_api + >>> api = get_api() + >>> api.show_notification("Hello", "World!") + """ + global _api_instance + if _api_instance is None: + _api_instance = PluginAPI() + return _api_instance + + +# Convenience exports +__all__ = [ + 'PluginAPI', + 'get_api', + 'PluginAPIError', + 'ServiceNotAvailableError' +] diff --git a/core/api/widget_api.py b/core/api/widget_api.py new file mode 100644 index 0000000..be64911 --- /dev/null +++ b/core/api/widget_api.py @@ -0,0 +1,906 @@ +""" +EU-Utility - Widget API +================ + +The WidgetAPI provides a comprehensive interface for creating, managing, +and interacting with overlay widgets. + +Widgets are floating, draggable UI components that appear over the game. +They can display real-time data, controls, or mini-versions of plugins. + +Quick Start: +----------- +```python +from core.api.widget_api import get_widget_api + +class MyPlugin(BasePlugin): + def initialize(self): + self.widget_api = get_widget_api() + + def create_widget(self): + widget = self.widget_api.create_widget( + name="my_widget", + title="My Widget", + size=(300, 200) + ) + widget.show() +``` + +Widget Types: +------------ +- MiniWidget - Small info display +- ControlWidget - Interactive controls +- ChartWidget - Data visualization +- AlertWidget - Notifications/overlays + +For full documentation, see: docs/WIDGET_API.md +""" + +import json +from pathlib import Path +from typing import Optional, Dict, List, Callable, Any, Tuple, Union +from dataclasses import dataclass, asdict +from enum import Enum +from datetime import datetime + +from core.logger import get_logger + +logger = get_logger(__name__) + + +class WidgetType(Enum): + """Types of overlay widgets.""" + MINI = "mini" # Small info display + CONTROL = "control" # Interactive controls + CHART = "chart" # Data visualization + ALERT = "alert" # Notification overlay + CUSTOM = "custom" # Custom widget + + +class WidgetAnchor(Enum): + """Widget anchor positions.""" + TOP_LEFT = "top_left" + TOP_CENTER = "top_center" + TOP_RIGHT = "top_right" + CENTER_LEFT = "center_left" + CENTER = "center" + CENTER_RIGHT = "center_right" + BOTTOM_LEFT = "bottom_left" + BOTTOM_CENTER = "bottom_center" + BOTTOM_RIGHT = "bottom_right" + + +@dataclass +class WidgetConfig: + """Configuration for a widget.""" + name: str + title: str + widget_type: WidgetType = WidgetType.MINI + size: Tuple[int, int] = (300, 200) + position: Tuple[int, int] = (100, 100) + anchor: WidgetAnchor = WidgetAnchor.TOP_LEFT + opacity: float = 0.95 + always_on_top: bool = True + locked: bool = False # If True, cannot be moved + resizable: bool = True + minimizable: bool = True + closable: bool = True + show_in_taskbar: bool = False + snap_to_grid: bool = False + grid_size: int = 10 + + def to_dict(self) -> Dict: + """Convert to dictionary.""" + return { + 'name': self.name, + 'title': self.title, + 'widget_type': self.widget_type.value, + 'size': self.size, + 'position': self.position, + 'anchor': self.anchor.value, + 'opacity': self.opacity, + 'always_on_top': self.always_on_top, + 'locked': self.locked, + 'resizable': self.resizable, + 'minimizable': self.minimizable, + 'closable': self.closable, + 'show_in_taskbar': self.show_in_taskbar, + 'snap_to_grid': self.snap_to_grid, + 'grid_size': self.grid_size + } + + @classmethod + def from_dict(cls, data: Dict) -> 'WidgetConfig': + """Create from dictionary.""" + return cls( + name=data.get('name', 'unnamed'), + title=data.get('title', 'Widget'), + widget_type=WidgetType(data.get('widget_type', 'mini')), + size=tuple(data.get('size', [300, 200])), + position=tuple(data.get('position', [100, 100])), + anchor=WidgetAnchor(data.get('anchor', 'top_left')), + opacity=data.get('opacity', 0.95), + always_on_top=data.get('always_on_top', True), + locked=data.get('locked', False), + resizable=data.get('resizable', True), + minimizable=data.get('minimizable', True), + closable=data.get('closable', True), + show_in_taskbar=data.get('show_in_taskbar', False), + snap_to_grid=data.get('snap_to_grid', False), + grid_size=data.get('grid_size', 10) + ) + + +class Widget: + """ + Widget instance - a floating overlay window. + + This is a wrapper around the actual QWidget that provides + a clean API for plugin developers. + + Example: + -------- + ```python + widget = widget_api.create_widget( + name="loot_tracker", + title="Loot Tracker", + size=(400, 300) + ) + + # Set content + widget.set_content(my_widget_content) + + # Show widget + widget.show() + + # Update position + widget.move(500, 200) + ``` + """ + + def __init__(self, config: WidgetConfig, qt_widget=None): + self.config = config + self._qt_widget = qt_widget + self._content = None + self._callbacks: Dict[str, List[Callable]] = {} + self._created_at = datetime.now() + self._last_moved = None + + @property + def name(self) -> str: + """Widget name (unique identifier).""" + return self.config.name + + @property + def title(self) -> str: + """Widget title (displayed in header).""" + return self.config.title + + @property + def visible(self) -> bool: + """Whether widget is currently visible.""" + if self._qt_widget: + return self._qt_widget.isVisible() + return False + + @property + def position(self) -> Tuple[int, int]: + """Current widget position (x, y).""" + if self._qt_widget: + pos = self._qt_widget.pos() + return (pos.x(), pos.y()) + return self.config.position + + @property + def size(self) -> Tuple[int, int]: + """Current widget size (width, height).""" + if self._qt_widget: + sz = self._qt_widget.size() + return (sz.width(), sz.height()) + return self.config.size + + # ===================================================================== + # Widget Operations + # ===================================================================== + + def show(self) -> None: + """Show the widget.""" + if self._qt_widget: + self._qt_widget.show() + logger.debug(f"[Widget] Showed: {self.name}") + + def hide(self) -> None: + """Hide the widget.""" + if self._qt_widget: + self._qt_widget.hide() + logger.debug(f"[Widget] Hid: {self.name}") + + def close(self) -> bool: + """ + Close and destroy the widget. + + Returns: + True if closed successfully + """ + if self._qt_widget: + self._trigger_callback('closing') + self._qt_widget.close() + self._qt_widget.deleteLater() + self._qt_widget = None + self._trigger_callback('closed') + logger.debug(f"[Widget] Closed: {self.name}") + return True + return False + + def move(self, x: int, y: int) -> None: + """ + Move widget to position. + + Args: + x: X coordinate + y: Y coordinate + """ + if self._qt_widget: + # Apply grid snapping if enabled + if self.config.snap_to_grid: + x = round(x / self.config.grid_size) * self.config.grid_size + y = round(y / self.config.grid_size) * self.config.grid_size + + self._qt_widget.move(x, y) + self.config.position = (x, y) + self._last_moved = datetime.now() + self._trigger_callback('moved', {'x': x, 'y': y}) + + def resize(self, width: int, height: int) -> None: + """ + Resize widget. + + Args: + width: New width + height: New height + """ + if self._qt_widget: + self._qt_widget.resize(width, height) + self.config.size = (width, height) + self._trigger_callback('resized', {'width': width, 'height': height}) + + def set_opacity(self, opacity: float) -> None: + """ + Set widget opacity. + + Args: + opacity: 0.0 to 1.0 + """ + opacity = max(0.0, min(1.0, opacity)) + self.config.opacity = opacity + + if self._qt_widget: + self._qt_widget.setWindowOpacity(opacity) + + def set_title(self, title: str) -> None: + """Update widget title.""" + self.config.title = title + if self._qt_widget: + self._qt_widget.setWindowTitle(title) + + def set_locked(self, locked: bool) -> None: + """ + Lock/unlock widget position. + + When locked, the widget cannot be moved by dragging. + """ + self.config.locked = locked + if self._qt_widget: + # Update internal flag + setattr(self._qt_widget, '_locked', locked) + + def minimize(self) -> None: + """Minimize widget.""" + if self._qt_widget and self.config.minimizable: + self._qt_widget.showMinimized() + + def maximize(self) -> None: + """Maximize widget.""" + if self._qt_widget: + self._qt_widget.showMaximized() + + def restore(self) -> None: + """Restore from minimized/maximized state.""" + if self._qt_widget: + self._qt_widget.showNormal() + + def raise_widget(self) -> None: + """Bring widget to front.""" + if self._qt_widget: + self._qt_widget.raise_() + self._qt_widget.activateWindow() + + def lower_widget(self) -> None: + """Send widget to back.""" + if self._qt_widget: + self._qt_widget.lower() + + # ===================================================================== + # Content Management + # ===================================================================== + + def set_content(self, widget) -> None: + """ + Set the main content widget. + + Args: + widget: QWidget to display inside this widget + """ + self._content = widget + if self._qt_widget: + # Assuming the qt_widget has a content area + if hasattr(self._qt_widget, 'set_content'): + self._qt_widget.set_content(widget) + + def get_content(self) -> Optional[Any]: + """Get the current content widget.""" + return self._content + + def update_content(self, data: Any) -> None: + """ + Update widget content with new data. + + This triggers the 'update' callback. + + Args: + data: New data to display + """ + self._trigger_callback('update', data) + + def flash(self, duration_ms: int = 1000, color: str = "#ff8c42") -> None: + """ + Flash widget to draw attention. + + Args: + duration_ms: Flash duration + color: Flash color (hex) + """ + if self._qt_widget and hasattr(self._qt_widget, 'flash'): + self._qt_widget.flash(duration_ms, color) + + # ===================================================================== + # Event Handling + # ===================================================================== + + def on(self, event: str, callback: Callable) -> None: + """ + Register event callback. + + Events: + - 'moved': Widget was moved (data: {'x': int, 'y': int}) + - 'resized': Widget was resized (data: {'width': int, 'height': int}) + - 'closing': Widget is about to close + - 'closed': Widget was closed + - 'update': Content should update (data: new data) + - 'focus': Widget gained focus + - 'blur': Widget lost focus + + Args: + event: Event name + callback: Function to call + """ + if event not in self._callbacks: + self._callbacks[event] = [] + self._callbacks[event].append(callback) + + def off(self, event: str, callback: Callable = None) -> None: + """ + Unregister event callback. + + Args: + event: Event name + callback: Specific callback to remove (if None, removes all) + """ + if event in self._callbacks: + if callback: + self._callbacks[event] = [ + cb for cb in self._callbacks[event] if cb != callback + ] + else: + del self._callbacks[event] + + def _trigger_callback(self, event: str, data: Any = None) -> None: + """Internal: Trigger event callbacks.""" + if event in self._callbacks: + for callback in self._callbacks[event]: + try: + if data is not None: + callback(data) + else: + callback() + except Exception as e: + logger.error(f"[Widget] Callback error for {event}: {e}") + + # ===================================================================== + # Persistence + # ===================================================================== + + def save_state(self) -> Dict: + """ + Save widget state to dictionary. + + Returns: + State dictionary for persistence + """ + return { + 'config': self.config.to_dict(), + 'position': self.position, + 'size': self.size, + 'visible': self.visible, + 'created_at': self._created_at.isoformat(), + 'last_moved': self._last_moved.isoformat() if self._last_moved else None + } + + def load_state(self, state: Dict) -> None: + """ + Restore widget state from dictionary. + + Args: + state: State dictionary from save_state() + """ + if 'config' in state: + self.config = WidgetConfig.from_dict(state['config']) + + if 'position' in state: + x, y = state['position'] + self.move(x, y) + + if 'size' in state: + width, height = state['size'] + self.resize(width, height) + + if state.get('visible'): + self.show() + + +class WidgetAPI: + """ + WidgetAPI - API for widget management. + + Provides methods to create, manage, and interact with overlay widgets. + + Example: + -------- + ```python + from core.api.widget_api import get_widget_api + + api = get_widget_api() + + # Create a widget + widget = api.create_widget( + name="loot_counter", + title="Loot Counter", + size=(250, 150) + ) + + # Configure it + widget.set_opacity(0.9) + widget.move(100, 100) + + # Show it + widget.show() + + # Later... + api.hide_all_widgets() # Hide all + api.show_widget("loot_counter") # Show specific one + ``` + """ + + _instance: Optional['WidgetAPI'] = None + _widgets: Dict[str, Widget] = {} + _widget_factory: Optional[Callable] = None + _presets: Dict[str, WidgetConfig] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + # ===================================================================== + # Widget Creation + # ===================================================================== + + def create_widget(self, name: str, title: str = None, + size: Tuple[int, int] = (300, 200), + position: Tuple[int, int] = (100, 100), + widget_type: WidgetType = WidgetType.MINI, + **kwargs) -> Widget: + """ + Create a new overlay widget. + + Args: + name: Unique widget identifier + title: Display title (default: same as name) + size: (width, height) tuple + position: (x, y) tuple + widget_type: Type of widget + **kwargs: Additional config options + + Returns: + Widget instance + + Raises: + ValueError: If widget name already exists + + Example: + >>> widget = api.create_widget( + ... name="my_tracker", + ... title="My Tracker", + ... size=(400, 300), + ... opacity=0.9 + ... ) + """ + if name in self._widgets: + raise ValueError(f"Widget '{name}' already exists") + + config = WidgetConfig( + name=name, + title=title or name.replace('_', ' ').title(), + widget_type=widget_type, + size=size, + position=position, + **{k: v for k, v in kwargs.items() + if k in WidgetConfig.__dataclass_fields__} + ) + + # Create actual Qt widget via factory + qt_widget = None + if self._widget_factory: + qt_widget = self._widget_factory(config) + + widget = Widget(config, qt_widget) + self._widgets[name] = widget + + logger.info(f"[WidgetAPI] Created widget: {name}") + return widget + + def create_from_preset(self, preset_name: str, + name: str = None) -> Optional[Widget]: + """ + Create widget from a preset configuration. + + Args: + preset_name: Name of preset to use + name: Override widget name (optional) + + Returns: + Widget instance or None if preset not found + + Example: + >>> widget = api.create_from_preset("loot_tracker", "my_loot") + """ + if preset_name not in self._presets: + logger.error(f"[WidgetAPI] Preset not found: {preset_name}") + return None + + preset = self._presets[preset_name] + widget_name = name or f"{preset_name}_{len(self._widgets)}" + + config = WidgetConfig( + name=widget_name, + title=preset.title, + widget_type=preset.widget_type, + size=preset.size, + position=preset.position, + **{k: getattr(preset, k) for k in ['opacity', 'always_on_top', + 'locked', 'resizable', 'minimizable', 'closable']} + ) + + qt_widget = None + if self._widget_factory: + qt_widget = self._widget_factory(config) + + widget = Widget(config, qt_widget) + self._widgets[widget_name] = widget + + logger.info(f"[WidgetAPI] Created widget from preset: {preset_name}") + return widget + + def register_preset(self, name: str, config: WidgetConfig) -> None: + """ + Register a widget preset for reuse. + + Args: + name: Preset name + config: WidgetConfig to use as template + """ + self._presets[name] = config + logger.debug(f"[WidgetAPI] Registered preset: {name}") + + # ===================================================================== + # Widget Access + # ===================================================================== + + def get_widget(self, name: str) -> Optional[Widget]: + """ + Get widget by name. + + Args: + name: Widget name + + Returns: + Widget instance or None if not found + """ + return self._widgets.get(name) + + def get_all_widgets(self) -> List[Widget]: + """Get all widgets.""" + return list(self._widgets.values()) + + def get_visible_widgets(self) -> List[Widget]: + """Get all visible widgets.""" + return [w for w in self._widgets.values() if w.visible] + + def widget_exists(self, name: str) -> bool: + """Check if widget exists.""" + return name in self._widgets + + # ===================================================================== + # Widget Management + # ===================================================================== + + def show_widget(self, name: str) -> bool: + """ + Show a specific widget. + + Args: + name: Widget name + + Returns: + True if successful + """ + widget = self._widgets.get(name) + if widget: + widget.show() + return True + return False + + def hide_widget(self, name: str) -> bool: + """ + Hide a specific widget. + + Args: + name: Widget name + + Returns: + True if successful + """ + widget = self._widgets.get(name) + if widget: + widget.hide() + return True + return False + + def close_widget(self, name: str) -> bool: + """ + Close and destroy a widget. + + Args: + name: Widget name + + Returns: + True if closed + """ + widget = self._widgets.get(name) + if widget: + widget.close() + del self._widgets[name] + return True + return False + + def show_all_widgets(self) -> None: + """Show all widgets.""" + for widget in self._widgets.values(): + widget.show() + + def hide_all_widgets(self) -> None: + """Hide all widgets.""" + for widget in self._widgets.values(): + widget.hide() + + def close_all_widgets(self) -> None: + """Close all widgets.""" + for widget in list(self._widgets.values()): + widget.close() + self._widgets.clear() + + def minimize_all(self) -> None: + """Minimize all widgets.""" + for widget in self._widgets.values(): + widget.minimize() + + def restore_all(self) -> None: + """Restore all minimized widgets.""" + for widget in self._widgets.values(): + widget.restore() + + def set_all_opacity(self, opacity: float) -> None: + """ + Set opacity for all widgets. + + Args: + opacity: 0.0 to 1.0 + """ + for widget in self._widgets.values(): + widget.set_opacity(opacity) + + def lock_all(self) -> None: + """Lock all widgets (prevent moving).""" + for widget in self._widgets.values(): + widget.set_locked(True) + + def unlock_all(self) -> None: + """Unlock all widgets.""" + for widget in self._widgets.values(): + widget.set_locked(False) + + # ===================================================================== + # Layout Helpers + # ===================================================================== + + def arrange_widgets(self, layout: str = "grid", + spacing: int = 10) -> None: + """ + Automatically arrange visible widgets. + + Args: + layout: 'grid', 'horizontal', 'vertical', 'cascade' + spacing: Space between widgets + """ + visible = self.get_visible_widgets() + if not visible: + return + + if layout == "grid": + self._arrange_grid(visible, spacing) + elif layout == "horizontal": + self._arrange_horizontal(visible, spacing) + elif layout == "vertical": + self._arrange_vertical(visible, spacing) + elif layout == "cascade": + self._arrange_cascade(visible, spacing) + + def _arrange_grid(self, widgets: List[Widget], spacing: int) -> None: + """Arrange widgets in a grid.""" + import math + cols = math.ceil(math.sqrt(len(widgets))) + x, y = 100, 100 + + for i, widget in enumerate(widgets): + col = i % cols + row = i // cols + + widget_x = x + col * (300 + spacing) + widget_y = y + row * (200 + spacing) + widget.move(widget_x, widget_y) + + def _arrange_horizontal(self, widgets: List[Widget], spacing: int) -> None: + """Arrange widgets horizontally.""" + x, y = 100, 100 + for widget in widgets: + widget.move(x, y) + x += widget.size[0] + spacing + + def _arrange_vertical(self, widgets: List[Widget], spacing: int) -> None: + """Arrange widgets vertically.""" + x, y = 100, 100 + for widget in widgets: + widget.move(x, y) + y += widget.size[1] + spacing + + def _arrange_cascade(self, widgets: List[Widget], spacing: int) -> None: + """Arrange widgets in cascade.""" + x, y = 100, 100 + for widget in widgets: + widget.move(x, y) + x += spacing + y += spacing + + def snap_to_grid(self, grid_size: int = 10) -> None: + """ + Snap all widgets to grid. + + Args: + grid_size: Grid size in pixels + """ + for widget in self._widgets.values(): + x, y = widget.position + x = round(x / grid_size) * grid_size + y = round(y / grid_size) * grid_size + widget.move(x, y) + + # ===================================================================== + # Persistence + # ===================================================================== + + def save_all_states(self, filepath: str = None) -> Dict: + """ + Save all widget states. + + Args: + filepath: Optional file to save to + + Returns: + State dictionary + """ + states = { + name: widget.save_state() + for name, widget in self._widgets.items() + } + + if filepath: + Path(filepath).parent.mkdir(parents=True, exist_ok=True) + with open(filepath, 'w') as f: + json.dump(states, f, indent=2) + + return states + + def load_all_states(self, data: Union[str, Dict]) -> None: + """ + Load widget states. + + Args: + data: File path or state dictionary + """ + if isinstance(data, str): + with open(data, 'r') as f: + states = json.load(f) + else: + states = data + + for name, state in states.items(): + if name in self._widgets: + self._widgets[name].load_state(state) + + # ===================================================================== + # Internal + # ===================================================================== + + def _set_widget_factory(self, factory: Callable) -> None: + """ + Set the factory function for creating Qt widgets. + + Internal use only - called by core system. + + Args: + factory: Function that takes WidgetConfig and returns QWidget + """ + self._widget_factory = factory + + +# Global instance +_widget_api_instance: Optional[WidgetAPI] = None + + +def get_widget_api() -> WidgetAPI: + """ + Get the global WidgetAPI instance. + + Returns: + WidgetAPI singleton + + Example: + >>> from core.api.widget_api import get_widget_api + >>> api = get_widget_api() + >>> widget = api.create_widget("my_widget", "My Widget") + """ + global _widget_api_instance + if _widget_api_instance is None: + _widget_api_instance = WidgetAPI() + return _widget_api_instance + + +# Convenience exports +__all__ = [ + 'WidgetAPI', + 'get_widget_api', + 'Widget', + 'WidgetConfig', + 'WidgetType', + 'WidgetAnchor' +] diff --git a/core/main.py b/core/main.py index 966066c..4df74f9 100644 --- a/core/main.py +++ b/core/main.py @@ -36,7 +36,7 @@ from core.overlay_window import OverlayWindow from core.floating_icon import FloatingIcon from core.settings import get_settings from core.overlay_widgets import OverlayManager -from core.plugin_api import get_api, APIType +from core.api import get_api, get_widget_api, get_external_api from core.log_reader import get_log_reader from core.ocr_service import get_ocr_service from core.screenshot import get_screenshot_service diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md index d068f4c..75a5c40 100644 --- a/docs/API_REFERENCE.md +++ b/docs/API_REFERENCE.md @@ -1,1301 +1,632 @@ # EU-Utility API Reference -> Complete API reference for EU-Utility core services and plugin development -> -> **Version:** 2.0 -> **Last Updated:** 2025-02-14 +**Version:** 2.2.0 +**Last Updated:** 2026-02-15 --- -## Table of Contents +## Overview -1. [Core Services Overview](#core-services-overview) -2. [PluginAPI](#pluginapi) -3. [BasePlugin](#baseplugin) -4. [Event Bus](#event-bus) -5. [Task Manager](#task-manager) -6. [Nexus API](#nexus-api) -7. [Settings](#settings) -8. [Data Store](#data-store) -9. [Audio Manager](#audio-manager) -10. [HTTP Client](#http-client) -11. [OCR Service](#ocr-service) -12. [Screenshot Service](#screenshot-service) -13. [Log Reader](#log-reader) -14. [Window Manager](#window-manager) -15. [Notification Manager](#notification-manager) -16. [Clipboard Manager](#clipboard-manager) +EU-Utility provides a comprehensive three-tier API architecture: ---- - -## Core Services Overview - -EU-Utility provides a comprehensive set of core services accessible through the PluginAPI: - -``` -┌─────────────────────────────────────────────────────────────┐ -│ PluginAPI (Singleton) │ -├─────────────────────────────────────────────────────────────┤ -│ OCR Service │ Screenshot │ Log Reader │ Nexus API │ -├─────────────────────────────────────────────────────────────┤ -│ Task Manager │ Event Bus │ Audio │ HTTP Client│ -├─────────────────────────────────────────────────────────────┤ -│ Window Manager │ Data Store │ Clipboard │ Settings │ -└─────────────────────────────────────────────────────────────┘ -``` - -### Accessing Services - -```python -from core.plugin_api import get_api - -api = get_api() - -# Use services -result = api.ocr_capture() -``` +| API | Purpose | Audience | +|-----|---------|----------| +| **PluginAPI** | Access core services | Plugin developers | +| **WidgetAPI** | Create overlay widgets | Widget developers | +| **ExternalAPI** | Third-party integrations | External apps, bots | --- ## PluginAPI -### Class: `PluginAPI` +The PluginAPI provides access to all core EU-Utility services. -Central API registry and shared services. - -#### Singleton Access +### Getting Started ```python -from core.plugin_api import get_api +from core.api import get_api -api = get_api() +class MyPlugin(BasePlugin): + def initialize(self): + self.api = get_api() + + def on_event(self, event): + self.api.show_notification("Event", str(event)) ``` -#### API Registration +### Services Available +#### Log Reader ```python -from core.plugin_api import APIEndpoint, APIType +# Read recent log lines +lines = api.read_log_lines(100) -# Register an API endpoint -endpoint = APIEndpoint( - name="scan_window", - api_type=APIType.OCR, - description="Scan game window and return text", - handler=self.scan_window, - plugin_id=self._plugin_id, - version="1.0.0" -) - -success = api.register_api(endpoint) - -# Unregister -api.unregister_api(plugin_id, name) +# Read since timestamp +recent = api.read_log_since(datetime.now() - timedelta(minutes=5)) ``` -#### Calling APIs - +#### Window Manager ```python -# Call another plugin's API -result = api.call_api("plugin_id", "api_name", arg1, arg2) +# Get EU window info +window = api.get_eu_window() +if window: + print(f"EU at {window['x']}, {window['y']}") + print(f"Size: {window['width']}x{window['height']}") -# Find available APIs -ocr_apis = api.find_apis(APIType.OCR) +# Check focus +if api.is_eu_focused(): + api.play_sound("alert.wav") + +# Bring EU to front +api.bring_eu_to_front() ``` #### OCR Service - ```python -# Capture and OCR -result = api.ocr_capture(region=(x, y, width, height)) -# Returns: {'text': str, 'confidence': float, 'raw_results': list} +# Check availability +if api.ocr_available(): + # Read text from screen region + text = api.recognize_text((100, 100, 200, 50)) + print(f"Found: {text}") ``` -#### Screenshot Service - +#### Screenshot ```python # Capture screen -image = api.capture_screen(full_screen=True) - -# Capture region -image = api.capture_region(x, y, width, height) - -# Get last screenshot -image = api.get_last_screenshot() - -# Save screenshot -path = api.save_screenshot(image, filename="screenshot.png") -``` - -#### Log Service - -```python -# Read log lines -lines = api.read_log(lines=50, filter_text="loot") -# Returns: List[str] -``` - -#### Shared Data - -```python -# Set data -api.set_data("key", value) - -# Get data -value = api.get_data("key", default=None) -``` - -#### Audio Service - -```python -# Play sounds -api.play_sound("hof") # Predefined -api.play_sound("/path/to/custom.wav") # Custom file -api.play_sound("alert", blocking=True) # Wait for completion - -# Volume control -api.set_volume(0.8) # 0.0 to 1.0 -volume = api.get_volume() # Get current volume - -# Mute -api.mute_audio() -api.unmute_audio() -is_muted = api.is_audio_muted() +img = api.capture_screen((0, 0, 1920, 1080), "screenshot.png") # Check availability -if api.is_audio_available(): - api.play_sound("skill_gain") +if api.screenshot_available(): + img = api.capture_screen() ``` -#### Background Tasks +#### Nexus API (Item Database) +```python +# Search items +items = api.search_items("omegaton", limit=5) +for item in items: + print(f"{item['Name']}: {item['Value']} PED") +# Get item details +details = api.get_item_details(12345) +``` + +#### HTTP Client +```python +# GET request with caching +result = api.http_get("https://api.example.com/data", cache=True) +if result['success']: + data = result['data'] + +# POST request +result = api.http_post("https://api.example.com/save", {"key": "value"}) +``` + +#### Audio +```python +# Play sound +api.play_sound("assets/sounds/alert.wav", volume=0.7) + +# Simple beep +api.beep() +``` + +#### Notifications +```python +# Show toast +api.show_notification( + title="Loot Alert", + message="Found something valuable!", + duration=3000, + sound=True +) +``` + +#### Clipboard +```python +# Copy +api.copy_to_clipboard("TT: 100 PED") + +# Paste +text = api.paste_from_clipboard() +``` + +#### Event Bus (Pub/Sub) +```python +# Subscribe to events +def on_loot(event): + print(f"Loot: {event.data}") + +sub_id = api.subscribe("loot", on_loot) + +# Publish event +api.publish("my_plugin.event", {"key": "value"}) + +# Unsubscribe +api.unsubscribe(sub_id) +``` + +#### Data Store +```python +# Store data +api.set_data("kill_count", 100) + +# Retrieve data +count = api.get_data("kill_count", default=0) + +# Delete +api.delete_data("kill_count") +``` + +#### Task Manager ```python # Run in background -task_id = api.run_in_background( - func=my_function, - arg1, arg2, - priority='normal', # 'high', 'normal', 'low' - on_complete=on_done, # Callback with result - on_error=on_error # Callback with exception -) +def heavy_work(data): + return process(data) -# Schedule task -task_id = api.schedule_task( - delay_ms=5000, # 5 seconds - func=my_function, - priority='normal', - on_complete=on_done -) +def on_done(result): + print(f"Done: {result}") -# Periodic task -task_id = api.schedule_task( - delay_ms=0, - func=refresh_data, - periodic=True, - interval_ms=30000, # Every 30 seconds - on_complete=update_ui +task_id = api.run_task( + heavy_work, + my_data, + callback=on_done ) # Cancel task api.cancel_task(task_id) - -# Get status -status = api.get_task_status(task_id) # 'pending', 'running', 'completed', 'failed', 'cancelled' - -# Wait for completion -completed = api.wait_for_task(task_id, timeout=10.0) - -# Connect signals -api.connect_task_signal('completed', on_task_complete) -api.connect_task_signal('failed', on_task_error) -``` - -#### Nexus API - -```python -# Search -results = api.nexus_search("ArMatrix", entity_type="items", limit=20) -results = api.nexus_search("Atrox", entity_type="mobs") - -# Get details -details = api.nexus_get_item_details("armatrix_lp-35") - -# Get market data -market = api.nexus_get_market_data("armatrix_lp-35") - -# Check availability -if api.nexus_is_available(): - results = api.nexus_search("query") -``` - -#### Event Bus (Typed) - -```python -from core.event_bus import SkillGainEvent, LootEvent - -# Publish event -api.publish_typed(SkillGainEvent( - skill_name="Rifle", - skill_value=25.5, - gain_amount=0.01 -)) - -# Publish synchronously -count = api.publish_typed_sync(event) - -# Subscribe -sub_id = api.subscribe_typed( - SkillGainEvent, - callback_function, - min_damage=100, # Filter options - replay_last=10 # Replay recent events -) - -# Unsubscribe -api.unsubscribe_typed(sub_id) - -# Get recent events -recent = api.get_recent_events(SkillGainEvent, count=50) - -# Get stats -stats = api.get_event_stats() -``` - -#### Utility Methods - -```python -# Currency formatting -api.format_ped(123.45) # "123.45 PED" -api.format_pec(50) # "50 PEC" - -# Calculations -api.calculate_dpp(damage=45.0, ammo=100, decay=2.5) -api.calculate_markup(price=150.0, tt=100.0) # 150.0% ``` --- -## BasePlugin +## WidgetAPI -### Class: `BasePlugin` +The WidgetAPI manages overlay widgets - floating UI components. -Abstract base class for all plugins. - -#### Attributes - -| Attribute | Type | Required | Description | -|-----------|------|----------|-------------| -| `name` | str | Yes | Display name | -| `version` | str | Yes | Version string | -| `author` | str | Yes | Creator name | -| `description` | str | Yes | Short description | -| `hotkey` | str | No | Global hotkey | -| `enabled` | bool | No | Start enabled | -| `icon` | str | No | Icon path/emoji | - -#### Constructor +### Getting Started ```python -def __init__(self, overlay_window: OverlayWindow, config: Dict[str, Any]) +from core.api import get_widget_api + +widget_api = get_widget_api() + +# Create widget +widget = widget_api.create_widget( + name="loot_tracker", + title="Loot Tracker", + size=(400, 300), + position=(100, 100) +) + +widget.show() ``` -#### Abstract Methods +### Widget Configuration ```python -def initialize(self) -> None: - """Called when plugin is loaded. Setup API connections.""" - pass +from core.api import WidgetConfig, WidgetType -def get_ui(self) -> Any: - """Return the plugin's UI widget (QWidget).""" - return None +config = WidgetConfig( + name="my_widget", + title="My Widget", + widget_type=WidgetType.MINI, + size=(300, 200), + position=(100, 100), + opacity=0.95, + always_on_top=True, + locked=False, + resizable=True +) ``` -#### Lifecycle Methods +### Widget Operations ```python -def on_show(self) -> None: - """Called when overlay becomes visible.""" - pass +# Show/hide +widget.show() +widget.hide() -def on_hide(self) -> None: - """Called when overlay is hidden.""" - pass +# Position +widget.move(500, 200) +x, y = widget.position -def on_hotkey(self) -> None: - """Called when plugin's hotkey is pressed.""" - pass +# Size +widget.resize(400, 300) +width, height = widget.size -def shutdown(self) -> None: - """Called when app is closing. Cleanup resources.""" - # Unregister APIs - if self.api and self._api_registered: - self.api.unregister_api(self._plugin_id) +# Opacity +widget.set_opacity(0.8) + +# Lock/unlock (prevent dragging) +widget.set_locked(True) + +# Minimize/restore +widget.minimize() +widget.restore() + +# Close +widget.close() +``` + +### Widget Events + +```python +# Handle events +widget.on('moved', lambda data: print(f"Moved to {data['x']}, {data['y']}")) +widget.on('resized', lambda data: print(f"Sized to {data['width']}x{data['height']}")) +widget.on('closing', lambda: print("Widget closing")) +widget.on('closed', lambda: print("Widget closed")) +widget.on('update', lambda data: print(f"Update: {data}")) +``` + +### Widget Management + +```python +# Get widget +widget = widget_api.get_widget("loot_tracker") + +# Show/hide specific widget +widget_api.show_widget("loot_tracker") +widget_api.hide_widget("loot_tracker") + +# Close widget +widget_api.close_widget("loot_tracker") + +# All widgets +widget_api.show_all_widgets() +widget_api.hide_all_widgets() +widget_api.close_all_widgets() + +# Set all opacity +widget_api.set_all_opacity(0.8) + +# Lock/unlock all +widget_api.lock_all() +widget_api.unlock_all() +``` + +### Layout Helpers + +```python +# Arrange widgets +widget_api.arrange_widgets(layout="grid", spacing=10) +widget_api.arrange_widgets(layout="horizontal") +widget_api.arrange_widgets(layout="vertical") +widget_api.arrange_widgets(layout="cascade") + +# Snap to grid +widget_api.snap_to_grid(grid_size=10) +``` + +### Widget Presets + +```python +from core.api import WidgetConfig, WidgetType + +# Register preset +preset = WidgetConfig( + name="preset", + title="Loot Tracker", + widget_type=WidgetType.MINI, + size=(250, 150), + opacity=0.9 +) + +widget_api.register_preset("loot_tracker", preset) + +# Use preset +widget = widget_api.create_from_preset("loot_tracker", name="my_tracker") +``` + +### Persistence + +```python +# Save all widget states +states = widget_api.save_all_states("widgets.json") + +# Load states +widget_api.load_all_states("widgets.json") +``` + +--- + +## ExternalAPI + +The ExternalAPI provides REST endpoints, webhooks, and third-party integrations. + +### Getting Started + +```python +from core.api import get_external_api + +ext = get_external_api() + +# Start server +ext.start_server(port=8080) + +# Check status +print(ext.get_status()) +``` + +### REST API Server + +```python +# Start server with CORS +ext.start_server( + port=8080, + host="127.0.0.1", + cors_origins=["http://localhost:3000"] +) + +# Stop server +ext.stop_server() + +# Get URL +url = ext.get_url("api/v1/stats") +# Returns: http://127.0.0.1:8080/api/v1/stats +``` + +### API Endpoints + +Using decorator: +```python +@ext.endpoint("stats", methods=["GET"]) +def get_stats(): + return {"kills": 100, "loot": "50 PED"} + +@ext.endpoint("loot", methods=["POST"]) +def record_loot(data): + save_loot(data) + return {"status": "saved"} +``` + +Programmatic: +```python +def get_stats(params): + return {"kills": 100} + +ext.register_endpoint("stats", get_stats, methods=["GET"]) + +# Unregister +ext.unregister_endpoint("stats") +``` + +### Incoming Webhooks + +```python +# Register webhook handler +def handle_discord(payload): + print(f"Discord: {payload}") + return {"status": "ok"} + +ext.register_webhook( + name="discord", + handler=handle_discord, + secret="my_secret" # Optional HMAC verification +) + +# Now POST to: http://localhost:8080/webhook/discord +``` + +With HMAC verification: +```python +import hmac +import hashlib + +# Client side (Discord, etc.) +payload = {"event": "message"} +payload_str = json.dumps(payload, sort_keys=True) +signature = hmac.new( + secret.encode(), + payload_str.encode(), + hashlib.sha256 +).hexdigest() + +# Send with X-Signature header +headers = {'X-Signature': signature} +requests.post(url, json=payload, headers=headers) +``` + +### Outgoing Webhooks + +```python +# POST to external webhook +result = ext.post_webhook( + "https://discord.com/api/webhooks/...", + {"content": "Hello from EU-Utility!"} +) + +if result['success']: + print("Sent!") +else: + print(f"Error: {result['error']}") +``` + +### Authentication + +```python +# Create API key +api_key = ext.create_api_key( + name="My Bot", + permissions=["read", "write"] +) + +# Use in requests +headers = {'X-API-Key': api_key} +requests.get(url, headers=headers) + +# Revoke +ext.revoke_api_key(api_key) +``` + +### IPC (Inter-Process Communication) + +```python +# Register handler +def on_browser_message(data): + print(f"From browser: {data}") + +ext.register_ipc_handler("browser", on_browser_message) + +# Send message +ext.send_ipc("browser", {"action": "refresh"}) +``` + +### File Watcher + +```python +# Watch file for changes +def on_config_change(): + print("Config changed!") + reload_config() + +watch_id = ext.watch_file( + "config.json", + on_config_change, + interval=1.0 +) +``` + +### Webhook History + +```python +# Get recent webhook calls +history = ext.get_webhook_history(limit=10) +for entry in history: + print(f"{entry['name']}: {entry['timestamp']}") +``` + +### Server-Sent Events (SSE) + +Clients can connect to `/events` for real-time updates: + +```javascript +const evtSource = new EventSource("http://localhost:8080/events"); +evtSource.addEventListener("message", (e) => { + console.log("Event:", JSON.parse(e.data)); +}); +``` + +--- + +## Error Handling + +All APIs provide specific exceptions: + +```python +from core.api import ( + PluginAPIError, + ServiceNotAvailableError, + ExternalAPIError, + WebhookError +) + +try: + text = api.recognize_text((0, 0, 100, 100)) +except ServiceNotAvailableError: + print("OCR not available") +except PluginAPIError as e: + print(f"API error: {e}") +``` + +--- + +## API Versions + +| Version | Features | +|---------|----------| +| 2.0 | Initial PluginAPI | +| 2.1 | Added Activity Bar | +| 2.2 | Three-tier API (Plugin, Widget, External) | + +--- + +## Examples + +### Discord Integration + +```python +from core.api import get_api, get_external_api + +class DiscordPlugin(BasePlugin): + def initialize(self): + self.api = get_api() + self.ext = get_external_api() + + # Start server + self.ext.start_server(port=8080) + + # Register webhook + self.ext.register_webhook( + "loot", + self.handle_loot_webhook + ) + + # Subscribe to events + self.api.subscribe("loot", self.on_loot) - # Unsubscribe from events - self.unsubscribe_all_typed() -``` - -#### Configuration Methods - -```python -# Get config with default -value = self.get_config('key', default_value) - -# Set config -self.set_config('key', value) -``` - -#### API Methods - -```python -# Register API endpoint -self.register_api( - name="scan_window", - handler=self.scan_window, - api_type=APIType.OCR, - description="Scan game window" -) - -# Call another plugin's API -result = self.call_api("plugin_id", "api_name", *args, **kwargs) - -# Find APIs -apis = self.find_apis(APIType.OCR) -``` - -#### Service Methods - -```python -# OCR -result = self.ocr_capture(region=(x, y, w, h)) - -# Screenshots -image = self.capture_screen(full_screen=True) -image = self.capture_region(x, y, w, h) -last = self.get_last_screenshot() - -# Log -lines = self.read_log(lines=50, filter_text="keyword") - -# Shared data -self.set_shared_data('key', value) -data = self.get_shared_data('key', default) -``` - -#### Event Methods - -```python -# Legacy events -self.publish_event('event_type', {'key': 'value'}) -self.subscribe('event_type', callback) - -# Typed events -self.publish_typed(SkillGainEvent(...)) - -sub_id = self.subscribe_typed( - SkillGainEvent, - callback, - min_damage=100, - replay_last=10 -) - -self.unsubscribe_typed(sub_id) -self.unsubscribe_all_typed() - -# Get recent events -recent = self.get_recent_events(SkillGainEvent, 50) -stats = self.get_event_stats() -``` - -#### Utility Methods - -```python -# Currency -self.format_ped(123.45) # "123.45 PED" -self.format_pec(50) # "50 PEC" - -# Calculations -self.calculate_dpp(damage, ammo, decay) -self.calculate_markup(price, tt) -``` - -#### Audio Methods - -```python -self.play_sound('hof') -self.set_volume(0.8) -self.get_volume() -self.mute() -self.unmute() -self.toggle_mute() -self.is_muted() -self.is_audio_available() -``` - -#### Background Task Methods - -```python -# Run in background -task_id = self.run_in_background( - func, *args, - priority='normal', - on_complete=callback, - on_error=error_callback, - **kwargs -) - -# Schedule -task_id = self.schedule_task( - delay_ms, func, *args, - priority='normal', - on_complete=callback, - periodic=False, - interval_ms=None, - **kwargs -) - -# Control -self.cancel_task(task_id) -self.connect_task_signals( - on_completed=callback, - on_failed=error_callback, - on_started=callback, - on_cancelled=callback -) -``` - -#### Nexus API Methods - -```python -results = self.nexus_search(query, entity_type="items", limit=20) -details = self.nexus_get_item_details(item_id) -market = self.nexus_get_market_data(item_id) -available = self.nexus_is_available() -``` - ---- - -## Event Bus - -### Class: `EventBus` - -Typed event system with filtering and persistence. - -#### Singleton Access - -```python -from core.event_bus import get_event_bus - -event_bus = get_event_bus() -``` - -#### Event Types - -```python -from core.event_bus import ( - BaseEvent, - SkillGainEvent, - LootEvent, - DamageEvent, - GlobalEvent, - ChatEvent, - EconomyEvent, - SystemEvent, - EventCategory -) -``` - -#### Publishing Events - -```python -# Async (non-blocking) -event_bus.publish(event) - -# Sync (blocking, returns count) -count = event_bus.publish_sync(event) -``` - -#### Subscribing - -```python -# Basic subscription -sub_id = event_bus.subscribe( - callback=my_callback, - event_filter=filter_obj, - replay_history=True, - replay_count=100 -) - -# Typed subscription -sub_id = event_bus.subscribe_typed( - SkillGainEvent, - callback, - skill_names=["Rifle", "Pistol"], - replay_last=10 -) - -# Unsubscribe -event_bus.unsubscribe(sub_id) -``` - -#### Event Filters - -```python -from core.event_bus import EventFilter - -# Create filter -filter_obj = EventFilter( - event_types=[SkillGainEvent, LootEvent], - categories=[EventCategory.SKILL, EventCategory.LOOT], - min_damage=100, - max_damage=500, - mob_types=["Dragon", "Drake"], - skill_names=["Rifle"], - sources=["my_plugin"], - custom_predicate=lambda e: e.value > 100 -) - -# Check match -if filter_obj.matches(event): - print("Event matches filter!") -``` - -#### Getting Events - -```python -# Recent events -recent = event_bus.get_recent_events( - event_type=SkillGainEvent, - count=100, - category=EventCategory.SKILL -) - -# By time range -events = event_bus.get_events_by_time_range( - start=datetime_obj, - end=datetime_obj -) -``` - -#### Statistics - -```python -stats = event_bus.get_stats() -# Returns: { -# 'total_published': int, -# 'total_delivered': int, -# 'active_subscriptions': int, -# 'events_per_minute': float, -# 'avg_delivery_ms': float, -# 'errors': int, -# 'uptime_seconds': float, -# 'top_event_types': dict -# } -``` - -#### Event Classes - -```python -@dataclass(frozen=True) -class SkillGainEvent(BaseEvent): - skill_name: str = "" - skill_value: float = 0.0 - gain_amount: float = 0.0 - -@dataclass(frozen=True) -class LootEvent(BaseEvent): - mob_name: str = "" - items: List[Dict[str, Any]] = field(default_factory=list) - total_tt_value: float = 0.0 - position: Optional[tuple] = None - -@dataclass(frozen=True) -class DamageEvent(BaseEvent): - damage_amount: float = 0.0 - damage_type: str = "" - is_critical: bool = False - target_name: str = "" - attacker_name: str = "" - is_outgoing: bool = True - -@dataclass(frozen=True) -class GlobalEvent(BaseEvent): - player_name: str = "" - achievement_type: str = "" # "hof", "ath", "discovery" - value: float = 0.0 - item_name: Optional[str] = None - -@dataclass(frozen=True) -class ChatEvent(BaseEvent): - channel: str = "" # "main", "team", "society" - sender: str = "" - message: str = "" - -@dataclass(frozen=True) -class EconomyEvent(BaseEvent): - transaction_type: str = "" # "sale", "purchase" - amount: float = 0.0 - currency: str = "PED" - description: str = "" - -@dataclass(frozen=True) -class SystemEvent(BaseEvent): - message: str = "" - severity: str = "info" # "debug", "info", "warning", "error", "critical" -``` - ---- - -## Task Manager - -### Class: `TaskManager` - -Thread pool-based background task execution. - -#### Singleton Access - -```python -from core.tasks import get_task_manager - -task_manager = get_task_manager(max_workers=4) -``` - -#### Task Priorities - -```python -from core.tasks import TaskPriority - -TaskPriority.HIGH # 3 -TaskPriority.NORMAL # 2 -TaskPriority.LOW # 1 -``` - -#### Running Tasks - -```python -# Run in thread -task_id = task_manager.run_in_thread( - func=my_function, - arg1, arg2, - priority=TaskPriority.NORMAL, - on_complete=callback, - on_error=error_callback -) - -# Run later -task_id = task_manager.run_later( - delay_ms=5000, - func=my_function, - *args, - priority=TaskPriority.NORMAL, - on_complete=callback -) - -# Run periodic -task_id = task_manager.run_periodic( - interval_ms=30000, - func=my_function, - *args, - priority=TaskPriority.NORMAL, - on_complete=callback, - run_immediately=True -) -``` - -#### Task Control - -```python -# Cancel -task_manager.cancel_task(task_id) - -# Get status -status = task_manager.get_task_status(task_id) -# Returns: TaskStatus.PENDING, RUNNING, COMPLETED, FAILED, CANCELLED - -# Wait for completion -task_manager.wait_for_task(task_id, timeout=10.0) - -# Get all tasks -tasks = task_manager.get_all_tasks() - -# Get task by ID -task = task_manager.get_task(task_id) -``` - -#### Signals - -```python -# Connect to signals -task_manager.connect_signal('completed', on_completed) -task_manager.connect_signal('failed', on_failed) -task_manager.connect_signal('started', on_started) -task_manager.connect_signal('cancelled', on_cancelled) -task_manager.connect_signal('periodic', on_periodic) -``` - -#### Shutdown - -```python -# Graceful shutdown -task_manager.shutdown(wait=True, timeout=30.0) -``` - ---- - -## Nexus API - -### Class: `NexusAPI` - -Client for Entropia Nexus API. - -#### Singleton Access - -```python -from core.nexus_api import get_nexus_api - -nexus = get_nexus_api() -``` - -#### Search Methods - -```python -# Search items -results = nexus.search_items("ArMatrix", limit=20) - -# Search mobs -results = nexus.search_mobs("Atrox", limit=20) - -# Search all types -results = nexus.search_all("Calypso", limit=30) - -# Search by entity type -results = nexus.search_by_type("ArMatrix", entity_type="weapons", limit=20) -``` - -#### Detail Methods - -```python -# Get item details -details = nexus.get_item_details("armatrix_lp-35") - -# Get entity details -details = nexus.get_entity_details("atrox", entity_type="mobs") - -# Get market data -market = nexus.get_market_data("armatrix_lp-35") -``` - -#### Batch Methods - -```python -# Get multiple items -results = nexus.get_items_batch(["id1", "id2", "id3"]) - -# Get multiple market data -results = nexus.get_market_batch(["id1", "id2"]) -``` - -#### Utility Methods - -```python -# Clear cache -nexus.clear_cache() - -# Check availability -if nexus.is_available(): - results = nexus.search_items("query") -``` - -#### Data Classes - -```python -@dataclass -class SearchResult: - id: str - name: str - type: str - category: Optional[str] - icon_url: Optional[str] - data: Dict[str, Any] - -@dataclass -class ItemDetails: - id: str - name: str - description: Optional[str] - category: Optional[str] - weight: Optional[float] - tt_value: Optional[float] - decay: Optional[float] - ammo_consumption: Optional[int] - damage: Optional[float] - range: Optional[float] - accuracy: Optional[float] - durability: Optional[int] - requirements: Dict[str, Any] - materials: List[Dict[str, Any]] - raw_data: Dict[str, Any] - -@dataclass -class MarketData: - item_id: str - item_name: str - current_markup: Optional[float] - avg_markup_7d: Optional[float] - avg_markup_30d: Optional[float] - volume_24h: Optional[int] - volume_7d: Optional[int] - buy_orders: List[Dict[str, Any]] - sell_orders: List[Dict[str, Any]] - last_updated: Optional[datetime] - raw_data: Dict[str, Any] -``` - ---- - -## Settings - -### Class: `Settings` - -User preferences and configuration management. - -#### Singleton Access - -```python -from core.settings import get_settings - -settings = get_settings() -``` - -#### Default Settings - -```python -DEFAULTS = { - 'overlay_enabled': True, - 'overlay_opacity': 0.9, - 'overlay_theme': 'dark', - 'hotkey_toggle': 'ctrl+shift+u', - 'hotkey_search': 'ctrl+shift+f', - 'hotkey_calculator': 'ctrl+shift+c', - 'enabled_plugins': [...], - 'icon_size': 24, - 'accent_color': '#4a9eff', - 'check_updates': True, - 'data_retention_days': 30, -} -``` - -#### Methods - -```python -# Get setting -value = settings.get('key', default_value) - -# Set setting -settings.set('key', value) - -# Reset setting -settings.reset('key') # Single key -settings.reset() # All to defaults - -# Plugin management -if settings.is_plugin_enabled('plugin_id'): - settings.enable_plugin('plugin_id') - settings.disable_plugin('plugin_id') - -# Get all settings -all_settings = settings.all_settings() - -# Save to disk -settings.save() -``` - -#### Signals - -```python -# Connect to changes -settings.setting_changed.connect(on_setting_changed) - -def on_setting_changed(key, value): - print(f"Setting {key} changed to {value}") -``` - ---- - -## Data Store - -### Class: `DataStore` - -Persistent key-value storage for plugins. - -#### Singleton Access - -```python -from core.data_store import get_data_store - -data_store = get_data_store() -``` - -#### Methods - -```python -# Store data -data_store.set('my_plugin.key', value) -data_store.set('my_plugin.nested', {'a': 1, 'b': 2}) - -# Retrieve data -value = data_store.get('my_plugin.key', default=None) - -# Check existence -if data_store.has('my_plugin.key'): - print("Key exists!") - -# Delete -data_store.delete('my_plugin.key') - -# Get all keys for plugin -keys = data_store.get_plugin_keys('my_plugin') - -# Get all data for plugin -data = data_store.get_plugin_data('my_plugin') - -# Clear plugin data -data_store.clear_plugin_data('my_plugin') - -# Save to disk -data_store.save() -``` - ---- - -## Audio Manager - -### Class: `AudioManager` - -Sound playback and volume control. - -#### Singleton Access - -```python -from core.audio import get_audio_manager - -audio = get_audio_manager() -``` - -#### Methods - -```python -# Play sound -audio.play_sound('hof') # Predefined -audio.play_sound('/path/to/custom.wav') # Custom -audio.play_sound('alert', blocking=True) # Wait for completion - -# Predefined sounds: 'global', 'hof', 'skill_gain', 'alert', 'error' - -# Volume control -audio.set_volume(0.8) # 0.0 to 1.0 -volume = audio.get_volume() # Get current - -# Mute -audio.mute() -audio.unmute() -is_muted = audio.toggle_mute() -is_muted = audio.is_muted() - -# Check availability -if audio.is_available(): - audio.play_sound('skill_gain') - -# Get backend info -backend = audio.get_backend() # 'pygame', 'pyaudio', etc. - -# Shutdown -audio.shutdown() -``` - ---- - -## HTTP Client - -### Class: `HTTPClient` - -Cached HTTP requests with rate limiting. - -#### Singleton Access - -```python -from core.http_client import get_http_client - -http = get_http_client( - cache_dir="cache/http", - default_cache_ttl=3600, - rate_limit_delay=0.1, - max_retries=3, - backoff_factor=0.5 -) -``` - -#### Methods - -```python -# GET request -response = http.get( - url, - cache_ttl=300, # Cache for 5 minutes - headers={'Accept': 'application/json'}, - timeout=30 -) - -# Response format -{ - 'status_code': 200, - 'headers': {...}, - 'text': 'response body', - 'json': {...}, # Parsed JSON if applicable - 'cached': False, - 'cache_time': None -} - -# Clear cache -http.clear_cache() - -# Get cache stats -stats = http.get_cache_stats() -``` - ---- - -## OCR Service - -### Class: `OCRService` - -Text recognition from screen captures. - -#### Singleton Access - -```python -from core.ocr_service import get_ocr_service - -ocr = get_ocr_service() -``` - -#### Methods - -```python -# Initialize (lazy, called automatically) -ocr.initialize() - -# Recognize text -result = ocr.recognize(region=(x, y, width, height)) -# Returns: { -# 'text': 'extracted text', -# 'confidence': 0.95, -# 'raw_results': [...], -# 'error': None -# } - -# Check availability -if ocr.is_available(): - result = ocr.recognize() - -# Get available backends -backends = ocr.get_available_backends() # ['easyocr', 'pytesseract'] - -# Shutdown -ocr.shutdown() -``` - ---- - -## Screenshot Service - -### Class: `ScreenshotService` - -Screen capture functionality. - -#### Singleton Access - -```python -from core.screenshot import get_screenshot_service - -screenshot = get_screenshot_service() -``` - -#### Methods - -```python -# Capture full screen -image = screenshot.capture(full_screen=True) - -# Capture region -image = screenshot.capture_region(x, y, width, height) - -# Get last screenshot -image = screenshot.get_last_screenshot() - -# Save screenshot -path = screenshot.save_screenshot(image, filename="screenshot.png") - -# Check availability -if screenshot.is_available(): - image = screenshot.capture() - -# Get available backends -backends = screenshot.get_available_backends() # ['pil', 'pyautogui'] -``` - ---- - -## Log Reader - -### Class: `LogReader` - -Read Entropia Universe chat logs. - -#### Singleton Access - -```python -from core.log_reader import get_log_reader - -log_reader = get_log_reader() -``` - -#### Methods - -```python -# Start reading -log_reader.start() - -# Read lines -lines = log_reader.read_lines(count=50, filter_text="loot") - -# Check availability -if log_reader.is_available(): - lines = log_reader.read_lines() - -# Get log path -path = log_reader.get_log_path() - -# Stop reading -log_reader.stop() -``` - ---- - -## Window Manager - -### Class: `WindowManager` - -Windows-specific window management. - -#### Singleton Access - -```python -from core.window_manager import get_window_manager - -wm = get_window_manager() -``` - -#### Methods - -```python -# Check availability (Windows only) -if wm.is_available(): - # Find EU window - eu_window = wm.find_eu_window() + def on_loot(self, event): + # Send to Discord + self.ext.post_webhook( + self.discord_url, + {"content": f"Loot: {event.data}"} + ) - if eu_window: - print(f"Title: {eu_window.title}") - print(f"Size: {eu_window.width}x{eu_window.height}") - print(f"Position: ({eu_window.x}, {eu_window.y})") + def handle_loot_webhook(self, payload): + # Handle incoming Discord webhook + if payload.get('type') == 'command': + return self.process_command(payload) + return {"status": "ignored"} +``` + +### Custom Widget + +```python +from core.api import get_api, get_widget_api +from PyQt6.QtWidgets import QLabel, QVBoxLayout, QWidget + +class LootWidgetPlugin(BasePlugin): + def initialize(self): + self.api = get_api() + self.widget_api = get_widget_api() + + # Create widget + self.widget = self.widget_api.create_widget( + name="loot_display", + title="Recent Loot", + size=(300, 200) + ) + + # Set content + content = QWidget() + layout = QVBoxLayout(content) + self.loot_label = QLabel("No loot yet") + layout.addWidget(self.loot_label) + + self.widget.set_content(content) + + # Subscribe to loot events + self.api.subscribe("loot", self.on_loot) + + # Show widget + self.widget.show() - # Check if focused - if wm.is_eu_focused(): - print("EU is active!") - - # Get all windows - windows = wm.get_all_windows() - - # Find by title - window = wm.find_window_by_title("Entropia Universe") - - # Get foreground window - active = wm.get_foreground_window() + def on_loot(self, event): + self.loot_label.setText(f"Last: {event.data}") + self.widget.flash() ``` --- -## Notification Manager +## See Also -### Class: `NotificationManager` - -Desktop notifications. - -#### Singleton Access - -```python -from core.notifications import get_notification_manager - -notifications = get_notification_manager() -``` - -#### Methods - -```python -# Initialize -notifications.initialize(app) - -# Show notification -notifications.show( - title="Skill Gain!", - message="Rifle increased to 25.5", - duration_ms=3000 -) - -# Show with actions -notifications.show( - title="Global!", - message="You got a global!", - actions=[ - {"label": "View", "callback": view_callback}, - {"label": "Dismiss", "callback": dismiss_callback} - ] -) - -# Close all -notifications.close_all() -``` +- [API Cookbook](API_COOKBOOK.md) - Detailed recipes and patterns +- [Plugin Development Guide](PLUGIN_DEVELOPMENT.md) - Building plugins +- [Widget Guide](WIDGET_GUIDE.md) - Creating widgets +- [External Integration](EXTERNAL_INTEGRATION.md) - Third-party integrations --- -## Clipboard Manager - -### Class: `ClipboardManager` - -Clipboard operations. - -#### Singleton Access - -```python -from core.clipboard import get_clipboard_manager - -clipboard = get_clipboard_manager() -``` - -#### Methods - -```python -# Copy to clipboard -clipboard.copy("text to copy") - -# Paste from clipboard -text = clipboard.paste() - -# Check availability -if clipboard.is_available(): - clipboard.copy("Hello!") -``` - ---- - -**API Reference Complete** 📚 +**Need Help?** +- Discord: https://discord.gg/clawd +- Issues: https://git.lemonlink.eu/impulsivefps/EU-Utility/issues