432 lines
14 KiB
Python
432 lines
14 KiB
Python
"""
|
|
End-to-End tests for EU-Utility.
|
|
|
|
Tests cover:
|
|
- Full application initialization
|
|
- Plugin loading and execution
|
|
- Event flow through the system
|
|
- Data persistence across sessions
|
|
"""
|
|
import sys
|
|
import unittest
|
|
import tempfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent.parent
|
|
if str(project_root) not in sys.path:
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
|
|
class TestApplicationInitialization(unittest.TestCase):
|
|
"""Test full application initialization."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
# Reset singletons
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
from core.settings import get_settings
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
DataStore._instance = None
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
# Reset singletons
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
DataStore._instance = None
|
|
|
|
def test_core_services_initialization(self):
|
|
"""Test that all core services can be initialized."""
|
|
from core.event_bus import get_event_bus
|
|
from core.plugin_api import get_api
|
|
from core.data_store import get_data_store
|
|
from core.settings import Settings
|
|
from core.tasks import get_task_manager
|
|
|
|
# Initialize all services
|
|
event_bus = get_event_bus()
|
|
api = get_api()
|
|
data_store = get_data_store(data_dir=self.temp_dir)
|
|
settings = Settings(config_file=Path(self.temp_dir) / "settings.json")
|
|
task_manager = get_task_manager()
|
|
|
|
# Verify all are initialized
|
|
self.assertIsNotNone(event_bus)
|
|
self.assertIsNotNone(api)
|
|
self.assertIsNotNone(data_store)
|
|
self.assertIsNotNone(settings)
|
|
self.assertIsNotNone(task_manager)
|
|
|
|
def test_singleton_consistency(self):
|
|
"""Test that singletons maintain consistency."""
|
|
from core.event_bus import get_event_bus
|
|
from core.plugin_api import get_api
|
|
from core.data_store import get_data_store
|
|
|
|
# Get instances multiple times
|
|
bus1 = get_event_bus()
|
|
bus2 = get_event_bus()
|
|
api1 = get_api()
|
|
api2 = get_api()
|
|
|
|
# Verify they are the same
|
|
self.assertIs(bus1, bus2)
|
|
self.assertIs(api1, api2)
|
|
|
|
|
|
class TestEventFlow(unittest.TestCase):
|
|
"""Test event flow through the system."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
from core.event_bus import reset_event_bus, get_event_bus
|
|
reset_event_bus()
|
|
self.event_bus = get_event_bus()
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
from core.event_bus import reset_event_bus
|
|
reset_event_bus()
|
|
|
|
def test_event_publish_and_subscribe(self):
|
|
"""Test publishing and subscribing to events."""
|
|
from core.event_bus import SkillGainEvent
|
|
|
|
received_events = []
|
|
|
|
def handler(event):
|
|
received_events.append(event)
|
|
|
|
# Subscribe
|
|
self.event_bus.subscribe_typed(SkillGainEvent, handler)
|
|
|
|
# Publish
|
|
event = SkillGainEvent(
|
|
skill_name="Rifle",
|
|
skill_value=25.0,
|
|
gain_amount=0.01
|
|
)
|
|
self.event_bus.publish_sync(event)
|
|
|
|
# Verify
|
|
self.assertEqual(len(received_events), 1)
|
|
self.assertEqual(received_events[0].skill_name, "Rifle")
|
|
|
|
def test_multiple_subscribers_receive_event(self):
|
|
"""Test that multiple subscribers receive the same event."""
|
|
from core.event_bus import LootEvent
|
|
|
|
received1 = []
|
|
received2 = []
|
|
|
|
def handler1(event):
|
|
received1.append(event)
|
|
|
|
def handler2(event):
|
|
received2.append(event)
|
|
|
|
self.event_bus.subscribe_typed(LootEvent, handler1)
|
|
self.event_bus.subscribe_typed(LootEvent, handler2)
|
|
|
|
event = LootEvent(
|
|
mob_name="Daikiba",
|
|
items=[{"name": "Animal Oil"}],
|
|
total_tt_value=0.05
|
|
)
|
|
self.event_bus.publish_sync(event)
|
|
|
|
self.assertEqual(len(received1), 1)
|
|
self.assertEqual(len(received2), 1)
|
|
|
|
def test_event_history(self):
|
|
"""Test that events are stored in history."""
|
|
from core.event_bus import DamageEvent
|
|
|
|
# Publish several events
|
|
for i in range(5):
|
|
event = DamageEvent(
|
|
damage_amount=100 + i * 10,
|
|
target_name=f"Target{i}"
|
|
)
|
|
self.event_bus.publish_sync(event)
|
|
|
|
# Check history
|
|
recent = self.event_bus.get_recent_events(count=10)
|
|
self.assertEqual(len(recent), 5)
|
|
|
|
def test_event_filtering_by_type(self):
|
|
"""Test filtering events by type."""
|
|
from core.event_bus import SkillGainEvent, LootEvent, DamageEvent
|
|
|
|
# Publish different event types
|
|
self.event_bus.publish_sync(SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01))
|
|
self.event_bus.publish_sync(LootEvent(mob_name="Mob1", items=[], total_tt_value=0.0))
|
|
self.event_bus.publish_sync(DamageEvent(damage_amount=100, target_name="Target1"))
|
|
self.event_bus.publish_sync(SkillGainEvent(skill_name="Pistol", skill_value=30.0, gain_amount=0.02))
|
|
|
|
# Filter by type
|
|
skill_events = self.event_bus.get_recent_events(event_type=SkillGainEvent)
|
|
self.assertEqual(len(skill_events), 2)
|
|
|
|
|
|
class TestDataPersistence(unittest.TestCase):
|
|
"""Test data persistence across operations."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
from core.data_store import DataStore
|
|
DataStore._instance = None
|
|
|
|
def tearDown(self):
|
|
"""Clean up test environment."""
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
from core.data_store import DataStore
|
|
DataStore._instance = None
|
|
|
|
def test_data_persistence(self):
|
|
"""Test that data persists to disk."""
|
|
from core.data_store import DataStore
|
|
|
|
# Create data store and save data
|
|
ds1 = DataStore(data_dir=self.temp_dir)
|
|
ds1.save("plugin1", "key1", {"data": "value1"})
|
|
|
|
# Create new data store instance (simulating restart)
|
|
from core.data_store import DataStore
|
|
DataStore._instance = None
|
|
ds2 = DataStore(data_dir=self.temp_dir)
|
|
|
|
# Load data
|
|
loaded = ds2.load("plugin1", "key1")
|
|
|
|
self.assertEqual(loaded, {"data": "value1"})
|
|
|
|
def test_multiple_plugins_persistence(self):
|
|
"""Test persistence for multiple plugins."""
|
|
from core.data_store import DataStore
|
|
|
|
ds = DataStore(data_dir=self.temp_dir)
|
|
|
|
# Save data for multiple plugins
|
|
ds.save("plugin1", "config", {"enabled": True})
|
|
ds.save("plugin2", "config", {"enabled": False})
|
|
ds.save("plugin1", "data", [1, 2, 3])
|
|
|
|
# Reset and reload
|
|
from core.data_store import DataStore
|
|
DataStore._instance = None
|
|
ds2 = DataStore(data_dir=self.temp_dir)
|
|
|
|
# Verify all data
|
|
self.assertEqual(ds2.load("plugin1", "config"), {"enabled": True})
|
|
self.assertEqual(ds2.load("plugin2", "config"), {"enabled": False})
|
|
self.assertEqual(ds2.load("plugin1", "data"), [1, 2, 3])
|
|
|
|
|
|
class TestAPIIntegration(unittest.TestCase):
|
|
"""Test API integration with core services."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
DataStore._instance = None
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
|
|
def test_api_event_integration(self):
|
|
"""Test API integration with event bus."""
|
|
from core.plugin_api import get_api
|
|
from core.event_bus import SkillGainEvent
|
|
|
|
api = get_api()
|
|
|
|
received = []
|
|
|
|
def handler(event):
|
|
received.append(event)
|
|
|
|
api.subscribe_typed(SkillGainEvent, handler)
|
|
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
api.publish_typed(event)
|
|
|
|
# Use sync publish to ensure immediate delivery
|
|
from core.event_bus import get_event_bus
|
|
get_event_bus().publish_sync(event)
|
|
|
|
self.assertEqual(len(received), 1)
|
|
|
|
def test_api_utility_functions(self):
|
|
"""Test API utility functions."""
|
|
from core.plugin_api import get_api
|
|
|
|
api = get_api()
|
|
|
|
# Test PED formatting
|
|
self.assertEqual(api.format_ped(10.5), "10.50 PED")
|
|
self.assertEqual(api.format_ped(0), "0.00 PED")
|
|
|
|
# Test PEC formatting
|
|
self.assertEqual(api.format_pec(50), "50 PEC")
|
|
self.assertEqual(api.format_pec(0), "0 PEC")
|
|
|
|
# Test DPP calculation
|
|
dpp = api.calculate_dpp(damage=100, ammo=50, decay=0.5)
|
|
self.assertIsInstance(dpp, float)
|
|
|
|
# Test markup calculation
|
|
markup = api.calculate_markup(price=110, tt=100)
|
|
self.assertEqual(markup, 110.0)
|
|
|
|
|
|
class TestSettingsWorkflow(unittest.TestCase):
|
|
"""Test settings workflows."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self.config_file = Path(self.temp_dir) / "settings.json"
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
def test_settings_workflow(self):
|
|
"""Test complete settings workflow."""
|
|
from core.settings import Settings
|
|
|
|
# Create settings
|
|
settings = Settings(config_file=str(self.config_file))
|
|
|
|
# Set values
|
|
settings.set("overlay_enabled", False)
|
|
settings.set("custom_setting", "custom_value")
|
|
settings.enable_plugin("test_plugin")
|
|
|
|
# Verify values
|
|
self.assertFalse(settings.get("overlay_enabled"))
|
|
self.assertEqual(settings.get("custom_setting"), "custom_value")
|
|
self.assertTrue(settings.is_plugin_enabled("test_plugin"))
|
|
|
|
# Reset specific key
|
|
settings.reset("overlay_enabled")
|
|
self.assertTrue(settings.get("overlay_enabled")) # Back to default
|
|
|
|
# Reset all
|
|
settings.reset()
|
|
self.assertIsNone(settings.get("custom_setting"))
|
|
|
|
|
|
class TestSystemIntegration(unittest.TestCase):
|
|
"""Test complete system integration."""
|
|
|
|
def setUp(self):
|
|
"""Set up test environment."""
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
|
|
# Reset all singletons
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
DataStore._instance = None
|
|
|
|
def tearDown(self):
|
|
"""Clean up."""
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
# Reset all singletons
|
|
from core.event_bus import reset_event_bus
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
|
|
reset_event_bus()
|
|
PluginAPI._instance = None
|
|
DataStore._instance = None
|
|
|
|
def test_complete_system_interaction(self):
|
|
"""Test complete interaction between all core systems."""
|
|
from core.event_bus import get_event_bus, SkillGainEvent
|
|
from core.plugin_api import get_api, APIEndpoint, APIType
|
|
from core.data_store import get_data_store
|
|
from core.settings import Settings
|
|
|
|
# Initialize all systems
|
|
event_bus = get_event_bus()
|
|
api = get_api()
|
|
data_store = get_data_store(data_dir=self.temp_dir)
|
|
settings = Settings(config_file=Path(self.temp_dir) / "settings.json")
|
|
|
|
# 1. Register an API endpoint
|
|
def skill_handler(skill_name):
|
|
return f"Processed: {skill_name}"
|
|
|
|
endpoint = APIEndpoint(
|
|
name="process_skill",
|
|
api_type=APIType.UTILITY,
|
|
description="Process skill gain",
|
|
handler=skill_handler,
|
|
plugin_id="test_plugin"
|
|
)
|
|
api.register_api(endpoint)
|
|
|
|
# 2. Subscribe to events
|
|
received_skills = []
|
|
|
|
def event_handler(event):
|
|
received_skills.append(event.skill_name)
|
|
# Call the API
|
|
api.call_api("test_plugin", "process_skill", event.skill_name)
|
|
|
|
event_bus.subscribe_typed(SkillGainEvent, event_handler)
|
|
|
|
# 3. Publish an event
|
|
event = SkillGainEvent(skill_name="Rifle", skill_value=25.0, gain_amount=0.01)
|
|
event_bus.publish_sync(event)
|
|
|
|
# 4. Store the result
|
|
data_store.save("test_plugin", "last_skill", received_skills[0])
|
|
|
|
# 5. Update settings
|
|
settings.enable_plugin("test_plugin")
|
|
|
|
# Verify everything worked
|
|
self.assertEqual(len(received_skills), 1)
|
|
self.assertEqual(data_store.load("test_plugin", "last_skill"), "Rifle")
|
|
self.assertTrue(settings.is_plugin_enabled("test_plugin"))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|