EU-Utility/tests/e2e/test_end_to_end.py

432 lines
14 KiB
Python

"""
End-to-End tests for EU-Utility.
Tests cover:
- Full application initialization
- Plugin loading and execution
- Event flow through the system
- Data persistence across sessions
"""
import sys
import unittest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import MagicMock, patch
# Add project root to path
project_root = Path(__file__).parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
class TestApplicationInitialization(unittest.TestCase):
"""Test full application initialization."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
# Reset singletons
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
from core.data_store import DataStore
from core.settings import get_settings
reset_event_bus()
PluginAPI._instance = None
DataStore._instance = None
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
# Reset singletons
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
from core.data_store import DataStore
reset_event_bus()
PluginAPI._instance = None
DataStore._instance = None
def test_core_services_initialization(self):
"""Test that all core services can be initialized."""
from core.event_bus import get_event_bus
from core.plugin_api import get_api
from core.data_store import get_data_store
from core.settings import Settings
from core.tasks import get_task_manager
# Initialize all services
event_bus = get_event_bus()
api = get_api()
data_store = get_data_store(data_dir=self.temp_dir)
settings = Settings(config_file=Path(self.temp_dir) / "settings.json")
task_manager = get_task_manager()
# Verify all are initialized
self.assertIsNotNone(event_bus)
self.assertIsNotNone(api)
self.assertIsNotNone(data_store)
self.assertIsNotNone(settings)
self.assertIsNotNone(task_manager)
def test_singleton_consistency(self):
"""Test that singletons maintain consistency."""
from core.event_bus import get_event_bus
from core.plugin_api import get_api
from core.data_store import get_data_store
# Get instances multiple times
bus1 = get_event_bus()
bus2 = get_event_bus()
api1 = get_api()
api2 = get_api()
# Verify they are the same
self.assertIs(bus1, bus2)
self.assertIs(api1, api2)
class TestEventFlow(unittest.TestCase):
"""Test event flow through the system."""
def setUp(self):
"""Set up test environment."""
from core.event_bus import reset_event_bus, get_event_bus
reset_event_bus()
self.event_bus = get_event_bus()
def tearDown(self):
"""Clean up."""
from core.event_bus import reset_event_bus
reset_event_bus()
def test_event_publish_and_subscribe(self):
"""Test publishing and subscribing to events."""
from core.event_bus import SkillGainEvent
received_events = []
def handler(event):
received_events.append(event)
# Subscribe
self.event_bus.subscribe_typed(SkillGainEvent, handler)
# Publish
event = SkillGainEvent(
skill_name="Rifle",
skill_value=25.0,
gain_amount=0.01
)
self.event_bus.publish_sync(event)
# Verify
self.assertEqual(len(received_events), 1)
self.assertEqual(received_events[0].skill_name, "Rifle")
def test_multiple_subscribers_receive_event(self):
"""Test that multiple subscribers receive the same event."""
from core.event_bus import LootEvent
received1 = []
received2 = []
def handler1(event):
received1.append(event)
def handler2(event):
received2.append(event)
self.event_bus.subscribe_typed(LootEvent, handler1)
self.event_bus.subscribe_typed(LootEvent, handler2)
event = LootEvent(
mob_name="Daikiba",
items=[{"name": "Animal Oil"}],
total_tt_value=0.05
)
self.event_bus.publish_sync(event)
self.assertEqual(len(received1), 1)
self.assertEqual(len(received2), 1)
def test_event_history(self):
"""Test that events are stored in history."""
from core.event_bus import DamageEvent
# Publish several events
for i in range(5):
event = DamageEvent(
damage_amount=100 + i * 10,
target_name=f"Target{i}"
)
self.event_bus.publish_sync(event)
# Check history
recent = self.event_bus.get_recent_events(count=10)
self.assertEqual(len(recent), 5)
def test_event_filtering_by_type(self):
"""Test filtering events by type."""
from core.event_bus import SkillGainEvent, LootEvent, DamageEvent
# Publish different event types
self.event_bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
self.event_bus.publish_sync(LootEvent(mob_name="Mob1", items=[], total_tt_value=0.0))
self.event_bus.publish_sync(DamageEvent(damage_amount=100, target_name="Target1"))
self.event_bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02))
# Filter by type
skill_events = self.event_bus.get_recent_events(event_type=SkillGainEvent)
self.assertEqual(len(skill_events), 2)
class TestDataPersistence(unittest.TestCase):
"""Test data persistence across operations."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
from core.data_store import DataStore
DataStore._instance = None
def tearDown(self):
"""Clean up test environment."""
shutil.rmtree(self.temp_dir)
from core.data_store import DataStore
DataStore._instance = None
def test_data_persistence(self):
"""Test that data persists to disk."""
from core.data_store import DataStore
# Create data store and save data
ds1 = DataStore(data_dir=self.temp_dir)
ds1.save("plugin1", "key1", {"data": "value1"})
# Create new data store instance (simulating restart)
from core.data_store import DataStore
DataStore._instance = None
ds2 = DataStore(data_dir=self.temp_dir)
# Load data
loaded = ds2.load("plugin1", "key1")
self.assertEqual(loaded, {"data": "value1"})
def test_multiple_plugins_persistence(self):
"""Test persistence for multiple plugins."""
from core.data_store import DataStore
ds = DataStore(data_dir=self.temp_dir)
# Save data for multiple plugins
ds.save("plugin1", "config", {"enabled": True})
ds.save("plugin2", "config", {"enabled": False})
ds.save("plugin1", "data", [1, 2, 3])
# Reset and reload
from core.data_store import DataStore
DataStore._instance = None
ds2 = DataStore(data_dir=self.temp_dir)
# Verify all data
self.assertEqual(ds2.load("plugin1", "config"), {"enabled": True})
self.assertEqual(ds2.load("plugin2", "config"), {"enabled": False})
self.assertEqual(ds2.load("plugin1", "data"), [1, 2, 3])
class TestAPIIntegration(unittest.TestCase):
"""Test API integration with core services."""
def setUp(self):
"""Set up test environment."""
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
from core.data_store import DataStore
reset_event_bus()
PluginAPI._instance = None
DataStore._instance = None
def tearDown(self):
"""Clean up."""
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
reset_event_bus()
PluginAPI._instance = None
def test_api_event_integration(self):
"""Test API integration with event bus."""
from core.plugin_api import get_api
from core.event_bus import SkillGainEvent
api = get_api()
received = []
def handler(event):
received.append(event)
api.subscribe_typed(SkillGainEvent, handler)
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
api.publish_typed(event)
# Use sync publish to ensure immediate delivery
from core.event_bus import get_event_bus
get_event_bus().publish_sync(event)
self.assertEqual(len(received), 1)
def test_api_utility_functions(self):
"""Test API utility functions."""
from core.plugin_api import get_api
api = get_api()
# Test PED formatting
self.assertEqual(api.format_ped(10.5), "10.50 PED")
self.assertEqual(api.format_ped(0), "0.00 PED")
# Test PEC formatting
self.assertEqual(api.format_pec(50), "50 PEC")
self.assertEqual(api.format_pec(0), "0 PEC")
# Test DPP calculation
dpp = api.calculate_dpp(damage=100, ammo=50, decay=0.5)
self.assertIsInstance(dpp, float)
# Test markup calculation
markup = api.calculate_markup(price=110, tt=100)
self.assertEqual(markup, 110.0)
class TestSettingsWorkflow(unittest.TestCase):
"""Test settings workflows."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
self.config_file = Path(self.temp_dir) / "settings.json"
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir)
def test_settings_workflow(self):
"""Test complete settings workflow."""
from core.settings import Settings
# Create settings
settings = Settings(config_file=str(self.config_file))
# Set values
settings.set("overlay_enabled", False)
settings.set("custom_setting", "custom_value")
settings.enable_plugin("test_plugin")
# Verify values
self.assertFalse(settings.get("overlay_enabled"))
self.assertEqual(settings.get("custom_setting"), "custom_value")
self.assertTrue(settings.is_plugin_enabled("test_plugin"))
# Reset specific key
settings.reset("overlay_enabled")
self.assertTrue(settings.get("overlay_enabled")) # Back to default
# Reset all
settings.reset()
self.assertIsNone(settings.get("custom_setting"))
class TestSystemIntegration(unittest.TestCase):
"""Test complete system integration."""
def setUp(self):
"""Set up test environment."""
self.temp_dir = tempfile.mkdtemp()
# Reset all singletons
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
from core.data_store import DataStore
reset_event_bus()
PluginAPI._instance = None
DataStore._instance = None
def tearDown(self):
"""Clean up."""
shutil.rmtree(self.temp_dir)
# Reset all singletons
from core.event_bus import reset_event_bus
from core.plugin_api import PluginAPI
from core.data_store import DataStore
reset_event_bus()
PluginAPI._instance = None
DataStore._instance = None
def test_complete_system_interaction(self):
"""Test complete interaction between all core systems."""
from core.event_bus import get_event_bus, SkillGainEvent
from core.plugin_api import get_api, APIEndpoint, APIType
from core.data_store import get_data_store
from core.settings import Settings
# Initialize all systems
event_bus = get_event_bus()
api = get_api()
data_store = get_data_store(data_dir=self.temp_dir)
settings = Settings(config_file=Path(self.temp_dir) / "settings.json")
# 1. Register an API endpoint
def skill_handler(skill_name):
return f"Processed: {skill_name}"
endpoint = APIEndpoint(
name="process_skill",
api_type=APIType.UTILITY,
description="Process skill gain",
handler=skill_handler,
plugin_id="test_plugin"
)
api.register_api(endpoint)
# 2. Subscribe to events
received_skills = []
def event_handler(event):
received_skills.append(event.skill_name)
# Call the API
api.call_api("test_plugin", "process_skill", event.skill_name)
event_bus.subscribe_typed(SkillGainEvent, event_handler)
# 3. Publish an event
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
event_bus.publish_sync(event)
# 4. Store the result
data_store.save("test_plugin", "last_skill", received_skills[0])
# 5. Update settings
settings.enable_plugin("test_plugin")
# Verify everything worked
self.assertEqual(len(received_skills), 1)
self.assertEqual(data_store.load("test_plugin", "last_skill"), "Rifle")
self.assertTrue(settings.is_plugin_enabled("test_plugin"))
if __name__ == '__main__':
unittest.main()