""" EU-Utility - External API ================ The ExternalAPI provides interfaces for third-party integrations, webhooks, REST endpoints, and external application communication. This API is designed for: - Browser extensions - Discord bots - Home Assistant - Stream Deck - Custom scripts - Other applications Quick Start: ----------- ```python from core.api.external_api import get_external_api # Start the REST API server api = get_external_api() api.start_server(port=8080) # Register a webhook handler api.register_webhook("/loot", handle_loot_event) # Send data to external service api.post_webhook("https://discord.com/webhook/...", {"content": "Hello!"}) ``` Integration Types: ----------------- - REST API - HTTP endpoints for external queries - Webhooks - Receive events from external services - WebSocket - Real-time bidirectional communication - IPC - Inter-process communication - File Watch - Monitor file changes For full documentation, see: docs/EXTERNAL_API.md """ import json import time import asyncio from pathlib import Path from typing import Optional, Dict, List, Callable, Any, Union from dataclasses import dataclass, asdict from datetime import datetime from threading import Thread import hashlib import hmac from core.logger import get_logger logger = get_logger(__name__) class ExternalAPIError(Exception): """Base exception for ExternalAPI errors.""" pass class WebhookError(ExternalAPIError): """Raised when webhook operations fail.""" pass class ServerError(ExternalAPIError): """Raised when server operations fail.""" pass @dataclass class WebhookConfig: """Configuration for a webhook endpoint.""" path: str handler: Callable secret: Optional[str] = None # For HMAC verification methods: List[str] = None rate_limit: int = 60 # Requests per minute def __post_init__(self): if self.methods is None: self.methods = ['POST'] @dataclass class APIEndpoint: """REST API endpoint definition.""" path: str handler: Callable methods: List[str] = None auth_required: bool = False def __post_init__(self): if self.methods is None: self.methods = ['GET'] class ExternalAPI: """ ExternalAPI - API for third-party integrations. Provides REST endpoints, webhooks, WebSocket support, and other external communication interfaces. Security: --------- - Optional HMAC signature verification for webhooks - API key authentication for REST endpoints - Rate limiting on all endpoints - CORS configuration for browser access Example: -------- ```python from core.api.external_api import get_external_api api = get_external_api() # Start REST server api.start_server(port=8080) # Register an endpoint @api.endpoint("/stats", methods=["GET"]) def get_stats(): return {"kills": 100, "loot": "50 PED"} # Register a webhook api.register_webhook("/discord", handle_discord) # Send to external service api.post_webhook("https://discord.com/api/webhooks/...", {"content": "Bot started!"}) ``` """ _instance: Optional['ExternalAPI'] = None _server_running: bool = False _server_port: int = 0 _webhooks: Dict[str, WebhookConfig] = {} _endpoints: Dict[str, APIEndpoint] = {} _webhook_history: List[Dict] = [] _api_keys: Dict[str, Dict] = {} _cors_origins: List[str] = ['*'] _ipc_callbacks: Dict[str, Callable] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance # ===================================================================== # REST API Server # ===================================================================== def start_server(self, port: int = 8080, host: str = "127.0.0.1", cors_origins: List[str] = None) -> bool: """ Start the REST API server. Args: port: Port to listen on host: Host to bind to (default: localhost only) cors_origins: Allowed CORS origins Returns: True if server started Example: >>> api.start_server(port=8080) [ExternalAPI] REST server started on http://127.0.0.1:8080 """ if self._server_running: logger.warning("[ExternalAPI] Server already running") return False try: from aiohttp import web self._app = web.Application() self._server_port = port self._server_host = host if cors_origins: self._cors_origins = cors_origins # Setup routes self._setup_routes() # Start server in background thread self._server_thread = Thread( target=self._run_server, daemon=True ) self._server_thread.start() self._server_running = True logger.info(f"[ExternalAPI] REST server started on http://{host}:{port}") return True except ImportError: logger.error("[ExternalAPI] aiohttp not installed. Run: pip install aiohttp") return False except Exception as e: logger.error(f"[ExternalAPI] Failed to start server: {e}") return False def stop_server(self) -> bool: """ Stop the REST API server. Returns: True if stopped """ if not self._server_running: return False try: asyncio.run_coroutine_threadsafe( self._runner.cleanup(), self._loop ) self._server_running = False logger.info("[ExternalAPI] REST server stopped") return True except Exception as e: logger.error(f"[ExternalAPI] Error stopping server: {e}") return False def _run_server(self): """Internal: Run the asyncio server.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) from aiohttp import web self._runner = web.AppRunner(self._app) self._loop.run_until_complete(self._runner.setup()) site = web.TCPSite(self._runner, self._server_host, self._server_port) self._loop.run_until_complete(site.start()) # Keep running self._loop.run_forever() def _setup_routes(self): """Internal: Setup HTTP routes.""" from aiohttp import web # Health check self._app.router.add_get('/health', self._handle_health) # API endpoints self._app.router.add_get('/api/v1/{path:.*}', self._handle_api_get) self._app.router.add_post('/api/v1/{path:.*}', self._handle_api_post) # Webhooks self._app.router.add_post('/webhook/{name}', self._handle_webhook) # Events stream (SSE) self._app.router.add_get('/events', self._handle_events) async def _handle_health(self, request): """Handle health check request.""" return web.json_response({ 'status': 'ok', 'timestamp': datetime.now().isoformat(), 'server_running': self._server_running }) async def _handle_api_get(self, request): """Handle API GET requests.""" from aiohttp import web path = request.match_info['path'] # Check authentication if not self._check_auth(request): return web.json_response( {'error': 'Unauthorized'}, status=401 ) # Find endpoint endpoint = self._endpoints.get(path) if not endpoint or 'GET' not in endpoint.methods: return web.json_response( {'error': 'Not found'}, status=404 ) try: # Get query params params = dict(request.query) # Call handler result = endpoint.handler(params) return web.json_response({ 'success': True, 'data': result }) except Exception as e: logger.error(f"[ExternalAPI] GET {path} error: {e}") return web.json_response( {'error': str(e)}, status=500 ) async def _handle_api_post(self, request): """Handle API POST requests.""" from aiohttp import web path = request.match_info['path'] if not self._check_auth(request): return web.json_response( {'error': 'Unauthorized'}, status=401 ) endpoint = self._endpoints.get(path) if not endpoint or 'POST' not in endpoint.methods: return web.json_response( {'error': 'Not found'}, status=404 ) try: # Get JSON body data = await request.json() # Call handler result = endpoint.handler(data) return web.json_response({ 'success': True, 'data': result }) except Exception as e: logger.error(f"[ExternalAPI] POST {path} error: {e}") return web.json_response( {'error': str(e)}, status=500 ) async def _handle_webhook(self, request): """Handle incoming webhooks.""" from aiohttp import web name = request.match_info['name'] webhook = self._webhooks.get(name) if not webhook: return web.json_response( {'error': 'Webhook not found'}, status=404 ) # Check method if request.method not in webhook.methods: return web.json_response( {'error': 'Method not allowed'}, status=405 ) try: # Get payload payload = await request.json() # Verify signature if secret configured if webhook.secret: signature = request.headers.get('X-Signature', '') if not self._verify_signature(payload, signature, webhook.secret): return web.json_response( {'error': 'Invalid signature'}, status=401 ) # Log webhook self._log_webhook(name, payload) # Call handler result = webhook.handler(payload) return web.json_response({ 'success': True, 'data': result }) except Exception as e: logger.error(f"[ExternalAPI] Webhook {name} error: {e}") return web.json_response( {'error': str(e)}, status=500 ) async def _handle_events(self, request): """Handle Server-Sent Events (SSE) connection.""" from aiohttp import web response = web.StreamResponse( status=200, reason='OK', headers={ 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', } ) await response.prepare(request) # Keep connection alive and send events try: while True: # Send heartbeat await response.write(b'event: ping\ndata: {}\n\n') await asyncio.sleep(30) except: pass return response # ===================================================================== # API Endpoints # ===================================================================== def endpoint(self, path: str, methods: List[str] = None, auth_required: bool = False): """ Decorator to register an API endpoint. Args: path: URL path (e.g., "stats", "items/search") methods: HTTP methods allowed (default: ['GET']) auth_required: Whether API key is required Example: >>> @api.endpoint("stats", methods=["GET"]) ... def get_stats(): ... return {"kills": 100} >>> @api.endpoint("loot", methods=["POST"]) ... def record_loot(data): ... save_loot(data) ... return {"status": "saved"} """ def decorator(func: Callable): self.register_endpoint(path, func, methods, auth_required) return func return decorator def register_endpoint(self, path: str, handler: Callable, methods: List[str] = None, auth_required: bool = False) -> None: """ Register an API endpoint programmatically. Args: path: URL path handler: Function to handle requests methods: HTTP methods auth_required: Require authentication """ endpoint = APIEndpoint( path=path, handler=handler, methods=methods or ['GET'], auth_required=auth_required ) self._endpoints[path] = endpoint logger.debug(f"[ExternalAPI] Registered endpoint: {path}") def unregister_endpoint(self, path: str) -> bool: """Unregister an endpoint.""" if path in self._endpoints: del self._endpoints[path] return True return False def get_endpoints(self) -> List[str]: """Get list of registered endpoint paths.""" return list(self._endpoints.keys()) # ===================================================================== # Webhooks (Incoming) # ===================================================================== def register_webhook(self, name: str, handler: Callable, secret: str = None, methods: List[str] = None, rate_limit: int = 60) -> None: """ Register a webhook endpoint. External services can POST to /webhook/{name} Args: name: Webhook name (becomes URL path) handler: Function(payload) to handle webhook secret: Optional secret for HMAC verification methods: Allowed HTTP methods rate_limit: Max requests per minute Example: >>> def handle_discord(payload): ... print(f"Discord event: {payload}") ... >>> api.register_webhook("discord", handle_discord) # Now POST to http://localhost:8080/webhook/discord """ webhook = WebhookConfig( path=name, handler=handler, secret=secret, methods=methods or ['POST'], rate_limit=rate_limit ) self._webhooks[name] = webhook logger.info(f"[ExternalAPI] Registered webhook: /webhook/{name}") def unregister_webhook(self, name: str) -> bool: """Unregister a webhook.""" if name in self._webhooks: del self._webhooks[name] return True return False def get_webhooks(self) -> List[str]: """Get list of registered webhook names.""" return list(self._webhooks.keys()) def _verify_signature(self, payload: Dict, signature: str, secret: str) -> bool: """Verify webhook HMAC signature.""" try: payload_str = json.dumps(payload, sort_keys=True) expected = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest(signature, expected) except: return False def _log_webhook(self, name: str, payload: Dict) -> None: """Log webhook call.""" self._webhook_history.append({ 'name': name, 'timestamp': datetime.now().isoformat(), 'payload_size': len(str(payload)) }) # Keep last 100 entries if len(self._webhook_history) > 100: self._webhook_history = self._webhook_history[-100:] def get_webhook_history(self, limit: int = 50) -> List[Dict]: """ Get recent webhook call history. Args: limit: Number of entries to return Returns: List of webhook call records """ return self._webhook_history[-limit:] # ===================================================================== # Webhooks (Outgoing) # ===================================================================== def post_webhook(self, url: str, data: Dict, headers: Dict = None, timeout: int = 10) -> Dict: """ POST data to external webhook URL. Args: url: Webhook URL data: JSON payload headers: Additional headers timeout: Request timeout Returns: Response dict: {'success': bool, 'status': int, 'data': Any} Example: >>> api.post_webhook( ... "https://discord.com/api/webhooks/...", ... {"content": "Loot: 100 PED!"} ... ) """ try: import requests default_headers = {'Content-Type': 'application/json'} if headers: default_headers.update(headers) response = requests.post( url, json=data, headers=default_headers, timeout=timeout ) return { 'success': 200 <= response.status_code < 300, 'status': response.status_code, 'data': response.text } except Exception as e: logger.error(f"[ExternalAPI] Webhook POST failed: {e}") return { 'success': False, 'error': str(e) } # ===================================================================== # Authentication # ===================================================================== def create_api_key(self, name: str, permissions: List[str] = None) -> str: """ Create a new API key. Args: name: Key identifier permissions: List of allowed permissions Returns: Generated API key """ key = hashlib.sha256( f"{name}{time.time()}".encode() ).hexdigest()[:32] self._api_keys[key] = { 'name': name, 'permissions': permissions or ['read'], 'created': datetime.now().isoformat(), 'last_used': None } return key def revoke_api_key(self, key: str) -> bool: """Revoke an API key.""" if key in self._api_keys: del self._api_keys[key] return True return False def _check_auth(self, request) -> bool: """Internal: Check request authentication.""" # Check for API key in header api_key = request.headers.get('X-API-Key', '') if api_key in self._api_keys: self._api_keys[api_key]['last_used'] = datetime.now().isoformat() return True return False # ===================================================================== # IPC (Inter-Process Communication) # ===================================================================== def register_ipc_handler(self, channel: str, handler: Callable) -> None: """ Register an IPC message handler. For communication with other local processes. Args: channel: IPC channel name handler: Function(data) to handle messages Example: >>> def on_browser_message(data): ... print(f"From browser: {data}") ... >>> api.register_ipc_handler("browser", on_browser_message) """ self._ipc_callbacks[channel] = handler logger.debug(f"[ExternalAPI] Registered IPC handler: {channel}") def send_ipc(self, channel: str, data: Dict) -> bool: """ Send IPC message to registered handler. Args: channel: IPC channel name data: Message data Returns: True if sent """ if channel in self._ipc_callbacks: try: self._ipc_callbacks[channel](data) return True except Exception as e: logger.error(f"[ExternalAPI] IPC send failed: {e}") return False # ===================================================================== # File Watcher # ===================================================================== def watch_file(self, filepath: str, callback: Callable, interval: float = 1.0) -> str: """ Watch a file for changes. Args: filepath: Path to file callback: Function() called on change interval: Check interval in seconds Returns: Watch ID Example: >>> def on_config_change(): ... print("Config file changed!") ... >>> watch_id = api.watch_file("config.json", on_config_change) """ import hashlib watch_id = hashlib.md5(filepath.encode()).hexdigest()[:8] def watcher(): last_mtime = 0 while True: try: mtime = Path(filepath).stat().st_mtime if mtime != last_mtime: last_mtime = mtime callback() except: pass time.sleep(interval) thread = Thread(target=watcher, daemon=True) thread.start() return watch_id # ===================================================================== # Status & Info # ===================================================================== def get_status(self) -> Dict: """ Get ExternalAPI status. Returns: Status dictionary """ return { 'server_running': self._server_running, 'server_port': self._server_port, 'endpoints': len(self._endpoints), 'webhooks': len(self._webhooks), 'api_keys': len(self._api_keys), 'webhook_history': len(self._webhook_history) } def get_url(self, path: str = "") -> str: """ Get full URL for a path. Args: path: URL path Returns: Full URL """ if self._server_running: return f"http://127.0.0.1:{self._server_port}/{path.lstrip('/')}" return "" # Global instance _external_api_instance: Optional[ExternalAPI] = None def get_external_api() -> ExternalAPI: """ Get the global ExternalAPI instance. Returns: ExternalAPI singleton Example: >>> from core.api.external_api import get_external_api >>> api = get_external_api() >>> api.start_server(port=8080) """ global _external_api_instance if _external_api_instance is None: _external_api_instance = ExternalAPI() return _external_api_instance # Convenience exports __all__ = [ 'ExternalAPI', 'get_external_api', 'ExternalAPIError', 'WebhookError', 'ServerError', 'WebhookConfig', 'APIEndpoint' ]