EU-Utility/tests/performance/test_benchmarks.py

365 lines
11 KiB
Python

"""
Performance Benchmarks
======================
Performance tests for critical operations.
"""
import pytest
import time
from unittest.mock import Mock, patch
@pytest.mark.slow
class TestPluginManagerPerformance:
"""Benchmark Plugin Manager performance."""
def test_plugin_discovery_performance(self, benchmark, mock_overlay):
"""Benchmark plugin discovery speed."""
from core.plugin_manager import PluginManager
pm = PluginManager(mock_overlay)
# Benchmark discovery
result = benchmark(pm.discover_plugins)
# Should complete in reasonable time
assert result is not None
def test_plugin_load_performance(self, benchmark, mock_overlay):
"""Benchmark plugin load speed."""
from core.plugin_manager import PluginManager
from plugins.base_plugin import BasePlugin
class BenchmarkPlugin(BasePlugin):
name = "Benchmark Plugin"
def initialize(self):
pass
def get_ui(self):
return None
pm = PluginManager(mock_overlay)
# Benchmark loading
result = benchmark(pm.load_plugin, BenchmarkPlugin)
assert result is True
@pytest.mark.slow
class TestAPIPerformance:
"""Benchmark API performance."""
def test_log_reading_performance(self, benchmark):
"""Benchmark log reading speed."""
from core.plugin_api import PluginAPI
api = PluginAPI()
# Create mock log reader with large dataset
large_log = [f"[2024-02-15 14:{i:02d}:00] Log line {i}" for i in range(1000)]
mock_reader = Mock(return_value=large_log)
api.register_log_service(mock_reader)
# Benchmark reading
result = benchmark(api.read_log_lines, 1000)
assert len(result) == 1000
def test_nexus_search_performance(self, benchmark):
"""Benchmark Nexus search speed."""
from core.plugin_api import PluginAPI
api = PluginAPI()
# Mock Nexus API
mock_results = [
{"Id": i, "Name": f"Item {i}", "Value": i * 10.0}
for i in range(100)
]
mock_nexus = Mock()
mock_nexus.search_items.return_value = mock_results
api.register_nexus_service(mock_nexus)
# Benchmark search
result = benchmark(api.search_items, "test", limit=100)
assert len(result) == 100
def test_data_store_performance(self, benchmark, data_store):
"""Benchmark data store operations."""
from core.plugin_api import PluginAPI
api = PluginAPI()
api.register_data_service(data_store)
# Benchmark write
def write_data():
for i in range(100):
api.set_data(f"key_{i}", {"data": i * 100})
benchmark(write_data)
# Benchmark read
def read_data():
for i in range(100):
api.get_data(f"key_{i}")
benchmark(read_data)
@pytest.mark.slow
class TestUIPerformance:
"""Benchmark UI performance."""
def test_overlay_creation_performance(self, benchmark, mock_plugin_manager):
"""Benchmark overlay window creation."""
pytest.importorskip("PyQt6")
from PyQt6.QtWidgets import QApplication
from core.overlay_window import OverlayWindow
app = QApplication.instance() or QApplication([])
def create_overlay():
with patch.object(OverlayWindow, '_setup_tray'):
return OverlayWindow(mock_plugin_manager)
window = benchmark(create_overlay)
assert window is not None
def test_dashboard_render_performance(self, benchmark):
"""Benchmark dashboard render speed."""
pytest.importorskip("PyQt6")
from PyQt6.QtWidgets import QApplication
from core.dashboard import Dashboard
app = QApplication.instance() or QApplication([])
dashboard = Dashboard()
def render_dashboard():
dashboard.update()
benchmark(render_dashboard)
def test_plugin_switch_performance(self, benchmark, mock_plugin_manager):
"""Benchmark plugin switching speed."""
pytest.importorskip("PyQt6")
from PyQt6.QtWidgets import QApplication, QWidget
from core.overlay_window import OverlayWindow
app = QApplication.instance() or QApplication([])
# Create mock plugins
plugins = {}
for i in range(10):
mock_plugin = Mock()
mock_plugin.name = f"Plugin {i}"
mock_ui = QWidget()
mock_plugin.get_ui.return_value = mock_ui
plugins[f"plugin_{i}"] = mock_plugin
mock_plugin_manager.get_all_plugins.return_value = plugins
with patch.object(OverlayWindow, '_setup_tray'):
window = OverlayWindow(mock_plugin_manager)
def switch_plugins():
for i in range(min(10, len(window.sidebar_buttons))):
window._on_plugin_selected(i)
benchmark(switch_plugins)
@pytest.mark.slow
class TestMemoryPerformance:
"""Benchmark memory usage."""
def test_memory_usage_plugin_loading(self):
"""Test memory usage during plugin loading."""
import tracemalloc
from core.plugin_manager import PluginManager
from plugins.base_plugin import BasePlugin
tracemalloc.start()
# Initial memory
snapshot1 = tracemalloc.take_snapshot()
# Create plugin manager and load plugins
pm = PluginManager(Mock())
# Load multiple plugins
for i in range(50):
class TestPlugin(BasePlugin):
name = f"Test Plugin {i}"
pm.plugin_classes[f"plugin_{i}"] = TestPlugin
# Memory after loading
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
# Should not have excessive memory growth
total_size = sum(stat.size for stat in top_stats[:10])
assert total_size < 100 * 1024 * 1024 # Less than 100MB
tracemalloc.stop()
def test_memory_usage_data_storage(self):
"""Test memory usage during data storage."""
import tracemalloc
from core.data_store import DataStore
tracemalloc.start()
snapshot1 = tracemalloc.take_snapshot()
store = DataStore(":memory:")
# Store large dataset
for i in range(10000):
store.set(f"key_{i}", {
"id": i,
"data": "x" * 100,
"nested": {"value": i * 2}
})
snapshot2 = tracemalloc.take_snapshot()
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
total_size = sum(stat.size for stat in top_stats[:10])
# Memory should be reasonable
assert total_size < 500 * 1024 * 1024 # Less than 500MB
tracemalloc.stop()
@pytest.mark.slow
class TestStartupPerformance:
"""Benchmark startup performance."""
def test_application_startup_time(self):
"""Test total application startup time."""
import time
start = time.time()
# Import main modules
from core.plugin_manager import PluginManager
from core.plugin_api import PluginAPI
from core.data_store import DataStore
from core.event_bus import EventBus
end = time.time()
import_time = end - start
# Imports should be fast
assert import_time < 5.0 # Less than 5 seconds
def test_plugin_manager_initialization_time(self, benchmark):
"""Benchmark plugin manager initialization."""
from core.plugin_manager import PluginManager
mock_overlay = Mock()
pm = benchmark(PluginManager, mock_overlay)
assert pm is not None
def test_api_initialization_time(self, benchmark):
"""Benchmark API initialization."""
from core.plugin_api import PluginAPI
api = benchmark(PluginAPI)
assert api is not None
@pytest.mark.slow
class TestCachePerformance:
"""Benchmark caching performance."""
def test_http_cache_performance(self, benchmark):
"""Benchmark HTTP cache performance."""
from core.http_client import HTTPClient
client = HTTPClient()
# Mock request
with patch('requests.get') as mock_get:
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"data": "test" * 1000}
mock_get.return_value = mock_response
# First call - hits network
client.get("https://test.com/api")
# Second call - should use cache
def cached_request():
client.get("https://test.com/api", cache=True)
benchmark(cached_request)
# Should only have made one actual request
assert mock_get.call_count == 1
def test_data_store_cache_performance(self, benchmark, data_store):
"""Benchmark data store cache performance."""
from core.plugin_api import PluginAPI
api = PluginAPI()
api.register_data_service(data_store)
# Pre-populate data
for i in range(1000):
api.set_data(f"key_{i}", f"value_{i}")
# Benchmark reads
def read_random():
import random
for _ in range(100):
key = f"key_{random.randint(0, 999)}"
api.get_data(key)
benchmark(read_random)
@pytest.mark.slow
class TestConcurrentPerformance:
"""Benchmark concurrent operations."""
def test_concurrent_event_publishing(self, benchmark):
"""Benchmark concurrent event publishing."""
from core.event_bus import EventBus
from concurrent.futures import ThreadPoolExecutor
event_bus = EventBus()
# Add subscriber
received = []
def handler(event):
received.append(event.data)
event_bus.subscribe("test", handler)
def publish_events():
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(event_bus.publish, "test", i)
for i in range(100)
]
for f in futures:
f.result()
benchmark(publish_events)
assert len(received) == 100