""" EU-Utility - Enhanced Event Bus =============================== Core service for typed event handling with: - Typed events using dataclasses - Event filtering (mob types, damage thresholds, etc.) - Event persistence (last 1000 events) - Event replay (replay last N events to new subscribers) - Async event handling (non-blocking publishers) - Event statistics (events per minute, etc.) Quick Start: ------------ from core.event_bus import get_event_bus, LootEvent, DamageEvent bus = get_event_bus() # Subscribe to events sub_id = bus.subscribe_typed(LootEvent, on_loot) # Publish events bus.publish(LootEvent(mob_name="Atrox", items=[...])) # Get recent events recent_loot = bus.get_recent_events(LootEvent, count=10) Event Types: ------------ - SkillGainEvent: Skill increases - LootEvent: Loot received - DamageEvent: Combat damage - GlobalEvent: Global announcements - ChatEvent: Chat messages - EconomyEvent: Economic transactions - SystemEvent: System notifications """ import asyncio import time import threading from collections import deque, defaultdict from dataclasses import dataclass, field, asdict from datetime import datetime from enum import Enum, auto from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union # ========== Event Types ========== class EventCategory(Enum): """Event categories for organization.""" SKILL = auto() LOOT = auto() COMBAT = auto() ECONOMY = auto() SYSTEM = auto() CHAT = auto() GLOBAL = auto() @dataclass(frozen=True) class BaseEvent: """Base class for all typed events. Attributes: timestamp: When the event occurred source: Source of the event (plugin name, etc.) """ timestamp: datetime = field(default_factory=datetime.now) source: str = "unknown" @property def event_type(self) -> str: """Return the event type name.""" return self.__class__.__name__ @property def category(self) -> EventCategory: """Return the event category.""" return EventCategory.SYSTEM def to_dict(self) -> Dict[str, Any]: """Convert event to dictionary.""" return { 'event_type': self.event_type, 'category': self.category.name, **asdict(self) } @dataclass(frozen=True) class SkillGainEvent(BaseEvent): """Event fired when a skill increases. Attributes: skill_name: Name of the skill skill_value: New skill value gain_amount: Amount gained """ skill_name: str = "" skill_value: float = 0.0 gain_amount: float = 0.0 @property def category(self) -> EventCategory: return EventCategory.SKILL @dataclass(frozen=True) class LootEvent(BaseEvent): """Event fired when loot is received. Attributes: mob_name: Name of the mob killed items: List of looted items total_tt_value: Total TT value of loot position: Optional (x, y, z) position """ mob_name: str = "" items: List[Dict[str, Any]] = field(default_factory=list) total_tt_value: float = 0.0 position: Optional[tuple] = None @property def category(self) -> EventCategory: return EventCategory.LOOT def get_item_names(self) -> List[str]: """Get list of item names from loot.""" return [item.get('name', 'Unknown') for item in self.items] @dataclass(frozen=True) class DamageEvent(BaseEvent): """Event fired when damage is dealt or received. Attributes: damage_amount: Amount of damage damage_type: Type of damage (impact, penetration, etc.) is_critical: Whether it was a critical hit target_name: Name of the target attacker_name: Name of the attacker is_outgoing: True if player dealt damage """ damage_amount: float = 0.0 damage_type: str = "" # e.g., "impact", "penetration", "burn" is_critical: bool = False target_name: str = "" attacker_name: str = "" is_outgoing: bool = True @property def category(self) -> EventCategory: return EventCategory.COMBAT def is_high_damage(self, threshold: float = 100.0) -> bool: """Check if damage exceeds threshold.""" return self.damage_amount >= threshold @dataclass(frozen=True) class GlobalEvent(BaseEvent): """Event for global announcements. Attributes: player_name: Name of the player achievement_type: Type of achievement (hof, ath, discovery) value: Value of the achievement item_name: Optional item name """ player_name: str = "" achievement_type: str = "" # e.g., "hof", "ath", "discovery" value: float = 0.0 item_name: Optional[str] = None @property def category(self) -> EventCategory: return EventCategory.GLOBAL @dataclass(frozen=True) class ChatEvent(BaseEvent): """Event for chat messages. Attributes: channel: Chat channel (main, team, society, etc.) sender: Name of the sender message: Message content """ channel: str = "" # "main", "team", "society", etc. sender: str = "" message: str = "" @property def category(self) -> EventCategory: return EventCategory.CHAT @dataclass(frozen=True) class EconomyEvent(BaseEvent): """Event for economic transactions. Attributes: transaction_type: Type of transaction (sale, purchase, etc.) amount: Transaction amount currency: Currency type (usually PED) description: Transaction description """ transaction_type: str = "" # "sale", "purchase", "deposit", "withdraw" amount: float = 0.0 currency: str = "PED" description: str = "" @property def category(self) -> EventCategory: return EventCategory.ECONOMY @dataclass(frozen=True) class SystemEvent(BaseEvent): """Event for system notifications. Attributes: message: System message severity: Severity level (debug, info, warning, error, critical) """ message: str = "" severity: str = "info" # "debug", "info", "warning", "error", "critical" @property def category(self) -> EventCategory: return EventCategory.SYSTEM # Type variable for event types T = TypeVar('T', bound=BaseEvent) # ========== Event Filter ========== @dataclass class EventFilter: """Filter criteria for event subscription. Attributes: event_types: List of event types to filter categories: List of categories to filter min_damage: Minimum damage threshold max_damage: Maximum damage threshold mob_types: List of mob names to filter skill_names: List of skill names to filter sources: List of event sources to filter custom_predicate: Custom filter function """ event_types: Optional[List[Type[BaseEvent]]] = None categories: Optional[List[EventCategory]] = None min_damage: Optional[float] = None max_damage: Optional[float] = None mob_types: Optional[List[str]] = None skill_names: Optional[List[str]] = None sources: Optional[List[str]] = None custom_predicate: Optional[Callable[[BaseEvent], bool]] = None def matches(self, event: BaseEvent) -> bool: """Check if an event matches this filter.""" # Check event type if self.event_types is not None: if not any(isinstance(event, et) for et in self.event_types): return False # Check category if self.categories is not None: if event.category not in self.categories: return False # Check source if self.sources is not None: if event.source not in self.sources: return False # Type-specific filters if isinstance(event, DamageEvent): if self.min_damage is not None and event.damage_amount < self.min_damage: return False if self.max_damage is not None and event.damage_amount > self.max_damage: return False if isinstance(event, LootEvent): if self.mob_types is not None: if event.mob_name not in self.mob_types: return False if isinstance(event, SkillGainEvent): if self.skill_names is not None: if event.skill_name not in self.skill_names: return False # Custom predicate if self.custom_predicate is not None: return self.custom_predicate(event) return True # ========== Subscription ========== @dataclass class EventSubscription: """Represents an event subscription. Attributes: id: Unique subscription ID callback: Function to call when event occurs event_filter: Filter criteria replay_history: Whether to replay recent events replay_count: Number of events to replay created_at: When subscription was created event_count: Number of events delivered last_received: When last event was received """ id: str callback: Callable[[BaseEvent], Any] event_filter: EventFilter replay_history: bool replay_count: int created_at: datetime = field(default_factory=datetime.now) event_count: int = 0 last_received: Optional[datetime] = None def matches(self, event: BaseEvent) -> bool: """Check if event matches this subscription.""" return self.event_filter.matches(event) # ========== Event Statistics ========== @dataclass class EventStats: """Statistics for event bus performance. Attributes: total_events_published: Total events published total_events_delivered: Total events delivered total_subscriptions: Total subscriptions created active_subscriptions: Currently active subscriptions events_by_type: Event counts by type events_by_category: Event counts by category events_per_minute: Current events per minute rate average_delivery_time_ms: Average delivery time errors: Number of delivery errors """ total_events_published: int = 0 total_events_delivered: int = 0 total_subscriptions: int = 0 active_subscriptions: int = 0 events_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) events_by_category: Dict[str, int] = field(default_factory=lambda: defaultdict(int)) events_per_minute: float = 0.0 average_delivery_time_ms: float = 0.0 errors: int = 0 _start_time: datetime = field(default_factory=datetime.now) _minute_window: deque = field(default_factory=lambda: deque(maxlen=60)) _delivery_times: deque = field(default_factory=lambda: deque(maxlen=100)) def record_event_published(self, event: BaseEvent) -> None: """Record an event publication.""" self.total_events_published += 1 self.events_by_type[event.event_type] += 1 self.events_by_category[event.category.name] += 1 self._minute_window.append(time.time()) self._update_epm() def record_event_delivered(self, delivery_time_ms: float) -> None: """Record successful event delivery.""" self.total_events_delivered += 1 self._delivery_times.append(delivery_time_ms) if len(self._delivery_times) > 0: self.average_delivery_time_ms = sum(self._delivery_times) / len(self._delivery_times) def record_error(self) -> None: """Record a delivery error.""" self.errors += 1 def _update_epm(self) -> None: """Update events per minute calculation.""" now = time.time() recent = [t for t in self._minute_window if now - t < 60] self.events_per_minute = len(recent) def get_summary(self) -> Dict[str, Any]: """Get statistics summary.""" uptime = datetime.now() - self._start_time return { 'total_published': self.total_events_published, 'total_delivered': self.total_events_delivered, 'active_subscriptions': self.active_subscriptions, 'events_per_minute': round(self.events_per_minute, 2), 'avg_delivery_ms': round(self.average_delivery_time_ms, 2), 'errors': self.errors, 'uptime_seconds': uptime.total_seconds(), 'top_event_types': dict(sorted( self.events_by_type.items(), key=lambda x: x[1], reverse=True )[:5]) } # ========== Event Bus ========== class EventBus: """Enhanced Event Bus for EU-Utility. Features: - Typed events using dataclasses - Event filtering with flexible criteria - Event persistence (configurable history size) - Event replay for new subscribers - Async event handling - Event statistics This is a singleton - use get_event_bus() to get the instance. Example: bus = get_event_bus() # Subscribe to all loot events sub_id = bus.subscribe_typed(LootEvent, handle_loot) # Subscribe to high damage only sub_id = bus.subscribe_typed( DamageEvent, handle_big_hit, min_damage=100 ) # Publish an event bus.publish(LootEvent(mob_name="Atrox", items=[...])) """ _instance: Optional['EventBus'] = None _lock = threading.Lock() def __new__(cls, *args: Any, **kwargs: Any) -> 'EventBus': if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, max_history: int = 1000): """Initialize the EventBus. Args: max_history: Maximum number of events to keep in history """ if self._initialized: return self.max_history = max_history self._history: deque = deque(maxlen=max_history) self._subscriptions: Dict[str, EventSubscription] = {} self._subscription_counter: int = 0 self._stats = EventStats() self._lock = threading.RLock() self._async_loop: Optional[asyncio.AbstractEventLoop] = None self._async_thread: Optional[threading.Thread] = None self._initialized = True def _ensure_async_loop(self) -> None: """Ensure the async event loop is running.""" if self._async_loop is None or self._async_loop.is_closed(): self._start_async_loop() def _start_async_loop(self) -> None: """Start the async event loop in a separate thread.""" def run_loop() -> None: self._async_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._async_loop) self._async_loop.run_forever() self._async_thread = threading.Thread(target=run_loop, daemon=True) self._async_thread.start() def publish(self, event: BaseEvent) -> None: """Publish an event to all matching subscribers. Non-blocking - returns immediately. Args: event: Event to publish """ with self._lock: self._history.append(event) self._stats.record_event_published(event) matching_subs = [ sub for sub in self._subscriptions.values() if sub.matches(event) ] for sub in matching_subs: self._deliver_async(sub, event) def publish_sync(self, event: BaseEvent) -> int: """Publish an event synchronously. Blocks until all callbacks complete. Args: event: Event to publish Returns: Number of subscribers notified """ with self._lock: self._history.append(event) self._stats.record_event_published(event) matching_subs = [ sub for sub in self._subscriptions.values() if sub.matches(event) ] count = 0 for sub in matching_subs: if self._deliver_sync(sub, event): count += 1 return count def _deliver_async(self, subscription: EventSubscription, event: BaseEvent) -> None: """Deliver an event asynchronously.""" self._ensure_async_loop() def deliver() -> None: start = time.perf_counter() try: subscription.callback(event) subscription.event_count += 1 subscription.last_received = datetime.now() elapsed = (time.perf_counter() - start) * 1000 self._stats.record_event_delivered(elapsed) except Exception as e: self._stats.record_error() print(f"[EventBus] Error delivering to {subscription.id}: {e}") if self._async_loop: self._async_loop.call_soon_threadsafe(deliver) def _deliver_sync(self, subscription: EventSubscription, event: BaseEvent) -> bool: """Deliver an event synchronously.""" start = time.perf_counter() try: subscription.callback(event) subscription.event_count += 1 subscription.last_received = datetime.now() elapsed = (time.perf_counter() - start) * 1000 self._stats.record_event_delivered(elapsed) return True except Exception as e: self._stats.record_error() print(f"[EventBus] Error delivering to {subscription.id}: {e}") return False def subscribe( self, callback: Callable[[BaseEvent], Any], event_filter: Optional[EventFilter] = None, replay_history: bool = False, replay_count: int = 100, event_types: Optional[List[Type[BaseEvent]]] = None ) -> str: """Subscribe to events with optional filtering. Args: callback: Function to call when matching events occur event_filter: Filter criteria for events replay_history: Whether to replay recent events replay_count: Number of recent events to replay event_types: Shorthand for simple type-based filtering Returns: Subscription ID (use for unsubscribe) """ with self._lock: self._subscription_counter += 1 sub_id = f"sub_{self._subscription_counter}" if event_filter is None and event_types is not None: event_filter = EventFilter(event_types=event_types) elif event_filter is None: event_filter = EventFilter() subscription = EventSubscription( id=sub_id, callback=callback, event_filter=event_filter, replay_history=replay_history, replay_count=min(replay_count, self.max_history) ) self._subscriptions[sub_id] = subscription self._stats.active_subscriptions = len(self._subscriptions) self._stats.total_subscriptions += 1 if replay_history: self._replay_history(subscription) return sub_id def subscribe_typed( self, event_class: Type[T], callback: Callable[[T], Any], **filter_kwargs: Any ) -> str: """Subscribe to a specific event type with optional filtering. Args: event_class: The event class to subscribe to callback: Function to call with typed event **filter_kwargs: Additional filter criteria: - min_damage: Minimum damage threshold - max_damage: Maximum damage threshold - mob_types: List of mob names to filter - skill_names: List of skill names to filter - sources: List of event sources to filter - replay_last: Number of events to replay Returns: Subscription ID """ filter_args: Dict[str, Any] = {'event_types': [event_class]} if 'min_damage' in filter_kwargs: filter_args['min_damage'] = filter_kwargs.pop('min_damage') if 'max_damage' in filter_kwargs: filter_args['max_damage'] = filter_kwargs.pop('max_damage') if 'mob_types' in filter_kwargs: filter_args['mob_types'] = filter_kwargs.pop('mob_types') if 'skill_names' in filter_kwargs: filter_args['skill_names'] = filter_kwargs.pop('skill_names') if 'sources' in filter_kwargs: filter_args['sources'] = filter_kwargs.pop('sources') if 'predicate' in filter_kwargs: filter_args['custom_predicate'] = filter_kwargs.pop('predicate') event_filter = EventFilter(**filter_args) replay_count = filter_kwargs.pop('replay_last', 0) def typed_callback(event: BaseEvent) -> None: if isinstance(event, event_class): callback(event) return self.subscribe( callback=typed_callback, event_filter=event_filter, replay_history=replay_count > 0, replay_count=replay_count ) def _replay_history(self, subscription: EventSubscription) -> None: """Replay recent events to a subscriber.""" with self._lock: events_to_replay = [ e for e in list(self._history)[-subscription.replay_count:] if subscription.matches(e) ] for event in events_to_replay: self._deliver_async(subscription, event) def unsubscribe(self, subscription_id: str) -> bool: """Unsubscribe from events. Args: subscription_id: ID returned by subscribe() Returns: True if unsubscribed successfully """ with self._lock: if subscription_id in self._subscriptions: del self._subscriptions[subscription_id] self._stats.active_subscriptions = len(self._subscriptions) return True return False def get_recent_events( self, event_type: Optional[Type[T]] = None, count: int = 100, category: Optional[EventCategory] = None ) -> List[BaseEvent]: """Get recent events from history. Args: event_type: Filter by event class count: Maximum number of events to return category: Filter by event category Returns: List of matching events """ with self._lock: events = list(self._history) if event_type is not None: events = [e for e in events if isinstance(e, event_type)] if category is not None: events = [e for e in events if e.category == category] return events[-count:] def get_events_by_time_range( self, start: datetime, end: Optional[datetime] = None ) -> List[BaseEvent]: """Get events within a time range. Args: start: Start datetime end: End datetime (defaults to now) Returns: List of events in the time range """ if end is None: end = datetime.now() with self._lock: return [ e for e in self._history if start <= e.timestamp <= end ] def get_stats(self) -> Dict[str, Any]: """Get event bus statistics.""" return self._stats.get_summary() def clear_history(self) -> None: """Clear event history.""" with self._lock: self._history.clear() def shutdown(self) -> None: """Shutdown the event bus and cleanup resources.""" with self._lock: self._subscriptions.clear() self._history.clear() self._stats.active_subscriptions = 0 if self._async_loop and self._async_loop.is_running(): self._async_loop.call_soon_threadsafe(self._async_loop.stop) if self._async_thread and self._async_thread.is_alive(): self._async_thread.join(timeout=2.0) # Singleton instance _event_bus: Optional[EventBus] = None def get_event_bus() -> EventBus: """Get the global EventBus instance. Returns: The singleton EventBus instance """ global _event_bus if _event_bus is None: _event_bus = EventBus() return _event_bus def reset_event_bus() -> None: """Reset the global EventBus instance (mainly for testing).""" global _event_bus if _event_bus is not None: _event_bus.shutdown() _event_bus = None # Convenience decorator def on_event(event_class: Type[T], **filter_kwargs: Any): """Decorator for event subscription. Args: event_class: Event class to subscribe to **filter_kwargs: Filter criteria Example: @on_event(DamageEvent, min_damage=100) def handle_big_damage(event: DamageEvent): print(f"Big hit: {event.damage_amount}") """ def decorator(func: Callable[[T], Any]) -> Callable[[T], Any]: bus = get_event_bus() bus.subscribe_typed(event_class, func, **filter_kwargs) func._event_subscription = True # type: ignore return func return decorator # Export all public symbols __all__ = [ # Event types 'EventCategory', 'BaseEvent', 'SkillGainEvent', 'LootEvent', 'DamageEvent', 'GlobalEvent', 'ChatEvent', 'EconomyEvent', 'SystemEvent', # Core classes 'EventBus', 'EventFilter', 'EventSubscription', 'EventStats', # Functions 'get_event_bus', 'reset_event_bus', 'on_event', ]