diff --git a/projects/EU-Utility/core/data_store.py b/projects/EU-Utility/core/data_store.py new file mode 100644 index 0000000..89aa3d7 --- /dev/null +++ b/projects/EU-Utility/core/data_store.py @@ -0,0 +1,316 @@ +""" +EU-Utility - Data Store Service + +Thread-safe persistent data storage for plugins. +Provides file locking, auto-backup, and singleton access. +""" + +import json +import fcntl +import shutil +import threading +from pathlib import Path +from typing import Any, Dict, Optional +from datetime import datetime +from collections import OrderedDict + + +class DataStore: + """ + Singleton data persistence service for plugins. + + Features: + - Thread-safe file operations with file locking + - Auto-backup on write (keeps last 5 versions) + - Per-plugin JSON storage + - Auto-create directories + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self, data_dir: str = "data/plugins"): + if self._initialized: + return + + self.data_dir = Path(data_dir) + self.data_dir.mkdir(parents=True, exist_ok=True) + + # Memory cache for frequently accessed data + self._cache: Dict[str, Dict[str, Any]] = {} + self._cache_lock = threading.Lock() + + # Backup settings + self.max_backups = 5 + + self._initialized = True + + def _get_plugin_file(self, plugin_id: str) -> Path: + """Get the storage file path for a plugin.""" + # Sanitize plugin_id to create a safe filename + safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") + return self.data_dir / f"{safe_name}.json" + + def _get_backup_dir(self, plugin_id: str) -> Path: + """Get the backup directory for a plugin.""" + safe_name = plugin_id.replace(".", "_").replace("/", "_").replace("\\", "_") + backup_dir = self.data_dir / ".backups" / safe_name + backup_dir.mkdir(parents=True, exist_ok=True) + return backup_dir + + def _load_plugin_data(self, plugin_id: str) -> Dict[str, Any]: + """Load all data for a plugin from disk.""" + # Check cache first + with self._cache_lock: + if plugin_id in self._cache: + return self._cache[plugin_id].copy() + + file_path = self._get_plugin_file(plugin_id) + + if not file_path.exists(): + return {} + + try: + with open(file_path, 'r', encoding='utf-8') as f: + # Acquire shared lock for reading + fcntl.flock(f.fileno(), fcntl.LOCK_SH) + try: + data = json.load(f) + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + # Update cache + with self._cache_lock: + self._cache[plugin_id] = data.copy() + return data + except (json.JSONDecodeError, IOError) as e: + print(f"[DataStore] Error loading data for {plugin_id}: {e}") + return {} + + def _save_plugin_data(self, plugin_id: str, data: Dict[str, Any]) -> bool: + """Save all data for a plugin to disk with backup.""" + file_path = self._get_plugin_file(plugin_id) + + try: + # Create backup if file exists + if file_path.exists(): + self._create_backup(plugin_id, file_path) + + # Write to temp file first, then move (atomic operation) + temp_path = file_path.with_suffix('.tmp') + + with open(temp_path, 'w', encoding='utf-8') as f: + # Acquire exclusive lock for writing + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + try: + json.dump(data, f, indent=2, ensure_ascii=False) + f.flush() + import os + os.fsync(f.fileno()) + finally: + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + # Atomic move + temp_path.replace(file_path) + + # Update cache + with self._cache_lock: + self._cache[plugin_id] = data.copy() + + return True + except IOError as e: + print(f"[DataStore] Error saving data for {plugin_id}: {e}") + # Clean up temp file if exists + temp_path = file_path.with_suffix('.tmp') + if temp_path.exists(): + temp_path.unlink() + return False + + def _create_backup(self, plugin_id: str, file_path: Path): + """Create a backup of the current data file.""" + backup_dir = self._get_backup_dir(plugin_id) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_path = backup_dir / f"{timestamp}.json" + + try: + shutil.copy2(file_path, backup_path) + self._cleanup_old_backups(backup_dir) + except IOError as e: + print(f"[DataStore] Error creating backup for {plugin_id}: {e}") + + def _cleanup_old_backups(self, backup_dir: Path): + """Remove old backups, keeping only the last N versions.""" + try: + backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime) + while len(backups) > self.max_backups: + old_backup = backups.pop(0) + old_backup.unlink() + except IOError as e: + print(f"[DataStore] Error cleaning up backups: {e}") + + def save(self, plugin_id: str, key: str, data: Any) -> bool: + """ + Save data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key under which to store the data + data: Data to store (must be JSON serializable) + + Returns: + True if successful, False otherwise + """ + plugin_data = self._load_plugin_data(plugin_id) + plugin_data[key] = data + return self._save_plugin_data(plugin_id, plugin_data) + + def load(self, plugin_id: str, key: str, default: Any = None) -> Any: + """ + Load data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key of the data to load + default: Default value if key not found + + Returns: + The stored data or default value + """ + plugin_data = self._load_plugin_data(plugin_id) + return plugin_data.get(key, default) + + def delete(self, plugin_id: str, key: str) -> bool: + """ + Delete data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key of the data to delete + + Returns: + True if key existed and was deleted, False otherwise + """ + plugin_data = self._load_plugin_data(plugin_id) + if key in plugin_data: + del plugin_data[key] + return self._save_plugin_data(plugin_id, plugin_data) + return False + + def get_all_keys(self, plugin_id: str) -> list: + """ + Get all keys stored for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + + Returns: + List of keys + """ + plugin_data = self._load_plugin_data(plugin_id) + return list(plugin_data.keys()) + + def clear_plugin(self, plugin_id: str) -> bool: + """ + Clear all data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + + Returns: + True if successful, False otherwise + """ + file_path = self._get_plugin_file(plugin_id) + + # Create backup before clearing + if file_path.exists(): + self._create_backup(plugin_id, file_path) + + # Clear cache + with self._cache_lock: + if plugin_id in self._cache: + del self._cache[plugin_id] + + # Remove file + try: + if file_path.exists(): + file_path.unlink() + return True + except IOError as e: + print(f"[DataStore] Error clearing data for {plugin_id}: {e}") + return False + + def get_backups(self, plugin_id: str) -> list: + """ + Get list of available backups for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + + Returns: + List of backup file paths + """ + backup_dir = self._get_backup_dir(plugin_id) + if not backup_dir.exists(): + return [] + + backups = sorted(backup_dir.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) + return [str(b) for b in backups] + + def restore_backup(self, plugin_id: str, backup_path: str) -> bool: + """ + Restore data from a backup. + + Args: + plugin_id: Unique identifier for the plugin + backup_path: Path to the backup file + + Returns: + True if successful, False otherwise + """ + backup_file = Path(backup_path) + if not backup_file.exists(): + print(f"[DataStore] Backup not found: {backup_path}") + return False + + file_path = self._get_plugin_file(plugin_id) + + try: + # Create backup of current state before restoring + if file_path.exists(): + self._create_backup(plugin_id, file_path) + + # Copy backup to main file + shutil.copy2(backup_file, file_path) + + # Invalidate cache + with self._cache_lock: + if plugin_id in self._cache: + del self._cache[plugin_id] + + return True + except IOError as e: + print(f"[DataStore] Error restoring backup for {plugin_id}: {e}") + return False + + +# Singleton instance +_data_store = None +_data_store_lock = threading.Lock() + + +def get_data_store() -> DataStore: + """Get the global DataStore instance.""" + global _data_store + if _data_store is None: + with _data_store_lock: + if _data_store is None: + _data_store = DataStore() + return _data_store diff --git a/projects/EU-Utility/core/event_bus.py b/projects/EU-Utility/core/event_bus.py new file mode 100644 index 0000000..21a6c59 --- /dev/null +++ b/projects/EU-Utility/core/event_bus.py @@ -0,0 +1,672 @@ +""" +EU-Utility - Enhanced Event Bus + +Core service for typed event handling with: +- Typed events using dataclasses +- Event filtering (mob types, damage thresholds, etc.) +- Event persistence (last 1000 events) +- Event replay (replay last N events to new subscribers) +- Async event handling (non-blocking publishers) +- Event statistics (events per minute, etc.) +""" + +import asyncio +import time +import threading +from collections import deque, defaultdict +from dataclasses import dataclass, field, asdict +from datetime import datetime, timedelta +from enum import Enum, auto +from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, Set +from functools import wraps +import copy + + +# ========== Event Types ========== + +class EventCategory(Enum): + """Event categories for organization.""" + SKILL = auto() + LOOT = auto() + COMBAT = auto() + ECONOMY = auto() + SYSTEM = auto() + CHAT = auto() + GLOBAL = auto() + + +@dataclass(frozen=True) +class BaseEvent: + """Base class for all typed events.""" + timestamp: datetime = field(default_factory=datetime.now) + source: str = "unknown" + + @property + def event_type(self) -> str: + """Return the event type name.""" + return self.__class__.__name__ + + @property + def category(self) -> EventCategory: + """Return the event category.""" + return EventCategory.SYSTEM + + def to_dict(self) -> Dict[str, Any]: + """Convert event to dictionary.""" + return { + 'event_type': self.event_type, + 'category': self.category.name, + **asdict(self) + } + + +@dataclass(frozen=True) +class SkillGainEvent(BaseEvent): + """Event fired when a skill increases.""" + skill_name: str = "" + skill_value: float = 0.0 + gain_amount: float = 0.0 + + @property + def category(self) -> EventCategory: + return EventCategory.SKILL + + +@dataclass(frozen=True) +class LootEvent(BaseEvent): + """Event fired when loot is received.""" + mob_name: str = "" + items: List[Dict[str, Any]] = field(default_factory=list) + total_tt_value: float = 0.0 + position: Optional[tuple] = None # (x, y, z) + + @property + def category(self) -> EventCategory: + return EventCategory.LOOT + + def get_item_names(self) -> List[str]: + """Get list of item names from loot.""" + return [item.get('name', 'Unknown') for item in self.items] + + +@dataclass(frozen=True) +class DamageEvent(BaseEvent): + """Event fired when damage is dealt or received.""" + damage_amount: float = 0.0 + damage_type: str = "" # e.g., "impact", "penetration", "burn" + is_critical: bool = False + target_name: str = "" + attacker_name: str = "" + is_outgoing: bool = True # True if player dealt damage + + @property + def category(self) -> EventCategory: + return EventCategory.COMBAT + + def is_high_damage(self, threshold: float = 100.0) -> bool: + """Check if damage exceeds threshold.""" + return self.damage_amount >= threshold + + +@dataclass(frozen=True) +class GlobalEvent(BaseEvent): + """Event for global announcements.""" + player_name: str = "" + achievement_type: str = "" # e.g., "hof", "ath", "discovery" + value: float = 0.0 + item_name: Optional[str] = None + + @property + def category(self) -> EventCategory: + return EventCategory.GLOBAL + + +@dataclass(frozen=True) +class ChatEvent(BaseEvent): + """Event for chat messages.""" + channel: str = "" # "main", "team", "society", etc. + sender: str = "" + message: str = "" + + @property + def category(self) -> EventCategory: + return EventCategory.CHAT + + +@dataclass(frozen=True) +class EconomyEvent(BaseEvent): + """Event for economic transactions.""" + transaction_type: str = "" # "sale", "purchase", "deposit", "withdraw" + amount: float = 0.0 + currency: str = "PED" + description: str = "" + + @property + def category(self) -> EventCategory: + return EventCategory.ECONOMY + + +@dataclass(frozen=True) +class SystemEvent(BaseEvent): + """Event for system notifications.""" + message: str = "" + severity: str = "info" # "debug", "info", "warning", "error", "critical" + + @property + def category(self) -> EventCategory: + return EventCategory.SYSTEM + + +# Type variable for event types +T = TypeVar('T', bound=BaseEvent) + + +# ========== Event Filter ========== + +@dataclass +class EventFilter: + """Filter criteria for event subscription.""" + event_types: Optional[List[Type[BaseEvent]]] = None + categories: Optional[List[EventCategory]] = None + min_damage: Optional[float] = None + max_damage: Optional[float] = None + mob_types: Optional[List[str]] = None + skill_names: Optional[List[str]] = None + sources: Optional[List[str]] = None + custom_predicate: Optional[Callable[[BaseEvent], bool]] = None + + def matches(self, event: BaseEvent) -> bool: + """Check if an event matches this filter.""" + # Check event type + if self.event_types is not None: + if not any(isinstance(event, et) for et in self.event_types): + return False + + # Check category + if self.categories is not None: + if event.category not in self.categories: + return False + + # Check source + if self.sources is not None: + if event.source not in self.sources: + return False + + # Type-specific filters + if isinstance(event, DamageEvent): + if self.min_damage is not None and event.damage_amount < self.min_damage: + return False + if self.max_damage is not None and event.damage_amount > self.max_damage: + return False + + if isinstance(event, LootEvent): + if self.mob_types is not None: + if event.mob_name not in self.mob_types: + return False + + if isinstance(event, SkillGainEvent): + if self.skill_names is not None: + if event.skill_name not in self.skill_names: + return False + + # Custom predicate + if self.custom_predicate is not None: + return self.custom_predicate(event) + + return True + + +# ========== Subscription ========== + +@dataclass +class EventSubscription: + """Represents an event subscription.""" + id: str + callback: Callable[[BaseEvent], Any] + event_filter: EventFilter + replay_history: bool + replay_count: int + created_at: datetime = field(default_factory=datetime.now) + event_count: int = 0 + last_received: Optional[datetime] = None + + def matches(self, event: BaseEvent) -> bool: + """Check if event matches this subscription.""" + return self.event_filter.matches(event) + + +# ========== Event Statistics ========== + +@dataclass +class EventStats: + """Statistics for event bus performance.""" + total_events_published: int = 0 + total_events_delivered: int = 0 + total_subscriptions: int = 0 + active_subscriptions: int = 0 + events_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) + events_by_category: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) + events_per_minute: float = 0.0 + average_delivery_time_ms: float = 0.0 + errors: int = 0 + _start_time: datetime = field(default_factory=datetime.now) + _minute_window: deque = field(default_factory=lambda: deque(maxlen=60)) + _delivery_times: deque = field(default_factory=lambda: deque(maxlen=100)) + + def record_event_published(self, event: BaseEvent): + """Record an event publication.""" + self.total_events_published += 1 + self.events_by_type[event.event_type] += 1 + self.events_by_category[event.category.name] += 1 + self._minute_window.append(time.time()) + self._update_epm() + + def record_event_delivered(self, delivery_time_ms: float): + """Record successful event delivery.""" + self.total_events_delivered += 1 + self._delivery_times.append(delivery_time_ms) + if len(self._delivery_times) > 0: + self.average_delivery_time_ms = sum(self._delivery_times) / len(self._delivery_times) + + def record_error(self): + """Record a delivery error.""" + self.errors += 1 + + def _update_epm(self): + """Update events per minute calculation.""" + now = time.time() + # Count events in the last 60 seconds + recent = [t for t in self._minute_window if now - t < 60] + self.events_per_minute = len(recent) + + def get_summary(self) -> Dict[str, Any]: + """Get statistics summary.""" + uptime = datetime.now() - self._start_time + return { + 'total_published': self.total_events_published, + 'total_delivered': self.total_events_delivered, + 'active_subscriptions': self.active_subscriptions, + 'events_per_minute': round(self.events_per_minute, 2), + 'avg_delivery_ms': round(self.average_delivery_time_ms, 2), + 'errors': self.errors, + 'uptime_seconds': uptime.total_seconds(), + 'top_event_types': dict(sorted( + self.events_by_type.items(), + key=lambda x: x[1], + reverse=True + )[:5]) + } + + +# ========== Event Bus ========== + +class EventBus: + """ + Enhanced Event Bus for EU-Utility. + + Features: + - Typed events using dataclasses + - Event filtering with flexible criteria + - Event persistence (configurable history size) + - Event replay for new subscribers + - Async event handling + - Event statistics + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self, max_history: int = 1000): + if self._initialized: + return + + self.max_history = max_history + self._history: deque = deque(maxlen=max_history) + self._subscriptions: Dict[str, EventSubscription] = {} + self._subscription_counter = 0 + self._stats = EventStats() + self._lock = threading.RLock() + self._async_loop: Optional[asyncio.AbstractEventLoop] = None + self._async_thread: Optional[threading.Thread] = None + + self._initialized = True + + def _ensure_async_loop(self): + """Ensure the async event loop is running.""" + if self._async_loop is None or self._async_loop.is_closed(): + self._start_async_loop() + + def _start_async_loop(self): + """Start the async event loop in a separate thread.""" + def run_loop(): + self._async_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._async_loop) + self._async_loop.run_forever() + + self._async_thread = threading.Thread(target=run_loop, daemon=True) + self._async_thread.start() + + def publish(self, event: BaseEvent) -> None: + """ + Publish an event to all matching subscribers. + Non-blocking - returns immediately. + """ + with self._lock: + # Add to history + self._history.append(event) + + # Update stats + self._stats.record_event_published(event) + + # Get matching subscriptions + matching_subs = [ + sub for sub in self._subscriptions.values() + if sub.matches(event) + ] + + # Deliver outside the lock to prevent blocking + for sub in matching_subs: + self._deliver_async(sub, event) + + def publish_sync(self, event: BaseEvent) -> int: + """ + Publish an event synchronously. + Blocks until all callbacks complete. + Returns number of subscribers notified. + """ + with self._lock: + # Add to history + self._history.append(event) + + # Update stats + self._stats.record_event_published(event) + + # Get matching subscriptions + matching_subs = [ + sub for sub in self._subscriptions.values() + if sub.matches(event) + ] + + # Deliver synchronously + count = 0 + for sub in matching_subs: + if self._deliver_sync(sub, event): + count += 1 + + return count + + def _deliver_async(self, subscription: EventSubscription, event: BaseEvent): + """Deliver an event asynchronously.""" + self._ensure_async_loop() + + def deliver(): + start = time.perf_counter() + try: + subscription.callback(event) + subscription.event_count += 1 + subscription.last_received = datetime.now() + elapsed = (time.perf_counter() - start) * 1000 + self._stats.record_event_delivered(elapsed) + except Exception as e: + self._stats.record_error() + print(f"[EventBus] Error delivering to {subscription.id}: {e}") + + if self._async_loop: + self._async_loop.call_soon_threadsafe(deliver) + + def _deliver_sync(self, subscription: EventSubscription, event: BaseEvent) -> bool: + """Deliver an event synchronously.""" + start = time.perf_counter() + try: + subscription.callback(event) + subscription.event_count += 1 + subscription.last_received = datetime.now() + elapsed = (time.perf_counter() - start) * 1000 + self._stats.record_event_delivered(elapsed) + return True + except Exception as e: + self._stats.record_error() + print(f"[EventBus] Error delivering to {subscription.id}: {e}") + return False + + def subscribe( + self, + callback: Callable[[BaseEvent], Any], + event_filter: Optional[EventFilter] = None, + replay_history: bool = False, + replay_count: int = 100, + event_types: Optional[List[Type[BaseEvent]]] = None + ) -> str: + """ + Subscribe to events with optional filtering. + + Args: + callback: Function to call when matching events occur + event_filter: Filter criteria for events + replay_history: Whether to replay recent events to new subscriber + replay_count: Number of recent events to replay + event_types: Shorthand for simple type-based filtering + + Returns: + Subscription ID (use for unsubscribe) + """ + with self._lock: + self._subscription_counter += 1 + sub_id = f"sub_{self._subscription_counter}" + + # Build filter from shorthand if provided + if event_filter is None and event_types is not None: + event_filter = EventFilter(event_types=event_types) + elif event_filter is None: + event_filter = EventFilter() + + subscription = EventSubscription( + id=sub_id, + callback=callback, + event_filter=event_filter, + replay_history=replay_history, + replay_count=min(replay_count, self.max_history) + ) + + self._subscriptions[sub_id] = subscription + self._stats.active_subscriptions = len(self._subscriptions) + self._stats.total_subscriptions += 1 + + # Replay history if requested (outside lock) + if replay_history: + self._replay_history(subscription) + + return sub_id + + def subscribe_typed( + self, + event_class: Type[T], + callback: Callable[[T], Any], + **filter_kwargs + ) -> str: + """ + Subscribe to a specific event type with optional filtering. + + Args: + event_class: The event class to subscribe to + callback: Function to call with typed event + **filter_kwargs: Additional filter criteria + - min_damage: Minimum damage threshold + - max_damage: Maximum damage threshold + - mob_types: List of mob names to filter + - skill_names: List of skill names to filter + - sources: List of event sources to filter + - replay_last: Number of events to replay + + Returns: + Subscription ID + """ + # Build filter from kwargs + filter_args = {'event_types': [event_class]} + + if 'min_damage' in filter_kwargs: + filter_args['min_damage'] = filter_kwargs.pop('min_damage') + if 'max_damage' in filter_kwargs: + filter_args['max_damage'] = filter_kwargs.pop('max_damage') + if 'mob_types' in filter_kwargs: + filter_args['mob_types'] = filter_kwargs.pop('mob_types') + if 'skill_names' in filter_kwargs: + filter_args['skill_names'] = filter_kwargs.pop('skill_names') + if 'sources' in filter_kwargs: + filter_args['sources'] = filter_kwargs.pop('sources') + + # Handle custom predicate + if 'predicate' in filter_kwargs: + filter_args['custom_predicate'] = filter_kwargs.pop('predicate') + + event_filter = EventFilter(**filter_args) + replay_count = filter_kwargs.pop('replay_last', 0) + + # Create wrapper to ensure type safety + def typed_callback(event: BaseEvent): + if isinstance(event, event_class): + callback(event) + + return self.subscribe( + callback=typed_callback, + event_filter=event_filter, + replay_history=replay_count > 0, + replay_count=replay_count + ) + + def _replay_history(self, subscription: EventSubscription): + """Replay recent events to a subscriber.""" + with self._lock: + # Get recent events that match the filter + events_to_replay = [ + e for e in list(self._history)[-subscription.replay_count:] + if subscription.matches(e) + ] + + # Deliver each event + for event in events_to_replay: + self._deliver_async(subscription, event) + + def unsubscribe(self, subscription_id: str) -> bool: + """Unsubscribe from events.""" + with self._lock: + if subscription_id in self._subscriptions: + del self._subscriptions[subscription_id] + self._stats.active_subscriptions = len(self._subscriptions) + return True + return False + + def get_recent_events( + self, + event_type: Optional[Type[T]] = None, + count: int = 100, + category: Optional[EventCategory] = None + ) -> List[BaseEvent]: + """ + Get recent events from history. + + Args: + event_type: Filter by event class + count: Maximum number of events to return + category: Filter by event category + + Returns: + List of matching events + """ + with self._lock: + events = list(self._history) + + # Apply filters + if event_type is not None: + events = [e for e in events if isinstance(e, event_type)] + if category is not None: + events = [e for e in events if e.category == category] + + # Return most recent + return events[-count:] + + def get_events_by_time_range( + self, + start: datetime, + end: Optional[datetime] = None + ) -> List[BaseEvent]: + """Get events within a time range.""" + if end is None: + end = datetime.now() + + with self._lock: + return [ + e for e in self._history + if start <= e.timestamp <= end + ] + + def get_stats(self) -> Dict[str, Any]: + """Get event bus statistics.""" + return self._stats.get_summary() + + def clear_history(self): + """Clear event history.""" + with self._lock: + self._history.clear() + + def shutdown(self): + """Shutdown the event bus and cleanup resources.""" + with self._lock: + self._subscriptions.clear() + self._history.clear() + self._stats.active_subscriptions = 0 + + if self._async_loop and self._async_loop.is_running(): + self._async_loop.call_soon_threadsafe(self._async_loop.stop) + + if self._async_thread and self._async_thread.is_alive(): + self._async_thread.join(timeout=2.0) + + +# Singleton instance +_event_bus = None + +def get_event_bus() -> EventBus: + """Get the global EventBus instance.""" + global _event_bus + if _event_bus is None: + _event_bus = EventBus() + return _event_bus + + +def reset_event_bus(): + """Reset the global EventBus instance (mainly for testing).""" + global _event_bus + if _event_bus is not None: + _event_bus.shutdown() + _event_bus = None + + +# ========== Decorators ========== + +def on_event( + event_class: Type[T], + **filter_kwargs +): + """ + Decorator for event subscription. + + Usage: + @on_event(DamageEvent, min_damage=100) + def handle_big_damage(event: DamageEvent): + print(f"Big hit: {event.damage_amount}") + """ + def decorator(func): + bus = get_event_bus() + bus.subscribe_typed(event_class, func, **filter_kwargs) + func._event_subscription = True + return func + return decorator diff --git a/projects/EU-Utility/core/http_client.py b/projects/EU-Utility/core/http_client.py new file mode 100644 index 0000000..be5166b --- /dev/null +++ b/projects/EU-Utility/core/http_client.py @@ -0,0 +1,532 @@ +""" +EU-Utility - HTTP Client with Caching + +Thread-safe HTTP client with disk-based caching, automatic retries, +rate limiting, and Cache-Control header support. +""" + +import os +import json +import hashlib +import time +import threading +from pathlib import Path +from typing import Dict, Any, Optional, Union, Tuple +from dataclasses import dataclass, asdict +from datetime import datetime, timedelta + +try: + import requests + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry + REQUESTS_AVAILABLE = True +except ImportError: + REQUESTS_AVAILABLE = False + print("Warning: 'requests' library not installed. HTTP client will not function.") + + +@dataclass +class CacheEntry: + """Represents a cached HTTP response.""" + url: str + status_code: int + headers: Dict[str, str] + content: bytes + cached_at: float + expires_at: float + cache_control: Optional[str] = None + etag: Optional[str] = None + last_modified: Optional[str] = None + + +class HTTPClient: + """ + Thread-safe singleton HTTP client with caching support. + + Features: + - Disk-based response caching + - TTL support (time-to-live) + - Auto-retry with exponential backoff + - Rate limiting between requests + - Cache-Control header respect + - Thread-safe operations + """ + + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__( + self, + cache_dir: str = "cache/http", + default_cache_ttl: int = 3600, + rate_limit_delay: float = 0.0, + max_retries: int = 3, + backoff_factor: float = 0.5, + respect_cache_control: bool = True, + default_headers: Optional[Dict[str, str]] = None + ): + """ + Initialize the HTTP client. + + Args: + cache_dir: Directory for disk cache + default_cache_ttl: Default TTL in seconds + rate_limit_delay: Minimum delay between requests in seconds + max_retries: Maximum number of retries on failure + backoff_factor: Backoff factor for retries + respect_cache_control: Whether to respect Cache-Control headers + default_headers: Default headers for all requests + """ + if self._initialized: + return + + if not REQUESTS_AVAILABLE: + raise RuntimeError("requests library is required for HTTP client") + + # Settings + self.cache_dir = Path(cache_dir) + self.default_cache_ttl = default_cache_ttl + self.rate_limit_delay = rate_limit_delay + self.max_retries = max_retries + self.backoff_factor = backoff_factor + self.respect_cache_control = respect_cache_control + self.default_headers = default_headers or { + 'User-Agent': 'EU-Utility/1.0' + } + + # Thread safety + self._cache_lock = threading.RLock() + self._request_lock = threading.Lock() + self._last_request_time = 0 + + # Initialize cache directory + self.cache_dir.mkdir(parents=True, exist_ok=True) + + # Initialize session with retries + self.session = requests.Session() + retry_strategy = Retry( + total=max_retries, + backoff_factor=backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["HEAD", "GET", "OPTIONS", "POST"] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + + self._initialized = True + print(f"[HTTP] Client initialized (cache: {self.cache_dir})") + + def _generate_cache_key(self, url: str, params: Optional[Dict] = None) -> str: + """Generate a cache key from URL and parameters.""" + key_string = url + if params: + # Sort params for consistent hashing + param_str = json.dumps(params, sort_keys=True) + key_string += "|" + param_str + return hashlib.sha256(key_string.encode()).hexdigest() + + def _get_cache_path(self, cache_key: str) -> Path: + """Get the file path for a cache entry.""" + # Split into subdirectories for better file system performance + return self.cache_dir / cache_key[:2] / cache_key[2:4] / f"{cache_key}.json" + + def _parse_cache_control(self, header: str) -> Optional[int]: + """Parse Cache-Control header to extract max-age.""" + if not header: + return None + + try: + for directive in header.split(','): + directive = directive.strip().lower() + if directive.startswith('max-age='): + return int(directive.split('=')[1]) + elif directive == 'no-cache' or directive == 'no-store': + return 0 + except (ValueError, IndexError): + pass + return None + + def _load_cache_entry(self, cache_key: str) -> Optional[CacheEntry]: + """Load a cache entry from disk.""" + cache_path = self._get_cache_path(cache_key) + + if not cache_path.exists(): + return None + + try: + with self._cache_lock: + with open(cache_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Convert content from base64 if stored that way + content = data.get('content', '') + if isinstance(content, str): + import base64 + content = base64.b64decode(content) + + return CacheEntry( + url=data['url'], + status_code=data['status_code'], + headers=data['headers'], + content=content, + cached_at=data['cached_at'], + expires_at=data['expires_at'], + cache_control=data.get('cache_control'), + etag=data.get('etag'), + last_modified=data.get('last_modified') + ) + except Exception as e: + print(f"[HTTP] Cache load error: {e}") + return None + + def _save_cache_entry(self, cache_key: str, entry: CacheEntry): + """Save a cache entry to disk.""" + cache_path = self._get_cache_path(cache_key) + cache_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with self._cache_lock: + # Encode content as base64 for JSON serialization + import base64 + content_b64 = base64.b64encode(entry.content).decode('utf-8') + + data = { + 'url': entry.url, + 'status_code': entry.status_code, + 'headers': entry.headers, + 'content': content_b64, + 'cached_at': entry.cached_at, + 'expires_at': entry.expires_at, + 'cache_control': entry.cache_control, + 'etag': entry.etag, + 'last_modified': entry.last_modified + } + + with open(cache_path, 'w', encoding='utf-8') as f: + json.dump(data, f) + except Exception as e: + print(f"[HTTP] Cache save error: {e}") + + def _is_cache_valid(self, entry: CacheEntry) -> bool: + """Check if a cache entry is still valid.""" + return time.time() < entry.expires_at + + def _apply_rate_limit(self): + """Apply rate limiting between requests.""" + if self.rate_limit_delay <= 0: + return + + with self._request_lock: + elapsed = time.time() - self._last_request_time + if elapsed < self.rate_limit_delay: + sleep_time = self.rate_limit_delay - elapsed + time.sleep(sleep_time) + self._last_request_time = time.time() + + def _make_request( + self, + method: str, + url: str, + headers: Optional[Dict[str, str]] = None, + **kwargs + ) -> requests.Response: + """Make an HTTP request with rate limiting and retries.""" + self._apply_rate_limit() + + # Merge headers + request_headers = self.default_headers.copy() + if headers: + request_headers.update(headers) + + # Make request + response = self.session.request( + method=method, + url=url, + headers=request_headers, + **kwargs + ) + response.raise_for_status() + return response + + def get( + self, + url: str, + cache_ttl: Optional[int] = None, + headers: Optional[Dict[str, str]] = None, + params: Optional[Dict[str, Any]] = None, + use_cache: bool = True, + **kwargs + ) -> Dict[str, Any]: + """ + Perform a GET request with caching. + + Args: + url: The URL to fetch + cache_ttl: Cache TTL in seconds (None = use default) + headers: Additional headers + params: URL parameters + use_cache: Whether to use cache + **kwargs: Additional arguments for requests + + Returns: + Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache' + """ + if not REQUESTS_AVAILABLE: + raise RuntimeError("requests library not available") + + cache_key = self._generate_cache_key(url, params) + ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl + + # Check cache if enabled + if use_cache and ttl > 0: + cached = self._load_cache_entry(cache_key) + if cached and self._is_cache_valid(cached): + return { + 'status_code': cached.status_code, + 'headers': cached.headers, + 'content': cached.content, + 'text': cached.content.decode('utf-8', errors='replace'), + 'json': None, + 'from_cache': True + } + + # Make request + try: + response = self._make_request('GET', url, headers=headers, params=params, **kwargs) + + content = response.content + + # Parse JSON if possible + json_data = None + try: + json_data = response.json() + except ValueError: + pass + + # Cache the response + if use_cache and ttl > 0: + # Check Cache-Control header + cache_control = response.headers.get('Cache-Control', '') + cache_ttl_override = self._parse_cache_control(cache_control) + + if cache_ttl_override is not None: + if self.respect_cache_control: + if cache_ttl_override == 0: + # Don't cache + pass + else: + ttl = cache_ttl_override + + if ttl > 0: + entry = CacheEntry( + url=url, + status_code=response.status_code, + headers=dict(response.headers), + content=content, + cached_at=time.time(), + expires_at=time.time() + ttl, + cache_control=cache_control, + etag=response.headers.get('ETag'), + last_modified=response.headers.get('Last-Modified') + ) + self._save_cache_entry(cache_key, entry) + + return { + 'status_code': response.status_code, + 'headers': dict(response.headers), + 'content': content, + 'text': content.decode('utf-8', errors='replace'), + 'json': json_data, + 'from_cache': False + } + + except requests.exceptions.RequestException as e: + # Try to return stale cache on failure + if use_cache: + cached = self._load_cache_entry(cache_key) + if cached: + print(f"[HTTP] Returning stale cache for {url}") + return { + 'status_code': cached.status_code, + 'headers': cached.headers, + 'content': cached.content, + 'text': cached.content.decode('utf-8', errors='replace'), + 'json': None, + 'from_cache': True, + 'stale': True + } + raise + + def post( + self, + url: str, + data: Optional[Union[Dict, str, bytes]] = None, + json: Optional[Dict] = None, + headers: Optional[Dict[str, str]] = None, + use_cache: bool = False, + cache_ttl: Optional[int] = None, + **kwargs + ) -> Dict[str, Any]: + """ + Perform a POST request. + + Args: + url: The URL to post to + data: Form data, bytes, or string + json: JSON data (will be serialized) + headers: Additional headers + use_cache: Whether to cache the response + cache_ttl: Cache TTL in seconds + **kwargs: Additional arguments for requests + + Returns: + Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache' + """ + if not REQUESTS_AVAILABLE: + raise RuntimeError("requests library not available") + + cache_key = self._generate_cache_key(url, data or json) + ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl + + # Check cache if enabled (rare for POST but possible) + if use_cache and ttl > 0: + cached = self._load_cache_entry(cache_key) + if cached and self._is_cache_valid(cached): + return { + 'status_code': cached.status_code, + 'headers': cached.headers, + 'content': cached.content, + 'text': cached.content.decode('utf-8', errors='replace'), + 'json': None, + 'from_cache': True + } + + # Make request + response = self._make_request('POST', url, headers=headers, data=data, json=json, **kwargs) + + content = response.content + + # Parse JSON if possible + json_data = None + try: + json_data = response.json() + except ValueError: + pass + + # Cache the response if enabled + if use_cache and ttl > 0: + entry = CacheEntry( + url=url, + status_code=response.status_code, + headers=dict(response.headers), + content=content, + cached_at=time.time(), + expires_at=time.time() + ttl + ) + self._save_cache_entry(cache_key, entry) + + return { + 'status_code': response.status_code, + 'headers': dict(response.headers), + 'content': content, + 'text': content.decode('utf-8', errors='replace'), + 'json': json_data, + 'from_cache': False + } + + def clear_cache(self): + """Clear all cached responses.""" + with self._cache_lock: + if self.cache_dir.exists(): + import shutil + shutil.rmtree(self.cache_dir) + self.cache_dir.mkdir(parents=True, exist_ok=True) + print("[HTTP] Cache cleared") + + def invalidate_cache(self, url_pattern: str): + """ + Invalidate cache entries matching a URL pattern. + + Args: + url_pattern: Substring to match in URLs + """ + with self._cache_lock: + count = 0 + for cache_file in self.cache_dir.rglob("*.json"): + try: + with open(cache_file, 'r', encoding='utf-8') as f: + data = json.load(f) + if url_pattern in data.get('url', ''): + cache_file.unlink() + count += 1 + except Exception: + pass + print(f"[HTTP] Invalidated {count} cache entries matching '{url_pattern}'") + + def get_cache_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + with self._cache_lock: + total_entries = 0 + total_size = 0 + valid_entries = 0 + + for cache_file in self.cache_dir.rglob("*.json"): + try: + total_entries += 1 + total_size += cache_file.stat().st_size + + with open(cache_file, 'r', encoding='utf-8') as f: + data = json.load(f) + if time.time() < data.get('expires_at', 0): + valid_entries += 1 + except Exception: + pass + + return { + 'total_entries': total_entries, + 'valid_entries': valid_entries, + 'expired_entries': total_entries - valid_entries, + 'total_size_bytes': total_size, + 'cache_dir': str(self.cache_dir) + } + + +# Singleton instance +_http_client = None + +def get_http_client( + cache_dir: str = "cache/http", + default_cache_ttl: int = 3600, + rate_limit_delay: float = 0.0, + max_retries: int = 3, + backoff_factor: float = 0.5, + respect_cache_control: bool = True, + default_headers: Optional[Dict[str, str]] = None +) -> HTTPClient: + """ + Get or create the global HTTP client instance. + + First call initializes the client with the provided parameters. + Subsequent calls return the existing instance (parameters ignored). + """ + global _http_client + if _http_client is None: + _http_client = HTTPClient( + cache_dir=cache_dir, + default_cache_ttl=default_cache_ttl, + rate_limit_delay=rate_limit_delay, + max_retries=max_retries, + backoff_factor=backoff_factor, + respect_cache_control=respect_cache_control, + default_headers=default_headers + ) + return _http_client diff --git a/projects/EU-Utility/core/main.py b/projects/EU-Utility/core/main.py index f3ef040..762806c 100644 --- a/projects/EU-Utility/core/main.py +++ b/projects/EU-Utility/core/main.py @@ -39,6 +39,8 @@ from core.overlay_widgets import OverlayManager from core.plugin_api import get_api, APIType from core.log_reader import get_log_reader from core.ocr_service import get_ocr_service +from core.http_client import get_http_client +from core.window_manager import get_window_manager class HotkeyHandler(QObject): diff --git a/projects/EU-Utility/core/nexus_api.py b/projects/EU-Utility/core/nexus_api.py new file mode 100644 index 0000000..31870e7 --- /dev/null +++ b/projects/EU-Utility/core/nexus_api.py @@ -0,0 +1,551 @@ +""" +EU-Utility - Entropia Nexus API Client + +API client for https://api.entropianexus.com +Provides access to game data: items, mobs, market info. + +Part of core - plugins access via PluginAPI. +""" + +import time +import json +from typing import Dict, Any, List, Optional, Union +from dataclasses import dataclass +from enum import Enum +from datetime import datetime, timedelta + + +class EntityType(Enum): + """Types of entities that can be searched.""" + ITEM = "items" + MOB = "mobs" + ALL = "all" + + +class NexusAPIError(Exception): + """Custom exception for Nexus API errors.""" + pass + + +class RateLimitError(NexusAPIError): + """Raised when rate limit is exceeded.""" + pass + + +@dataclass +class SearchResult: + """Result from a search operation.""" + id: str + name: str + type: str + category: Optional[str] = None + icon_url: Optional[str] = None + data: Dict[str, Any] = None + + def __post_init__(self): + if self.data is None: + self.data = {} + + +@dataclass +class ItemDetails: + """Detailed item information.""" + id: str + name: str + description: Optional[str] = None + category: Optional[str] = None + weight: Optional[float] = None + tt_value: Optional[float] = None + decay: Optional[float] = None + ammo_consumption: Optional[int] = None + damage: Optional[float] = None + range: Optional[float] = None + accuracy: Optional[float] = None + durability: Optional[int] = None + requirements: Dict[str, Any] = None + materials: List[Dict[str, Any]] = None + raw_data: Dict[str, Any] = None + + def __post_init__(self): + if self.requirements is None: + self.requirements = {} + if self.materials is None: + self.materials = [] + if self.raw_data is None: + self.raw_data = {} + + +@dataclass +class MarketData: + """Market data for an item.""" + item_id: str + item_name: str + current_markup: Optional[float] = None + avg_markup_7d: Optional[float] = None + avg_markup_30d: Optional[float] = None + volume_24h: Optional[int] = None + volume_7d: Optional[int] = None + buy_orders: List[Dict[str, Any]] = None + sell_orders: List[Dict[str, Any]] = None + last_updated: Optional[datetime] = None + raw_data: Dict[str, Any] = None + + def __post_init__(self): + if self.buy_orders is None: + self.buy_orders = [] + if self.sell_orders is None: + self.sell_orders = [] + if self.raw_data is None: + self.raw_data = {} + + +class NexusAPI: + """ + Singleton client for Entropia Nexus API. + + Features: + - Auto-retry on transient failures + - Rate limiting (max requests per second) + - Caching for frequently accessed data + - Proper error handling + + Usage: + api = get_nexus_api() + results = api.search_items("ArMatrix") + details = api.get_item_details("item_id") + """ + + _instance = None + + # API Configuration + BASE_URL = "https://api.entropianexus.com" + API_VERSION = "v1" + + # Rate limiting + MAX_REQUESTS_PER_SECOND = 5 + MIN_REQUEST_INTERVAL = 1.0 / MAX_REQUESTS_PER_SECOND + + # Retry configuration + MAX_RETRIES = 3 + RETRY_DELAY = 1.0 # seconds + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._last_request_time = 0 + self._request_count = 0 + self._cache: Dict[str, Any] = {} + self._cache_ttl = 300 # 5 minutes default cache + self._cache_timestamps: Dict[str, float] = {} + self._session = None + self._initialized = True + self._available = True + + print("[NexusAPI] Initialized") + + def _get_session(self): + """Get or create HTTP session.""" + if self._session is None: + try: + import requests + self._session = requests.Session() + self._session.headers.update({ + 'User-Agent': 'EU-Utility/1.0 (Entropia Universe Utility Tool)', + 'Accept': 'application/json' + }) + except ImportError: + raise NexusAPIError("requests library not installed. Run: pip install requests") + return self._session + + def _rate_limit(self): + """Enforce rate limiting between requests.""" + current_time = time.time() + time_since_last = current_time - self._last_request_time + + if time_since_last < self.MIN_REQUEST_INTERVAL: + sleep_time = self.MIN_REQUEST_INTERVAL - time_since_last + time.sleep(sleep_time) + + self._last_request_time = time.time() + self._request_count += 1 + + def _make_request( + self, + endpoint: str, + params: Dict[str, Any] = None, + use_cache: bool = True + ) -> Dict[str, Any]: + """ + Make HTTP request with retry logic and rate limiting. + + Args: + endpoint: API endpoint path + params: Query parameters + use_cache: Whether to use caching + + Returns: + JSON response as dict + + Raises: + NexusAPIError: On API errors + RateLimitError: On rate limit exceeded + """ + # Check cache + cache_key = f"{endpoint}:{json.dumps(params, sort_keys=True) if params else ''}" + if use_cache and self._is_cache_valid(cache_key): + return self._cache[cache_key] + + url = f"{self.BASE_URL}/{self.API_VERSION}/{endpoint}" + last_error = None + + for attempt in range(self.MAX_RETRIES): + try: + # Rate limit + self._rate_limit() + + # Make request + session = self._get_session() + response = session.get(url, params=params, timeout=30) + + # Handle rate limiting + if response.status_code == 429: + retry_after = int(response.headers.get('Retry-After', 60)) + if attempt < self.MAX_RETRIES - 1: + print(f"[NexusAPI] Rate limited. Waiting {retry_after}s...") + time.sleep(retry_after) + continue + else: + raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after}s") + + # Handle other HTTP errors + if response.status_code >= 500: + if attempt < self.MAX_RETRIES - 1: + wait_time = self.RETRY_DELAY * (2 ** attempt) # Exponential backoff + print(f"[NexusAPI] Server error {response.status_code}. Retrying in {wait_time}s...") + time.sleep(wait_time) + continue + + response.raise_for_status() + + # Parse response + data = response.json() + + # Cache successful response + if use_cache: + self._cache[cache_key] = data + self._cache_timestamps[cache_key] = time.time() + + return data + + except RateLimitError: + raise + except Exception as e: + last_error = e + if attempt < self.MAX_RETRIES - 1: + wait_time = self.RETRY_DELAY * (2 ** attempt) + print(f"[NexusAPI] Request failed: {e}. Retrying in {wait_time}s...") + time.sleep(wait_time) + else: + break + + # All retries exhausted + error_msg = f"Request failed after {self.MAX_RETRIES} attempts: {last_error}" + print(f"[NexusAPI] {error_msg}") + raise NexusAPIError(error_msg) + + def _is_cache_valid(self, key: str) -> bool: + """Check if cached data is still valid.""" + if key not in self._cache_timestamps: + return False + age = time.time() - self._cache_timestamps[key] + return age < self._cache_ttl + + def clear_cache(self): + """Clear all cached data.""" + self._cache.clear() + self._cache_timestamps.clear() + print("[NexusAPI] Cache cleared") + + def is_available(self) -> bool: + """Check if API is available.""" + return self._available + + # ========== Search Methods ========== + + def search_items(self, query: str, limit: int = 20) -> List[SearchResult]: + """ + Search for items by name. + + Args: + query: Search term (e.g., "ArMatrix", "Omegaton") + limit: Maximum results (default 20, max 100) + + Returns: + List of SearchResult objects + + Example: + results = api.search_items("ArMatrix") + for r in results: + print(f"{r.name} ({r.type})") + """ + try: + params = { + 'q': query, + 'limit': min(limit, 100), + 'type': 'item' + } + + data = self._make_request('search', params) + + results = [] + for item in data.get('results', []): + results.append(SearchResult( + id=item.get('id', ''), + name=item.get('name', ''), + type=item.get('type', 'item'), + category=item.get('category'), + icon_url=item.get('icon_url'), + data=item + )) + + return results + + except Exception as e: + print(f"[NexusAPI] search_items error: {e}") + return [] + + def search_mobs(self, query: str, limit: int = 20) -> List[SearchResult]: + """ + Search for creatures/mobs by name. + + Args: + query: Search term (e.g., "Atrox", "Hispidus") + limit: Maximum results (default 20, max 100) + + Returns: + List of SearchResult objects + """ + try: + params = { + 'q': query, + 'limit': min(limit, 100), + 'type': 'mob' + } + + data = self._make_request('search', params) + + results = [] + for item in data.get('results', []): + results.append(SearchResult( + id=item.get('id', ''), + name=item.get('name', ''), + type=item.get('type', 'mob'), + category=item.get('category'), + icon_url=item.get('icon_url'), + data=item + )) + + return results + + except Exception as e: + print(f"[NexusAPI] search_mobs error: {e}") + return [] + + def search_all(self, query: str, limit: int = 20) -> List[SearchResult]: + """ + Search across all entity types (items, mobs, etc.). + + Args: + query: Search term + limit: Maximum results per type (default 20) + + Returns: + List of SearchResult objects + """ + try: + params = { + 'q': query, + 'limit': min(limit, 100) + } + + data = self._make_request('search', params) + + results = [] + for item in data.get('results', []): + results.append(SearchResult( + id=item.get('id', ''), + name=item.get('name', ''), + type=item.get('type', 'unknown'), + category=item.get('category'), + icon_url=item.get('icon_url'), + data=item + )) + + return results + + except Exception as e: + print(f"[NexusAPI] search_all error: {e}") + return [] + + # ========== Detail Methods ========== + + def get_item_details(self, item_id: str) -> Optional[ItemDetails]: + """ + Get detailed information about a specific item. + + Args: + item_id: The item's unique identifier + + Returns: + ItemDetails object or None if not found + + Example: + details = api.get_item_details("armatrix_lp-35") + print(f"TT Value: {details.tt_value} PED") + """ + try: + data = self._make_request(f'items/{item_id}') + + if not data or 'error' in data: + return None + + return ItemDetails( + id=data.get('id', item_id), + name=data.get('name', 'Unknown'), + description=data.get('description'), + category=data.get('category'), + weight=data.get('weight'), + tt_value=data.get('tt_value'), + decay=data.get('decay'), + ammo_consumption=data.get('ammo_consumption'), + damage=data.get('damage'), + range=data.get('range'), + accuracy=data.get('accuracy'), + durability=data.get('durability'), + requirements=data.get('requirements', {}), + materials=data.get('materials', []), + raw_data=data + ) + + except Exception as e: + print(f"[NexusAPI] get_item_details error: {e}") + return None + + def get_market_data(self, item_id: str) -> Optional[MarketData]: + """ + Get market data for a specific item. + + Args: + item_id: The item's unique identifier + + Returns: + MarketData object or None if not found + + Example: + market = api.get_market_data("armatrix_lp-35") + print(f"Current markup: {market.current_markup:.1f}%") + print(f"24h Volume: {market.volume_24h}") + """ + try: + data = self._make_request(f'items/{item_id}/market') + + if not data or 'error' in data: + return None + + # Parse timestamp if present + last_updated = None + if 'last_updated' in data: + try: + last_updated = datetime.fromisoformat(data['last_updated'].replace('Z', '+00:00')) + except: + pass + + return MarketData( + item_id=item_id, + item_name=data.get('item_name', 'Unknown'), + current_markup=data.get('current_markup'), + avg_markup_7d=data.get('avg_markup_7d'), + avg_markup_30d=data.get('avg_markup_30d'), + volume_24h=data.get('volume_24h'), + volume_7d=data.get('volume_7d'), + buy_orders=data.get('buy_orders', []), + sell_orders=data.get('sell_orders', []), + last_updated=last_updated, + raw_data=data + ) + + except Exception as e: + print(f"[NexusAPI] get_market_data error: {e}") + return None + + # ========== Batch Methods ========== + + def get_items_batch(self, item_ids: List[str]) -> Dict[str, Optional[ItemDetails]]: + """ + Get details for multiple items efficiently. + + Args: + item_ids: List of item IDs + + Returns: + Dict mapping item_id to ItemDetails (or None if failed) + """ + results = {} + for item_id in item_ids: + results[item_id] = self.get_item_details(item_id) + return results + + def get_market_batch(self, item_ids: List[str]) -> Dict[str, Optional[MarketData]]: + """ + Get market data for multiple items efficiently. + + Args: + item_ids: List of item IDs + + Returns: + Dict mapping item_id to MarketData (or None if failed) + """ + results = {} + for item_id in item_ids: + results[item_id] = self.get_market_data(item_id) + return results + + +# Singleton instance +_nexus_api = None + +def get_nexus_api() -> NexusAPI: + """Get the global NexusAPI instance.""" + global _nexus_api + if _nexus_api is None: + _nexus_api = NexusAPI() + return _nexus_api + + +# Convenience functions for quick access +def search_items(query: str, limit: int = 20) -> List[SearchResult]: + """Quick search for items.""" + return get_nexus_api().search_items(query, limit) + +def search_mobs(query: str, limit: int = 20) -> List[SearchResult]: + """Quick search for mobs.""" + return get_nexus_api().search_mobs(query, limit) + +def search_all(query: str, limit: int = 20) -> List[SearchResult]: + """Quick search across all types.""" + return get_nexus_api().search_all(query, limit) + +def get_item_details(item_id: str) -> Optional[ItemDetails]: + """Quick get item details.""" + return get_nexus_api().get_item_details(item_id) + +def get_market_data(item_id: str) -> Optional[MarketData]: + """Quick get market data.""" + return get_nexus_api().get_market_data(item_id) diff --git a/projects/EU-Utility/core/notifications.py b/projects/EU-Utility/core/notifications.py new file mode 100644 index 0000000..4be4aae --- /dev/null +++ b/projects/EU-Utility/core/notifications.py @@ -0,0 +1,597 @@ +""" +EU-Utility - Notification System + +Toast notification system for non-blocking user feedback. +Supports info, warning, error, and success notification types. +""" + +from pathlib import Path +from typing import List, Optional, Callable, Dict, Any +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from collections import deque +import threading + +from PyQt6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QGraphicsDropShadowEffect, QApplication +) +from PyQt6.QtCore import Qt, QTimer, pyqtSignal, QObject, QPoint +from PyQt6.QtGui import QColor, QIcon, QPixmap + +from core.eu_styles import EU_COLORS, EU_RADIUS + + +class NotificationType(Enum): + """Notification types with associated styling.""" + INFO = "info" + WARNING = "warning" + ERROR = "error" + SUCCESS = "success" + + +NOTIFICATION_STYLES = { + NotificationType.INFO: { + 'icon': 'ℹ', + 'icon_color': '#4a9eff', + 'border_color': 'rgba(74, 158, 255, 150)', + 'bg_color': 'rgba(25, 35, 50, 240)', + }, + NotificationType.WARNING: { + 'icon': '⚠', + 'icon_color': '#ffc107', + 'border_color': 'rgba(255, 193, 7, 150)', + 'bg_color': 'rgba(45, 40, 25, 240)', + }, + NotificationType.ERROR: { + 'icon': '✕', + 'icon_color': '#f44336', + 'border_color': 'rgba(244, 67, 54, 150)', + 'bg_color': 'rgba(45, 25, 25, 240)', + }, + NotificationType.SUCCESS: { + 'icon': '✓', + 'icon_color': '#4caf50', + 'border_color': 'rgba(76, 175, 80, 150)', + 'bg_color': 'rgba(25, 45, 30, 240)', + }, +} + + +@dataclass +class Notification: + """A single notification entry.""" + id: str + title: str + message: str + type: NotificationType + timestamp: datetime = field(default_factory=datetime.now) + sound_played: bool = False + duration: int = 5000 # ms + + +class ToastWidget(QWidget): + """Individual toast notification widget.""" + + clicked = pyqtSignal(str) # Emits notification ID + closed = pyqtSignal(str) # Emits notification ID + expired = pyqtSignal(str) # Emits notification ID + + def __init__(self, notification: Notification, parent=None): + super().__init__(parent) + self.notification = notification + self._opacity = 1.0 + self._fade_timer = None + + # Frameless, always on top + self.setWindowFlags( + Qt.WindowType.FramelessWindowHint | + Qt.WindowType.WindowStaysOnTopHint | + Qt.WindowType.Tool | + Qt.WindowType.WindowDoesNotFocus + ) + self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground) + self.setAttribute(Qt.WidgetAttribute.WA_ShowWithoutActivating) + + self._setup_ui() + self._setup_auto_close() + + def _setup_ui(self): + """Setup toast UI with EU styling.""" + style = NOTIFICATION_STYLES[self.notification.type] + + # Main container styling + self.setStyleSheet(f""" + QWidget {{ + background-color: {style['bg_color']}; + border: 1px solid {style['border_color']}; + border-radius: {EU_RADIUS['medium']}; + }} + """) + + # Shadow effect + shadow = QGraphicsDropShadowEffect() + shadow.setBlurRadius(20) + shadow.setColor(QColor(0, 0, 0, 80)) + shadow.setOffset(0, 4) + self.setGraphicsEffect(shadow) + + # Layout + layout = QHBoxLayout(self) + layout.setContentsMargins(12, 10, 12, 10) + layout.setSpacing(10) + + # Icon + self.icon_label = QLabel(style['icon']) + self.icon_label.setStyleSheet(f""" + color: {style['icon_color']}; + font-size: 16px; + font-weight: bold; + background: transparent; + """) + self.icon_label.setFixedSize(24, 24) + self.icon_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + layout.addWidget(self.icon_label) + + # Content + content_layout = QVBoxLayout() + content_layout.setSpacing(4) + content_layout.setContentsMargins(0, 0, 0, 0) + + # Title + if self.notification.title: + self.title_label = QLabel(self.notification.title) + self.title_label.setStyleSheet(f""" + color: {EU_COLORS['text_primary']}; + font-size: 12px; + font-weight: bold; + background: transparent; + """) + self.title_label.setWordWrap(True) + content_layout.addWidget(self.title_label) + + # Message + self.message_label = QLabel(self.notification.message) + self.message_label.setStyleSheet(f""" + color: {EU_COLORS['text_secondary']}; + font-size: 11px; + background: transparent; + """) + self.message_label.setWordWrap(True) + self.message_label.setMaximumWidth(280) + content_layout.addWidget(self.message_label) + + layout.addLayout(content_layout, 1) + + # Close button + self.close_btn = QLabel("×") + self.close_btn.setStyleSheet(f""" + color: {EU_COLORS['text_muted']}; + font-size: 16px; + font-weight: bold; + background: transparent; + """) + self.close_btn.setFixedSize(20, 20) + self.close_btn.setAlignment(Qt.AlignmentFlag.AlignCenter) + self.close_btn.setCursor(Qt.CursorShape.PointingHandCursor) + layout.addWidget(self.close_btn) + + # Fixed width, auto height + self.setFixedWidth(320) + self.adjustSize() + + def _setup_auto_close(self): + """Setup auto-close timer.""" + self._close_timer = QTimer(self) + self._close_timer.setSingleShot(True) + self._close_timer.timeout.connect(self._start_fade_out) + self._close_timer.start(self.notification.duration) + + def _start_fade_out(self): + """Start fade-out animation.""" + self.expired.emit(self.notification.id) + self._fade_timer = QTimer(self) + self._fade_timer.timeout.connect(self._fade_step) + self._fade_timer.start(30) # 30ms per step + + def _fade_step(self): + """Fade out animation step.""" + self._opacity -= 0.1 + if self._opacity <= 0: + self._fade_timer.stop() + self.close() + self.closed.emit(self.notification.id) + else: + self.setWindowOpacity(self._opacity) + + def mousePressEvent(self, event): + """Handle click to dismiss.""" + if event.button() == Qt.MouseButton.LeftButton: + # Check if close button was clicked + if self.close_btn.geometry().contains(event.pos()): + self._close_timer.stop() + self.close() + self.closed.emit(self.notification.id) + else: + self.clicked.emit(self.notification.id) + super().mousePressEvent(event) + + def enterEvent(self, event): + """Pause auto-close on hover.""" + self._close_timer.stop() + super().enterEvent(event) + + def leaveEvent(self, event): + """Resume auto-close on leave.""" + self._close_timer.start(1000) # Give 1 second after mouse leaves + super().leaveEvent(event) + + def close_notification(self): + """Close this notification immediately.""" + self._close_timer.stop() + if self._fade_timer: + self._fade_timer.stop() + self.close() + self.closed.emit(self.notification.id) + + +class NotificationSignals(QObject): + """Thread-safe signals for notifications.""" + show_notification = pyqtSignal(Notification) + + +class NotificationManager: + """Singleton manager for toast notifications. + + Usage: + manager = NotificationManager.get_instance() + manager.notify("Title", "Message", type=NotificationType.INFO) + manager.notify_info("Info", "Something happened") + manager.notify_warning("Warning", "Be careful") + manager.notify_error("Error", "Something failed") + manager.notify_success("Success", "Operation completed") + """ + + _instance = None + _lock = threading.Lock() + + # Maximum notifications to show at once + MAX_VISIBLE = 5 + + # Spacing between toasts + TOAST_SPACING = 8 + + # Margin from screen edge + SCREEN_MARGIN = 20 + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._initialized = True + self._app: Optional[QApplication] = None + self._active_toasts: Dict[str, ToastWidget] = {} + self._history: deque = deque(maxlen=100) + self._counter = 0 + self._sound_enabled = True + self._sound_path: Optional[Path] = None + self._sound_cache: Dict[str, Any] = {} + self._signals = NotificationSignals() + self._signals.show_notification.connect(self._show_toast) + + # Position: bottom-right corner + self._position = "bottom-right" # or "top-right", "bottom-left", "top-left" + + @classmethod + def get_instance(cls) -> 'NotificationManager': + """Get the singleton instance.""" + return cls() + + def initialize(self, app: QApplication = None, sound_path: Path = None): + """Initialize the notification manager. + + Args: + app: QApplication instance (optional, will try to get existing) + sound_path: Path to sound files directory + """ + if app: + self._app = app + else: + self._app = QApplication.instance() + + if sound_path: + self._sound_path = Path(sound_path) + + print(f"[Notifications] Initialized with {self._history.maxlen} history limit") + + def _get_next_id(self) -> str: + """Generate unique notification ID.""" + self._counter += 1 + return f"notif_{self._counter}_{datetime.now().strftime('%H%M%S')}" + + def notify(self, title: str, message: str, + type: NotificationType = NotificationType.INFO, + sound: bool = False, duration: int = 5000) -> str: + """Show a notification toast. + + Args: + title: Notification title + message: Notification message + type: Notification type (info, warning, error, success) + sound: Play sound notification + duration: Display duration in milliseconds + + Returns: + Notification ID + """ + notification = Notification( + id=self._get_next_id(), + title=title, + message=message, + type=type, + duration=duration + ) + + # Add to history + self._history.append(notification) + + # Play sound if requested + if sound and self._sound_enabled: + self._play_sound(type) + notification.sound_played = True + + # Emit signal to show toast (thread-safe) + self._signals.show_notification.emit(notification) + + return notification.id + + def notify_info(self, title: str, message: str, + sound: bool = False, duration: int = 5000) -> str: + """Show an info notification.""" + return self.notify(title, message, NotificationType.INFO, sound, duration) + + def notify_warning(self, title: str, message: str, + sound: bool = False, duration: int = 5000) -> str: + """Show a warning notification.""" + return self.notify(title, message, NotificationType.WARNING, sound, duration) + + def notify_error(self, title: str, message: str, + sound: bool = True, duration: int = 7000) -> str: + """Show an error notification (sound on by default).""" + return self.notify(title, message, NotificationType.ERROR, sound, duration) + + def notify_success(self, title: str, message: str, + sound: bool = False, duration: int = 5000) -> str: + """Show a success notification.""" + return self.notify(title, message, NotificationType.SUCCESS, sound, duration) + + def _show_toast(self, notification: Notification): + """Show toast widget (called via signal for thread safety).""" + if not self._app: + self._app = QApplication.instance() + if not self._app: + print("[Notifications] Error: No QApplication available") + return + + # Create toast widget + toast = ToastWidget(notification) + toast.closed.connect(lambda: self._on_toast_closed(notification.id)) + toast.expired.connect(lambda: self._on_toast_expired(notification.id)) + + # Store reference + self._active_toasts[notification.id] = toast + + # Position the toast + self._position_toast(toast) + + # Show + toast.show() + + # Remove oldest if too many visible + if len(self._active_toasts) > self.MAX_VISIBLE: + oldest_id = next(iter(self._active_toasts)) + self._active_toasts[oldest_id].close_notification() + + def _position_toast(self, toast: ToastWidget): + """Position toast on screen.""" + if not self._app: + return + + screen = self._app.primaryScreen().geometry() + + # Calculate position + visible_count = len(self._active_toasts) + + if self._position == "bottom-right": + x = screen.width() - toast.width() - self.SCREEN_MARGIN + y = screen.height() - (toast.height() + self.TOAST_SPACING) * (visible_count + 1) - self.SCREEN_MARGIN + elif self._position == "top-right": + x = screen.width() - toast.width() - self.SCREEN_MARGIN + y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * visible_count + elif self._position == "bottom-left": + x = self.SCREEN_MARGIN + y = screen.height() - (toast.height() + self.TOAST_SPACING) * (visible_count + 1) - self.SCREEN_MARGIN + else: # top-left + x = self.SCREEN_MARGIN + y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * visible_count + + toast.move(x, max(y, self.SCREEN_MARGIN)) + + # Reposition all visible toasts to maintain stack + self._reposition_toasts() + + def _reposition_toasts(self): + """Reposition all visible toasts.""" + if not self._app: + return + + screen = self._app.primaryScreen().geometry() + toasts = list(self._active_toasts.values()) + + for i, toast in enumerate(toasts): + if self._position == "bottom-right": + x = screen.width() - toast.width() - self.SCREEN_MARGIN + y = screen.height() - (toast.height() + self.TOAST_SPACING) * (i + 1) - self.SCREEN_MARGIN + elif self._position == "top-right": + x = screen.width() - toast.width() - self.SCREEN_MARGIN + y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * i + elif self._position == "bottom-left": + x = self.SCREEN_MARGIN + y = screen.height() - (toast.height() + self.TOAST_SPACING) * (i + 1) - self.SCREEN_MARGIN + else: # top-left + x = self.SCREEN_MARGIN + y = self.SCREEN_MARGIN + (toast.height() + self.TOAST_SPACING) * i + + toast.move(x, max(y, self.SCREEN_MARGIN)) + + def _on_toast_closed(self, notification_id: str): + """Handle toast closed.""" + if notification_id in self._active_toasts: + del self._active_toasts[notification_id] + self._reposition_toasts() + + def _on_toast_expired(self, notification_id: str): + """Handle toast expired (started fading).""" + pass # Just for tracking if needed + + def _play_sound(self, type: NotificationType): + """Play notification sound.""" + try: + # Try to use QSoundEffect for non-blocking playback + from PyQt6.QtMultimedia import QSoundEffect, QAudioDevice + + sound_file = self._get_sound_file(type) + if not sound_file or not sound_file.exists(): + return + + # Use cached sound effect or create new + sound_key = str(sound_file) + if sound_key not in self._sound_cache: + effect = QSoundEffect() + effect.setSource(str(sound_file)) + effect.setVolume(0.7) + self._sound_cache[sound_key] = effect + + effect = self._sound_cache[sound_key] + effect.play() + + except ImportError: + # Fallback: try simple audio playback + self._play_sound_fallback(type) + except Exception as e: + print(f"[Notifications] Sound error: {e}") + + def _get_sound_file(self, type: NotificationType) -> Optional[Path]: + """Get sound file path for notification type.""" + if not self._sound_path: + return None + + # Map types to sound files + sound_map = { + NotificationType.INFO: "info.wav", + NotificationType.WARNING: "warning.wav", + NotificationType.ERROR: "error.wav", + NotificationType.SUCCESS: "success.wav", + } + + sound_file = self._sound_path / sound_map.get(type, "info.wav") + + # Try alternative extensions + if not sound_file.exists(): + for ext in ['.mp3', '.wav', '.ogg']: + alt_file = sound_file.with_suffix(ext) + if alt_file.exists(): + return alt_file + + return sound_file if sound_file.exists() else None + + def _play_sound_fallback(self, type: NotificationType): + """Fallback sound playback using system tools.""" + import platform + import subprocess + + sound_file = self._get_sound_file(type) + if not sound_file: + return + + try: + system = platform.system() + if system == "Windows": + import winsound + winsound.MessageBeep() + elif system == "Darwin": # macOS + subprocess.run(["afplay", str(sound_file)], check=False) + else: # Linux + subprocess.run(["aplay", "-q", str(sound_file)], check=False) + except Exception: + pass # Silently fail on sound errors + + # ========== Public API ========== + + def close_all(self): + """Close all visible notifications.""" + for toast in list(self._active_toasts.values()): + toast.close_notification() + self._active_toasts.clear() + + def get_history(self, limit: int = None, type: NotificationType = None) -> List[Notification]: + """Get notification history. + + Args: + limit: Maximum number of notifications to return + type: Filter by notification type + + Returns: + List of notifications (newest first) + """ + history = list(self._history) + + if type: + history = [n for n in history if n.type == type] + + if limit: + history = history[-limit:] + + return list(reversed(history)) + + def clear_history(self): + """Clear notification history.""" + self._history.clear() + + def set_sound_enabled(self, enabled: bool): + """Enable/disable notification sounds.""" + self._sound_enabled = enabled + + def set_sound_path(self, path: Path): + """Set path to sound files.""" + self._sound_path = Path(path) + + def set_position(self, position: str): + """Set notification position. + + Args: + position: One of "bottom-right", "top-right", "bottom-left", "top-left" + """ + valid_positions = ["bottom-right", "top-right", "bottom-left", "top-left"] + if position in valid_positions: + self._position = position + + def dismiss(self, notification_id: str): + """Dismiss a specific notification by ID.""" + if notification_id in self._active_toasts: + self._active_toasts[notification_id].close_notification() + + +# Convenience function to get manager +def get_notification_manager() -> NotificationManager: + """Get the global NotificationManager instance.""" + return NotificationManager.get_instance() diff --git a/projects/EU-Utility/core/plugin_api.py b/projects/EU-Utility/core/plugin_api.py index 510b7a4..38a1d6d 100644 --- a/projects/EU-Utility/core/plugin_api.py +++ b/projects/EU-Utility/core/plugin_api.py @@ -12,6 +12,8 @@ import json from datetime import datetime from pathlib import Path +from core.data_store import get_data_store + class APIType(Enum): """Types of plugin APIs.""" @@ -20,6 +22,7 @@ class APIType(Enum): DATA = "data" # Shared data storage UTILITY = "utility" # Helper functions SERVICE = "service" # Background services + NEXUS = "nexus" # Entropia Nexus API @dataclass @@ -51,6 +54,8 @@ class PluginAPI: self.apis: Dict[str, APIEndpoint] = {} self.services: Dict[str, Any] = {} self.data_cache: Dict[str, Any] = {} + self.http_client = None + self._notification_service = None self._initialized = True # ========== API Registration ========== @@ -97,6 +102,81 @@ class PluginAPI: return [ep for ep in self.apis.values() if ep.api_type == api_type] return list(self.apis.values()) + # ========== Window Service ========== + + def register_window_service(self, window_manager): + """Register the Window Manager service.""" + self.services['window'] = window_manager + print("[API] Window Manager service registered") + + def get_eu_window(self) -> Optional[Dict[str, Any]]: + """Get information about the Entropia Universe window. + + Returns: + Dict with window info or None if not found + { + 'handle': int, + 'title': str, + 'rect': (left, top, right, bottom), + 'width': int, + 'height': int, + 'is_visible': bool, + 'is_focused': bool + } + """ + window_service = self.services.get('window') + if not window_service: + return None + + try: + window_info = window_service.find_eu_window() + if window_info: + return { + 'handle': window_info.handle, + 'title': window_info.title, + 'rect': window_info.rect, + 'width': window_info.width, + 'height': window_info.height, + 'is_visible': window_info.is_visible, + 'is_focused': window_info.is_focused + } + return None + except Exception as e: + print(f"[API] Window error: {e}") + return None + + def is_eu_focused(self) -> bool: + """Check if EU window is currently focused. + + Returns: + True if EU is active window, False otherwise + """ + window_service = self.services.get('window') + if not window_service: + return False + + try: + return window_service.is_window_focused() + except Exception as e: + print(f"[API] Window focus error: {e}") + return False + + def bring_eu_to_front(self) -> bool: + """Bring EU window to front and focus it. + + Returns: + True if successful, False otherwise + """ + window_service = self.services.get('window') + if not window_service: + return False + + try: + return window_service.bring_to_front() + except Exception as e: + print(f"[API] Bring to front error: {e}") + return False + # ========== OCR Service ========== def register_ocr_service(self, ocr_handler: Callable): @@ -148,6 +228,205 @@ class PluginAPI: print(f"[API] Log error: {e}") return [] + # ========== Nexus API Service ========== + + def register_nexus_service(self, nexus_api_instance): + """Register the Nexus API service instance.""" + self.services['nexus'] = nexus_api_instance + print("[API] Nexus service registered") + + def nexus_search(self, query: str, entity_type: str = "all", limit: int = 20) -> list: + """Search Entropia Nexus for items, mobs, or all entities. + + Args: + query: Search term (e.g., "ArMatrix", "Atrox") + entity_type: Type of entity to search - "items", "mobs", or "all" + limit: Maximum results (default 20, max 100) + + Returns: + List of search results with id, name, type, category, icon_url + + Example: + # Search for items + results = api.nexus_search("ArMatrix", "items") + + # Search for mobs + mobs = api.nexus_search("Atrox", "mobs") + + # Search everything + all_results = api.nexus_search("Omegaton") + """ + nexus = self.services.get('nexus') + if not nexus: + print("[API] Nexus service not available") + return [] + + try: + entity_type = entity_type.lower() + if entity_type == "items": + results = nexus.search_items(query, limit) + elif entity_type == "mobs": + results = nexus.search_mobs(query, limit) + else: + results = nexus.search_all(query, limit) + + # Convert SearchResult objects to dicts for easier consumption + return [ + { + 'id': r.id, + 'name': r.name, + 'type': r.type, + 'category': r.category, + 'icon_url': r.icon_url + } + for r in results + ] + except Exception as e: + print(f"[API] Nexus search error: {e}") + return [] + + def nexus_get_item_details(self, item_id: str) -> dict: + """Get detailed information about a specific item from Nexus. + + Args: + item_id: The item's unique identifier + + Returns: + Dict with item details or None if not found + """ + nexus = self.services.get('nexus') + if not nexus: + print("[API] Nexus service not available") + return None + + try: + details = nexus.get_item_details(item_id) + if not details: + return None + + return { + 'id': details.id, + 'name': details.name, + 'description': details.description, + 'category': details.category, + 'weight': details.weight, + 'tt_value': details.tt_value, + 'decay': details.decay, + 'ammo_consumption': details.ammo_consumption, + 'damage': details.damage, + 'range': details.range, + 'accuracy': details.accuracy, + 'durability': details.durability, + 'requirements': details.requirements, + 'materials': details.materials + } + except Exception as e: + print(f"[API] Nexus get_item_details error: {e}") + return None + + def nexus_get_market_data(self, item_id: str) -> dict: + """Get market data for a specific item from Nexus. + + Args: + item_id: The item's unique identifier + + Returns: + Dict with market data or None if not found + """ + nexus = self.services.get('nexus') + if not nexus: + print("[API] Nexus service not available") + return None + + try: + market = nexus.get_market_data(item_id) + if not market: + return None + + return { + 'item_id': market.item_id, + 'item_name': market.item_name, + 'current_markup': market.current_markup, + 'avg_markup_7d': market.avg_markup_7d, + 'avg_markup_30d': market.avg_markup_30d, + 'volume_24h': market.volume_24h, + 'volume_7d': market.volume_7d, + 'buy_orders': market.buy_orders, + 'sell_orders': market.sell_orders, + 'last_updated': market.last_updated.isoformat() if market.last_updated else None + } + except Exception as e: + print(f"[API] Nexus get_market_data error: {e}") + return None + + # ========== HTTP Service ========== + + def register_http_service(self, http_client): + """Register the HTTP client service.""" + self.http_client = http_client + print("[API] HTTP service registered") + + def http_get(self, url: str, cache_ttl: int = 3600, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]: + """ + Perform HTTP GET request with caching. + + Args: + url: URL to fetch + cache_ttl: Cache time-to-live in seconds (0 to disable) + headers: Additional headers + **kwargs: Additional arguments (params, use_cache, etc.) + + Returns: + Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache' + """ + if not self.http_client: + raise RuntimeError("HTTP service not available") + + try: + return self.http_client.get(url, cache_ttl=cache_ttl, headers=headers, **kwargs) + except Exception as e: + print(f"[API] HTTP GET error: {e}") + raise + + def http_post(self, url: str, data=None, json=None, headers: Dict[str, str] = None, **kwargs) -> Dict[str, Any]: + """ + Perform HTTP POST request. + + Args: + url: URL to post to + data: Form data, bytes, or string + json: JSON data + headers: Additional headers + **kwargs: Additional arguments + + Returns: + Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache' + """ + if not self.http_client: + raise RuntimeError("HTTP service not available") + + try: + return self.http_client.post(url, data=data, json=json, headers=headers, **kwargs) + except Exception as e: + print(f"[API] HTTP POST error: {e}") + raise + + def http_clear_cache(self): + """Clear all HTTP cache entries.""" + if self.http_client: + self.http_client.clear_cache() + + def http_invalidate_cache(self, url_pattern: str): + """Invalidate cache entries matching URL pattern.""" + if self.http_client: + self.http_client.invalidate_cache(url_pattern) + + def http_get_cache_stats(self) -> Dict[str, Any]: + """Get HTTP cache statistics.""" + if self.http_client: + return self.http_client.get_cache_stats() + return {} + # ========== Shared Data ========== def get_data(self, key: str, default=None) -> Any: @@ -210,6 +489,91 @@ class PluginAPI: if tt <= 0: return 0.0 return (price / tt) * 100 + + # ========== Data Service ========== + + def register_data_service(self) -> bool: + """Register the data persistence service.""" + try: + self.services['data'] = get_data_store() + return True + except Exception as e: + print(f"[API] Failed to register data service: {e}") + return False + + def save_data(self, plugin_id: str, key: str, data: Any) -> bool: + """Save data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key under which to store the data + data: Data to store (must be JSON serializable) + + Returns: + True if successful, False otherwise + """ + data_store = self.services.get('data') + if not data_store: + # Try to register and get data store + if self.register_data_service(): + data_store = self.services.get('data') + if not data_store: + raise RuntimeError("Data service not available") + + try: + return data_store.save(plugin_id, key, data) + except Exception as e: + print(f"[API] Error saving data for {plugin_id}/{key}: {e}") + return False + + def load_data(self, plugin_id: str, key: str, default: Any = None) -> Any: + """Load data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key of the data to load + default: Default value if key not found + + Returns: + The stored data or default value + """ + data_store = self.services.get('data') + if not data_store: + # Try to register and get data store + if self.register_data_service(): + data_store = self.services.get('data') + if not data_store: + return default + + try: + return data_store.load(plugin_id, key, default) + except Exception as e: + print(f"[API] Error loading data for {plugin_id}/{key}: {e}") + return default + + def delete_data(self, plugin_id: str, key: str) -> bool: + """Delete data for a plugin. + + Args: + plugin_id: Unique identifier for the plugin + key: Key of the data to delete + + Returns: + True if key existed and was deleted, False otherwise + """ + data_store = self.services.get('data') + if not data_store: + # Try to register and get data store + if self.register_data_service(): + data_store = self.services.get('data') + if not data_store: + raise RuntimeError("Data service not available") + + try: + return data_store.delete(plugin_id, key) + except Exception as e: + print(f"[API] Error deleting data for {plugin_id}/{key}: {e}") + return False # Singleton instance diff --git a/projects/EU-Utility/core/window_manager.py b/projects/EU-Utility/core/window_manager.py new file mode 100644 index 0000000..1fbd79a --- /dev/null +++ b/projects/EU-Utility/core/window_manager.py @@ -0,0 +1,467 @@ +""" +EU-Utility - Window Manager Core Service + +Manages interaction with the Entropia Universe game window. +Windows-specific implementation using ctypes (no external dependencies). +""" + +import sys +import time +from typing import Optional, Tuple, Dict, Any +from dataclasses import dataclass +from pathlib import Path + +# Platform detection +IS_WINDOWS = sys.platform == 'win32' + +# Windows-specific imports with graceful fallback +if IS_WINDOWS: + try: + import ctypes + from ctypes import wintypes + import subprocess + WINDOWS_AVAILABLE = True + except ImportError: + WINDOWS_AVAILABLE = False +else: + WINDOWS_AVAILABLE = False + + +@dataclass +class WindowInfo: + """Information about a window.""" + handle: int + title: str + pid: int + rect: Tuple[int, int, int, int] # left, top, right, bottom + width: int + height: int + is_visible: bool + is_focused: bool + + +@dataclass +class ProcessInfo: + """Information about a process.""" + pid: int + name: str + executable_path: Optional[str] + memory_usage: Optional[int] # in bytes + cpu_percent: Optional[float] + + +class WindowManager: + """ + Singleton Window Manager for EU-Utility. + + Manages finding and interacting with the Entropia Universe game window. + Windows-specific implementation with graceful fallback on Linux. + """ + + _instance = None + + # Window search criteria + EU_WINDOW_TITLE = "Entropia Universe" + EU_PROCESS_NAMES = ["entropia.exe", "entropiauniverse.exe", "clientloader.exe"] + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._window_handle: Optional[int] = None + self._window_info: Optional[WindowInfo] = None + self._process_info: Optional[ProcessInfo] = None + self._last_update: float = 0 + self._update_interval: float = 1.0 # seconds + + # Windows API constants + if IS_WINDOWS and WINDOWS_AVAILABLE: + self._setup_windows_api() + + self._initialized = True + self._available = IS_WINDOWS and WINDOWS_AVAILABLE + + if not self._available: + print("[WindowManager] Windows API not available - running in limited mode") + + def _setup_windows_api(self): + """Setup Windows API constants and functions.""" + # Window styles + self.GWL_STYLE = -16 + self.GWL_EXSTYLE = -20 + self.WS_VISIBLE = 0x10000000 + self.WS_MINIMIZE = 0x20000000 + + # Load user32.dll functions + self.user32 = ctypes.windll.user32 + self.kernel32 = ctypes.windll.kernel32 + + # EnumWindows callback type + self.EnumWindowsProc = ctypes.WINFUNCTYPE( + wintypes.BOOL, + wintypes.HWND, + wintypes.LPARAM + ) + + # ========== Public API ========== + + def is_available(self) -> bool: + """Check if window manager is fully functional.""" + return self._available + + def find_eu_window(self) -> Optional[WindowInfo]: + """ + Find the Entropia Universe game window. + + Returns: + WindowInfo if found, None otherwise + """ + if not self._available: + return None + + # Try by window title first + hwnd = self._find_window_by_title(self.EU_WINDOW_TITLE) + + if hwnd: + self._window_handle = hwnd + self._window_info = self._get_window_info(hwnd) + return self._window_info + + # Try by process name + for proc_name in self.EU_PROCESS_NAMES: + hwnd = self._find_window_by_process(proc_name) + if hwnd: + self._window_handle = hwnd + self._window_info = self._get_window_info(hwnd) + return self._window_info + + return None + + def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]: + """ + Get the window rectangle (left, top, right, bottom). + + Returns: + Tuple of (left, top, right, bottom) or None + """ + if not self._available: + return None + + # Refresh window info + self._update_window_info() + + if self._window_info: + return self._window_info.rect + return None + + def is_window_focused(self) -> bool: + """ + Check if the EU window is currently focused/active. + + Returns: + True if EU window is active, False otherwise + """ + if not self._available: + return False + + if not self._window_handle: + self.find_eu_window() + + if not self._window_handle: + return False + + # Get foreground window + foreground_hwnd = self.user32.GetForegroundWindow() + return foreground_hwnd == self._window_handle + + def is_window_visible(self) -> bool: + """ + Check if the EU window is visible. + + Returns: + True if visible, False otherwise + """ + if not self._available: + return False + + # Refresh window info + self._update_window_info() + + if self._window_info: + return self._window_info.is_visible + return False + + def bring_to_front(self) -> bool: + """ + Bring the EU window to the front and focus it. + + Returns: + True if successful, False otherwise + """ + if not self._available: + return False + + if not self._window_handle: + self.find_eu_window() + + if not self._window_handle: + return False + + try: + # Show window if minimized + self.user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9 + + # Bring to front + result = self.user32.SetForegroundWindow(self._window_handle) + + # Force window to top + self.user32.SetWindowPos( + self._window_handle, + -1, # HWND_TOPMOST + 0, 0, 0, 0, + 0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE + ) + + # Remove topmost flag but keep on top + self.user32.SetWindowPos( + self._window_handle, + -2, # HWND_NOTOPMOST + 0, 0, 0, 0, + 0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE + ) + + return bool(result) + except Exception as e: + print(f"[WindowManager] Failed to bring window to front: {e}") + return False + + def get_eu_process_info(self) -> Optional[ProcessInfo]: + """ + Get process information for Entropia Universe. + + Returns: + ProcessInfo if found, None otherwise + """ + if not self._available: + return None + + if not self._window_handle: + self.find_eu_window() + + if not self._window_handle: + return None + + # Get PID from window + pid = wintypes.DWORD() + self.user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid)) + + if pid.value == 0: + return None + + self._process_info = self._get_process_info(pid.value) + return self._process_info + + def get_window_handle(self) -> Optional[int]: + """Get the current window handle.""" + return self._window_handle + + def refresh(self) -> Optional[WindowInfo]: + """Force refresh of window information.""" + self._last_update = 0 + return self.find_eu_window() + + # ========== Private Methods ========== + + def _update_window_info(self): + """Update cached window info if needed.""" + current_time = time.time() + if current_time - self._last_update > self._update_interval: + if self._window_handle: + self._window_info = self._get_window_info(self._window_handle) + else: + self.find_eu_window() + self._last_update = current_time + + def _find_window_by_title(self, title: str) -> Optional[int]: + """Find window by title (partial match).""" + found_hwnd = [None] + + def callback(hwnd, extra): + if not self.user32.IsWindowVisible(hwnd): + return True + + # Get window text + text = ctypes.create_unicode_buffer(256) + self.user32.GetWindowTextW(hwnd, text, 256) + window_title = text.value + + if title.lower() in window_title.lower(): + found_hwnd[0] = hwnd + return False # Stop enumeration + + return True + + proc = self.EnumWindowsProc(callback) + self.user32.EnumWindows(proc, 0) + + return found_hwnd[0] + + def _find_window_by_process(self, process_name: str) -> Optional[int]: + """Find window by process name.""" + found_hwnd = [None] + + def callback(hwnd, extra): + if not self.user32.IsWindowVisible(hwnd): + return True + + # Get process ID + pid = wintypes.DWORD() + self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + + # Check process name + proc_info = self._get_process_info(pid.value) + if proc_info and process_name.lower() in proc_info.name.lower(): + found_hwnd[0] = hwnd + return False # Stop enumeration + + return True + + proc = self.EnumWindowsProc(callback) + self.user32.EnumWindows(proc, 0) + + return found_hwnd[0] + + def _get_window_info(self, hwnd: int) -> Optional[WindowInfo]: + """Get detailed information about a window.""" + try: + # Get window rect + rect = wintypes.RECT() + if not self.user32.GetWindowRect(hwnd, ctypes.byref(rect)): + return None + + # Get window text + text = ctypes.create_unicode_buffer(256) + self.user32.GetWindowTextW(hwnd, text, 256) + + # Get PID + pid = wintypes.DWORD() + self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + + # Check visibility + is_visible = bool(self.user32.IsWindowVisible(hwnd)) + + # Check if focused + foreground = self.user32.GetForegroundWindow() + is_focused = (foreground == hwnd) + + return WindowInfo( + handle=hwnd, + title=text.value, + pid=pid.value, + rect=(rect.left, rect.top, rect.right, rect.bottom), + width=rect.right - rect.left, + height=rect.bottom - rect.top, + is_visible=is_visible, + is_focused=is_focused + ) + except Exception as e: + print(f"[WindowManager] Error getting window info: {e}") + return None + + def _get_process_info(self, pid: int) -> Optional[ProcessInfo]: + """Get process information.""" + try: + # Use Windows WMI or tasklist for process info + import subprocess + + # Try tasklist first + result = subprocess.run( + ['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'], + capture_output=True, + text=True, + timeout=5 + ) + + if result.returncode == 0 and result.stdout.strip(): + # Parse CSV output + lines = result.stdout.strip().split('\n') + for line in lines: + if str(pid) in line: + parts = line.split('","') + if len(parts) >= 2: + name = parts[0].replace('"', '') + return ProcessInfo( + pid=pid, + name=name, + executable_path=None, + memory_usage=None, + cpu_percent=None + ) + + return ProcessInfo( + pid=pid, + name="Unknown", + executable_path=None, + memory_usage=None, + cpu_percent=None + ) + except Exception as e: + print(f"[WindowManager] Error getting process info: {e}") + return None + + def _check_window_exists(self, hwnd: int) -> bool: + """Check if a window handle is still valid.""" + if not self._available: + return False + return bool(self.user32.IsWindow(hwnd)) + + +# Singleton instance +_window_manager = None + + +def get_window_manager() -> WindowManager: + """Get the global WindowManager instance.""" + global _window_manager + if _window_manager is None: + _window_manager = WindowManager() + return _window_manager + + +def is_eu_running() -> bool: + """Quick check if Entropia Universe is running.""" + wm = get_window_manager() + return wm.find_eu_window() is not None + + +def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]: + """Quick access to EU window rectangle.""" + wm = get_window_manager() + return wm.get_window_rect() + + +def wait_for_eu(timeout: float = 30.0) -> bool: + """ + Wait for Entropia Universe to start. + + Args: + timeout: Maximum seconds to wait + + Returns: + True if EU found, False if timeout + """ + wm = get_window_manager() + start_time = time.time() + + while time.time() - start_time < timeout: + if wm.find_eu_window(): + return True + time.sleep(0.5) + + return False diff --git a/projects/EU-Utility/plugins/base_plugin.py b/projects/EU-Utility/plugins/base_plugin.py index 52de81d..967d462 100644 --- a/projects/EU-Utility/plugins/base_plugin.py +++ b/projects/EU-Utility/plugins/base_plugin.py @@ -139,6 +139,50 @@ class BasePlugin(ABC): return self.api.find_apis(api_type) + # ========== Window Service Methods ========== + + def get_eu_window(self) -> Optional[Dict[str, Any]]: + """Get information about the Entropia Universe window. + + Returns: + Dict with window info or None if not available: + { + 'handle': int, + 'title': str, + 'rect': (left, top, right, bottom), + 'width': int, + 'height': int, + 'is_visible': bool, + 'is_focused': bool + } + """ + if not self.api: + return None + + return self.api.get_eu_window() + + def is_eu_focused(self) -> bool: + """Check if EU window is currently focused/active. + + Returns: + True if EU is the active window, False otherwise + """ + if not self.api: + return False + + return self.api.is_eu_focused() + + def bring_eu_to_front(self) -> bool: + """Bring EU window to front and focus it. + + Returns: + True if successful, False otherwise + """ + if not self.api: + return False + + return self.api.bring_eu_to_front() + # ========== Shared Services ========== def ocr_capture(self, region: tuple = None) -> Dict[str, Any]: