EU-Utility/core/event_bus.py

673 lines
22 KiB
Python

"""
EU-Utility - Enhanced Event Bus
Core service for typed event handling with:
- Typed events using dataclasses
- Event filtering (mob types, damage thresholds, etc.)
- Event persistence (last 1000 events)
- Event replay (replay last N events to new subscribers)
- Async event handling (non-blocking publishers)
- Event statistics (events per minute, etc.)
"""
import asyncio
import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union, Set
from functools import wraps
import copy
# ========== Event Types ==========
class EventCategory(Enum):
"""Event categories for organization."""
SKILL = auto()
LOOT = auto()
COMBAT = auto()
ECONOMY = auto()
SYSTEM = auto()
CHAT = auto()
GLOBAL = auto()
@dataclass(frozen=True)
class BaseEvent:
"""Base class for all typed events."""
timestamp: datetime = field(default_factory=datetime.now)
source: str = "unknown"
@property
def event_type(self) -> str:
"""Return the event type name."""
return self.__class__.__name__
@property
def category(self) -> EventCategory:
"""Return the event category."""
return EventCategory.SYSTEM
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary."""
return {
'event_type': self.event_type,
'category': self.category.name,
**asdict(self)
}
@dataclass(frozen=True)
class SkillGainEvent(BaseEvent):
"""Event fired when a skill increases."""
skill_name: str = ""
skill_value: float = 0.0
gain_amount: float = 0.0
@property
def category(self) -> EventCategory:
return EventCategory.SKILL
@dataclass(frozen=True)
class LootEvent(BaseEvent):
"""Event fired when loot is received."""
mob_name: str = ""
items: List[Dict[str, Any]] = field(default_factory=list)
total_tt_value: float = 0.0
position: Optional[tuple] = None # (x, y, z)
@property
def category(self) -> EventCategory:
return EventCategory.LOOT
def get_item_names(self) -> List[str]:
"""Get list of item names from loot."""
return [item.get('name', 'Unknown') for item in self.items]
@dataclass(frozen=True)
class DamageEvent(BaseEvent):
"""Event fired when damage is dealt or received."""
damage_amount: float = 0.0
damage_type: str = "" # e.g., "impact", "penetration", "burn"
is_critical: bool = False
target_name: str = ""
attacker_name: str = ""
is_outgoing: bool = True # True if player dealt damage
@property
def category(self) -> EventCategory:
return EventCategory.COMBAT
def is_high_damage(self, threshold: float = 100.0) -> bool:
"""Check if damage exceeds threshold."""
return self.damage_amount >= threshold
@dataclass(frozen=True)
class GlobalEvent(BaseEvent):
"""Event for global announcements."""
player_name: str = ""
achievement_type: str = "" # e.g., "hof", "ath", "discovery"
value: float = 0.0
item_name: Optional[str] = None
@property
def category(self) -> EventCategory:
return EventCategory.GLOBAL
@dataclass(frozen=True)
class ChatEvent(BaseEvent):
"""Event for chat messages."""
channel: str = "" # "main", "team", "society", etc.
sender: str = ""
message: str = ""
@property
def category(self) -> EventCategory:
return EventCategory.CHAT
@dataclass(frozen=True)
class EconomyEvent(BaseEvent):
"""Event for economic transactions."""
transaction_type: str = "" # "sale", "purchase", "deposit", "withdraw"
amount: float = 0.0
currency: str = "PED"
description: str = ""
@property
def category(self) -> EventCategory:
return EventCategory.ECONOMY
@dataclass(frozen=True)
class SystemEvent(BaseEvent):
"""Event for system notifications."""
message: str = ""
severity: str = "info" # "debug", "info", "warning", "error", "critical"
@property
def category(self) -> EventCategory:
return EventCategory.SYSTEM
# Type variable for event types
T = TypeVar('T', bound=BaseEvent)
# ========== Event Filter ==========
@dataclass
class EventFilter:
"""Filter criteria for event subscription."""
event_types: Optional[List[Type[BaseEvent]]] = None
categories: Optional[List[EventCategory]] = None
min_damage: Optional[float] = None
max_damage: Optional[float] = None
mob_types: Optional[List[str]] = None
skill_names: Optional[List[str]] = None
sources: Optional[List[str]] = None
custom_predicate: Optional[Callable[[BaseEvent], bool]] = None
def matches(self, event: BaseEvent) -> bool:
"""Check if an event matches this filter."""
# Check event type
if self.event_types is not None:
if not any(isinstance(event, et) for et in self.event_types):
return False
# Check category
if self.categories is not None:
if event.category not in self.categories:
return False
# Check source
if self.sources is not None:
if event.source not in self.sources:
return False
# Type-specific filters
if isinstance(event, DamageEvent):
if self.min_damage is not None and event.damage_amount < self.min_damage:
return False
if self.max_damage is not None and event.damage_amount > self.max_damage:
return False
if isinstance(event, LootEvent):
if self.mob_types is not None:
if event.mob_name not in self.mob_types:
return False
if isinstance(event, SkillGainEvent):
if self.skill_names is not None:
if event.skill_name not in self.skill_names:
return False
# Custom predicate
if self.custom_predicate is not None:
return self.custom_predicate(event)
return True
# ========== Subscription ==========
@dataclass
class EventSubscription:
"""Represents an event subscription."""
id: str
callback: Callable[[BaseEvent], Any]
event_filter: EventFilter
replay_history: bool
replay_count: int
created_at: datetime = field(default_factory=datetime.now)
event_count: int = 0
last_received: Optional[datetime] = None
def matches(self, event: BaseEvent) -> bool:
"""Check if event matches this subscription."""
return self.event_filter.matches(event)
# ========== Event Statistics ==========
@dataclass
class EventStats:
"""Statistics for event bus performance."""
total_events_published: int = 0
total_events_delivered: int = 0
total_subscriptions: int = 0
active_subscriptions: int = 0
events_by_type: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
events_by_category: Dict[str, int] = field(default_factory=lambda: defaultdict(int))
events_per_minute: float = 0.0
average_delivery_time_ms: float = 0.0
errors: int = 0
_start_time: datetime = field(default_factory=datetime.now)
_minute_window: deque = field(default_factory=lambda: deque(maxlen=60))
_delivery_times: deque = field(default_factory=lambda: deque(maxlen=100))
def record_event_published(self, event: BaseEvent):
"""Record an event publication."""
self.total_events_published += 1
self.events_by_type[event.event_type] += 1
self.events_by_category[event.category.name] += 1
self._minute_window.append(time.time())
self._update_epm()
def record_event_delivered(self, delivery_time_ms: float):
"""Record successful event delivery."""
self.total_events_delivered += 1
self._delivery_times.append(delivery_time_ms)
if len(self._delivery_times) > 0:
self.average_delivery_time_ms = sum(self._delivery_times) / len(self._delivery_times)
def record_error(self):
"""Record a delivery error."""
self.errors += 1
def _update_epm(self):
"""Update events per minute calculation."""
now = time.time()
# Count events in the last 60 seconds
recent = [t for t in self._minute_window if now - t < 60]
self.events_per_minute = len(recent)
def get_summary(self) -> Dict[str, Any]:
"""Get statistics summary."""
uptime = datetime.now() - self._start_time
return {
'total_published': self.total_events_published,
'total_delivered': self.total_events_delivered,
'active_subscriptions': self.active_subscriptions,
'events_per_minute': round(self.events_per_minute, 2),
'avg_delivery_ms': round(self.average_delivery_time_ms, 2),
'errors': self.errors,
'uptime_seconds': uptime.total_seconds(),
'top_event_types': dict(sorted(
self.events_by_type.items(),
key=lambda x: x[1],
reverse=True
)[:5])
}
# ========== Event Bus ==========
class EventBus:
"""
Enhanced Event Bus for EU-Utility.
Features:
- Typed events using dataclasses
- Event filtering with flexible criteria
- Event persistence (configurable history size)
- Event replay for new subscribers
- Async event handling
- Event statistics
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, max_history: int = 1000):
if self._initialized:
return
self.max_history = max_history
self._history: deque = deque(maxlen=max_history)
self._subscriptions: Dict[str, EventSubscription] = {}
self._subscription_counter = 0
self._stats = EventStats()
self._lock = threading.RLock()
self._async_loop: Optional[asyncio.AbstractEventLoop] = None
self._async_thread: Optional[threading.Thread] = None
self._initialized = True
def _ensure_async_loop(self):
"""Ensure the async event loop is running."""
if self._async_loop is None or self._async_loop.is_closed():
self._start_async_loop()
def _start_async_loop(self):
"""Start the async event loop in a separate thread."""
def run_loop():
self._async_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._async_loop)
self._async_loop.run_forever()
self._async_thread = threading.Thread(target=run_loop, daemon=True)
self._async_thread.start()
def publish(self, event: BaseEvent) -> None:
"""
Publish an event to all matching subscribers.
Non-blocking - returns immediately.
"""
with self._lock:
# Add to history
self._history.append(event)
# Update stats
self._stats.record_event_published(event)
# Get matching subscriptions
matching_subs = [
sub for sub in self._subscriptions.values()
if sub.matches(event)
]
# Deliver outside the lock to prevent blocking
for sub in matching_subs:
self._deliver_async(sub, event)
def publish_sync(self, event: BaseEvent) -> int:
"""
Publish an event synchronously.
Blocks until all callbacks complete.
Returns number of subscribers notified.
"""
with self._lock:
# Add to history
self._history.append(event)
# Update stats
self._stats.record_event_published(event)
# Get matching subscriptions
matching_subs = [
sub for sub in self._subscriptions.values()
if sub.matches(event)
]
# Deliver synchronously
count = 0
for sub in matching_subs:
if self._deliver_sync(sub, event):
count += 1
return count
def _deliver_async(self, subscription: EventSubscription, event: BaseEvent):
"""Deliver an event asynchronously."""
self._ensure_async_loop()
def deliver():
start = time.perf_counter()
try:
subscription.callback(event)
subscription.event_count += 1
subscription.last_received = datetime.now()
elapsed = (time.perf_counter() - start) * 1000
self._stats.record_event_delivered(elapsed)
except Exception as e:
self._stats.record_error()
print(f"[EventBus] Error delivering to {subscription.id}: {e}")
if self._async_loop:
self._async_loop.call_soon_threadsafe(deliver)
def _deliver_sync(self, subscription: EventSubscription, event: BaseEvent) -> bool:
"""Deliver an event synchronously."""
start = time.perf_counter()
try:
subscription.callback(event)
subscription.event_count += 1
subscription.last_received = datetime.now()
elapsed = (time.perf_counter() - start) * 1000
self._stats.record_event_delivered(elapsed)
return True
except Exception as e:
self._stats.record_error()
print(f"[EventBus] Error delivering to {subscription.id}: {e}")
return False
def subscribe(
self,
callback: Callable[[BaseEvent], Any],
event_filter: Optional[EventFilter] = None,
replay_history: bool = False,
replay_count: int = 100,
event_types: Optional[List[Type[BaseEvent]]] = None
) -> str:
"""
Subscribe to events with optional filtering.
Args:
callback: Function to call when matching events occur
event_filter: Filter criteria for events
replay_history: Whether to replay recent events to new subscriber
replay_count: Number of recent events to replay
event_types: Shorthand for simple type-based filtering
Returns:
Subscription ID (use for unsubscribe)
"""
with self._lock:
self._subscription_counter += 1
sub_id = f"sub_{self._subscription_counter}"
# Build filter from shorthand if provided
if event_filter is None and event_types is not None:
event_filter = EventFilter(event_types=event_types)
elif event_filter is None:
event_filter = EventFilter()
subscription = EventSubscription(
id=sub_id,
callback=callback,
event_filter=event_filter,
replay_history=replay_history,
replay_count=min(replay_count, self.max_history)
)
self._subscriptions[sub_id] = subscription
self._stats.active_subscriptions = len(self._subscriptions)
self._stats.total_subscriptions += 1
# Replay history if requested (outside lock)
if replay_history:
self._replay_history(subscription)
return sub_id
def subscribe_typed(
self,
event_class: Type[T],
callback: Callable[[T], Any],
**filter_kwargs
) -> str:
"""
Subscribe to a specific event type with optional filtering.
Args:
event_class: The event class to subscribe to
callback: Function to call with typed event
**filter_kwargs: Additional filter criteria
- min_damage: Minimum damage threshold
- max_damage: Maximum damage threshold
- mob_types: List of mob names to filter
- skill_names: List of skill names to filter
- sources: List of event sources to filter
- replay_last: Number of events to replay
Returns:
Subscription ID
"""
# Build filter from kwargs
filter_args = {'event_types': [event_class]}
if 'min_damage' in filter_kwargs:
filter_args['min_damage'] = filter_kwargs.pop('min_damage')
if 'max_damage' in filter_kwargs:
filter_args['max_damage'] = filter_kwargs.pop('max_damage')
if 'mob_types' in filter_kwargs:
filter_args['mob_types'] = filter_kwargs.pop('mob_types')
if 'skill_names' in filter_kwargs:
filter_args['skill_names'] = filter_kwargs.pop('skill_names')
if 'sources' in filter_kwargs:
filter_args['sources'] = filter_kwargs.pop('sources')
# Handle custom predicate
if 'predicate' in filter_kwargs:
filter_args['custom_predicate'] = filter_kwargs.pop('predicate')
event_filter = EventFilter(**filter_args)
replay_count = filter_kwargs.pop('replay_last', 0)
# Create wrapper to ensure type safety
def typed_callback(event: BaseEvent):
if isinstance(event, event_class):
callback(event)
return self.subscribe(
callback=typed_callback,
event_filter=event_filter,
replay_history=replay_count > 0,
replay_count=replay_count
)
def _replay_history(self, subscription: EventSubscription):
"""Replay recent events to a subscriber."""
with self._lock:
# Get recent events that match the filter
events_to_replay = [
e for e in list(self._history)[-subscription.replay_count:]
if subscription.matches(e)
]
# Deliver each event
for event in events_to_replay:
self._deliver_async(subscription, event)
def unsubscribe(self, subscription_id: str) -> bool:
"""Unsubscribe from events."""
with self._lock:
if subscription_id in self._subscriptions:
del self._subscriptions[subscription_id]
self._stats.active_subscriptions = len(self._subscriptions)
return True
return False
def get_recent_events(
self,
event_type: Optional[Type[T]] = None,
count: int = 100,
category: Optional[EventCategory] = None
) -> List[BaseEvent]:
"""
Get recent events from history.
Args:
event_type: Filter by event class
count: Maximum number of events to return
category: Filter by event category
Returns:
List of matching events
"""
with self._lock:
events = list(self._history)
# Apply filters
if event_type is not None:
events = [e for e in events if isinstance(e, event_type)]
if category is not None:
events = [e for e in events if e.category == category]
# Return most recent
return events[-count:]
def get_events_by_time_range(
self,
start: datetime,
end: Optional[datetime] = None
) -> List[BaseEvent]:
"""Get events within a time range."""
if end is None:
end = datetime.now()
with self._lock:
return [
e for e in self._history
if start <= e.timestamp <= end
]
def get_stats(self) -> Dict[str, Any]:
"""Get event bus statistics."""
return self._stats.get_summary()
def clear_history(self):
"""Clear event history."""
with self._lock:
self._history.clear()
def shutdown(self):
"""Shutdown the event bus and cleanup resources."""
with self._lock:
self._subscriptions.clear()
self._history.clear()
self._stats.active_subscriptions = 0
if self._async_loop and self._async_loop.is_running():
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
if self._async_thread and self._async_thread.is_alive():
self._async_thread.join(timeout=2.0)
# Singleton instance
_event_bus = None
def get_event_bus() -> EventBus:
"""Get the global EventBus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def reset_event_bus():
"""Reset the global EventBus instance (mainly for testing)."""
global _event_bus
if _event_bus is not None:
_event_bus.shutdown()
_event_bus = None
# ========== Decorators ==========
def on_event(
event_class: Type[T],
**filter_kwargs
):
"""
Decorator for event subscription.
Usage:
@on_event(DamageEvent, min_damage=100)
def handle_big_damage(event: DamageEvent):
print(f"Big hit: {event.damage_amount}")
"""
def decorator(func):
bus = get_event_bus()
bus.subscribe_typed(event_class, func, **filter_kwargs)
func._event_subscription = True
return func
return decorator