feat: Core services swarm implementation (WIP)

Core services created by swarm agents:
- core/nexus_api.py - Entropia Nexus API client
- core/data_store.py - Plugin data persistence
- core/notifications.py - Toast notification system
- core/window_manager.py - Window management utilities
- core/http_client.py - HTTP client with caching
- core/plugin_api.py - Enhanced with new services

Services integrate with PluginAPI for plugin access.
TODO: Final integration and testing.
This commit is contained in:
LemonNexus 2026-02-13 19:13:10 +00:00
parent f1e2076570
commit 2d999a91f6
9 changed files with 3545 additions and 0 deletions

View File

@ -0,0 +1,316 @@
"""
EU-Utility - Data Store Service
Thread-safe persistent data storage for plugins.
Provides file locking, auto-backup, and singleton access.
"""
import json
import fcntl
import shutil
import threading
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime
from collections import OrderedDict
class DataStore:
"""
Singleton data persistence service for plugins.
Features:
- Thread-safe file operations with file locking
- Auto-backup on write (keeps last 5 versions)
- Per-plugin JSON storage
- Auto-create directories
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, data_dir: str = "data/plugins"):
if self._initialized:
return
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
# Memory cache for frequently accessed data
self._cache: Dict[str, Dict[str, Any]] = {}
self._cache_lock = threading.Lock()
# Backup settings
self.max_backups = 5
self._initialized = True
def _get_plugin_file(self, plugin_id: str) -> Path:
"""Get the storage file path for a plugin."""
# Sanitize plugin_id to create a safe filename
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
return self.data_dir / f"{safe_name}.json"
def _get_backup_dir(self, plugin_id: str) -> Path:
"""Get the backup directory for a plugin."""
safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_")
backup_dir = self.data_dir / ".backups" / safe_name
backup_dir.mkdir(parents=True, exist_ok=True)
return backup_dir
def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]:
"""Load all data for a plugin from disk."""
# Check cache first
with self._cache_lock:
if plugin_id in self._cache:
return self._cache[plugin_id].copy()
file_path = self._get_plugin_file(plugin_id)
if not file_path.exists():
return {}
try:
with open(file_path, 'r', encoding='utf-8') as f:
# Acquire shared lock for reading
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
try:
data = json.load(f)
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return data
except (json.JSONDecodeError, IOError) as e:
print(f"[DataStore] Error loading data for {plugin_id}: {e}")
return {}
def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool:
"""Save all data for a plugin to disk with backup."""
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup if file exists
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Write to temp file first, then move (atomic operation)
temp_path = file_path.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
# Acquire exclusive lock for writing
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
try:
json.dump(data, f, indent=2, ensure_ascii=False)
f.flush()
import os
os.fsync(f.fileno())
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
# Atomic move
temp_path.replace(file_path)
# Update cache
with self._cache_lock:
self._cache[plugin_id] = data.copy()
return True
except IOError as e:
print(f"[DataStore] Error saving data for {plugin_id}: {e}")
# Clean up temp file if exists
temp_path = file_path.with_suffix('.tmp')
if temp_path.exists():
temp_path.unlink()
return False
def _create_backup(self, plugin_id: str, file_path: Path):
"""Create a backup of the current data file."""
backup_dir = self._get_backup_dir(plugin_id)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"{timestamp}.json"
try:
shutil.copy2(file_path, backup_path)
self._cleanup_old_backups(backup_dir)
except IOError as e:
print(f"[DataStore] Error creating backup for {plugin_id}: {e}")
def _cleanup_old_backups(self, backup_dir: Path):
"""Remove old backups, keeping only the last N versions."""
try:
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime)
while len(backups) > self.max_backups:
old_backup = backups.pop(0)
old_backup.unlink()
except IOError as e:
print(f"[DataStore] Error cleaning up backups: {e}")
def save(self, plugin_id: str, key: str, data: Any) -> bool:
"""
Save data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
plugin_data[key] = data
return self._save_plugin_data(plugin_id, plugin_data)
def load(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""
Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
plugin_data = self._load_plugin_data(plugin_id)
return plugin_data.get(key, default)
def delete(self, plugin_id: str, key: str) -> bool:
"""
Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
plugin_data = self._load_plugin_data(plugin_id)
if key in plugin_data:
del plugin_data[key]
return self._save_plugin_data(plugin_id, plugin_data)
return False
def get_all_keys(self, plugin_id: str) -> list:
"""
Get all keys stored for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of keys
"""
plugin_data = self._load_plugin_data(plugin_id)
return list(plugin_data.keys())
def clear_plugin(self, plugin_id: str) -> bool:
"""
Clear all data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
True if successful, False otherwise
"""
file_path = self._get_plugin_file(plugin_id)
# Create backup before clearing
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Clear cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
# Remove file
try:
if file_path.exists():
file_path.unlink()
return True
except IOError as e:
print(f"[DataStore] Error clearing data for {plugin_id}: {e}")
return False
def get_backups(self, plugin_id: str) -> list:
"""
Get list of available backups for a plugin.
Args:
plugin_id: Unique identifier for the plugin
Returns:
List of backup file paths
"""
backup_dir = self._get_backup_dir(plugin_id)
if not backup_dir.exists():
return []
backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True)
return [str(b) for b in backups]
def restore_backup(self, plugin_id: str, backup_path: str) -> bool:
"""
Restore data from a backup.
Args:
plugin_id: Unique identifier for the plugin
backup_path: Path to the backup file
Returns:
True if successful, False otherwise
"""
backup_file = Path(backup_path)
if not backup_file.exists():
print(f"[DataStore] Backup not found: {backup_path}")
return False
file_path = self._get_plugin_file(plugin_id)
try:
# Create backup of current state before restoring
if file_path.exists():
self._create_backup(plugin_id, file_path)
# Copy backup to main file
shutil.copy2(backup_file, file_path)
# Invalidate cache
with self._cache_lock:
if plugin_id in self._cache:
del self._cache[plugin_id]
return True
except IOError as e:
print(f"[DataStore] Error restoring backup for {plugin_id}: {e}")
return False
# Singleton instance
_data_store = None
_data_store_lock = threading.Lock()
def get_data_store() -> DataStore:
"""Get the global DataStore instance."""
global _data_store
if _data_store is None:
with _data_store_lock:
if _data_store is None:
_data_store = DataStore()
return _data_store

View File

@ -0,0 +1,672 @@
"""
EU-Utility - Enhanced Event Bus
Core service for typed event handling with:
- Typed events using dataclasses
- Event filtering (mob types, damage thresholds, etc.)
- Event persistence (last 1000 events)
- Event replay (replay last N events to new subscribers)
- Async event handling (non-blocking publishers)
- Event statistics (events per minute, etc.)
"""
import asyncio
import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, Set
from functools import wraps
import copy
# ========== Event Types ==========
class EventCategory(Enum):
"""Event categories for organization."""
SKILL = auto()
LOOT = auto()
COMBAT = auto()
ECONOMY = auto()
SYSTEM = auto()
CHAT = auto()
GLOBAL = auto()
@dataclass(frozen=True)
class BaseEvent:
"""Base class for all typed events."""
timestamp: datetime = field(default_factory=datetime.now)
source: str = "unknown"
@property
def event_type(self) -> str:
"""Return the event type name."""
return self.__class__.__name__
@property
def category(self) -> EventCategory:
"""Return the event category."""
return EventCategory.SYSTEM
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary."""
return {
'event_type': self.event_type,
'category': self.category.name,
**asdict(self)
}
@dataclass(frozen=True)
class SkillGainEvent(BaseEvent):
"""Event fired when a skill increases."""
skill_name: str = ""
skill_value: float = 0.0
gain_amount: float = 0.0
@property
def category(self) -> EventCategory:
return EventCategory.SKILL
@dataclass(frozen=True)
class LootEvent(BaseEvent):
"""Event fired when loot is received."""
mob_name: str = ""
items: List[Dict[str, Any]] = field(default_factory=list)
total_tt_value: float = 0.0
position: Optional[tuple] = None # (x, y, z)
@property
def category(self) -> EventCategory:
return EventCategory.LOOT
def get_item_names(self) -> List[str]:
"""Get list of item names from loot."""
return [item.get('name', 'Unknown') for item in self.items]
@dataclass(frozen=True)
class DamageEvent(BaseEvent):
"""Event fired when damage is dealt or received."""
damage_amount: float = 0.0
damage_type: str = "" # e.g., "impact", "penetration", "burn"
is_critical: bool = False
target_name: str = ""
attacker_name: str = ""
is_outgoing: bool = True # True if player dealt damage
@property
def category(self) -> EventCategory:
return EventCategory.COMBAT
def is_high_damage(self, threshold: float = 100.0) -> bool:
"""Check if damage exceeds threshold."""
return self.damage_amount >= threshold
@dataclass(frozen=True)
class GlobalEvent(BaseEvent):
"""Event for global announcements."""
player_name: str = ""
achievement_type: str = "" # e.g., "hof", "ath", "discovery"
value: float = 0.0
item_name: Optional[str] = None
@property
def category(self) -> EventCategory:
return EventCategory.GLOBAL
@dataclass(frozen=True)
class ChatEvent(BaseEvent):
"""Event for chat messages."""
channel: str = "" # "main", "team", "society", etc.
sender: str = ""
message: str = ""
@property
def category(self) -> EventCategory:
return EventCategory.CHAT
@dataclass(frozen=True)
class EconomyEvent(BaseEvent):
"""Event for economic transactions."""
transaction_type: str = "" # "sale", "purchase", "deposit", "withdraw"
amount: float = 0.0
currency: str = "PED"
description: str = ""
@property
def category(self) -> EventCategory:
return EventCategory.ECONOMY
@dataclass(frozen=True)
class SystemEvent(BaseEvent):
"""Event for system notifications."""
message: str = ""
severity: str = "info" # "debug", "info", "warning", "error", "critical"
@property
def category(self) -> EventCategory:
return EventCategory.SYSTEM
# Type variable for event types
T = TypeVar('T', bound=BaseEvent)
# ========== Event Filter ==========
@dataclass
class EventFilter:
"""Filter criteria for event subscription."""
event_types: Optional[List[Type[BaseEvent]]] = None
categories: Optional[List[EventCategory]] = None
min_damage: Optional[float] = None
max_damage: Optional[float] = None
mob_types: Optional[List[str]] = None
skill_names: Optional[List[str]] = None
sources: Optional[List[str]] = None
custom_predicate: Optional[Callable[[BaseEvent], bool]] = None
def matches(self, event: BaseEvent) -> bool:
"""Check if an event matches this filter."""
# Check event type
if self.event_types is not None:
if not any(isinstance(event, et) for et in self.event_types):
return False
# Check category
if self.categories is not None:
if event.category not in self.categories:
return False
# Check source
if self.sources is not None:
if event.source not in self.sources:
return False
# Type-specific filters
if isinstance(event, DamageEvent):
if self.min_damage is not None and event.damage_amount < self.min_damage:
return False
if self.max_damage is not None and event.damage_amount > self.max_damage:
return False
if isinstance(event, LootEvent):
if self.mob_types is not None:
if event.mob_name not in self.mob_types:
return False
if isinstance(event, SkillGainEvent):
if self.skill_names is not None:
if event.skill_name not in self.skill_names:
return False
# Custom predicate
if self.custom_predicate is not None:
return self.custom_predicate(event)
return True
# ========== Subscription ==========
@dataclass
class EventSubscription:
"""Represents an event subscription."""
id: str
callback: Callable[[BaseEvent], Any]
event_filter: EventFilter
replay_history: bool
replay_count: int
created_at: datetime = field(default_factory=datetime.now)
event_count: int = 0
last_received: Optional[datetime] = None
def matches(self, event: BaseEvent) -> bool:
"""Check if event matches this subscription."""
return self.event_filter.matches(event)
# ========== Event Statistics ==========
@dataclass
class EventStats:
"""Statistics for event bus performance."""
total_events_published: int = 0
total_events_delivered: int = 0
total_subscriptions: int = 0
active_subscriptions: int = 0
events_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
events_by_category: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
events_per_minute: float = 0.0
average_delivery_time_ms: float = 0.0
errors: int = 0
_start_time: datetime = field(default_factory=datetime.now)
_minute_window: deque = field(default_factory=lambda: deque(maxlen=60))
_delivery_times: deque = field(default_factory=lambda: deque(maxlen=100))
def record_event_published(self, event: BaseEvent):
"""Record an event publication."""
self.total_events_published += 1
self.events_by_type[event.event_type] += 1
self.events_by_category[event.category.name] += 1
self._minute_window.append(time.time())
self._update_epm()
def record_event_delivered(self, delivery_time_ms: float):
"""Record successful event delivery."""
self.total_events_delivered += 1
self._delivery_times.append(delivery_time_ms)
if len(self._delivery_times) > 0:
self.average_delivery_time_ms = sum(self._delivery_times) / len(self._delivery_times)
def record_error(self):
"""Record a delivery error."""
self.errors += 1
def _update_epm(self):
"""Update events per minute calculation."""
now = time.time()
# Count events in the last 60 seconds
recent = [t for t in self._minute_window if now - t < 60]
self.events_per_minute = len(recent)
def get_summary(self) -> Dict[str, Any]:
"""Get statistics summary."""
uptime = datetime.now() - self._start_time
return {
'total_published': self.total_events_published,
'total_delivered': self.total_events_delivered,
'active_subscriptions': self.active_subscriptions,
'events_per_minute': round(self.events_per_minute, 2),
'avg_delivery_ms': round(self.average_delivery_time_ms, 2),
'errors': self.errors,
'uptime_seconds': uptime.total_seconds(),
'top_event_types': dict(sorted(
self.events_by_type.items(),
key=lambda x: x[1],
reverse=True
)[:5])
}
# ========== Event Bus ==========
class EventBus:
"""
Enhanced Event Bus for EU-Utility.
Features:
- Typed events using dataclasses
- Event filtering with flexible criteria
- Event persistence (configurable history size)
- Event replay for new subscribers
- Async event handling
- Event statistics
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, max_history: int = 1000):
if self._initialized:
return
self.max_history = max_history
self._history: deque = deque(maxlen=max_history)
self._subscriptions: Dict[str, EventSubscription] = {}
self._subscription_counter = 0
self._stats = EventStats()
self._lock = threading.RLock()
self._async_loop: Optional[asyncio.AbstractEventLoop] = None
self._async_thread: Optional[threading.Thread] = None
self._initialized = True
def _ensure_async_loop(self):
"""Ensure the async event loop is running."""
if self._async_loop is None or self._async_loop.is_closed():
self._start_async_loop()
def _start_async_loop(self):
"""Start the async event loop in a separate thread."""
def run_loop():
self._async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._async_loop)
self._async_loop.run_forever()
self._async_thread = threading.Thread(target=run_loop, daemon=True)
self._async_thread.start()
def publish(self, event: BaseEvent) -> None:
"""
Publish an event to all matching subscribers.
Non-blocking - returns immediately.
"""
with self._lock:
# Add to history
self._history.append(event)
# Update stats
self._stats.record_event_published(event)
# Get matching subscriptions
matching_subs = [
sub for sub in self._subscriptions.values()
if sub.matches(event)
]
# Deliver outside the lock to prevent blocking
for sub in matching_subs:
self._deliver_async(sub, event)
def publish_sync(self, event: BaseEvent) -> int:
"""
Publish an event synchronously.
Blocks until all callbacks complete.
Returns number of subscribers notified.
"""
with self._lock:
# Add to history
self._history.append(event)
# Update stats
self._stats.record_event_published(event)
# Get matching subscriptions
matching_subs = [
sub for sub in self._subscriptions.values()
if sub.matches(event)
]
# Deliver synchronously
count = 0
for sub in matching_subs:
if self._deliver_sync(sub, event):
count += 1
return count
def _deliver_async(self, subscription: EventSubscription, event: BaseEvent):
"""Deliver an event asynchronously."""
self._ensure_async_loop()
def deliver():
start = time.perf_counter()
try:
subscription.callback(event)
subscription.event_count += 1
subscription.last_received = datetime.now()
elapsed = (time.perf_counter() - start) * 1000
self._stats.record_event_delivered(elapsed)
except Exception as e:
self._stats.record_error()
print(f"[EventBus] Error delivering to {subscription.id}: {e}")
if self._async_loop:
self._async_loop.call_soon_threadsafe(deliver)
def _deliver_sync(self, subscription: EventSubscription, event: BaseEvent) -> bool:
"""Deliver an event synchronously."""
start = time.perf_counter()
try:
subscription.callback(event)
subscription.event_count += 1
subscription.last_received = datetime.now()
elapsed = (time.perf_counter() - start) * 1000
self._stats.record_event_delivered(elapsed)
return True
except Exception as e:
self._stats.record_error()
print(f"[EventBus] Error delivering to {subscription.id}: {e}")
return False
def subscribe(
self,
callback: Callable[[BaseEvent], Any],
event_filter: Optional[EventFilter] = None,
replay_history: bool = False,
replay_count: int = 100,
event_types: Optional[List[Type[BaseEvent]]] = None
) -> str:
"""
Subscribe to events with optional filtering.
Args:
callback: Function to call when matching events occur
event_filter: Filter criteria for events
replay_history: Whether to replay recent events to new subscriber
replay_count: Number of recent events to replay
event_types: Shorthand for simple type-based filtering
Returns:
Subscription ID (use for unsubscribe)
"""
with self._lock:
self._subscription_counter += 1
sub_id = f"sub_{self._subscription_counter}"
# Build filter from shorthand if provided
if event_filter is None and event_types is not None:
event_filter = EventFilter(event_types=event_types)
elif event_filter is None:
event_filter = EventFilter()
subscription = EventSubscription(
id=sub_id,
callback=callback,
event_filter=event_filter,
replay_history=replay_history,
replay_count=min(replay_count, self.max_history)
)
self._subscriptions[sub_id] = subscription
self._stats.active_subscriptions = len(self._subscriptions)
self._stats.total_subscriptions += 1
# Replay history if requested (outside lock)
if replay_history:
self._replay_history(subscription)
return sub_id
def subscribe_typed(
self,
event_class: Type[T],
callback: Callable[[T], Any],
**filter_kwargs
) -> str:
"""
Subscribe to a specific event type with optional filtering.
Args:
event_class: The event class to subscribe to
callback: Function to call with typed event
**filter_kwargs: Additional filter criteria
- min_damage: Minimum damage threshold
- max_damage: Maximum damage threshold
- mob_types: List of mob names to filter
- skill_names: List of skill names to filter
- sources: List of event sources to filter
- replay_last: Number of events to replay
Returns:
Subscription ID
"""
# Build filter from kwargs
filter_args = {'event_types': [event_class]}
if 'min_damage' in filter_kwargs:
filter_args['min_damage'] = filter_kwargs.pop('min_damage')
if 'max_damage' in filter_kwargs:
filter_args['max_damage'] = filter_kwargs.pop('max_damage')
if 'mob_types' in filter_kwargs:
filter_args['mob_types'] = filter_kwargs.pop('mob_types')
if 'skill_names' in filter_kwargs:
filter_args['skill_names'] = filter_kwargs.pop('skill_names')
if 'sources' in filter_kwargs:
filter_args['sources'] = filter_kwargs.pop('sources')
# Handle custom predicate
if 'predicate' in filter_kwargs:
filter_args['custom_predicate'] = filter_kwargs.pop('predicate')
event_filter = EventFilter(**filter_args)
replay_count = filter_kwargs.pop('replay_last', 0)
# Create wrapper to ensure type safety
def typed_callback(event: BaseEvent):
if isinstance(event, event_class):
callback(event)
return self.subscribe(
callback=typed_callback,
event_filter=event_filter,
replay_history=replay_count > 0,
replay_count=replay_count
)
def _replay_history(self, subscription: EventSubscription):
"""Replay recent events to a subscriber."""
with self._lock:
# Get recent events that match the filter
events_to_replay = [
e for e in list(self._history)[-subscription.replay_count:]
if subscription.matches(e)
]
# Deliver each event
for event in events_to_replay:
self._deliver_async(subscription, event)
def unsubscribe(self, subscription_id: str) -> bool:
"""Unsubscribe from events."""
with self._lock:
if subscription_id in self._subscriptions:
del self._subscriptions[subscription_id]
self._stats.active_subscriptions = len(self._subscriptions)
return True
return False
def get_recent_events(
self,
event_type: Optional[Type[T]] = None,
count: int = 100,
category: Optional[EventCategory] = None
) -> List[BaseEvent]:
"""
Get recent events from history.
Args:
event_type: Filter by event class
count: Maximum number of events to return
category: Filter by event category
Returns:
List of matching events
"""
with self._lock:
events = list(self._history)
# Apply filters
if event_type is not None:
events = [e for e in events if isinstance(e, event_type)]
if category is not None:
events = [e for e in events if e.category == category]
# Return most recent
return events[-count:]
def get_events_by_time_range(
self,
start: datetime,
end: Optional[datetime] = None
) -> List[BaseEvent]:
"""Get events within a time range."""
if end is None:
end = datetime.now()
with self._lock:
return [
e for e in self._history
if start <= e.timestamp <= end
]
def get_stats(self) -> Dict[str, Any]:
"""Get event bus statistics."""
return self._stats.get_summary()
def clear_history(self):
"""Clear event history."""
with self._lock:
self._history.clear()
def shutdown(self):
"""Shutdown the event bus and cleanup resources."""
with self._lock:
self._subscriptions.clear()
self._history.clear()
self._stats.active_subscriptions = 0
if self._async_loop and self._async_loop.is_running():
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
if self._async_thread and self._async_thread.is_alive():
self._async_thread.join(timeout=2.0)
# Singleton instance
_event_bus = None
def get_event_bus() -> EventBus:
"""Get the global EventBus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def reset_event_bus():
"""Reset the global EventBus instance (mainly for testing)."""
global _event_bus
if _event_bus is not None:
_event_bus.shutdown()
_event_bus = None
# ========== Decorators ==========
def on_event(
event_class: Type[T],
**filter_kwargs
):
"""
Decorator for event subscription.
Usage:
@on_event(DamageEvent, min_damage=100)
def handle_big_damage(event: DamageEvent):
print(f"Big hit: {event.damage_amount}")
"""
def decorator(func):
bus = get_event_bus()
bus.subscribe_typed(event_class, func, **filter_kwargs)
func._event_subscription = True
return func
return decorator

View File

@ -0,0 +1,532 @@
"""
EU-Utility - HTTP Client with Caching
Thread-safe HTTP client with disk-based caching, automatic retries,
rate limiting, and Cache-Control header support.
"""
import os
import json
import hashlib
import time
import threading
from pathlib import Path
from typing import Dict, Any, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
try:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
print("Warning: 'requests' library not installed. HTTP client will not function.")
@dataclass
class CacheEntry:
"""Represents a cached HTTP response."""
url: str
status_code: int
headers: Dict[str, str]
content: bytes
cached_at: float
expires_at: float
cache_control: Optional[str] = None
etag: Optional[str] = None
last_modified: Optional[str] = None
class HTTPClient:
"""
Thread-safe singleton HTTP client with caching support.
Features:
- Disk-based response caching
- TTL support (time-to-live)
- Auto-retry with exponential backoff
- Rate limiting between requests
- Cache-Control header respect
- Thread-safe operations
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(
self,
cache_dir: str = "cache/http",
default_cache_ttl: int = 3600,
rate_limit_delay: float = 0.0,
max_retries: int = 3,
backoff_factor: float = 0.5,
respect_cache_control: bool = True,
default_headers: Optional[Dict[str, str]] = None
):
"""
Initialize the HTTP client.
Args:
cache_dir: Directory for disk cache
default_cache_ttl: Default TTL in seconds
rate_limit_delay: Minimum delay between requests in seconds
max_retries: Maximum number of retries on failure
backoff_factor: Backoff factor for retries
respect_cache_control: Whether to respect Cache-Control headers
default_headers: Default headers for all requests
"""
if self._initialized:
return
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library is required for HTTP client")
# Settings
self.cache_dir = Path(cache_dir)
self.default_cache_ttl = default_cache_ttl
self.rate_limit_delay = rate_limit_delay
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.respect_cache_control = respect_cache_control
self.default_headers = default_headers or {
'User-Agent': 'EU-Utility/1.0'
}
# Thread safety
self._cache_lock = threading.RLock()
self._request_lock = threading.Lock()
self._last_request_time = 0
# Initialize cache directory
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Initialize session with retries
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self._initialized = True
print(f"[HTTP] Client initialized (cache: {self.cache_dir})")
def _generate_cache_key(self, url: str, params: Optional[Dict] = None) -> str:
"""Generate a cache key from URL and parameters."""
key_string = url
if params:
# Sort params for consistent hashing
param_str = json.dumps(params, sort_keys=True)
key_string += "|" + param_str
return hashlib.sha256(key_string.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> Path:
"""Get the file path for a cache entry."""
# Split into subdirectories for better file system performance
return self.cache_dir / cache_key[:2] / cache_key[2:4] / f"{cache_key}.json"
def _parse_cache_control(self, header: str) -> Optional[int]:
"""Parse Cache-Control header to extract max-age."""
if not header:
return None
try:
for directive in header.split(','):
directive = directive.strip().lower()
if directive.startswith('max-age='):
return int(directive.split('=')[1])
elif directive == 'no-cache' or directive == 'no-store':
return 0
except (ValueError, IndexError):
pass
return None
def _load_cache_entry(self, cache_key: str) -> Optional[CacheEntry]:
"""Load a cache entry from disk."""
cache_path = self._get_cache_path(cache_key)
if not cache_path.exists():
return None
try:
with self._cache_lock:
with open(cache_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Convert content from base64 if stored that way
content = data.get('content', '')
if isinstance(content, str):
import base64
content = base64.b64decode(content)
return CacheEntry(
url=data['url'],
status_code=data['status_code'],
headers=data['headers'],
content=content,
cached_at=data['cached_at'],
expires_at=data['expires_at'],
cache_control=data.get('cache_control'),
etag=data.get('etag'),
last_modified=data.get('last_modified')
)
except Exception as e:
print(f"[HTTP] Cache load error: {e}")
return None
def _save_cache_entry(self, cache_key: str, entry: CacheEntry):
"""Save a cache entry to disk."""
cache_path = self._get_cache_path(cache_key)
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
with self._cache_lock:
# Encode content as base64 for JSON serialization
import base64
content_b64 = base64.b64encode(entry.content).decode('utf-8')
data = {
'url': entry.url,
'status_code': entry.status_code,
'headers': entry.headers,
'content': content_b64,
'cached_at': entry.cached_at,
'expires_at': entry.expires_at,
'cache_control': entry.cache_control,
'etag': entry.etag,
'last_modified': entry.last_modified
}
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
print(f"[HTTP] Cache save error: {e}")
def _is_cache_valid(self, entry: CacheEntry) -> bool:
"""Check if a cache entry is still valid."""
return time.time() < entry.expires_at
def _apply_rate_limit(self):
"""Apply rate limiting between requests."""
if self.rate_limit_delay <= 0:
return
with self._request_lock:
elapsed = time.time() - self._last_request_time
if elapsed < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - elapsed
time.sleep(sleep_time)
self._last_request_time = time.time()
def _make_request(
self,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
**kwargs
) -> requests.Response:
"""Make an HTTP request with rate limiting and retries."""
self._apply_rate_limit()
# Merge headers
request_headers = self.default_headers.copy()
if headers:
request_headers.update(headers)
# Make request
response = self.session.request(
method=method,
url=url,
headers=request_headers,
**kwargs
)
response.raise_for_status()
return response
def get(
self,
url: str,
cache_ttl: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
use_cache: bool = True,
**kwargs
) -> Dict[str, Any]:
"""
Perform a GET request with caching.
Args:
url: The URL to fetch
cache_ttl: Cache TTL in seconds (None = use default)
headers: Additional headers
params: URL parameters
use_cache: Whether to use cache
**kwargs: Additional arguments for requests
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library not available")
cache_key = self._generate_cache_key(url, params)
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
# Check cache if enabled
if use_cache and ttl > 0:
cached = self._load_cache_entry(cache_key)
if cached and self._is_cache_valid(cached):
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True
}
# Make request
try:
response = self._make_request('GET', url, headers=headers, params=params, **kwargs)
content = response.content
# Parse JSON if possible
json_data = None
try:
json_data = response.json()
except ValueError:
pass
# Cache the response
if use_cache and ttl > 0:
# Check Cache-Control header
cache_control = response.headers.get('Cache-Control', '')
cache_ttl_override = self._parse_cache_control(cache_control)
if cache_ttl_override is not None:
if self.respect_cache_control:
if cache_ttl_override == 0:
# Don't cache
pass
else:
ttl = cache_ttl_override
if ttl > 0:
entry = CacheEntry(
url=url,
status_code=response.status_code,
headers=dict(response.headers),
content=content,
cached_at=time.time(),
expires_at=time.time() + ttl,
cache_control=cache_control,
etag=response.headers.get('ETag'),
last_modified=response.headers.get('Last-Modified')
)
self._save_cache_entry(cache_key, entry)
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': content,
'text': content.decode('utf-8', errors='replace'),
'json': json_data,
'from_cache': False
}
except requests.exceptions.RequestException as e:
# Try to return stale cache on failure
if use_cache:
cached = self._load_cache_entry(cache_key)
if cached:
print(f"[HTTP] Returning stale cache for {url}")
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True,
'stale': True
}
raise
def post(
self,
url: str,
data: Optional[Union[Dict, str, bytes]] = None,
json: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None,
use_cache: bool = False,
cache_ttl: Optional[int] = None,
**kwargs
) -> Dict[str, Any]:
"""
Perform a POST request.
Args:
url: The URL to post to
data: Form data, bytes, or string
json: JSON data (will be serialized)
headers: Additional headers
use_cache: Whether to cache the response
cache_ttl: Cache TTL in seconds
**kwargs: Additional arguments for requests
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library not available")
cache_key = self._generate_cache_key(url, data or json)
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
# Check cache if enabled (rare for POST but possible)
if use_cache and ttl > 0:
cached = self._load_cache_entry(cache_key)
if cached and self._is_cache_valid(cached):
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True
}
# Make request
response = self._make_request('POST', url, headers=headers, data=data, json=json, **kwargs)
content = response.content
# Parse JSON if possible
json_data = None
try:
json_data = response.json()
except ValueError:
pass
# Cache the response if enabled
if use_cache and ttl > 0:
entry = CacheEntry(
url=url,
status_code=response.status_code,
headers=dict(response.headers),
content=content,
cached_at=time.time(),
expires_at=time.time() + ttl
)
self._save_cache_entry(cache_key, entry)
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': content,
'text': content.decode('utf-8', errors='replace'),
'json': json_data,
'from_cache': False
}
def clear_cache(self):
"""Clear all cached responses."""
with self._cache_lock:
if self.cache_dir.exists():
import shutil
shutil.rmtree(self.cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
print("[HTTP] Cache cleared")
def invalidate_cache(self, url_pattern: str):
"""
Invalidate cache entries matching a URL pattern.
Args:
url_pattern: Substring to match in URLs
"""
with self._cache_lock:
count = 0
for cache_file in self.cache_dir.rglob("*.json"):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if url_pattern in data.get('url', ''):
cache_file.unlink()
count += 1
except Exception:
pass
print(f"[HTTP] Invalidated {count} cache entries matching '{url_pattern}'")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self._cache_lock:
total_entries = 0
total_size = 0
valid_entries = 0
for cache_file in self.cache_dir.rglob("*.json"):
try:
total_entries += 1
total_size += cache_file.stat().st_size
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if time.time() < data.get('expires_at', 0):
valid_entries += 1
except Exception:
pass
return {
'total_entries': total_entries,
'valid_entries': valid_entries,
'expired_entries': total_entries - valid_entries,
'total_size_bytes': total_size,
'cache_dir': str(self.cache_dir)
}
# Singleton instance
_http_client = None
def get_http_client(
cache_dir: str = "cache/http",
default_cache_ttl: int = 3600,
rate_limit_delay: float = 0.0,
max_retries: int = 3,
backoff_factor: float = 0.5,
respect_cache_control: bool = True,
default_headers: Optional[Dict[str, str]] = None
) -> HTTPClient:
"""
Get or create the global HTTP client instance.
First call initializes the client with the provided parameters.
Subsequent calls return the existing instance (parameters ignored).
"""
global _http_client
if _http_client is None:
_http_client = HTTPClient(
cache_dir=cache_dir,
default_cache_ttl=default_cache_ttl,
rate_limit_delay=rate_limit_delay,
max_retries=max_retries,
backoff_factor=backoff_factor,
respect_cache_control=respect_cache_control,
default_headers=default_headers
)
return _http_client

View File

@ -39,6 +39,8 @@ from core.overlay_widgets import OverlayManager
from core.plugin_api import get_api, APIType
from core.log_reader import get_log_reader
from core.ocr_service import get_ocr_service
from core.http_client import get_http_client
from core.window_manager import get_window_manager
class HotkeyHandler(QObject):

View File

@ -0,0 +1,551 @@
"""
EU-Utility - Entropia Nexus API Client
API client for https://api.entropianexus.com
Provides access to game data: items, mobs, market info.
Part of core - plugins access via PluginAPI.
"""
import time
import json
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
class EntityType(Enum):
"""Types of entities that can be searched."""
ITEM = "items"
MOB = "mobs"
ALL = "all"
class NexusAPIError(Exception):
"""Custom exception for Nexus API errors."""
pass
class RateLimitError(NexusAPIError):
"""Raised when rate limit is exceeded."""
pass
@dataclass
class SearchResult:
"""Result from a search operation."""
id: str
name: str
type: str
category: Optional[str] = None
icon_url: Optional[str] = None
data: Dict[str, Any] = None
def __post_init__(self):
if self.data is None:
self.data = {}
@dataclass
class ItemDetails:
"""Detailed item information."""
id: str
name: str
description: Optional[str] = None
category: Optional[str] = None
weight: Optional[float] = None
tt_value: Optional[float] = None
decay: Optional[float] = None
ammo_consumption: Optional[int] = None
damage: Optional[float] = None
range: Optional[float] = None
accuracy: Optional[float] = None
durability: Optional[int] = None
requirements: Dict[str, Any] = None
materials: List[Dict[str, Any]] = None
raw_data: Dict[str, Any] = None
def __post_init__(self):
if self.requirements is None:
self.requirements = {}
if self.materials is None:
self.materials = []
if self.raw_data is None:
self.raw_data = {}
@dataclass
class MarketData:
"""Market data for an item."""
item_id: str
item_name: str
current_markup: Optional[float] = None
avg_markup_7d: Optional[float] = None
avg_markup_30d: Optional[float] = None
volume_24h: Optional[int] = None
volume_7d: Optional[int] = None
buy_orders: List[Dict[str, Any]] = None
sell_orders: List[Dict[str, Any]] = None
last_updated: Optional[datetime] = None
raw_data: Dict[str, Any] = None
def __post_init__(self):
if self.buy_orders is None:
self.buy_orders = []
if self.sell_orders is None:
self.sell_orders = []
if self.raw_data is None:
self.raw_data = {}
class NexusAPI:
"""
Singleton client for Entropia Nexus API.
Features:
- Auto-retry on transient failures
- Rate limiting (max requests per second)
- Caching for frequently accessed data
- Proper error handling
Usage:
api = get_nexus_api()
results = api.search_items("ArMatrix")
details = api.get_item_details("item_id")
"""
_instance = None
# API Configuration
BASE_URL = "https://api.entropianexus.com"
API_VERSION = "v1"
# Rate limiting
MAX_REQUESTS_PER_SECOND = 5
MIN_REQUEST_INTERVAL = 1.0 / MAX_REQUESTS_PER_SECOND
# Retry configuration
MAX_RETRIES = 3
RETRY_DELAY = 1.0 # seconds
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._last_request_time = 0
self._request_count = 0
self._cache: Dict[str, Any] = {}
self._cache_ttl = 300 # 5 minutes default cache
self._cache_timestamps: Dict[str, float] = {}
self._session = None
self._initialized = True
self._available = True
print("[NexusAPI] Initialized")
def _get_session(self):
"""Get or create HTTP session."""
if self._session is None:
try:
import requests
self._session = requests.Session()
self._session.headers.update({
'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)',
'Accept': 'application/json'
})
except ImportError:
raise NexusAPIError("requests library not installed. Run: pip install requests")
return self._session
def _rate_limit(self):
"""Enforce rate limiting between requests."""
current_time = time.time()
time_since_last = current_time - self._last_request_time
if time_since_last < self.MIN_REQUEST_INTERVAL:
sleep_time = self.MIN_REQUEST_INTERVAL - time_since_last
time.sleep(sleep_time)
self._last_request_time = time.time()
self._request_count += 1
def _make_request(
self,
endpoint: str,
params: Dict[str, Any] = None,
use_cache: bool = True
) -> Dict[str, Any]:
"""
Make HTTP request with retry logic and rate limiting.
Args:
endpoint: API endpoint path
params: Query parameters
use_cache: Whether to use caching
Returns:
JSON response as dict
Raises:
NexusAPIError: On API errors
RateLimitError: On rate limit exceeded
"""
# Check cache
cache_key = f"{endpoint}:{json.dumps(params, sort_keys=True) if params else ''}"
if use_cache and self._is_cache_valid(cache_key):
return self._cache[cache_key]
url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}"
last_error = None
for attempt in range(self.MAX_RETRIES):
try:
# Rate limit
self._rate_limit()
# Make request
session = self._get_session()
response = session.get(url, params=params, timeout=30)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
if attempt < self.MAX_RETRIES - 1:
print(f"[NexusAPI] Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
continue
else:
raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after}s")
# Handle other HTTP errors
if response.status_code >= 500:
if attempt < self.MAX_RETRIES - 1:
wait_time = self.RETRY_DELAY * (2 ** attempt) # Exponential backoff
print(f"[NexusAPI] Server error {response.status_code}. Retrying in {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
# Parse response
data = response.json()
# Cache successful response
if use_cache:
self._cache[cache_key] = data
self._cache_timestamps[cache_key] = time.time()
return data
except RateLimitError:
raise
except Exception as e:
last_error = e
if attempt < self.MAX_RETRIES - 1:
wait_time = self.RETRY_DELAY * (2 ** attempt)
print(f"[NexusAPI] Request failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
break
# All retries exhausted
error_msg = f"Request failed after {self.MAX_RETRIES} attempts: {last_error}"
print(f"[NexusAPI] {error_msg}")
raise NexusAPIError(error_msg)
def _is_cache_valid(self, key: str) -> bool:
"""Check if cached data is still valid."""
if key not in self._cache_timestamps:
return False
age = time.time() - self._cache_timestamps[key]
return age < self._cache_ttl
def clear_cache(self):
"""Clear all cached data."""
self._cache.clear()
self._cache_timestamps.clear()
print("[NexusAPI] Cache cleared")
def is_available(self) -> bool:
"""Check if API is available."""
return self._available
# ========== Search Methods ==========
def search_items(self, query: str, limit: int = 20) -> List[SearchResult]:
"""
Search for items by name.
Args:
query: Search term (e.g., "ArMatrix", "Omegaton")
limit: Maximum results (default 20, max 100)
Returns:
List of SearchResult objects
Example:
results = api.search_items("ArMatrix")
for r in results:
print(f"{r.name} ({r.type})")
"""
try:
params = {
'q': query,
'limit': min(limit, 100),
'type': 'item'
}
data = self._make_request('search', params)
results = []
for item in data.get('results', []):
results.append(SearchResult(
id=item.get('id', ''),
name=item.get('name', ''),
type=item.get('type', 'item'),
category=item.get('category'),
icon_url=item.get('icon_url'),
data=item
))
return results
except Exception as e:
print(f"[NexusAPI] search_items error: {e}")
return []
def search_mobs(self, query: str, limit: int = 20) -> List[SearchResult]:
"""
Search for creatures/mobs by name.
Args:
query: Search term (e.g., "Atrox", "Hispidus")
limit: Maximum results (default 20, max 100)
Returns:
List of SearchResult objects
"""
try:
params = {
'q': query,
'limit': min(limit, 100),
'type': 'mob'
}
data = self._make_request('search', params)
results = []
for item in data.get('results', []):
results.append(SearchResult(
id=item.get('id', ''),
name=item.get('name', ''),
type=item.get('type', 'mob'),
category=item.get('category'),
icon_url=item.get('icon_url'),
data=item
))
return results
except Exception as e:
print(f"[NexusAPI] search_mobs error: {e}")
return []
def search_all(self, query: str, limit: int = 20) -> List[SearchResult]:
"""
Search across all entity types (items, mobs, etc.).
Args:
query: Search term
limit: Maximum results per type (default 20)
Returns:
List of SearchResult objects
"""
try:
params = {
'q': query,
'limit': min(limit, 100)
}
data = self._make_request('search', params)
results = []
for item in data.get('results', []):
results.append(SearchResult(
id=item.get('id', ''),
name=item.get('name', ''),
type=item.get('type', 'unknown'),
category=item.get('category'),
icon_url=item.get('icon_url'),
data=item
))
return results
except Exception as e:
print(f"[NexusAPI] search_all error: {e}")
return []
# ========== Detail Methods ==========
def get_item_details(self, item_id: str) -> Optional[ItemDetails]:
"""
Get detailed information about a specific item.
Args:
item_id: The item's unique identifier
Returns:
ItemDetails object or None if not found
Example:
details = api.get_item_details("armatrix_lp-35")
print(f"TT Value: {details.tt_value} PED")
"""
try:
data = self._make_request(f'items/{item_id}')
if not data or 'error' in data:
return None
return ItemDetails(
id=data.get('id', item_id),
name=data.get('name', 'Unknown'),
description=data.get('description'),
category=data.get('category'),
weight=data.get('weight'),
tt_value=data.get('tt_value'),
decay=data.get('decay'),
ammo_consumption=data.get('ammo_consumption'),
damage=data.get('damage'),
range=data.get('range'),
accuracy=data.get('accuracy'),
durability=data.get('durability'),
requirements=data.get('requirements', {}),
materials=data.get('materials', []),
raw_data=data
)
except Exception as e:
print(f"[NexusAPI] get_item_details error: {e}")
return None
def get_market_data(self, item_id: str) -> Optional[MarketData]:
"""
Get market data for a specific item.
Args:
item_id: The item's unique identifier
Returns:
MarketData object or None if not found
Example:
market = api.get_market_data("armatrix_lp-35")
print(f"Current markup: {market.current_markup:.1f}%")
print(f"24h Volume: {market.volume_24h}")
"""
try:
data = self._make_request(f'items/{item_id}/market')
if not data or 'error' in data:
return None
# Parse timestamp if present
last_updated = None
if 'last_updated' in data:
try:
last_updated = datetime.fromisoformat(data['last_updated'].replace('Z', '+00:00'))
except:
pass
return MarketData(
item_id=item_id,
item_name=data.get('item_name', 'Unknown'),
current_markup=data.get('current_markup'),
avg_markup_7d=data.get('avg_markup_7d'),
avg_markup_30d=data.get('avg_markup_30d'),
volume_24h=data.get('volume_24h'),
volume_7d=data.get('volume_7d'),
buy_orders=data.get('buy_orders', []),
sell_orders=data.get('sell_orders', []),
last_updated=last_updated,
raw_data=data
)
except Exception as e:
print(f"[NexusAPI] get_market_data error: {e}")
return None
# ========== Batch Methods ==========
def get_items_batch(self, item_ids: List[str]) -> Dict[str, Optional[ItemDetails]]:
"""
Get details for multiple items efficiently.
Args:
item_ids: List of item IDs
Returns:
Dict mapping item_id to ItemDetails (or None if failed)
"""
results = {}
for item_id in item_ids:
results[item_id] = self.get_item_details(item_id)
return results
def get_market_batch(self, item_ids: List[str]) -> Dict[str, Optional[MarketData]]:
"""
Get market data for multiple items efficiently.
Args:
item_ids: List of item IDs
Returns:
Dict mapping item_id to MarketData (or None if failed)
"""
results = {}
for item_id in item_ids:
results[item_id] = self.get_market_data(item_id)
return results
# Singleton instance
_nexus_api = None
def get_nexus_api() -> NexusAPI:
"""Get the global NexusAPI instance."""
global _nexus_api
if _nexus_api is None:
_nexus_api = NexusAPI()
return _nexus_api
# Convenience functions for quick access
def search_items(query: str, limit: int = 20) -> List[SearchResult]:
"""Quick search for items."""
return get_nexus_api().search_items(query, limit)
def search_mobs(query: str, limit: int = 20) -> List[SearchResult]:
"""Quick search for mobs."""
return get_nexus_api().search_mobs(query, limit)
def search_all(query: str, limit: int = 20) -> List[SearchResult]:
"""Quick search across all types."""
return get_nexus_api().search_all(query, limit)
def get_item_details(item_id: str) -> Optional[ItemDetails]:
"""Quick get item details."""
return get_nexus_api().get_item_details(item_id)
def get_market_data(item_id: str) -> Optional[MarketData]:
"""Quick get market data."""
return get_nexus_api().get_market_data(item_id)

View File

@ -0,0 +1,597 @@
"""
EU-Utility - Notification System
Toast notification system for non-blocking user feedback.
Supports info, warning, error, and success notification types.
"""
from pathlib import Path
from typing import List, Optional, Callable, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from collections import deque
import threading
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QGraphicsDropShadowEffect, QApplication
)
from PyQt6.QtCore import Qt, QTimer, pyqtSignal, QObject, QPoint
from PyQt6.QtGui import QColor, QIcon, QPixmap
from core.eu_styles import EU_COLORS, EU_RADIUS
class NotificationType(Enum):
"""Notification types with associated styling."""
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success"
NOTIFICATION_STYLES = {
NotificationType.INFO: {
'icon': '',
'icon_color': '#4a9eff',
'border_color': 'rgba(74, 158, 255, 150)',
'bg_color': 'rgba(25, 35, 50, 240)',
},
NotificationType.WARNING: {
'icon': '',
'icon_color': '#ffc107',
'border_color': 'rgba(255, 193, 7, 150)',
'bg_color': 'rgba(45, 40, 25, 240)',
},
NotificationType.ERROR: {
'icon': '',
'icon_color': '#f44336',
'border_color': 'rgba(244, 67, 54, 150)',
'bg_color': 'rgba(45, 25, 25, 240)',
},
NotificationType.SUCCESS: {
'icon': '',
'icon_color': '#4caf50',
'border_color': 'rgba(76, 175, 80, 150)',
'bg_color': 'rgba(25, 45, 30, 240)',
},
}
@dataclass
class Notification:
"""A single notification entry."""
id: str
title: str
message: str
type: NotificationType
timestamp: datetime = field(default_factory=datetime.now)
sound_played: bool = False
duration: int = 5000 # ms
class ToastWidget(QWidget):
"""Individual toast notification widget."""
clicked = pyqtSignal(str) # Emits notification ID
closed = pyqtSignal(str) # Emits notification ID
expired = pyqtSignal(str) # Emits notification ID
def __init__(self, notification: Notification, parent=None):
super().__init__(parent)
self.notification = notification
self._opacity = 1.0
self._fade_timer = None
# Frameless, always on top
self.setWindowFlags(
Qt.WindowType.FramelessWindowHint |
Qt.WindowType.WindowStaysOnTopHint |
Qt.WindowType.Tool |
Qt.WindowType.WindowDoesNotFocus
)
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
self.setAttribute(Qt.WidgetAttribute.WA_ShowWithoutActivating)
self._setup_ui()
self._setup_auto_close()
def _setup_ui(self):
"""Setup toast UI with EU styling."""
style = NOTIFICATION_STYLES[self.notification.type]
# Main container styling
self.setStyleSheet(f"""
QWidget {{
background-color: {style['bg_color']};
border: 1px solid {style['border_color']};
border-radius: {EU_RADIUS['medium']};
}}
""")
# Shadow effect
shadow = QGraphicsDropShadowEffect()
shadow.setBlurRadius(20)
shadow.setColor(QColor(0, 0, 0, 80))
shadow.setOffset(0, 4)
self.setGraphicsEffect(shadow)
# Layout
layout = QHBoxLayout(self)
layout.setContentsMargins(12, 10, 12, 10)
layout.setSpacing(10)
# Icon
self.icon_label = QLabel(style['icon'])
self.icon_label.setStyleSheet(f"""
color: {style['icon_color']};
font-size: 16px;
font-weight: bold;
background: transparent;
""")
self.icon_label.setFixedSize(24, 24)
self.icon_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(self.icon_label)
# Content
content_layout = QVBoxLayout()
content_layout.setSpacing(4)
content_layout.setContentsMargins(0, 0, 0, 0)
# Title
if self.notification.title:
self.title_label = QLabel(self.notification.title)
self.title_label.setStyleSheet(f"""
color: {EU_COLORS['text_primary']};
font-size: 12px;
font-weight: bold;
background: transparent;
""")
self.title_label.setWordWrap(True)
content_layout.addWidget(self.title_label)
# Message
self.message_label = QLabel(self.notification.message)
self.message_label.setStyleSheet(f"""
color: {EU_COLORS['text_secondary']};
font-size: 11px;
background: transparent;
""")
self.message_label.setWordWrap(True)
self.message_label.setMaximumWidth(280)
content_layout.addWidget(self.message_label)
layout.addLayout(content_layout, 1)
# Close button
self.close_btn = QLabel("×")
self.close_btn.setStyleSheet(f"""
color: {EU_COLORS['text_muted']};
font-size: 16px;
font-weight: bold;
background: transparent;
""")
self.close_btn.setFixedSize(20, 20)
self.close_btn.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.close_btn.setCursor(Qt.CursorShape.PointingHandCursor)
layout.addWidget(self.close_btn)
# Fixed width, auto height
self.setFixedWidth(320)
self.adjustSize()
def _setup_auto_close(self):
"""Setup auto-close timer."""
self._close_timer = QTimer(self)
self._close_timer.setSingleShot(True)
self._close_timer.timeout.connect(self._start_fade_out)
self._close_timer.start(self.notification.duration)
def _start_fade_out(self):
"""Start fade-out animation."""
self.expired.emit(self.notification.id)
self._fade_timer = QTimer(self)
self._fade_timer.timeout.connect(self._fade_step)
self._fade_timer.start(30) # 30ms per step
def _fade_step(self):
"""Fade out animation step."""
self._opacity -= 0.1
if self._opacity <= 0:
self._fade_timer.stop()
self.close()
self.closed.emit(self.notification.id)
else:
self.setWindowOpacity(self._opacity)
def mousePressEvent(self, event):
"""Handle click to dismiss."""
if event.button() == Qt.MouseButton.LeftButton:
# Check if close button was clicked
if self.close_btn.geometry().contains(event.pos()):
self._close_timer.stop()
self.close()
self.closed.emit(self.notification.id)
else:
self.clicked.emit(self.notification.id)
super().mousePressEvent(event)
def enterEvent(self, event):
"""Pause auto-close on hover."""
self._close_timer.stop()
super().enterEvent(event)
def leaveEvent(self, event):
"""Resume auto-close on leave."""
self._close_timer.start(1000) # Give 1 second after mouse leaves
super().leaveEvent(event)
def close_notification(self):
"""Close this notification immediately."""
self._close_timer.stop()
if self._fade_timer:
self._fade_timer.stop()
self.close()
self.closed.emit(self.notification.id)
class NotificationSignals(QObject):
"""Thread-safe signals for notifications."""
show_notification = pyqtSignal(Notification)
class NotificationManager:
"""Singleton manager for toast notifications.
Usage:
manager = NotificationManager.get_instance()
manager.notify("Title", "Message", type=NotificationType.INFO)
manager.notify_info("Info", "Something happened")
manager.notify_warning("Warning", "Be careful")
manager.notify_error("Error", "Something failed")
manager.notify_success("Success", "Operation completed")
"""
_instance = None
_lock = threading.Lock()
# Maximum notifications to show at once
MAX_VISIBLE = 5
# Spacing between toasts
TOAST_SPACING = 8
# Margin from screen edge
SCREEN_MARGIN = 20
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._app: Optional[QApplication] = None
self._active_toasts: Dict[str, ToastWidget] = {}
self._history: deque = deque(maxlen=100)
self._counter = 0
self._sound_enabled = True
self._sound_path: Optional[Path] = None
self._sound_cache: Dict[str, Any] = {}
self._signals = NotificationSignals()
self._signals.show_notification.connect(self._show_toast)
# Position: bottom-right corner
self._position = "bottom-right" # or "top-right", "bottom-left", "top-left"
@classmethod
def get_instance(cls) -> 'NotificationManager':
"""Get the singleton instance."""
return cls()
def initialize(self, app: QApplication = None, sound_path: Path = None):
"""Initialize the notification manager.
Args:
app: QApplication instance (optional, will try to get existing)
sound_path: Path to sound files directory
"""
if app:
self._app = app
else:
self._app = QApplication.instance()
if sound_path:
self._sound_path = Path(sound_path)
print(f"[Notifications] Initialized with {self._history.maxlen} history limit")
def _get_next_id(self) -> str:
"""Generate unique notification ID."""
self._counter += 1
return f"notif_{self._counter}_{datetime.now().strftime('%H%M%S')}"
def notify(self, title: str, message: str,
type: NotificationType = NotificationType.INFO,
sound: bool = False, duration: int = 5000) -> str:
"""Show a notification toast.
Args:
title: Notification title
message: Notification message
type: Notification type (info, warning, error, success)
sound: Play sound notification
duration: Display duration in milliseconds
Returns:
Notification ID
"""
notification = Notification(
id=self._get_next_id(),
title=title,
message=message,
type=type,
duration=duration
)
# Add to history
self._history.append(notification)
# Play sound if requested
if sound and self._sound_enabled:
self._play_sound(type)
notification.sound_played = True
# Emit signal to show toast (thread-safe)
self._signals.show_notification.emit(notification)
return notification.id
def notify_info(self, title: str, message: str,
sound: bool = False, duration: int = 5000) -> str:
"""Show an info notification."""
return self.notify(title, message, NotificationType.INFO, sound, duration)
def notify_warning(self, title: str, message: str,
sound: bool = False, duration: int = 5000) -> str:
"""Show a warning notification."""
return self.notify(title, message, NotificationType.WARNING, sound, duration)
def notify_error(self, title: str, message: str,
sound: bool = True, duration: int = 7000) -> str:
"""Show an error notification (sound on by default)."""
return self.notify(title, message, NotificationType.ERROR, sound, duration)
def notify_success(self, title: str, message: str,
sound: bool = False, duration: int = 5000) -> str:
"""Show a success notification."""
return self.notify(title, message, NotificationType.SUCCESS, sound, duration)
def _show_toast(self, notification: Notification):
"""Show toast widget (called via signal for thread safety)."""
if not self._app:
self._app = QApplication.instance()
if not self._app:
print("[Notifications] Error: No QApplication available")
return
# Create toast widget
toast = ToastWidget(notification)
toast.closed.connect(lambda: self._on_toast_closed(notification.id))
toast.expired.connect(lambda: self._on_toast_expired(notification.id))
# Store reference
self._active_toasts[notification.id] = toast
# Position the toast
self._position_toast(toast)
# Show
toast.show()
# Remove oldest if too many visible
if len(self._active_toasts) > self.MAX_VISIBLE:
oldest_id = next(iter(self._active_toasts))
self._active_toasts[oldest_id].close_notification()
def _position_toast(self, toast: ToastWidget):
"""Position toast on screen."""
if not self._app:
return
screen = self._app.primaryScreen().geometry()
# Calculate position
visible_count = len(self._active_toasts)
if self._position == "bottom-right":
x = screen.width() - toast.width() - self.SCREEN_MARGIN
y = screen.height() - (toast.height() + self.TOAST_SPACING) * (visible_count + 1) - self.SCREEN_MARGIN
elif self._position == "top-right":
x = screen.width() - toast.width() - self.SCREEN_MARGIN
y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * visible_count
elif self._position == "bottom-left":
x = self.SCREEN_MARGIN
y = screen.height() - (toast.height() + self.TOAST_SPACING) * (visible_count + 1) - self.SCREEN_MARGIN
else: # top-left
x = self.SCREEN_MARGIN
y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * visible_count
toast.move(x, max(y, self.SCREEN_MARGIN))
# Reposition all visible toasts to maintain stack
self._reposition_toasts()
def _reposition_toasts(self):
"""Reposition all visible toasts."""
if not self._app:
return
screen = self._app.primaryScreen().geometry()
toasts = list(self._active_toasts.values())
for i, toast in enumerate(toasts):
if self._position == "bottom-right":
x = screen.width() - toast.width() - self.SCREEN_MARGIN
y = screen.height() - (toast.height() + self.TOAST_SPACING) * (i + 1) - self.SCREEN_MARGIN
elif self._position == "top-right":
x = screen.width() - toast.width() - self.SCREEN_MARGIN
y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * i
elif self._position == "bottom-left":
x = self.SCREEN_MARGIN
y = screen.height() - (toast.height() + self.TOAST_SPACING) * (i + 1) - self.SCREEN_MARGIN
else: # top-left
x = self.SCREEN_MARGIN
y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * i
toast.move(x, max(y, self.SCREEN_MARGIN))
def _on_toast_closed(self, notification_id: str):
"""Handle toast closed."""
if notification_id in self._active_toasts:
del self._active_toasts[notification_id]
self._reposition_toasts()
def _on_toast_expired(self, notification_id: str):
"""Handle toast expired (started fading)."""
pass # Just for tracking if needed
def _play_sound(self, type: NotificationType):
"""Play notification sound."""
try:
# Try to use QSoundEffect for non-blocking playback
from PyQt6.QtMultimedia import QSoundEffect, QAudioDevice
sound_file = self._get_sound_file(type)
if not sound_file or not sound_file.exists():
return
# Use cached sound effect or create new
sound_key = str(sound_file)
if sound_key not in self._sound_cache:
effect = QSoundEffect()
effect.setSource(str(sound_file))
effect.setVolume(0.7)
self._sound_cache[sound_key] = effect
effect = self._sound_cache[sound_key]
effect.play()
except ImportError:
# Fallback: try simple audio playback
self._play_sound_fallback(type)
except Exception as e:
print(f"[Notifications] Sound error: {e}")
def _get_sound_file(self, type: NotificationType) -> Optional[Path]:
"""Get sound file path for notification type."""
if not self._sound_path:
return None
# Map types to sound files
sound_map = {
NotificationType.INFO: "info.wav",
NotificationType.WARNING: "warning.wav",
NotificationType.ERROR: "error.wav",
NotificationType.SUCCESS: "success.wav",
}
sound_file = self._sound_path / sound_map.get(type, "info.wav")
# Try alternative extensions
if not sound_file.exists():
for ext in ['.mp3', '.wav', '.ogg']:
alt_file = sound_file.with_suffix(ext)
if alt_file.exists():
return alt_file
return sound_file if sound_file.exists() else None
def _play_sound_fallback(self, type: NotificationType):
"""Fallback sound playback using system tools."""
import platform
import subprocess
sound_file = self._get_sound_file(type)
if not sound_file:
return
try:
system = platform.system()
if system == "Windows":
import winsound
winsound.MessageBeep()
elif system == "Darwin": # macOS
subprocess.run(["afplay", str(sound_file)], check=False)
else: # Linux
subprocess.run(["aplay", "-q", str(sound_file)], check=False)
except Exception:
pass # Silently fail on sound errors
# ========== Public API ==========
def close_all(self):
"""Close all visible notifications."""
for toast in list(self._active_toasts.values()):
toast.close_notification()
self._active_toasts.clear()
def get_history(self, limit: int = None, type: NotificationType = None) -> List[Notification]:
"""Get notification history.
Args:
limit: Maximum number of notifications to return
type: Filter by notification type
Returns:
List of notifications (newest first)
"""
history = list(self._history)
if type:
history = [n for n in history if n.type == type]
if limit:
history = history[-limit:]
return list(reversed(history))
def clear_history(self):
"""Clear notification history."""
self._history.clear()
def set_sound_enabled(self, enabled: bool):
"""Enable/disable notification sounds."""
self._sound_enabled = enabled
def set_sound_path(self, path: Path):
"""Set path to sound files."""
self._sound_path = Path(path)
def set_position(self, position: str):
"""Set notification position.
Args:
position: One of "bottom-right", "top-right", "bottom-left", "top-left"
"""
valid_positions = ["bottom-right", "top-right", "bottom-left", "top-left"]
if position in valid_positions:
self._position = position
def dismiss(self, notification_id: str):
"""Dismiss a specific notification by ID."""
if notification_id in self._active_toasts:
self._active_toasts[notification_id].close_notification()
# Convenience function to get manager
def get_notification_manager() -> NotificationManager:
"""Get the global NotificationManager instance."""
return NotificationManager.get_instance()

View File

@ -12,6 +12,8 @@ import json
from datetime import datetime
from pathlib import Path
from core.data_store import get_data_store
class APIType(Enum):
"""Types of plugin APIs."""
@ -20,6 +22,7 @@ class APIType(Enum):
DATA = "data" # Shared data storage
UTILITY = "utility" # Helper functions
SERVICE = "service" # Background services
NEXUS = "nexus" # Entropia Nexus API
@dataclass
@ -51,6 +54,8 @@ class PluginAPI:
self.apis: Dict[str, APIEndpoint] = {}
self.services: Dict[str, Any] = {}
self.data_cache: Dict[str, Any] = {}
self.http_client = None
self._notification_service = None
self._initialized = True
# ========== API Registration ==========
@ -97,6 +102,81 @@ class PluginAPI:
return [ep for ep in self.apis.values() if ep.api_type == api_type]
return list(self.apis.values())
# ========== Window Service ==========
def register_window_service(self, window_manager):
"""Register the Window Manager service."""
self.services['window'] = window_manager
print("[API] Window Manager service registered")
def get_eu_window(self) -> Optional[Dict[str, Any]]:
"""Get information about the Entropia Universe window.
Returns:
Dict with window info or None if not found
{
'handle': int,
'title': str,
'rect': (left, top, right, bottom),
'width': int,
'height': int,
'is_visible': bool,
'is_focused': bool
}
"""
window_service = self.services.get('window')
if not window_service:
return None
try:
window_info = window_service.find_eu_window()
if window_info:
return {
'handle': window_info.handle,
'title': window_info.title,
'rect': window_info.rect,
'width': window_info.width,
'height': window_info.height,
'is_visible': window_info.is_visible,
'is_focused': window_info.is_focused
}
return None
except Exception as e:
print(f"[API] Window error: {e}")
return None
def is_eu_focused(self) -> bool:
"""Check if EU window is currently focused.
Returns:
True if EU is active window, False otherwise
"""
window_service = self.services.get('window')
if not window_service:
return False
try:
return window_service.is_window_focused()
except Exception as e:
print(f"[API] Window focus error: {e}")
return False
def bring_eu_to_front(self) -> bool:
"""Bring EU window to front and focus it.
Returns:
True if successful, False otherwise
"""
window_service = self.services.get('window')
if not window_service:
return False
try:
return window_service.bring_to_front()
except Exception as e:
print(f"[API] Bring to front error: {e}")
return False
# ========== OCR Service ==========
def register_ocr_service(self, ocr_handler: Callable):
@ -148,6 +228,205 @@ class PluginAPI:
print(f"[API] Log error: {e}")
return []
# ========== Nexus API Service ==========
def register_nexus_service(self, nexus_api_instance):
"""Register the Nexus API service instance."""
self.services['nexus'] = nexus_api_instance
print("[API] Nexus service registered")
def nexus_search(self, query: str, entity_type: str = "all", limit: int = 20) -> list:
"""Search Entropia Nexus for items, mobs, or all entities.
Args:
query: Search term (e.g., "ArMatrix", "Atrox")
entity_type: Type of entity to search - "items", "mobs", or "all"
limit: Maximum results (default 20, max 100)
Returns:
List of search results with id, name, type, category, icon_url
Example:
# Search for items
results = api.nexus_search("ArMatrix", "items")
# Search for mobs
mobs = api.nexus_search("Atrox", "mobs")
# Search everything
all_results = api.nexus_search("Omegaton")
"""
nexus = self.services.get('nexus')
if not nexus:
print("[API] Nexus service not available")
return []
try:
entity_type = entity_type.lower()
if entity_type == "items":
results = nexus.search_items(query, limit)
elif entity_type == "mobs":
results = nexus.search_mobs(query, limit)
else:
results = nexus.search_all(query, limit)
# Convert SearchResult objects to dicts for easier consumption
return [
{
'id': r.id,
'name': r.name,
'type': r.type,
'category': r.category,
'icon_url': r.icon_url
}
for r in results
]
except Exception as e:
print(f"[API] Nexus search error: {e}")
return []
def nexus_get_item_details(self, item_id: str) -> dict:
"""Get detailed information about a specific item from Nexus.
Args:
item_id: The item's unique identifier
Returns:
Dict with item details or None if not found
"""
nexus = self.services.get('nexus')
if not nexus:
print("[API] Nexus service not available")
return None
try:
details = nexus.get_item_details(item_id)
if not details:
return None
return {
'id': details.id,
'name': details.name,
'description': details.description,
'category': details.category,
'weight': details.weight,
'tt_value': details.tt_value,
'decay': details.decay,
'ammo_consumption': details.ammo_consumption,
'damage': details.damage,
'range': details.range,
'accuracy': details.accuracy,
'durability': details.durability,
'requirements': details.requirements,
'materials': details.materials
}
except Exception as e:
print(f"[API] Nexus get_item_details error: {e}")
return None
def nexus_get_market_data(self, item_id: str) -> dict:
"""Get market data for a specific item from Nexus.
Args:
item_id: The item's unique identifier
Returns:
Dict with market data or None if not found
"""
nexus = self.services.get('nexus')
if not nexus:
print("[API] Nexus service not available")
return None
try:
market = nexus.get_market_data(item_id)
if not market:
return None
return {
'item_id': market.item_id,
'item_name': market.item_name,
'current_markup': market.current_markup,
'avg_markup_7d': market.avg_markup_7d,
'avg_markup_30d': market.avg_markup_30d,
'volume_24h': market.volume_24h,
'volume_7d': market.volume_7d,
'buy_orders': market.buy_orders,
'sell_orders': market.sell_orders,
'last_updated': market.last_updated.isoformat() if market.last_updated else None
}
except Exception as e:
print(f"[API] Nexus get_market_data error: {e}")
return None
# ========== HTTP Service ==========
def register_http_service(self, http_client):
"""Register the HTTP client service."""
self.http_client = http_client
print("[API] HTTP service registered")
def http_get(self, url: str, cache_ttl: int = 3600, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]:
"""
Perform HTTP GET request with caching.
Args:
url: URL to fetch
cache_ttl: Cache time-to-live in seconds (0 to disable)
headers: Additional headers
**kwargs: Additional arguments (params, use_cache, etc.)
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not self.http_client:
raise RuntimeError("HTTP service not available")
try:
return self.http_client.get(url, cache_ttl=cache_ttl, headers=headers, **kwargs)
except Exception as e:
print(f"[API] HTTP GET error: {e}")
raise
def http_post(self, url: str, data=None, json=None, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]:
"""
Perform HTTP POST request.
Args:
url: URL to post to
data: Form data, bytes, or string
json: JSON data
headers: Additional headers
**kwargs: Additional arguments
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not self.http_client:
raise RuntimeError("HTTP service not available")
try:
return self.http_client.post(url, data=data, json=json, headers=headers, **kwargs)
except Exception as e:
print(f"[API] HTTP POST error: {e}")
raise
def http_clear_cache(self):
"""Clear all HTTP cache entries."""
if self.http_client:
self.http_client.clear_cache()
def http_invalidate_cache(self, url_pattern: str):
"""Invalidate cache entries matching URL pattern."""
if self.http_client:
self.http_client.invalidate_cache(url_pattern)
def http_get_cache_stats(self) -> Dict[str, Any]:
"""Get HTTP cache statistics."""
if self.http_client:
return self.http_client.get_cache_stats()
return {}
# ========== Shared Data ==========
def get_data(self, key: str, default=None) -> Any:
@ -211,6 +490,91 @@ class PluginAPI:
return 0.0
return (price / tt) * 100
# ========== Data Service ==========
def register_data_service(self) -> bool:
"""Register the data persistence service."""
try:
self.services['data'] = get_data_store()
return True
except Exception as e:
print(f"[API] Failed to register data service: {e}")
return False
def save_data(self, plugin_id: str, key: str, data: Any) -> bool:
"""Save data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key under which to store the data
data: Data to store (must be JSON serializable)
Returns:
True if successful, False otherwise
"""
data_store = self.services.get('data')
if not data_store:
# Try to register and get data store
if self.register_data_service():
data_store = self.services.get('data')
if not data_store:
raise RuntimeError("Data service not available")
try:
return data_store.save(plugin_id, key, data)
except Exception as e:
print(f"[API] Error saving data for {plugin_id}/{key}: {e}")
return False
def load_data(self, plugin_id: str, key: str, default: Any = None) -> Any:
"""Load data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to load
default: Default value if key not found
Returns:
The stored data or default value
"""
data_store = self.services.get('data')
if not data_store:
# Try to register and get data store
if self.register_data_service():
data_store = self.services.get('data')
if not data_store:
return default
try:
return data_store.load(plugin_id, key, default)
except Exception as e:
print(f"[API] Error loading data for {plugin_id}/{key}: {e}")
return default
def delete_data(self, plugin_id: str, key: str) -> bool:
"""Delete data for a plugin.
Args:
plugin_id: Unique identifier for the plugin
key: Key of the data to delete
Returns:
True if key existed and was deleted, False otherwise
"""
data_store = self.services.get('data')
if not data_store:
# Try to register and get data store
if self.register_data_service():
data_store = self.services.get('data')
if not data_store:
raise RuntimeError("Data service not available")
try:
return data_store.delete(plugin_id, key)
except Exception as e:
print(f"[API] Error deleting data for {plugin_id}/{key}: {e}")
return False
# Singleton instance
_plugin_api = None

View File

@ -0,0 +1,467 @@
"""
EU-Utility - Window Manager Core Service
Manages interaction with the Entropia Universe game window.
Windows-specific implementation using ctypes (no external dependencies).
"""
import sys
import time
from typing import Optional, Tuple, Dict, Any
from dataclasses import dataclass
from pathlib import Path
# Platform detection
IS_WINDOWS = sys.platform == 'win32'
# Windows-specific imports with graceful fallback
if IS_WINDOWS:
try:
import ctypes
from ctypes import wintypes
import subprocess
WINDOWS_AVAILABLE = True
except ImportError:
WINDOWS_AVAILABLE = False
else:
WINDOWS_AVAILABLE = False
@dataclass
class WindowInfo:
"""Information about a window."""
handle: int
title: str
pid: int
rect: Tuple[int, int, int, int] # left, top, right, bottom
width: int
height: int
is_visible: bool
is_focused: bool
@dataclass
class ProcessInfo:
"""Information about a process."""
pid: int
name: str
executable_path: Optional[str]
memory_usage: Optional[int] # in bytes
cpu_percent: Optional[float]
class WindowManager:
"""
Singleton Window Manager for EU-Utility.
Manages finding and interacting with the Entropia Universe game window.
Windows-specific implementation with graceful fallback on Linux.
"""
_instance = None
# Window search criteria
EU_WINDOW_TITLE = "Entropia Universe"
EU_PROCESS_NAMES = ["entropia.exe", "entropiauniverse.exe", "clientloader.exe"]
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._window_handle: Optional[int] = None
self._window_info: Optional[WindowInfo] = None
self._process_info: Optional[ProcessInfo] = None
self._last_update: float = 0
self._update_interval: float = 1.0 # seconds
# Windows API constants
if IS_WINDOWS and WINDOWS_AVAILABLE:
self._setup_windows_api()
self._initialized = True
self._available = IS_WINDOWS and WINDOWS_AVAILABLE
if not self._available:
print("[WindowManager] Windows API not available - running in limited mode")
def _setup_windows_api(self):
"""Setup Windows API constants and functions."""
# Window styles
self.GWL_STYLE = -16
self.GWL_EXSTYLE = -20
self.WS_VISIBLE = 0x10000000
self.WS_MINIMIZE = 0x20000000
# Load user32.dll functions
self.user32 = ctypes.windll.user32
self.kernel32 = ctypes.windll.kernel32
# EnumWindows callback type
self.EnumWindowsProc = ctypes.WINFUNCTYPE(
wintypes.BOOL,
wintypes.HWND,
wintypes.LPARAM
)
# ========== Public API ==========
def is_available(self) -> bool:
"""Check if window manager is fully functional."""
return self._available
def find_eu_window(self) -> Optional[WindowInfo]:
"""
Find the Entropia Universe game window.
Returns:
WindowInfo if found, None otherwise
"""
if not self._available:
return None
# Try by window title first
hwnd = self._find_window_by_title(self.EU_WINDOW_TITLE)
if hwnd:
self._window_handle = hwnd
self._window_info = self._get_window_info(hwnd)
return self._window_info
# Try by process name
for proc_name in self.EU_PROCESS_NAMES:
hwnd = self._find_window_by_process(proc_name)
if hwnd:
self._window_handle = hwnd
self._window_info = self._get_window_info(hwnd)
return self._window_info
return None
def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]:
"""
Get the window rectangle (left, top, right, bottom).
Returns:
Tuple of (left, top, right, bottom) or None
"""
if not self._available:
return None
# Refresh window info
self._update_window_info()
if self._window_info:
return self._window_info.rect
return None
def is_window_focused(self) -> bool:
"""
Check if the EU window is currently focused/active.
Returns:
True if EU window is active, False otherwise
"""
if not self._available:
return False
if not self._window_handle:
self.find_eu_window()
if not self._window_handle:
return False
# Get foreground window
foreground_hwnd = self.user32.GetForegroundWindow()
return foreground_hwnd == self._window_handle
def is_window_visible(self) -> bool:
"""
Check if the EU window is visible.
Returns:
True if visible, False otherwise
"""
if not self._available:
return False
# Refresh window info
self._update_window_info()
if self._window_info:
return self._window_info.is_visible
return False
def bring_to_front(self) -> bool:
"""
Bring the EU window to the front and focus it.
Returns:
True if successful, False otherwise
"""
if not self._available:
return False
if not self._window_handle:
self.find_eu_window()
if not self._window_handle:
return False
try:
# Show window if minimized
self.user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9
# Bring to front
result = self.user32.SetForegroundWindow(self._window_handle)
# Force window to top
self.user32.SetWindowPos(
self._window_handle,
-1, # HWND_TOPMOST
0, 0, 0, 0,
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
)
# Remove topmost flag but keep on top
self.user32.SetWindowPos(
self._window_handle,
-2, # HWND_NOTOPMOST
0, 0, 0, 0,
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
)
return bool(result)
except Exception as e:
print(f"[WindowManager] Failed to bring window to front: {e}")
return False
def get_eu_process_info(self) -> Optional[ProcessInfo]:
"""
Get process information for Entropia Universe.
Returns:
ProcessInfo if found, None otherwise
"""
if not self._available:
return None
if not self._window_handle:
self.find_eu_window()
if not self._window_handle:
return None
# Get PID from window
pid = wintypes.DWORD()
self.user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid))
if pid.value == 0:
return None
self._process_info = self._get_process_info(pid.value)
return self._process_info
def get_window_handle(self) -> Optional[int]:
"""Get the current window handle."""
return self._window_handle
def refresh(self) -> Optional[WindowInfo]:
"""Force refresh of window information."""
self._last_update = 0
return self.find_eu_window()
# ========== Private Methods ==========
def _update_window_info(self):
"""Update cached window info if needed."""
current_time = time.time()
if current_time - self._last_update > self._update_interval:
if self._window_handle:
self._window_info = self._get_window_info(self._window_handle)
else:
self.find_eu_window()
self._last_update = current_time
def _find_window_by_title(self, title: str) -> Optional[int]:
"""Find window by title (partial match)."""
found_hwnd = [None]
def callback(hwnd, extra):
if not self.user32.IsWindowVisible(hwnd):
return True
# Get window text
text = ctypes.create_unicode_buffer(256)
self.user32.GetWindowTextW(hwnd, text, 256)
window_title = text.value
if title.lower() in window_title.lower():
found_hwnd[0] = hwnd
return False # Stop enumeration
return True
proc = self.EnumWindowsProc(callback)
self.user32.EnumWindows(proc, 0)
return found_hwnd[0]
def _find_window_by_process(self, process_name: str) -> Optional[int]:
"""Find window by process name."""
found_hwnd = [None]
def callback(hwnd, extra):
if not self.user32.IsWindowVisible(hwnd):
return True
# Get process ID
pid = wintypes.DWORD()
self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
# Check process name
proc_info = self._get_process_info(pid.value)
if proc_info and process_name.lower() in proc_info.name.lower():
found_hwnd[0] = hwnd
return False # Stop enumeration
return True
proc = self.EnumWindowsProc(callback)
self.user32.EnumWindows(proc, 0)
return found_hwnd[0]
def _get_window_info(self, hwnd: int) -> Optional[WindowInfo]:
"""Get detailed information about a window."""
try:
# Get window rect
rect = wintypes.RECT()
if not self.user32.GetWindowRect(hwnd, ctypes.byref(rect)):
return None
# Get window text
text = ctypes.create_unicode_buffer(256)
self.user32.GetWindowTextW(hwnd, text, 256)
# Get PID
pid = wintypes.DWORD()
self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
# Check visibility
is_visible = bool(self.user32.IsWindowVisible(hwnd))
# Check if focused
foreground = self.user32.GetForegroundWindow()
is_focused = (foreground == hwnd)
return WindowInfo(
handle=hwnd,
title=text.value,
pid=pid.value,
rect=(rect.left, rect.top, rect.right, rect.bottom),
width=rect.right - rect.left,
height=rect.bottom - rect.top,
is_visible=is_visible,
is_focused=is_focused
)
except Exception as e:
print(f"[WindowManager] Error getting window info: {e}")
return None
def _get_process_info(self, pid: int) -> Optional[ProcessInfo]:
"""Get process information."""
try:
# Use Windows WMI or tasklist for process info
import subprocess
# Try tasklist first
result = subprocess.run(
['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and result.stdout.strip():
# Parse CSV output
lines = result.stdout.strip().split('\n')
for line in lines:
if str(pid) in line:
parts = line.split('","')
if len(parts) >= 2:
name = parts[0].replace('"', '')
return ProcessInfo(
pid=pid,
name=name,
executable_path=None,
memory_usage=None,
cpu_percent=None
)
return ProcessInfo(
pid=pid,
name="Unknown",
executable_path=None,
memory_usage=None,
cpu_percent=None
)
except Exception as e:
print(f"[WindowManager] Error getting process info: {e}")
return None
def _check_window_exists(self, hwnd: int) -> bool:
"""Check if a window handle is still valid."""
if not self._available:
return False
return bool(self.user32.IsWindow(hwnd))
# Singleton instance
_window_manager = None
def get_window_manager() -> WindowManager:
"""Get the global WindowManager instance."""
global _window_manager
if _window_manager is None:
_window_manager = WindowManager()
return _window_manager
def is_eu_running() -> bool:
"""Quick check if Entropia Universe is running."""
wm = get_window_manager()
return wm.find_eu_window() is not None
def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]:
"""Quick access to EU window rectangle."""
wm = get_window_manager()
return wm.get_window_rect()
def wait_for_eu(timeout: float = 30.0) -> bool:
"""
Wait for Entropia Universe to start.
Args:
timeout: Maximum seconds to wait
Returns:
True if EU found, False if timeout
"""
wm = get_window_manager()
start_time = time.time()
while time.time() - start_time < timeout:
if wm.find_eu_window():
return True
time.sleep(0.5)
return False

View File

@ -139,6 +139,50 @@ class BasePlugin(ABC):
return self.api.find_apis(api_type)
# ========== Window Service Methods ==========
def get_eu_window(self) -> Optional[Dict[str, Any]]:
"""Get information about the Entropia Universe window.
Returns:
Dict with window info or None if not available:
{
'handle': int,
'title': str,
'rect': (left, top, right, bottom),
'width': int,
'height': int,
'is_visible': bool,
'is_focused': bool
}
"""
if not self.api:
return None
return self.api.get_eu_window()
def is_eu_focused(self) -> bool:
"""Check if EU window is currently focused/active.
Returns:
True if EU is the active window, False otherwise
"""
if not self.api:
return False
return self.api.is_eu_focused()
def bring_eu_to_front(self) -> bool:
"""Bring EU window to front and focus it.
Returns:
True if successful, False otherwise
"""
if not self.api:
return False
return self.api.bring_eu_to_front()
# ========== Shared Services ==========
def ocr_capture(self, region: tuple = None) -> Dict[str, Any]: