""" End-to-End tests for EU-Utility. Tests cover: - Full application initialization - Plugin loading and execution - Event flow through the system - Data persistence across sessions """ import sys import unittest import tempfile import shutil from pathlib import Path from unittest.mock import MagicMock, patch # Add project root to path project_root = Path(__file__).parent.parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) class TestApplicationInitialization(unittest.TestCase): """Test full application initialization.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() # Reset singletons from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI from core.data_store import DataStore from core.settings import get_settings reset_event_bus() PluginAPI._instance = None DataStore._instance = None def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir) # Reset singletons from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI from core.data_store import DataStore reset_event_bus() PluginAPI._instance = None DataStore._instance = None def test_core_services_initialization(self): """Test that all core services can be initialized.""" from core.event_bus import get_event_bus from core.plugin_api import get_api from core.data_store import get_data_store from core.settings import Settings from core.tasks import get_task_manager # Initialize all services event_bus = get_event_bus() api = get_api() data_store = get_data_store(data_dir=self.temp_dir) settings = Settings(config_file=Path(self.temp_dir) / "settings.json") task_manager = get_task_manager() # Verify all are initialized self.assertIsNotNone(event_bus) self.assertIsNotNone(api) self.assertIsNotNone(data_store) self.assertIsNotNone(settings) self.assertIsNotNone(task_manager) def test_singleton_consistency(self): """Test that singletons maintain consistency.""" from core.event_bus import get_event_bus from core.plugin_api import get_api from core.data_store import get_data_store # Get instances multiple times bus1 = get_event_bus() bus2 = get_event_bus() api1 = get_api() api2 = get_api() # Verify they are the same self.assertIs(bus1, bus2) self.assertIs(api1, api2) class TestEventFlow(unittest.TestCase): """Test event flow through the system.""" def setUp(self): """Set up test environment.""" from core.event_bus import reset_event_bus, get_event_bus reset_event_bus() self.event_bus = get_event_bus() def tearDown(self): """Clean up.""" from core.event_bus import reset_event_bus reset_event_bus() def test_event_publish_and_subscribe(self): """Test publishing and subscribing to events.""" from core.event_bus import SkillGainEvent received_events = [] def handler(event): received_events.append(event) # Subscribe self.event_bus.subscribe_typed(SkillGainEvent, handler) # Publish event = SkillGainEvent( skill_name="Rifle", skill_value=25.0, gain_amount=0.01 ) self.event_bus.publish_sync(event) # Verify self.assertEqual(len(received_events), 1) self.assertEqual(received_events[0].skill_name, "Rifle") def test_multiple_subscribers_receive_event(self): """Test that multiple subscribers receive the same event.""" from core.event_bus import LootEvent received1 = [] received2 = [] def handler1(event): received1.append(event) def handler2(event): received2.append(event) self.event_bus.subscribe_typed(LootEvent, handler1) self.event_bus.subscribe_typed(LootEvent, handler2) event = LootEvent( mob_name="Daikiba", items=[{"name": "Animal Oil"}], total_tt_value=0.05 ) self.event_bus.publish_sync(event) self.assertEqual(len(received1), 1) self.assertEqual(len(received2), 1) def test_event_history(self): """Test that events are stored in history.""" from core.event_bus import DamageEvent # Publish several events for i in range(5): event = DamageEvent( damage_amount=100 + i * 10, target_name=f"Target{i}" ) self.event_bus.publish_sync(event) # Check history recent = self.event_bus.get_recent_events(count=10) self.assertEqual(len(recent), 5) def test_event_filtering_by_type(self): """Test filtering events by type.""" from core.event_bus import SkillGainEvent, LootEvent, DamageEvent # Publish different event types self.event_bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)) self.event_bus.publish_sync(LootEvent(mob_name="Mob1", items=[], total_tt_value=0.0)) self.event_bus.publish_sync(DamageEvent(damage_amount=100, target_name="Target1")) self.event_bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02)) # Filter by type skill_events = self.event_bus.get_recent_events(event_type=SkillGainEvent) self.assertEqual(len(skill_events), 2) class TestDataPersistence(unittest.TestCase): """Test data persistence across operations.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() from core.data_store import DataStore DataStore._instance = None def tearDown(self): """Clean up test environment.""" shutil.rmtree(self.temp_dir) from core.data_store import DataStore DataStore._instance = None def test_data_persistence(self): """Test that data persists to disk.""" from core.data_store import DataStore # Create data store and save data ds1 = DataStore(data_dir=self.temp_dir) ds1.save("plugin1", "key1", {"data": "value1"}) # Create new data store instance (simulating restart) from core.data_store import DataStore DataStore._instance = None ds2 = DataStore(data_dir=self.temp_dir) # Load data loaded = ds2.load("plugin1", "key1") self.assertEqual(loaded, {"data": "value1"}) def test_multiple_plugins_persistence(self): """Test persistence for multiple plugins.""" from core.data_store import DataStore ds = DataStore(data_dir=self.temp_dir) # Save data for multiple plugins ds.save("plugin1", "config", {"enabled": True}) ds.save("plugin2", "config", {"enabled": False}) ds.save("plugin1", "data", [1, 2, 3]) # Reset and reload from core.data_store import DataStore DataStore._instance = None ds2 = DataStore(data_dir=self.temp_dir) # Verify all data self.assertEqual(ds2.load("plugin1", "config"), {"enabled": True}) self.assertEqual(ds2.load("plugin2", "config"), {"enabled": False}) self.assertEqual(ds2.load("plugin1", "data"), [1, 2, 3]) class TestAPIIntegration(unittest.TestCase): """Test API integration with core services.""" def setUp(self): """Set up test environment.""" from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI from core.data_store import DataStore reset_event_bus() PluginAPI._instance = None DataStore._instance = None def tearDown(self): """Clean up.""" from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI reset_event_bus() PluginAPI._instance = None def test_api_event_integration(self): """Test API integration with event bus.""" from core.plugin_api import get_api from core.event_bus import SkillGainEvent api = get_api() received = [] def handler(event): received.append(event) api.subscribe_typed(SkillGainEvent, handler) event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) api.publish_typed(event) # Use sync publish to ensure immediate delivery from core.event_bus import get_event_bus get_event_bus().publish_sync(event) self.assertEqual(len(received), 1) def test_api_utility_functions(self): """Test API utility functions.""" from core.plugin_api import get_api api = get_api() # Test PED formatting self.assertEqual(api.format_ped(10.5), "10.50 PED") self.assertEqual(api.format_ped(0), "0.00 PED") # Test PEC formatting self.assertEqual(api.format_pec(50), "50 PEC") self.assertEqual(api.format_pec(0), "0 PEC") # Test DPP calculation dpp = api.calculate_dpp(damage=100, ammo=50, decay=0.5) self.assertIsInstance(dpp, float) # Test markup calculation markup = api.calculate_markup(price=110, tt=100) self.assertEqual(markup, 110.0) class TestSettingsWorkflow(unittest.TestCase): """Test settings workflows.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() self.config_file = Path(self.temp_dir) / "settings.json" def tearDown(self): """Clean up.""" shutil.rmtree(self.temp_dir) def test_settings_workflow(self): """Test complete settings workflow.""" from core.settings import Settings # Create settings settings = Settings(config_file=str(self.config_file)) # Set values settings.set("overlay_enabled", False) settings.set("custom_setting", "custom_value") settings.enable_plugin("test_plugin") # Verify values self.assertFalse(settings.get("overlay_enabled")) self.assertEqual(settings.get("custom_setting"), "custom_value") self.assertTrue(settings.is_plugin_enabled("test_plugin")) # Reset specific key settings.reset("overlay_enabled") self.assertTrue(settings.get("overlay_enabled")) # Back to default # Reset all settings.reset() self.assertIsNone(settings.get("custom_setting")) class TestSystemIntegration(unittest.TestCase): """Test complete system integration.""" def setUp(self): """Set up test environment.""" self.temp_dir = tempfile.mkdtemp() # Reset all singletons from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI from core.data_store import DataStore reset_event_bus() PluginAPI._instance = None DataStore._instance = None def tearDown(self): """Clean up.""" shutil.rmtree(self.temp_dir) # Reset all singletons from core.event_bus import reset_event_bus from core.plugin_api import PluginAPI from core.data_store import DataStore reset_event_bus() PluginAPI._instance = None DataStore._instance = None def test_complete_system_interaction(self): """Test complete interaction between all core systems.""" from core.event_bus import get_event_bus, SkillGainEvent from core.plugin_api import get_api, APIEndpoint, APIType from core.data_store import get_data_store from core.settings import Settings # Initialize all systems event_bus = get_event_bus() api = get_api() data_store = get_data_store(data_dir=self.temp_dir) settings = Settings(config_file=Path(self.temp_dir) / "settings.json") # 1. Register an API endpoint def skill_handler(skill_name): return f"Processed: {skill_name}" endpoint = APIEndpoint( name="process_skill", api_type=APIType.UTILITY, description="Process skill gain", handler=skill_handler, plugin_id="test_plugin" ) api.register_api(endpoint) # 2. Subscribe to events received_skills = [] def event_handler(event): received_skills.append(event.skill_name) # Call the API api.call_api("test_plugin", "process_skill", event.skill_name) event_bus.subscribe_typed(SkillGainEvent, event_handler) # 3. Publish an event event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01) event_bus.publish_sync(event) # 4. Store the result data_store.save("test_plugin", "last_skill", received_skills[0]) # 5. Update settings settings.enable_plugin("test_plugin") # Verify everything worked self.assertEqual(len(received_skills), 1) self.assertEqual(data_store.load("test_plugin", "last_skill"), "Rifle") self.assertTrue(settings.is_plugin_enabled("test_plugin")) if __name__ == '__main__': unittest.main()