""" Unit tests for Event Bus service. Tests cover: - Singleton pattern - Event publishing and subscribing - Event filtering - Event history and replay - Statistics tracking - Thread safety """ import pytest import time import threading from datetime import datetime from unittest.mock import MagicMock from core.event_bus import ( EventBus, get_event_bus, reset_event_bus, BaseEvent, SkillGainEvent, LootEvent, DamageEvent, GlobalEvent, ChatEvent, EconomyEvent, SystemEvent, EventCategory, EventFilter ) @pytest.mark.unit class TestEventBusSingleton: """Test EventBus singleton behavior.""" def test_singleton_instance(self): """Test that EventBus is a proper singleton.""" reset_event_bus() bus1 = get_event_bus() bus2 = get_event_bus() assert bus1 is bus2 assert isinstance(bus1, EventBus) def test_reset_creates_new_instance(self): """Test that reset creates a new instance.""" bus1 = get_event_bus() reset_event_bus() bus2 = get_event_bus() assert bus1 is not bus2 @pytest.mark.unit class TestEventPublishing: """Test event publishing functionality.""" def test_publish_async(self, fresh_event_bus): """Test async event publishing.""" bus = fresh_event_bus received = [] def handler(event): received.append(event) sub_id = bus.subscribe(handler) event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) bus.publish(event) time.sleep(0.1) # Wait for async delivery assert len(received) == 1 assert received[0].skill_name == "Rifle" def test_publish_sync(self, fresh_event_bus): """Test synchronous event publishing.""" bus = fresh_event_bus received = [] def handler(event): received.append(event) sub_id = bus.subscribe(handler) event = DamageEvent(damage_amount=100, target_name="Atrox") count = bus.publish_sync(event) assert count == 1 assert len(received) == 1 assert received[0].damage_amount == 100 def test_publish_no_subscribers(self, fresh_event_bus): """Test publishing with no subscribers.""" bus = fresh_event_bus event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) # Should not raise bus.publish(event) count = bus.publish_sync(event) assert count == 0 @pytest.mark.unit class TestEventSubscribing: """Test event subscription functionality.""" def test_subscribe_and_receive(self, fresh_event_bus): """Test basic subscription and event receipt.""" bus = fresh_event_bus received = [] def handler(event): received.append(event) sub_id = bus.subscribe(handler) event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0) bus.publish_sync(event) assert len(received) == 1 assert isinstance(sub_id, str) def test_unsubscribe(self, fresh_event_bus): """Test unsubscribing from events.""" bus = fresh_event_bus received = [] def handler(event): received.append(event) sub_id = bus.subscribe(handler) event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) bus.publish_sync(event) assert len(received) == 1 # Unsubscribe result = bus.unsubscribe(sub_id) assert result is True bus.publish_sync(event) assert len(received) == 1 # No new events def test_unsubscribe_invalid_id(self, fresh_event_bus): """Test unsubscribing with invalid ID.""" bus = fresh_event_bus result = bus.unsubscribe("invalid_id") assert result is False def test_multiple_subscribers(self, fresh_event_bus): """Test multiple subscribers receiving same event.""" bus = fresh_event_bus received1 = [] received2 = [] def handler1(event): received1.append(event) def handler2(event): received2.append(event) bus.subscribe(handler1) bus.subscribe(handler2) event = GlobalEvent(player_name="Player", achievement_type="hof", value=1000) bus.publish_sync(event) assert len(received1) == 1 assert len(received2) == 1 @pytest.mark.unit class TestTypedSubscriptions: """Test typed event subscriptions.""" def test_subscribe_typed(self, fresh_event_bus): """Test subscribing to specific event type.""" bus = fresh_event_bus received = [] def handler(event: SkillGainEvent): received.append(event) sub_id = bus.subscribe_typed(SkillGainEvent, handler) # Publish matching event skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) bus.publish_sync(skill_event) # Publish non-matching event loot_event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0) bus.publish_sync(loot_event) assert len(received) == 1 assert received[0].skill_name == "Rifle" def test_subscribe_typed_with_filter(self, fresh_event_bus): """Test typed subscription with filters.""" bus = fresh_event_bus received = [] def handler(event: DamageEvent): received.append(event) # Subscribe to high damage only sub_id = bus.subscribe_typed(DamageEvent, handler, min_damage=100) # Low damage - should not trigger bus.publish_sync(DamageEvent(damage_amount=50, target_name="Mob1")) # High damage - should trigger bus.publish_sync(DamageEvent(damage_amount=150, target_name="Mob2")) assert len(received) == 1 assert received[0].damage_amount == 150 @pytest.mark.unit class TestEventHistory: """Test event history and replay functionality.""" def test_event_history(self, fresh_event_bus): """Test that events are stored in history.""" bus = fresh_event_bus events = [ SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01), SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02), LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0) ] for event in events: bus.publish_sync(event) recent = bus.get_recent_events(count=10) assert len(recent) == 3 def test_get_recent_events_by_type(self, fresh_event_bus): """Test filtering recent events by type.""" bus = fresh_event_bus bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)) bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)) bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02)) skill_events = bus.get_recent_events(event_type=SkillGainEvent) assert len(skill_events) == 2 loot_events = bus.get_recent_events(event_type=LootEvent) assert len(loot_events) == 1 def test_get_recent_events_by_category(self, fresh_event_bus): """Test filtering recent events by category.""" bus = fresh_event_bus bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)) bus.publish_sync(DamageEvent(damage_amount=100, target_name="Atrox")) bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)) combat_events = bus.get_recent_events(category=EventCategory.COMBAT) assert len(combat_events) == 1 skill_events = bus.get_recent_events(category=EventCategory.SKILL) assert len(skill_events) == 1 def test_event_replay(self, fresh_event_bus): """Test replaying events to new subscribers.""" bus = fresh_event_bus # Publish some events for i in range(5): bus.publish_sync(SkillGainEvent(skill_name=f"Skill{i}", skill_value=float(i), gain_amount=0.01)) # Subscribe with replay received = [] def handler(event): received.append(event) sub_id = bus.subscribe(handler, replay_history=True, replay_count=3) time.sleep(0.1) # Wait for replay assert len(received) == 3 @pytest.mark.unit class TestEventStatistics: """Test event statistics tracking.""" def test_stats_tracking(self, fresh_event_bus): """Test that event statistics are tracked.""" bus = fresh_event_bus # Initial stats stats = bus.get_stats() assert stats['total_published'] == 0 # Publish events bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)) bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)) stats = bus.get_stats() assert stats['total_published'] == 2 assert stats['total_delivered'] == 2 def test_stats_by_type(self, fresh_event_bus): """Test event type statistics.""" bus = fresh_event_bus bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)) bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02)) bus.publish_sync(LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0)) stats = bus.get_stats() assert 'SkillGainEvent' in stats['top_event_types'] assert 'LootEvent' in stats['top_event_types'] @pytest.mark.unit class TestEventFilters: """Test event filtering functionality.""" def test_filter_by_event_type(self): """Test filtering by event type.""" filter_obj = EventFilter(event_types=[SkillGainEvent, LootEvent]) skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) damage_event = DamageEvent(damage_amount=100, target_name="Atrox") assert filter_obj.matches(skill_event) is True assert filter_obj.matches(damage_event) is False def test_filter_by_category(self): """Test filtering by category.""" filter_obj = EventFilter(categories=[EventCategory.SKILL]) skill_event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) loot_event = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0) assert filter_obj.matches(skill_event) is True assert filter_obj.matches(loot_event) is False def test_filter_by_damage_threshold(self): """Test filtering by damage threshold.""" filter_obj = EventFilter(min_damage=100, max_damage=200) low_damage = DamageEvent(damage_amount=50, target_name="Mob1") mid_damage = DamageEvent(damage_amount=150, target_name="Mob2") high_damage = DamageEvent(damage_amount=250, target_name="Mob3") assert filter_obj.matches(low_damage) is False assert filter_obj.matches(mid_damage) is True assert filter_obj.matches(high_damage) is False def test_filter_by_mob_types(self): """Test filtering by mob types.""" filter_obj = EventFilter(mob_types=["Atrox", "Daikiba"]) atrox_loot = LootEvent(mob_name="Atrox", items=[], total_tt_value=0.0) daikiba_loot = LootEvent(mob_name="Daikiba", items=[], total_tt_value=0.0) other_loot = LootEvent(mob_name="Other", items=[], total_tt_value=0.0) assert filter_obj.matches(atrox_loot) is True assert filter_obj.matches(daikiba_loot) is True assert filter_obj.matches(other_loot) is False def test_custom_predicate_filter(self): """Test custom predicate filtering.""" def is_critical_damage(event): return isinstance(event, DamageEvent) and event.is_critical filter_obj = EventFilter(custom_predicate=is_critical_damage) crit_damage = DamageEvent(damage_amount=100, target_name="Mob", is_critical=True) normal_damage = DamageEvent(damage_amount=100, target_name="Mob", is_critical=False) assert filter_obj.matches(crit_damage) is True assert filter_obj.matches(normal_damage) is False @pytest.mark.unit class TestEventDataClasses: """Test event dataclass functionality.""" def test_event_to_dict(self): """Test converting event to dictionary.""" event = SkillGainEvent(skill_name="Rifle", skill_value=25.5, gain_amount=0.01) data = event.to_dict() assert data['event_type'] == 'SkillGainEvent' assert data['category'] == 'SKILL' assert data['skill_name'] == 'Rifle' assert data['skill_value'] == 25.5 def test_event_categories(self): """Test event category assignment.""" assert SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01).category == EventCategory.SKILL assert LootEvent(mob_name="Mob", items=[], total_tt_value=0.0).category == EventCategory.LOOT assert DamageEvent(damage_amount=100, target_name="Mob").category == EventCategory.COMBAT assert GlobalEvent(player_name="Player", achievement_type="hof", value=1000).category == EventCategory.GLOBAL assert ChatEvent(channel="main", sender="Player", message="Hi").category == EventCategory.CHAT assert EconomyEvent(transaction_type="sale", amount=100).category == EventCategory.ECONOMY assert SystemEvent(message="Test").category == EventCategory.SYSTEM def test_damage_event_high_damage_check(self): """Test DamageEvent high damage helper.""" low_damage = DamageEvent(damage_amount=50, target_name="Mob") high_damage = DamageEvent(damage_amount=150, target_name="Mob") assert low_damage.is_high_damage(threshold=100) is False assert high_damage.is_high_damage(threshold=100) is True def test_loot_event_get_item_names(self): """Test LootEvent get_item_names helper.""" loot = LootEvent( mob_name="Daikiba", items=[{"name": "Animal Oil"}, {"name": "Wool"}], total_tt_value=0.1 ) names = loot.get_item_names() assert names == ["Animal Oil", "Wool"]