EU-Utility/core/plugin_api.py

1346 lines
44 KiB
Python

"""
EU-Utility - Plugin API System
Shared API for cross-plugin communication and common functionality.
Allows plugins to expose APIs and use shared services.
Includes Enhanced Event Bus for typed event handling.
"""
from typing import Dict, Any, Callable, Optional, List, Type, TypeVar
from dataclasses import dataclass
from enum import Enum
import json
from datetime import datetime
from pathlib import Path
# Import Enhanced Event Bus
from core.event_bus import (
get_event_bus,
BaseEvent,
SkillGainEvent,
LootEvent,
DamageEvent,
GlobalEvent,
ChatEvent,
EconomyEvent,
SystemEvent,
EventCategory,
EventFilter,
)
# Import Task Manager
from core.tasks import TaskManager, TaskPriority, TaskStatus, Task
class APIType(Enum):
"""Types of plugin APIs."""
OCR = "ocr" # Screen capture & OCR
LOG = "log" # Chat/game log reading
DATA = "data" # Shared data storage
UTILITY = "utility" # Helper functions
SERVICE = "service" # Background services
EVENT = "event" # Event bus operations
@dataclass
class APIEndpoint:
"""Definition of a plugin API endpoint."""
name: str
api_type: APIType
description: str
handler: Callable
plugin_id: str
version: str = "1.0.0"
class PluginAPI:
"""Central API registry and shared services."""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self.apis: Dict[str, APIEndpoint] = {}
self.services: Dict[str, Any] = {}
self.data_cache: Dict[str, Any] = {}
# Initialize Event Bus
self._event_bus = get_event_bus()
self._initialized = True
# ========== API Registration ==========
def register_api(self, endpoint: APIEndpoint) -> bool:
"""Register a plugin API endpoint."""
try:
api_key = f"{endpoint.plugin_id}:{endpoint.name}"
self.apis[api_key] = endpoint
print(f"[API] Registered: {api_key}")
return True
except Exception as e:
print(f"[API] Failed to register {endpoint.name}: {e}")
return False
def unregister_api(self, plugin_id: str, name: str = None):
"""Unregister plugin APIs."""
if name:
api_key = f"{plugin_id}:{name}"
self.apis.pop(api_key, None)
else:
# Unregister all APIs for this plugin
keys = [k for k in self.apis.keys() if k.startswith(f"{plugin_id}:")]
for key in keys:
del self.apis[key]
def call_api(self, plugin_id: str, name: str, *args, **kwargs) -> Any:
"""Call a plugin API endpoint."""
api_key = f"{plugin_id}:{name}"
endpoint = self.apis.get(api_key)
if not endpoint:
raise ValueError(f"API not found: {api_key}")
try:
return endpoint.handler(*args, **kwargs)
except Exception as e:
print(f"[API] Error calling {api_key}: {e}")
raise
def find_apis(self, api_type: APIType = None) -> List[APIEndpoint]:
"""Find available APIs."""
if api_type:
return [ep for ep in self.apis.values() if ep.api_type == api_type]
return list(self.apis.values())
# ========== OCR Service ==========
def register_ocr_service(self, ocr_handler: Callable):
"""Register the OCR service handler."""
self.services['ocr'] = ocr_handler
def ocr_capture(self, region: tuple = None) -> Dict[str, Any]:
"""Capture screen and perform OCR.
Args:
region: (x, y, width, height) or None for full screen
Returns:
Dict with 'text', 'confidence', 'raw_results'
"""
ocr_service = self.services.get('ocr')
if not ocr_service:
raise RuntimeError("OCR service not available")
try:
return ocr_service(region)
except Exception as e:
print(f"[API] OCR error: {e}")
return {"text": "", "confidence": 0, "error": str(e)}
# ========== Screenshot Service ==========
def register_screenshot_service(self, screenshot_service):
"""Register the screenshot service.
Args:
screenshot_service: ScreenshotService instance
"""
self.services['screenshot'] = screenshot_service
print("[API] Screenshot service registered")
def capture_screen(self, full_screen: bool = True):
"""Capture screenshot.
Args:
full_screen: If True, capture entire screen
Returns:
PIL Image object
"""
screenshot_service = self.services.get('screenshot')
if not screenshot_service:
raise RuntimeError("Screenshot service not available")
try:
return screenshot_service.capture(full_screen=full_screen)
except Exception as e:
print(f"[API] Screenshot error: {e}")
raise
def capture_region(self, x: int, y: int, width: int, height: int):
"""Capture specific screen region.
Args:
x: Left coordinate
y: Top coordinate
width: Region width
height: Region height
Returns:
PIL Image object
"""
screenshot_service = self.services.get('screenshot')
if not screenshot_service:
raise RuntimeError("Screenshot service not available")
try:
return screenshot_service.capture_region(x, y, width, height)
except Exception as e:
print(f"[API] Screenshot region error: {e}")
raise
def get_last_screenshot(self):
"""Get the most recent screenshot.
Returns:
PIL Image or None if no screenshots taken yet
"""
screenshot_service = self.services.get('screenshot')
if not screenshot_service:
return None
return screenshot_service.get_last_screenshot()
def save_screenshot(self, image, filename: Optional[str] = None) -> Path:
"""Save screenshot to file.
Args:
image: PIL Image to save
filename: Optional filename (auto-generated if None)
Returns:
Path to saved file
"""
screenshot_service = self.services.get('screenshot')
if not screenshot_service:
raise RuntimeError("Screenshot service not available")
return screenshot_service.save_screenshot(image, filename)
# ========== Log Service ==========
def register_log_service(self, log_handler: Callable):
"""Register the log reading service."""
self.services['log'] = log_handler
def read_log(self, lines: int = 50, filter_text: str = None) -> List[str]:
"""Read recent game log lines.
Args:
lines: Number of lines to read
filter_text: Optional text filter
Returns:
List of log lines
"""
log_service = self.services.get('log')
if not log_service:
raise RuntimeError("Log service not available")
try:
return log_service(lines, filter_text)
except Exception as e:
print(f"[API] Log error: {e}")
return []
# ========== Shared Data ==========
def get_data(self, key: str, default=None) -> Any:
"""Get shared data."""
return self.data_cache.get(key, default)
def set_data(self, key: str, value: Any):
"""Set shared data."""
self.data_cache[key] = value
# ========== Legacy Event System (Backward Compatibility) ==========
def publish_event(self, event_type: str, data: Dict[str, Any]):
"""Publish an event for other plugins (legacy - use publish_typed)."""
# Store in cache
event_key = f"event:{event_type}"
self.data_cache[event_key] = {
'timestamp': datetime.now().isoformat(),
'data': data
}
# Notify subscribers (if any)
subscribers = self.data_cache.get(f"subscribers:{event_type}", [])
for callback in subscribers:
try:
callback(data)
except Exception as e:
print(f"[API] Subscriber error: {e}")
def subscribe(self, event_type: str, callback: Callable):
"""Subscribe to events (legacy - use subscribe_typed)."""
key = f"subscribers:{event_type}"
if key not in self.data_cache:
self.data_cache[key] = []
self.data_cache[key].append(callback)
# ========== Enhanced Event Bus (Typed Events) ==========
def publish_typed(self, event: BaseEvent) -> None:
"""
Publish a typed event to the Event Bus.
Args:
event: A typed event instance (e.g., SkillGainEvent, LootEvent)
Example:
api.publish_typed(SkillGainEvent(
skill_name="Rifle",
skill_value=25.5,
gain_amount=0.01
))
"""
self._event_bus.publish(event)
def publish_typed_sync(self, event: BaseEvent) -> int:
"""
Publish a typed event synchronously.
Blocks until all callbacks complete.
Returns number of subscribers notified.
Args:
event: A typed event instance
Returns:
Number of subscribers that received the event
"""
return self._event_bus.publish_sync(event)
def subscribe_typed(
self,
event_class: Type[BaseEvent],
callback: Callable,
**filter_kwargs
) -> str:
"""
Subscribe to a specific event type with optional filtering.
Args:
event_class: The event class to subscribe to
(SkillGainEvent, LootEvent, DamageEvent, etc.)
callback: Function to call when matching events occur
**filter_kwargs: Additional filter criteria
- min_damage: Minimum damage threshold (for DamageEvent)
- max_damage: Maximum damage threshold (for DamageEvent)
- mob_types: List of mob names to filter (for LootEvent)
- skill_names: List of skill names to filter (for SkillGainEvent)
- sources: List of event sources to filter
- replay_last: Number of recent events to replay to new subscriber
- predicate: Custom filter function (event) -> bool
Returns:
Subscription ID (use with unsubscribe_typed)
Example:
# Subscribe to all damage events
sub_id = api.subscribe_typed(DamageEvent, on_damage)
# Subscribe to high damage events only
sub_id = api.subscribe_typed(
DamageEvent,
on_big_hit,
min_damage=100
)
# Subscribe to loot from specific mobs with replay
sub_id = api.subscribe_typed(
LootEvent,
on_dragon_loot,
mob_types=["Dragon", "Drake"],
replay_last=10
)
"""
return self._event_bus.subscribe_typed(event_class, callback, **filter_kwargs)
def unsubscribe_typed(self, subscription_id: str) -> bool:
"""
Unsubscribe from typed events.
Args:
subscription_id: The subscription ID returned by subscribe_typed
Returns:
True if subscription was found and removed
"""
return self._event_bus.unsubscribe(subscription_id)
def get_recent_events(
self,
event_type: Type[BaseEvent] = None,
count: int = 100,
category: EventCategory = None
) -> List[BaseEvent]:
"""
Get recent events from history.
Args:
event_type: Filter by event class (e.g., SkillGainEvent)
count: Maximum number of events to return (default 100)
category: Filter by event category
Returns:
List of matching events
Example:
# Get last 50 skill gains
recent_skills = api.get_recent_events(SkillGainEvent, 50)
# Get all recent combat events
combat_events = api.get_recent_events(category=EventCategory.COMBAT)
"""
return self._event_bus.get_recent_events(event_type, count, category)
def get_event_stats(self) -> Dict[str, Any]:
"""
Get Event Bus statistics.
Returns:
Dict with statistics:
- total_published: Total events published
- total_delivered: Total events delivered to subscribers
- active_subscriptions: Current number of active subscriptions
- events_per_minute: Average events per minute
- avg_delivery_ms: Average delivery time in milliseconds
- errors: Number of delivery errors
- top_event_types: Most common event types
"""
return self._event_bus.get_stats()
def create_event_filter(
self,
event_types: List[Type[BaseEvent]] = None,
categories: List[EventCategory] = None,
**kwargs
) -> EventFilter:
"""
Create an event filter for complex subscriptions.
Args:
event_types: List of event classes to match
categories: List of event categories to match
**kwargs: Additional filter criteria
Returns:
EventFilter object for use with subscribe()
"""
return EventFilter(
event_types=event_types,
categories=categories,
**kwargs
)
# ========== Task Service ==========
def register_task_service(self, task_manager: TaskManager) -> None:
"""Register the Task Manager service.
Args:
task_manager: TaskManager instance
"""
self.services['tasks'] = task_manager
print("[API] Task Manager service registered")
def run_in_background(self, func: Callable, *args,
priority: str = 'normal',
on_complete: Callable = None,
on_error: Callable = None,
**kwargs) -> str:
"""Run a function in a background thread.
Args:
func: Function to execute
*args: Positional arguments
priority: 'high', 'normal', or 'low'
on_complete: Callback when task completes (receives result)
on_error: Callback when task fails (receives exception)
**kwargs: Keyword arguments
Returns:
Task ID for tracking/cancellation
Example:
task_id = api.run_in_background(
heavy_calculation,
data,
priority='high',
on_complete=lambda result: print(f"Done: {result}")
)
"""
task_manager = self.services.get('tasks')
if not task_manager:
raise RuntimeError("Task service not available")
priority_map = {
'high': TaskPriority.HIGH,
'normal': TaskPriority.NORMAL,
'low': TaskPriority.LOW
}
task_priority = priority_map.get(priority, TaskPriority.NORMAL)
return task_manager.run_in_thread(
func, *args,
priority=task_priority,
on_complete=on_complete,
on_error=on_error,
**kwargs
)
def schedule_task(self, delay_ms: int, func: Callable, *args,
priority: str = 'normal',
on_complete: Callable = None,
on_error: Callable = None,
periodic: bool = False,
interval_ms: int = None,
**kwargs) -> str:
"""Schedule a task for later execution.
Args:
delay_ms: Delay before first execution (milliseconds)
func: Function to execute
*args: Positional arguments
priority: 'high', 'normal', or 'low'
on_complete: Completion callback
on_error: Error callback
periodic: If True, run repeatedly
interval_ms: Interval between periodic executions
**kwargs: Keyword arguments
Returns:
Task ID
Example:
# One-time delayed task
task_id = api.schedule_task(
5000,
lambda: print("Delayed!"),
on_complete=lambda _: print("Done")
)
# Periodic task (every 10 seconds)
task_id = api.schedule_task(
0,
fetch_data,
periodic=True,
interval_ms=10000,
on_complete=lambda data: update_ui(data)
)
"""
task_manager = self.services.get('tasks')
if not task_manager:
raise RuntimeError("Task service not available")
priority_map = {
'high': TaskPriority.HIGH,
'normal': TaskPriority.NORMAL,
'low': TaskPriority.LOW
}
task_priority = priority_map.get(priority, TaskPriority.NORMAL)
if periodic:
return task_manager.run_periodic(
interval_ms or delay_ms,
func, *args,
priority=task_priority,
on_complete=on_complete,
on_error=on_error,
run_immediately=(delay_ms == 0),
**kwargs
)
else:
return task_manager.run_later(
delay_ms,
func, *args,
priority=task_priority,
on_complete=on_complete,
on_error=on_error,
**kwargs
)
def cancel_task(self, task_id: str) -> bool:
"""Cancel a pending or running task.
Args:
task_id: Task ID to cancel
Returns:
True if cancelled, False if not found or already completed
"""
task_manager = self.services.get('tasks')
if not task_manager:
return False
return task_manager.cancel_task(task_id)
def get_task_status(self, task_id: str) -> Optional[str]:
"""Get the status of a task.
Args:
task_id: Task ID
Returns:
Status string: 'pending', 'running', 'completed', 'failed', 'cancelled', or None
"""
task_manager = self.services.get('tasks')
if not task_manager:
return None
status = task_manager.get_task_status(task_id)
if status:
return status.name.lower()
return None
def wait_for_task(self, task_id: str, timeout: float = None) -> bool:
"""Wait for a task to complete.
Args:
task_id: Task ID to wait for
timeout: Maximum seconds to wait, or None for no timeout
Returns:
True if completed, False if timeout
"""
task_manager = self.services.get('tasks')
if not task_manager:
return False
return task_manager.wait_for_task(task_id, timeout)
def connect_task_signal(self, signal_name: str, callback: Callable) -> bool:
"""Connect to task signals for UI updates.
Args:
signal_name: One of 'completed', 'failed', 'started', 'cancelled', 'periodic'
callback: Function to call when signal emits
Returns:
True if connected
Example:
api.connect_task_signal('completed', on_task_complete)
api.connect_task_signal('failed', on_task_error)
"""
task_manager = self.services.get('tasks')
if not task_manager:
return False
return task_manager.connect_signal(signal_name, callback)
# ========== Utility APIs ==========
def format_ped(self, value: float) -> str:
"""Format PED value."""
return f"{value:.2f} PED"
def format_pec(self, value: float) -> str:
"""Format PEC value."""
return f"{value:.0f} PEC"
def calculate_dpp(self, damage: float, ammo: int, decay: float) -> float:
"""Calculate Damage Per PEC."""
if damage <= 0:
return 0.0
ammo_cost = ammo * 0.01 # PEC
total_cost = ammo_cost + decay
if total_cost <= 0:
return 0.0
return damage / (total_cost / 100) # Convert to PED-based DPP
def calculate_markup(self, price: float, tt: float) -> float:
"""Calculate markup percentage."""
if tt <= 0:
return 0.0
return (price / tt) * 100
# ========== Audio Service ==========
def register_audio_service(self, audio_manager):
"""Register the audio service.
Args:
audio_manager: AudioManager instance
"""
self.services['audio'] = audio_manager
print("[API] Audio service registered")
def play_sound(self, filename_or_key: str, blocking: bool = False) -> bool:
"""Play a sound by key or filename.
Args:
filename_or_key: Sound key ('global', 'hof', 'skill_gain', 'alert', 'error')
or path to file
blocking: If True, wait for sound to complete (default: False)
Returns:
True if sound was queued/played, False on error or if muted
Examples:
api.play_sound('hof') # Play HOF sound
api.play_sound('skill_gain') # Play skill gain sound
api.play_sound('/path/to/custom.wav')
"""
audio = self.services.get('audio')
if not audio:
# Try to get audio manager directly
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception as e:
print(f"[API] Audio service not available: {e}")
return False
try:
return audio.play_sound(filename_or_key, blocking)
except Exception as e:
print(f"[API] Audio play error: {e}")
return False
def set_volume(self, volume: float) -> None:
"""Set global audio volume.
Args:
volume: Volume level from 0.0 (mute) to 1.0 (max)
"""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception as e:
print(f"[API] Audio service not available: {e}")
return
try:
audio.set_volume(volume)
except Exception as e:
print(f"[API] Audio volume error: {e}")
def get_volume(self) -> float:
"""Get current audio volume.
Returns:
Current volume level (0.0 to 1.0)
"""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception:
return 0.0
try:
return audio.get_volume()
except Exception:
return 0.0
def mute_audio(self) -> None:
"""Mute all audio."""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception as e:
print(f"[API] Audio service not available: {e}")
return
try:
audio.mute()
except Exception as e:
print(f"[API] Audio mute error: {e}")
def unmute_audio(self) -> None:
"""Unmute audio."""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception as e:
print(f"[API] Audio service not available: {e}")
return
try:
audio.unmute()
except Exception as e:
print(f"[API] Audio unmute error: {e}")
def toggle_mute_audio(self) -> bool:
"""Toggle audio mute state.
Returns:
New muted state (True if now muted)
"""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception as e:
print(f"[API] Audio service not available: {e}")
return False
try:
return audio.toggle_mute()
except Exception as e:
print(f"[API] Audio toggle mute error: {e}")
return False
def is_audio_muted(self) -> bool:
"""Check if audio is muted.
Returns:
True if audio is muted
"""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception:
return False
try:
return audio.is_muted()
except Exception:
return False
def is_audio_available(self) -> bool:
"""Check if audio service is available.
Returns:
True if audio backend is initialized and working
"""
audio = self.services.get('audio')
if not audio:
try:
from core.audio import get_audio_manager
audio = get_audio_manager()
self.services['audio'] = audio
except Exception:
return False
try:
return audio.is_available()
except Exception:
return False
# ========== Nexus API Service ==========
def register_nexus_service(self, nexus_api) -> None:
"""Register the Nexus API service.
Args:
nexus_api: NexusAPI instance from core.nexus_api
"""
self.services['nexus'] = nexus_api
print("[API] Nexus API service registered")
def nexus_search(self, query: str, entity_type: str = "items", limit: int = 20) -> List[Dict[str, Any]]:
"""Search for entities via Nexus API.
Args:
query: Search query string
entity_type: Type of entity to search (items, mobs, weapons, etc.)
limit: Maximum number of results (default: 20, max: 100)
Returns:
List of search result dictionaries
Example:
# Search for items
results = api.nexus_search("ArMatrix", entity_type="items")
# Search for mobs
mobs = api.nexus_search("Atrox", entity_type="mobs")
# Search for blueprints
bps = api.nexus_search("ArMatrix", entity_type="blueprints")
"""
nexus = self.services.get('nexus')
if not nexus:
try:
from core.nexus_api import get_nexus_api
nexus = get_nexus_api()
self.services['nexus'] = nexus
except Exception as e:
print(f"[API] Nexus API not available: {e}")
return []
try:
# Map entity type to search method
entity_type = entity_type.lower()
if entity_type in ['item', 'items']:
results = nexus.search_items(query, limit)
elif entity_type in ['mob', 'mobs']:
results = nexus.search_mobs(query, limit)
elif entity_type == 'all':
results = nexus.search_all(query, limit)
else:
# For other entity types, use the generic search
# This requires the enhanced nexus_api with entity type support
if hasattr(nexus, 'search_by_type'):
results = nexus.search_by_type(query, entity_type, limit)
else:
# Fallback to generic search
results = nexus.search_all(query, limit)
# Convert SearchResult objects to dicts for plugin compatibility
return [self._search_result_to_dict(r) for r in results]
except Exception as e:
print(f"[API] Nexus search error: {e}")
return []
def _search_result_to_dict(self, result) -> Dict[str, Any]:
"""Convert SearchResult to dictionary."""
if isinstance(result, dict):
return result
return {
'id': getattr(result, 'id', ''),
'name': getattr(result, 'name', ''),
'type': getattr(result, 'type', ''),
'category': getattr(result, 'category', None),
'icon_url': getattr(result, 'icon_url', None),
'data': getattr(result, 'data', {})
}
def nexus_get_item_details(self, item_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific item.
Args:
item_id: The item's unique identifier
Returns:
Dictionary with item details, or None if not found
Example:
details = api.nexus_get_item_details("armatrix_lp-35")
if details:
print(f"TT Value: {details.get('tt_value')} PED")
print(f"Damage: {details.get('damage')}")
"""
nexus = self.services.get('nexus')
if not nexus:
try:
from core.nexus_api import get_nexus_api
nexus = get_nexus_api()
self.services['nexus'] = nexus
except Exception as e:
print(f"[API] Nexus API not available: {e}")
return None
try:
details = nexus.get_item_details(item_id)
if details:
return self._item_details_to_dict(details)
return None
except Exception as e:
print(f"[API] Nexus get_item_details error: {e}")
return None
def _item_details_to_dict(self, details) -> Dict[str, Any]:
"""Convert ItemDetails to dictionary."""
if isinstance(details, dict):
return details
return {
'id': getattr(details, 'id', ''),
'name': getattr(details, 'name', ''),
'description': getattr(details, 'description', None),
'category': getattr(details, 'category', None),
'weight': getattr(details, 'weight', None),
'tt_value': getattr(details, 'tt_value', None),
'decay': getattr(details, 'decay', None),
'ammo_consumption': getattr(details, 'ammo_consumption', None),
'damage': getattr(details, 'damage', None),
'range': getattr(details, 'range', None),
'accuracy': getattr(details, 'accuracy', None),
'durability': getattr(details, 'durability', None),
'requirements': getattr(details, 'requirements', {}),
'materials': getattr(details, 'materials', []),
'raw_data': getattr(details, 'raw_data', {})
}
def nexus_get_market_data(self, item_id: str) -> Optional[Dict[str, Any]]:
"""Get market data for a specific item.
Args:
item_id: The item's unique identifier
Returns:
Dictionary with market data, or None if not found
Example:
market = api.nexus_get_market_data("armatrix_lp-35")
if market:
print(f"Current markup: {market.get('current_markup'):.1f}%")
print(f"24h Volume: {market.get('volume_24h')}")
# Access order book
buy_orders = market.get('buy_orders', [])
sell_orders = market.get('sell_orders', [])
"""
nexus = self.services.get('nexus')
if not nexus:
try:
from core.nexus_api import get_nexus_api
nexus = get_nexus_api()
self.services['nexus'] = nexus
except Exception as e:
print(f"[API] Nexus API not available: {e}")
return None
try:
market = nexus.get_market_data(item_id)
if market:
return self._market_data_to_dict(market)
return None
except Exception as e:
print(f"[API] Nexus get_market_data error: {e}")
return None
def _market_data_to_dict(self, market) -> Dict[str, Any]:
"""Convert MarketData to dictionary."""
if isinstance(market, dict):
return market
return {
'item_id': getattr(market, 'item_id', ''),
'item_name': getattr(market, 'item_name', ''),
'current_markup': getattr(market, 'current_markup', None),
'avg_markup_7d': getattr(market, 'avg_markup_7d', None),
'avg_markup_30d': getattr(market, 'avg_markup_30d', None),
'volume_24h': getattr(market, 'volume_24h', None),
'volume_7d': getattr(market, 'volume_7d', None),
'buy_orders': getattr(market, 'buy_orders', []),
'sell_orders': getattr(market, 'sell_orders', []),
'last_updated': getattr(market, 'last_updated', None),
'raw_data': getattr(market, 'raw_data', {})
}
def nexus_is_available(self) -> bool:
"""Check if Nexus API is available.
Returns:
True if Nexus API service is ready
"""
nexus = self.services.get('nexus')
if not nexus:
try:
from core.nexus_api import get_nexus_api
nexus = get_nexus_api()
self.services['nexus'] = nexus
except Exception:
return False
try:
return nexus.is_available()
except Exception:
return False
# ========== Notification Service ==========
def register_notification_service(self, notification_manager) -> None:
"""Register the Notification service.
Args:
notification_manager: NotificationManager instance from core.notifications
"""
self.services['notifications'] = notification_manager
print("[API] Notification service registered")
def notify(self, title: str, message: str, notification_type: str = "info",
sound: bool = False, duration: int = 5000) -> str:
"""Show a notification toast.
Args:
title: Notification title
message: Notification message
notification_type: Type (info, warning, error, success)
sound: Play sound notification
duration: Display duration in milliseconds
Returns:
Notification ID
"""
notifications = self.services.get('notifications')
if not notifications:
raise RuntimeError("Notification service not available")
# Map string type to NotificationType
type_map = {
'info': 'notify_info',
'warning': 'notify_warning',
'error': 'notify_error',
'success': 'notify_success'
}
method_name = type_map.get(notification_type, 'notify_info')
method = getattr(notifications, method_name, notifications.notify_info)
return method(title, message, sound=sound, duration=duration)
# ========== Clipboard Service ==========
def register_clipboard_service(self, clipboard_manager) -> None:
"""Register the Clipboard service.
Args:
clipboard_manager: ClipboardManager instance from core.clipboard
"""
self.services['clipboard'] = clipboard_manager
print("[API] Clipboard service registered")
def copy_to_clipboard(self, text: str, source: str = "plugin") -> bool:
"""Copy text to clipboard.
Args:
text: Text to copy
source: Source identifier for history
Returns:
True if successful
"""
clipboard = self.services.get('clipboard')
if not clipboard:
raise RuntimeError("Clipboard service not available")
return clipboard.copy(text, source)
def paste_from_clipboard(self) -> str:
"""Paste text from clipboard.
Returns:
Clipboard content or empty string
"""
clipboard = self.services.get('clipboard')
if not clipboard:
raise RuntimeError("Clipboard service not available")
return clipboard.paste()
# ========== Window Manager Service ==========
def register_window_service(self, window_manager) -> None:
"""Register the Window Manager service.
Args:
window_manager: WindowManager instance from core.window_manager
"""
self.services['window'] = window_manager
print("[API] Window Manager service registered")
def get_eu_window(self) -> Optional[Dict[str, Any]]:
"""Get information about the Entropia Universe game window.
Returns:
Dict with window info or None if not found
"""
window_manager = self.services.get('window')
if not window_manager:
return None
info = window_manager.find_eu_window()
if info is None:
return None
# Convert WindowInfo to dict for consistent API
return {
'handle': info.handle,
'title': info.title,
'pid': info.pid,
'rect': info.rect,
'width': info.width,
'height': info.height,
'is_visible': info.is_visible,
'is_focused': info.is_focused,
}
def is_eu_focused(self) -> bool:
"""Check if Entropia Universe window is currently focused.
Returns:
True if EU is the active window
"""
window_manager = self.services.get('window')
if not window_manager:
return False
return window_manager.is_eu_focused()
def is_eu_visible(self) -> bool:
"""Check if Entropia Universe window is visible.
Returns:
True if EU window is visible (not minimized)
"""
window_manager = self.services.get('window')
if not window_manager:
return False
return window_manager.is_eu_visible()
def bring_eu_to_front(self) -> bool:
"""Bring Entropia Universe window to front and focus it.
Returns:
True if successful
"""
window_manager = self.services.get('window')
if not window_manager:
return False
return window_manager.bring_eu_to_front()
# ========== Data Store Service ==========
def register_data_service(self, data_store) -> None:
"""Register the Data Store service.
Args:
data_store: DataStore instance from core.data_store
"""
self.services['data'] = data_store
print("[API] Data Store service registered")
def save_data(self, plugin_id: str, key: str, data: Any) -> bool:
"""Save data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
data_store = self.services.get('data')
if not data_store:
raise RuntimeError("Data store not available")
return data_store.save(plugin_id, key, data)
def load_data(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
data_store = self.services.get('data')
if not data_store:
raise RuntimeError("Data store not available")
return data_store.load(plugin_id, key, default)
# ========== HTTP Client Service ==========
def register_http_service(self, http_client) -> None:
"""Register the HTTP client service.
Args:
http_client: HTTPClient instance from core.http_client
"""
self.services['http'] = http_client
print("[API] HTTP Client service registered")
def http_get(self, url: str, cache_ttl: int = 300, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]:
"""Make an HTTP GET request with caching.
Args:
url: The URL to fetch
cache_ttl: Cache TTL in seconds (default: 300 = 5 minutes)
headers: Additional headers
**kwargs: Additional arguments for requests
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
http_client = self.services.get('http')
if not http_client:
raise RuntimeError("HTTP client not available")
return http_client.get(url, cache_ttl=cache_ttl, headers=headers, **kwargs)
# Singleton instance
_plugin_api = None
def get_api() -> PluginAPI:
"""Get the global PluginAPI instance."""
global _plugin_api
if _plugin_api is None:
_plugin_api = PluginAPI()
return _plugin_api
# ========== Decorator for easy API registration ==========
def register_api(name: str, api_type: APIType, description: str = ""):
"""Decorator to register a plugin method as an API.
Usage:
@register_api("scan_skills", APIType.OCR, "Scan skills window")
def scan_skills(self):
...
"""
def decorator(func):
func._api_info = {
'name': name,
'api_type': api_type,
'description': description
}
return func
return decorator
# ========== Event Type Exports ==========
__all__ = [
# API Classes
'PluginAPI',
'APIType',
'APIEndpoint',
'get_api',
'register_api',
# Event Bus Classes
'BaseEvent',
'SkillGainEvent',
'LootEvent',
'DamageEvent',
'GlobalEvent',
'ChatEvent',
'EconomyEvent',
'SystemEvent',
'EventCategory',
'EventFilter',
]