421 lines
15 KiB
Python
421 lines
15 KiB
Python
"""
|
|
Unit tests for Event Bus service.
|
|
|
|
Tests cover:
|
|
- Singleton pattern
|
|
- Event publishing and subscribing
|
|
- Event filtering
|
|
- Event history and replay
|
|
- Statistics tracking
|
|
- Thread safety
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
import threading
|
|
from datetime import datetime
|
|
from unittest.mock import MagicMock
|
|
|
|
from core.event_bus import (
|
|
EventBus, get_event_bus, reset_event_bus,
|
|
BaseEvent, SkillGainEvent, LootEvent, DamageEvent,
|
|
GlobalEvent, ChatEvent, EconomyEvent, SystemEvent,
|
|
EventCategory, EventFilter
|
|
)
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventBusSingleton:
|
|
"""Test EventBus singleton behavior."""
|
|
|
|
def test_singleton_instance(self):
|
|
"""Test that EventBus is a proper singleton."""
|
|
reset_event_bus()
|
|
|
|
bus1 = get_event_bus()
|
|
bus2 = get_event_bus()
|
|
|
|
assert bus1 is bus2
|
|
assert isinstance(bus1, EventBus)
|
|
|
|
def test_reset_creates_new_instance(self):
|
|
"""Test that reset creates a new instance."""
|
|
bus1 = get_event_bus()
|
|
reset_event_bus()
|
|
bus2 = get_event_bus()
|
|
|
|
assert bus1 is not bus2
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventPublishing:
|
|
"""Test event publishing functionality."""
|
|
|
|
def test_publish_async(self, fresh_event_bus):
|
|
"""Test async event publishing."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe(handler)
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
|
|
bus.publish(event)
|
|
time.sleep(0.1) # Wait for async delivery
|
|
|
|
assert len(received) == 1
|
|
assert received[0].skill_name == "Rifle"
|
|
|
|
def test_publish_sync(self, fresh_event_bus):
|
|
"""Test synchronous event publishing."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe(handler)
|
|
event = DamageEvent(damage_amount=100, target_name="Atrox")
|
|
|
|
count = bus.publish_sync(event)
|
|
|
|
assert count == 1
|
|
assert len(received) == 1
|
|
assert received[0].damage_amount == 100
|
|
|
|
def test_publish_no_subscribers(self, fresh_event_bus):
|
|
"""Test publishing with no subscribers."""
|
|
bus = fresh_event_bus
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
|
|
# Should not raise
|
|
bus.publish(event)
|
|
count = bus.publish_sync(event)
|
|
|
|
assert count == 0
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventSubscribing:
|
|
"""Test event subscription functionality."""
|
|
|
|
def test_subscribe_and_receive(self, fresh_event_bus):
|
|
"""Test basic subscription and event receipt."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe(handler)
|
|
event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)
|
|
|
|
bus.publish_sync(event)
|
|
|
|
assert len(received) == 1
|
|
assert isinstance(sub_id, str)
|
|
|
|
def test_unsubscribe(self, fresh_event_bus):
|
|
"""Test unsubscribing from events."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe(handler)
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
|
|
bus.publish_sync(event)
|
|
assert len(received) == 1
|
|
|
|
# Unsubscribe
|
|
result = bus.unsubscribe(sub_id)
|
|
assert result is True
|
|
|
|
bus.publish_sync(event)
|
|
assert len(received) == 1 # No new events
|
|
|
|
def test_unsubscribe_invalid_id(self, fresh_event_bus):
|
|
"""Test unsubscribing with invalid ID."""
|
|
bus = fresh_event_bus
|
|
|
|
result = bus.unsubscribe("invalid_id")
|
|
assert result is False
|
|
|
|
def test_multiple_subscribers(self, fresh_event_bus):
|
|
"""Test multiple subscribers receiving same event."""
|
|
bus = fresh_event_bus
|
|
received1 = []
|
|
received2 = []
|
|
|
|
def handler1(event):
|
|
received1.append(event)
|
|
|
|
def handler2(event):
|
|
received2.append(event)
|
|
|
|
bus.subscribe(handler1)
|
|
bus.subscribe(handler2)
|
|
|
|
event = GlobalEvent(player_name="Player", achievement_type="hof", value=1000)
|
|
bus.publish_sync(event)
|
|
|
|
assert len(received1) == 1
|
|
assert len(received2) == 1
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTypedSubscriptions:
|
|
"""Test typed event subscriptions."""
|
|
|
|
def test_subscribe_typed(self, fresh_event_bus):
|
|
"""Test subscribing to specific event type."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event: SkillGainEvent):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe_typed(SkillGainEvent, handler)
|
|
|
|
# Publish matching event
|
|
skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
bus.publish_sync(skill_event)
|
|
|
|
# Publish non-matching event
|
|
loot_event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)
|
|
bus.publish_sync(loot_event)
|
|
|
|
assert len(received) == 1
|
|
assert received[0].skill_name == "Rifle"
|
|
|
|
def test_subscribe_typed_with_filter(self, fresh_event_bus):
|
|
"""Test typed subscription with filters."""
|
|
bus = fresh_event_bus
|
|
received = []
|
|
|
|
def handler(event: DamageEvent):
|
|
received.append(event)
|
|
|
|
# Subscribe to high damage only
|
|
sub_id = bus.subscribe_typed(DamageEvent, handler, min_damage=100)
|
|
|
|
# Low damage - should not trigger
|
|
bus.publish_sync(DamageEvent(damage_amount=50, target_name="Mob1"))
|
|
|
|
# High damage - should trigger
|
|
bus.publish_sync(DamageEvent(damage_amount=150, target_name="Mob2"))
|
|
|
|
assert len(received) == 1
|
|
assert received[0].damage_amount == 150
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventHistory:
|
|
"""Test event history and replay functionality."""
|
|
|
|
def test_event_history(self, fresh_event_bus):
|
|
"""Test that events are stored in history."""
|
|
bus = fresh_event_bus
|
|
|
|
events = [
|
|
SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01),
|
|
SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02),
|
|
LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)
|
|
]
|
|
|
|
for event in events:
|
|
bus.publish_sync(event)
|
|
|
|
recent = bus.get_recent_events(count=10)
|
|
assert len(recent) == 3
|
|
|
|
def test_get_recent_events_by_type(self, fresh_event_bus):
|
|
"""Test filtering recent events by type."""
|
|
bus = fresh_event_bus
|
|
|
|
bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
|
|
bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0))
|
|
bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02))
|
|
|
|
skill_events = bus.get_recent_events(event_type=SkillGainEvent)
|
|
assert len(skill_events) == 2
|
|
|
|
loot_events = bus.get_recent_events(event_type=LootEvent)
|
|
assert len(loot_events) == 1
|
|
|
|
def test_get_recent_events_by_category(self, fresh_event_bus):
|
|
"""Test filtering recent events by category."""
|
|
bus = fresh_event_bus
|
|
|
|
bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
|
|
bus.publish_sync(DamageEvent(damage_amount=100, target_name="Atrox"))
|
|
bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0))
|
|
|
|
combat_events = bus.get_recent_events(category=EventCategory.COMBAT)
|
|
assert len(combat_events) == 1
|
|
|
|
skill_events = bus.get_recent_events(category=EventCategory.SKILL)
|
|
assert len(skill_events) == 1
|
|
|
|
def test_event_replay(self, fresh_event_bus):
|
|
"""Test replaying events to new subscribers."""
|
|
bus = fresh_event_bus
|
|
|
|
# Publish some events
|
|
for i in range(5):
|
|
bus.publish_sync(SkillGainEvent(skill_name=f"Skill{i}", skill_value=float(i), gain_amount=0.01))
|
|
|
|
# Subscribe with replay
|
|
received = []
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
sub_id = bus.subscribe(handler, replay_history=True, replay_count=3)
|
|
time.sleep(0.1) # Wait for replay
|
|
|
|
assert len(received) == 3
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventStatistics:
|
|
"""Test event statistics tracking."""
|
|
|
|
def test_stats_tracking(self, fresh_event_bus):
|
|
"""Test that event statistics are tracked."""
|
|
bus = fresh_event_bus
|
|
|
|
# Initial stats
|
|
stats = bus.get_stats()
|
|
assert stats['total_published'] == 0
|
|
|
|
# Publish events
|
|
bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
|
|
bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0))
|
|
|
|
stats = bus.get_stats()
|
|
assert stats['total_published'] == 2
|
|
assert stats['total_delivered'] == 2
|
|
|
|
def test_stats_by_type(self, fresh_event_bus):
|
|
"""Test event type statistics."""
|
|
bus = fresh_event_bus
|
|
|
|
bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
|
|
bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02))
|
|
bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0))
|
|
|
|
stats = bus.get_stats()
|
|
assert 'SkillGainEvent' in stats['top_event_types']
|
|
assert 'LootEvent' in stats['top_event_types']
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventFilters:
|
|
"""Test event filtering functionality."""
|
|
|
|
def test_filter_by_event_type(self):
|
|
"""Test filtering by event type."""
|
|
filter_obj = EventFilter(event_types=[SkillGainEvent, LootEvent])
|
|
|
|
skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
damage_event = DamageEvent(damage_amount=100, target_name="Atrox")
|
|
|
|
assert filter_obj.matches(skill_event) is True
|
|
assert filter_obj.matches(damage_event) is False
|
|
|
|
def test_filter_by_category(self):
|
|
"""Test filtering by category."""
|
|
filter_obj = EventFilter(categories=[EventCategory.SKILL])
|
|
|
|
skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
loot_event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)
|
|
|
|
assert filter_obj.matches(skill_event) is True
|
|
assert filter_obj.matches(loot_event) is False
|
|
|
|
def test_filter_by_damage_threshold(self):
|
|
"""Test filtering by damage threshold."""
|
|
filter_obj = EventFilter(min_damage=100, max_damage=200)
|
|
|
|
low_damage = DamageEvent(damage_amount=50, target_name="Mob1")
|
|
mid_damage = DamageEvent(damage_amount=150, target_name="Mob2")
|
|
high_damage = DamageEvent(damage_amount=250, target_name="Mob3")
|
|
|
|
assert filter_obj.matches(low_damage) is False
|
|
assert filter_obj.matches(mid_damage) is True
|
|
assert filter_obj.matches(high_damage) is False
|
|
|
|
def test_filter_by_mob_types(self):
|
|
"""Test filtering by mob types."""
|
|
filter_obj = EventFilter(mob_types=["Atrox", "Daikiba"])
|
|
|
|
atrox_loot = LootEvent(mob_name="Atrox", items=[], total_tt_value=0.0)
|
|
daikiba_loot = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)
|
|
other_loot = LootEvent(mob_name="Other", items=[], total_tt_value=0.0)
|
|
|
|
assert filter_obj.matches(atrox_loot) is True
|
|
assert filter_obj.matches(daikiba_loot) is True
|
|
assert filter_obj.matches(other_loot) is False
|
|
|
|
def test_custom_predicate_filter(self):
|
|
"""Test custom predicate filtering."""
|
|
def is_critical_damage(event):
|
|
return isinstance(event, DamageEvent) and event.is_critical
|
|
|
|
filter_obj = EventFilter(custom_predicate=is_critical_damage)
|
|
|
|
crit_damage = DamageEvent(damage_amount=100, target_name="Mob", is_critical=True)
|
|
normal_damage = DamageEvent(damage_amount=100, target_name="Mob", is_critical=False)
|
|
|
|
assert filter_obj.matches(crit_damage) is True
|
|
assert filter_obj.matches(normal_damage) is False
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestEventDataClasses:
|
|
"""Test event dataclass functionality."""
|
|
|
|
def test_event_to_dict(self):
|
|
"""Test converting event to dictionary."""
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.5, gain_amount=0.01)
|
|
|
|
data = event.to_dict()
|
|
|
|
assert data['event_type'] == 'SkillGainEvent'
|
|
assert data['category'] == 'SKILL'
|
|
assert data['skill_name'] == 'Rifle'
|
|
assert data['skill_value'] == 25.5
|
|
|
|
def test_event_categories(self):
|
|
"""Test event category assignment."""
|
|
assert SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01).category == EventCategory.SKILL
|
|
assert LootEvent(mob_name="Mob", items=[], total_tt_value=0.0).category == EventCategory.LOOT
|
|
assert DamageEvent(damage_amount=100, target_name="Mob").category == EventCategory.COMBAT
|
|
assert GlobalEvent(player_name="Player", achievement_type="hof", value=1000).category == EventCategory.GLOBAL
|
|
assert ChatEvent(channel="main", sender="Player", message="Hi").category == EventCategory.CHAT
|
|
assert EconomyEvent(transaction_type="sale", amount=100).category == EventCategory.ECONOMY
|
|
assert SystemEvent(message="Test").category == EventCategory.SYSTEM
|
|
|
|
def test_damage_event_high_damage_check(self):
|
|
"""Test DamageEvent high damage helper."""
|
|
low_damage = DamageEvent(damage_amount=50, target_name="Mob")
|
|
high_damage = DamageEvent(damage_amount=150, target_name="Mob")
|
|
|
|
assert low_damage.is_high_damage(threshold=100) is False
|
|
assert high_damage.is_high_damage(threshold=100) is True
|
|
|
|
def test_loot_event_get_item_names(self):
|
|
"""Test LootEvent get_item_names helper."""
|
|
loot = LootEvent(
|
|
mob_name="Daikiba",
|
|
items=[{"name": "Animal Oil"}, {"name": "Wool"}],
|
|
total_tt_value=0.1
|
|
)
|
|
|
|
names = loot.get_item_names()
|
|
assert names == ["Animal Oil", "Wool"]
|