EU-Utility/core/api/external_api.py

823 lines
24 KiB
Python

"""
EU-Utility - External API
================
The ExternalAPI provides interfaces for third-party integrations,
webhooks, REST endpoints, and external application communication.
This API is designed for:
- Browser extensions
- Discord bots
- Home Assistant
- Stream Deck
- Custom scripts
- Other applications
Quick Start:
-----------
```python
from core.api.external_api import get_external_api
# Start the REST API server
api = get_external_api()
api.start_server(port=8080)
# Register a webhook handler
api.register_webhook("/loot", handle_loot_event)
# Send data to external service
api.post_webhook("https://discord.com/webhook/...", {"content": "Hello!"})
```
Integration Types:
-----------------
- REST API - HTTP endpoints for external queries
- Webhooks - Receive events from external services
- WebSocket - Real-time bidirectional communication
- IPC - Inter-process communication
- File Watch - Monitor file changes
For full documentation, see: docs/EXTERNAL_API.md
"""
import json
import time
import asyncio
from pathlib import Path
from typing import Optional, Dict, List, Callable, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime
from threading import Thread
import hashlib
import hmac
from core.logger import get_logger
logger = get_logger(__name__)
class ExternalAPIError(Exception):
"""Base exception for ExternalAPI errors."""
pass
class WebhookError(ExternalAPIError):
"""Raised when webhook operations fail."""
pass
class ServerError(ExternalAPIError):
"""Raised when server operations fail."""
pass
@dataclass
class WebhookConfig:
"""Configuration for a webhook endpoint."""
path: str
handler: Callable
secret: Optional[str] = None # For HMAC verification
methods: List[str] = None
rate_limit: int = 60 # Requests per minute
def __post_init__(self):
if self.methods is None:
self.methods = ['POST']
@dataclass
class APIEndpoint:
"""REST API endpoint definition."""
path: str
handler: Callable
methods: List[str] = None
auth_required: bool = False
def __post_init__(self):
if self.methods is None:
self.methods = ['GET']
class ExternalAPI:
"""
ExternalAPI - API for third-party integrations.
Provides REST endpoints, webhooks, WebSocket support, and
other external communication interfaces.
Security:
---------
- Optional HMAC signature verification for webhooks
- API key authentication for REST endpoints
- Rate limiting on all endpoints
- CORS configuration for browser access
Example:
--------
```python
from core.api.external_api import get_external_api
api = get_external_api()
# Start REST server
api.start_server(port=8080)
# Register an endpoint
@api.endpoint("/stats", methods=["GET"])
def get_stats():
return {"kills": 100, "loot": "50 PED"}
# Register a webhook
api.register_webhook("/discord", handle_discord)
# Send to external service
api.post_webhook("https://discord.com/api/webhooks/...",
{"content": "Bot started!"})
```
"""
_instance: Optional['ExternalAPI'] = None
_server_running: bool = False
_server_port: int = 0
_webhooks: Dict[str, WebhookConfig] = {}
_endpoints: Dict[str, APIEndpoint] = {}
_webhook_history: List[Dict] = []
_api_keys: Dict[str, Dict] = {}
_cors_origins: List[str] = ['*']
_ipc_callbacks: Dict[str, Callable] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
# =====================================================================
# REST API Server
# =====================================================================
def start_server(self, port: int = 8080,
host: str = "127.0.0.1",
cors_origins: List[str] = None) -> bool:
"""
Start the REST API server.
Args:
port: Port to listen on
host: Host to bind to (default: localhost only)
cors_origins: Allowed CORS origins
Returns:
True if server started
Example:
>>> api.start_server(port=8080)
[ExternalAPI] REST server started on http://127.0.0.1:8080
"""
if self._server_running:
logger.warning("[ExternalAPI] Server already running")
return False
try:
from aiohttp import web
self._app = web.Application()
self._server_port = port
self._server_host = host
if cors_origins:
self._cors_origins = cors_origins
# Setup routes
self._setup_routes()
# Start server in background thread
self._server_thread = Thread(
target=self._run_server,
daemon=True
)
self._server_thread.start()
self._server_running = True
logger.info(f"[ExternalAPI] REST server started on http://{host}:{port}")
return True
except ImportError:
logger.error("[ExternalAPI] aiohttp not installed. Run: pip install aiohttp")
return False
except Exception as e:
logger.error(f"[ExternalAPI] Failed to start server: {e}")
return False
def stop_server(self) -> bool:
"""
Stop the REST API server.
Returns:
True if stopped
"""
if not self._server_running:
return False
try:
asyncio.run_coroutine_threadsafe(
self._runner.cleanup(),
self._loop
)
self._server_running = False
logger.info("[ExternalAPI] REST server stopped")
return True
except Exception as e:
logger.error(f"[ExternalAPI] Error stopping server: {e}")
return False
def _run_server(self):
"""Internal: Run the asyncio server."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
from aiohttp import web
self._runner = web.AppRunner(self._app)
self._loop.run_until_complete(self._runner.setup())
site = web.TCPSite(self._runner, self._server_host, self._server_port)
self._loop.run_until_complete(site.start())
# Keep running
self._loop.run_forever()
def _setup_routes(self):
"""Internal: Setup HTTP routes."""
from aiohttp import web
# Health check
self._app.router.add_get('/health', self._handle_health)
# API endpoints
self._app.router.add_get('/api/v1/{path:.*}', self._handle_api_get)
self._app.router.add_post('/api/v1/{path:.*}', self._handle_api_post)
# Webhooks
self._app.router.add_post('/webhook/{name}', self._handle_webhook)
# Events stream (SSE)
self._app.router.add_get('/events', self._handle_events)
async def _handle_health(self, request):
"""Handle health check request."""
return web.json_response({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'server_running': self._server_running
})
async def _handle_api_get(self, request):
"""Handle API GET requests."""
from aiohttp import web
path = request.match_info['path']
# Check authentication
if not self._check_auth(request):
return web.json_response(
{'error': 'Unauthorized'},
status=401
)
# Find endpoint
endpoint = self._endpoints.get(path)
if not endpoint or 'GET' not in endpoint.methods:
return web.json_response(
{'error': 'Not found'},
status=404
)
try:
# Get query params
params = dict(request.query)
# Call handler
result = endpoint.handler(params)
return web.json_response({
'success': True,
'data': result
})
except Exception as e:
logger.error(f"[ExternalAPI] GET {path} error: {e}")
return web.json_response(
{'error': str(e)},
status=500
)
async def _handle_api_post(self, request):
"""Handle API POST requests."""
from aiohttp import web
path = request.match_info['path']
if not self._check_auth(request):
return web.json_response(
{'error': 'Unauthorized'},
status=401
)
endpoint = self._endpoints.get(path)
if not endpoint or 'POST' not in endpoint.methods:
return web.json_response(
{'error': 'Not found'},
status=404
)
try:
# Get JSON body
data = await request.json()
# Call handler
result = endpoint.handler(data)
return web.json_response({
'success': True,
'data': result
})
except Exception as e:
logger.error(f"[ExternalAPI] POST {path} error: {e}")
return web.json_response(
{'error': str(e)},
status=500
)
async def _handle_webhook(self, request):
"""Handle incoming webhooks."""
from aiohttp import web
name = request.match_info['name']
webhook = self._webhooks.get(name)
if not webhook:
return web.json_response(
{'error': 'Webhook not found'},
status=404
)
# Check method
if request.method not in webhook.methods:
return web.json_response(
{'error': 'Method not allowed'},
status=405
)
try:
# Get payload
payload = await request.json()
# Verify signature if secret configured
if webhook.secret:
signature = request.headers.get('X-Signature', '')
if not self._verify_signature(payload, signature, webhook.secret):
return web.json_response(
{'error': 'Invalid signature'},
status=401
)
# Log webhook
self._log_webhook(name, payload)
# Call handler
result = webhook.handler(payload)
return web.json_response({
'success': True,
'data': result
})
except Exception as e:
logger.error(f"[ExternalAPI] Webhook {name} error: {e}")
return web.json_response(
{'error': str(e)},
status=500
)
async def _handle_events(self, request):
"""Handle Server-Sent Events (SSE) connection."""
from aiohttp import web
response = web.StreamResponse(
status=200,
reason='OK',
headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
)
await response.prepare(request)
# Keep connection alive and send events
try:
while True:
# Send heartbeat
await response.write(b'event: ping\ndata: {}\n\n')
await asyncio.sleep(30)
except:
pass
return response
# =====================================================================
# API Endpoints
# =====================================================================
def endpoint(self, path: str, methods: List[str] = None,
auth_required: bool = False):
"""
Decorator to register an API endpoint.
Args:
path: URL path (e.g., "stats", "items/search")
methods: HTTP methods allowed (default: ['GET'])
auth_required: Whether API key is required
Example:
>>> @api.endpoint("stats", methods=["GET"])
... def get_stats():
... return {"kills": 100}
>>> @api.endpoint("loot", methods=["POST"])
... def record_loot(data):
... save_loot(data)
... return {"status": "saved"}
"""
def decorator(func: Callable):
self.register_endpoint(path, func, methods, auth_required)
return func
return decorator
def register_endpoint(self, path: str, handler: Callable,
methods: List[str] = None,
auth_required: bool = False) -> None:
"""
Register an API endpoint programmatically.
Args:
path: URL path
handler: Function to handle requests
methods: HTTP methods
auth_required: Require authentication
"""
endpoint = APIEndpoint(
path=path,
handler=handler,
methods=methods or ['GET'],
auth_required=auth_required
)
self._endpoints[path] = endpoint
logger.debug(f"[ExternalAPI] Registered endpoint: {path}")
def unregister_endpoint(self, path: str) -> bool:
"""Unregister an endpoint."""
if path in self._endpoints:
del self._endpoints[path]
return True
return False
def get_endpoints(self) -> List[str]:
"""Get list of registered endpoint paths."""
return list(self._endpoints.keys())
# =====================================================================
# Webhooks (Incoming)
# =====================================================================
def register_webhook(self, name: str, handler: Callable,
secret: str = None,
methods: List[str] = None,
rate_limit: int = 60) -> None:
"""
Register a webhook endpoint.
External services can POST to /webhook/{name}
Args:
name: Webhook name (becomes URL path)
handler: Function(payload) to handle webhook
secret: Optional secret for HMAC verification
methods: Allowed HTTP methods
rate_limit: Max requests per minute
Example:
>>> def handle_discord(payload):
... print(f"Discord event: {payload}")
...
>>> api.register_webhook("discord", handle_discord)
# Now POST to http://localhost:8080/webhook/discord
"""
webhook = WebhookConfig(
path=name,
handler=handler,
secret=secret,
methods=methods or ['POST'],
rate_limit=rate_limit
)
self._webhooks[name] = webhook
logger.info(f"[ExternalAPI] Registered webhook: /webhook/{name}")
def unregister_webhook(self, name: str) -> bool:
"""Unregister a webhook."""
if name in self._webhooks:
del self._webhooks[name]
return True
return False
def get_webhooks(self) -> List[str]:
"""Get list of registered webhook names."""
return list(self._webhooks.keys())
def _verify_signature(self, payload: Dict, signature: str, secret: str) -> bool:
"""Verify webhook HMAC signature."""
try:
payload_str = json.dumps(payload, sort_keys=True)
expected = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
except:
return False
def _log_webhook(self, name: str, payload: Dict) -> None:
"""Log webhook call."""
self._webhook_history.append({
'name': name,
'timestamp': datetime.now().isoformat(),
'payload_size': len(str(payload))
})
# Keep last 100 entries
if len(self._webhook_history) > 100:
self._webhook_history = self._webhook_history[-100:]
def get_webhook_history(self, limit: int = 50) -> List[Dict]:
"""
Get recent webhook call history.
Args:
limit: Number of entries to return
Returns:
List of webhook call records
"""
return self._webhook_history[-limit:]
# =====================================================================
# Webhooks (Outgoing)
# =====================================================================
def post_webhook(self, url: str, data: Dict,
headers: Dict = None,
timeout: int = 10) -> Dict:
"""
POST data to external webhook URL.
Args:
url: Webhook URL
data: JSON payload
headers: Additional headers
timeout: Request timeout
Returns:
Response dict: {'success': bool, 'status': int, 'data': Any}
Example:
>>> api.post_webhook(
... "https://discord.com/api/webhooks/...",
... {"content": "Loot: 100 PED!"}
... )
"""
try:
import requests
default_headers = {'Content-Type': 'application/json'}
if headers:
default_headers.update(headers)
response = requests.post(
url,
json=data,
headers=default_headers,
timeout=timeout
)
return {
'success': 200 <= response.status_code < 300,
'status': response.status_code,
'data': response.text
}
except Exception as e:
logger.error(f"[ExternalAPI] Webhook POST failed: {e}")
return {
'success': False,
'error': str(e)
}
# =====================================================================
# Authentication
# =====================================================================
def create_api_key(self, name: str, permissions: List[str] = None) -> str:
"""
Create a new API key.
Args:
name: Key identifier
permissions: List of allowed permissions
Returns:
Generated API key
"""
key = hashlib.sha256(
f"{name}{time.time()}".encode()
).hexdigest()[:32]
self._api_keys[key] = {
'name': name,
'permissions': permissions or ['read'],
'created': datetime.now().isoformat(),
'last_used': None
}
return key
def revoke_api_key(self, key: str) -> bool:
"""Revoke an API key."""
if key in self._api_keys:
del self._api_keys[key]
return True
return False
def _check_auth(self, request) -> bool:
"""Internal: Check request authentication."""
# Check for API key in header
api_key = request.headers.get('X-API-Key', '')
if api_key in self._api_keys:
self._api_keys[api_key]['last_used'] = datetime.now().isoformat()
return True
return False
# =====================================================================
# IPC (Inter-Process Communication)
# =====================================================================
def register_ipc_handler(self, channel: str, handler: Callable) -> None:
"""
Register an IPC message handler.
For communication with other local processes.
Args:
channel: IPC channel name
handler: Function(data) to handle messages
Example:
>>> def on_browser_message(data):
... print(f"From browser: {data}")
...
>>> api.register_ipc_handler("browser", on_browser_message)
"""
self._ipc_callbacks[channel] = handler
logger.debug(f"[ExternalAPI] Registered IPC handler: {channel}")
def send_ipc(self, channel: str, data: Dict) -> bool:
"""
Send IPC message to registered handler.
Args:
channel: IPC channel name
data: Message data
Returns:
True if sent
"""
if channel in self._ipc_callbacks:
try:
self._ipc_callbacks[channel](data)
return True
except Exception as e:
logger.error(f"[ExternalAPI] IPC send failed: {e}")
return False
# =====================================================================
# File Watcher
# =====================================================================
def watch_file(self, filepath: str,
callback: Callable,
interval: float = 1.0) -> str:
"""
Watch a file for changes.
Args:
filepath: Path to file
callback: Function() called on change
interval: Check interval in seconds
Returns:
Watch ID
Example:
>>> def on_config_change():
... print("Config file changed!")
...
>>> watch_id = api.watch_file("config.json", on_config_change)
"""
import hashlib
watch_id = hashlib.md5(filepath.encode()).hexdigest()[:8]
def watcher():
last_mtime = 0
while True:
try:
mtime = Path(filepath).stat().st_mtime
if mtime != last_mtime:
last_mtime = mtime
callback()
except:
pass
time.sleep(interval)
thread = Thread(target=watcher, daemon=True)
thread.start()
return watch_id
# =====================================================================
# Status & Info
# =====================================================================
def get_status(self) -> Dict:
"""
Get ExternalAPI status.
Returns:
Status dictionary
"""
return {
'server_running': self._server_running,
'server_port': self._server_port,
'endpoints': len(self._endpoints),
'webhooks': len(self._webhooks),
'api_keys': len(self._api_keys),
'webhook_history': len(self._webhook_history)
}
def get_url(self, path: str = "") -> str:
"""
Get full URL for a path.
Args:
path: URL path
Returns:
Full URL
"""
if self._server_running:
return f"http://127.0.0.1:{self._server_port}/{path.lstrip('/')}"
return ""
# Global instance
_external_api_instance: Optional[ExternalAPI] = None
def get_external_api() -> ExternalAPI:
"""
Get the global ExternalAPI instance.
Returns:
ExternalAPI singleton
Example:
>>> from core.api.external_api import get_external_api
>>> api = get_external_api()
>>> api.start_server(port=8080)
"""
global _external_api_instance
if _external_api_instance is None:
_external_api_instance = ExternalAPI()
return _external_api_instance
# Convenience exports
__all__ = [
'ExternalAPI',
'get_external_api',
'ExternalAPIError',
'WebhookError',
'ServerError',
'WebhookConfig',
'APIEndpoint'
]