365 lines
11 KiB
Python
365 lines
11 KiB
Python
"""
|
|
Performance Benchmarks
|
|
======================
|
|
|
|
Performance tests for critical operations.
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from unittest.mock import Mock, patch
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestPluginManagerPerformance:
|
|
"""Benchmark Plugin Manager performance."""
|
|
|
|
def test_plugin_discovery_performance(self, benchmark, mock_overlay):
|
|
"""Benchmark plugin discovery speed."""
|
|
from core.plugin_manager import PluginManager
|
|
|
|
pm = PluginManager(mock_overlay)
|
|
|
|
# Benchmark discovery
|
|
result = benchmark(pm.discover_plugins)
|
|
|
|
# Should complete in reasonable time
|
|
assert result is not None
|
|
|
|
def test_plugin_load_performance(self, benchmark, mock_overlay):
|
|
"""Benchmark plugin load speed."""
|
|
from core.plugin_manager import PluginManager
|
|
from plugins.base_plugin import BasePlugin
|
|
|
|
class BenchmarkPlugin(BasePlugin):
|
|
name = "Benchmark Plugin"
|
|
|
|
def initialize(self):
|
|
pass
|
|
|
|
def get_ui(self):
|
|
return None
|
|
|
|
pm = PluginManager(mock_overlay)
|
|
|
|
# Benchmark loading
|
|
result = benchmark(pm.load_plugin, BenchmarkPlugin)
|
|
|
|
assert result is True
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestAPIPerformance:
|
|
"""Benchmark API performance."""
|
|
|
|
def test_log_reading_performance(self, benchmark):
|
|
"""Benchmark log reading speed."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
|
|
# Create mock log reader with large dataset
|
|
large_log = [f"[2024-02-15 14:{i:02d}:00] Log line {i}" for i in range(1000)]
|
|
mock_reader = Mock(return_value=large_log)
|
|
api.register_log_service(mock_reader)
|
|
|
|
# Benchmark reading
|
|
result = benchmark(api.read_log_lines, 1000)
|
|
|
|
assert len(result) == 1000
|
|
|
|
def test_nexus_search_performance(self, benchmark):
|
|
"""Benchmark Nexus search speed."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
|
|
# Mock Nexus API
|
|
mock_results = [
|
|
{"Id": i, "Name": f"Item {i}", "Value": i * 10.0}
|
|
for i in range(100)
|
|
]
|
|
mock_nexus = Mock()
|
|
mock_nexus.search_items.return_value = mock_results
|
|
api.register_nexus_service(mock_nexus)
|
|
|
|
# Benchmark search
|
|
result = benchmark(api.search_items, "test", limit=100)
|
|
|
|
assert len(result) == 100
|
|
|
|
def test_data_store_performance(self, benchmark, data_store):
|
|
"""Benchmark data store operations."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_data_service(data_store)
|
|
|
|
# Benchmark write
|
|
def write_data():
|
|
for i in range(100):
|
|
api.set_data(f"key_{i}", {"data": i * 100})
|
|
|
|
benchmark(write_data)
|
|
|
|
# Benchmark read
|
|
def read_data():
|
|
for i in range(100):
|
|
api.get_data(f"key_{i}")
|
|
|
|
benchmark(read_data)
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestUIPerformance:
|
|
"""Benchmark UI performance."""
|
|
|
|
def test_overlay_creation_performance(self, benchmark, mock_plugin_manager):
|
|
"""Benchmark overlay window creation."""
|
|
pytest.importorskip("PyQt6")
|
|
from PyQt6.QtWidgets import QApplication
|
|
from core.overlay_window import OverlayWindow
|
|
|
|
app = QApplication.instance() or QApplication([])
|
|
|
|
def create_overlay():
|
|
with patch.object(OverlayWindow, '_setup_tray'):
|
|
return OverlayWindow(mock_plugin_manager)
|
|
|
|
window = benchmark(create_overlay)
|
|
|
|
assert window is not None
|
|
|
|
def test_dashboard_render_performance(self, benchmark):
|
|
"""Benchmark dashboard render speed."""
|
|
pytest.importorskip("PyQt6")
|
|
from PyQt6.QtWidgets import QApplication
|
|
from core.dashboard import Dashboard
|
|
|
|
app = QApplication.instance() or QApplication([])
|
|
|
|
dashboard = Dashboard()
|
|
|
|
def render_dashboard():
|
|
dashboard.update()
|
|
|
|
benchmark(render_dashboard)
|
|
|
|
def test_plugin_switch_performance(self, benchmark, mock_plugin_manager):
|
|
"""Benchmark plugin switching speed."""
|
|
pytest.importorskip("PyQt6")
|
|
from PyQt6.QtWidgets import QApplication, QWidget
|
|
from core.overlay_window import OverlayWindow
|
|
|
|
app = QApplication.instance() or QApplication([])
|
|
|
|
# Create mock plugins
|
|
plugins = {}
|
|
for i in range(10):
|
|
mock_plugin = Mock()
|
|
mock_plugin.name = f"Plugin {i}"
|
|
mock_ui = QWidget()
|
|
mock_plugin.get_ui.return_value = mock_ui
|
|
plugins[f"plugin_{i}"] = mock_plugin
|
|
|
|
mock_plugin_manager.get_all_plugins.return_value = plugins
|
|
|
|
with patch.object(OverlayWindow, '_setup_tray'):
|
|
window = OverlayWindow(mock_plugin_manager)
|
|
|
|
def switch_plugins():
|
|
for i in range(min(10, len(window.sidebar_buttons))):
|
|
window._on_plugin_selected(i)
|
|
|
|
benchmark(switch_plugins)
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestMemoryPerformance:
|
|
"""Benchmark memory usage."""
|
|
|
|
def test_memory_usage_plugin_loading(self):
|
|
"""Test memory usage during plugin loading."""
|
|
import tracemalloc
|
|
from core.plugin_manager import PluginManager
|
|
from plugins.base_plugin import BasePlugin
|
|
|
|
tracemalloc.start()
|
|
|
|
# Initial memory
|
|
snapshot1 = tracemalloc.take_snapshot()
|
|
|
|
# Create plugin manager and load plugins
|
|
pm = PluginManager(Mock())
|
|
|
|
# Load multiple plugins
|
|
for i in range(50):
|
|
class TestPlugin(BasePlugin):
|
|
name = f"Test Plugin {i}"
|
|
|
|
pm.plugin_classes[f"plugin_{i}"] = TestPlugin
|
|
|
|
# Memory after loading
|
|
snapshot2 = tracemalloc.take_snapshot()
|
|
|
|
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
|
|
|
|
# Should not have excessive memory growth
|
|
total_size = sum(stat.size for stat in top_stats[:10])
|
|
assert total_size < 100 * 1024 * 1024 # Less than 100MB
|
|
|
|
tracemalloc.stop()
|
|
|
|
def test_memory_usage_data_storage(self):
|
|
"""Test memory usage during data storage."""
|
|
import tracemalloc
|
|
from core.data_store import DataStore
|
|
|
|
tracemalloc.start()
|
|
|
|
snapshot1 = tracemalloc.take_snapshot()
|
|
|
|
store = DataStore(":memory:")
|
|
|
|
# Store large dataset
|
|
for i in range(10000):
|
|
store.set(f"key_{i}", {
|
|
"id": i,
|
|
"data": "x" * 100,
|
|
"nested": {"value": i * 2}
|
|
})
|
|
|
|
snapshot2 = tracemalloc.take_snapshot()
|
|
|
|
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
|
|
total_size = sum(stat.size for stat in top_stats[:10])
|
|
|
|
# Memory should be reasonable
|
|
assert total_size < 500 * 1024 * 1024 # Less than 500MB
|
|
|
|
tracemalloc.stop()
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestStartupPerformance:
|
|
"""Benchmark startup performance."""
|
|
|
|
def test_application_startup_time(self):
|
|
"""Test total application startup time."""
|
|
import time
|
|
|
|
start = time.time()
|
|
|
|
# Import main modules
|
|
from core.plugin_manager import PluginManager
|
|
from core.plugin_api import PluginAPI
|
|
from core.data_store import DataStore
|
|
from core.event_bus import EventBus
|
|
|
|
end = time.time()
|
|
|
|
import_time = end - start
|
|
|
|
# Imports should be fast
|
|
assert import_time < 5.0 # Less than 5 seconds
|
|
|
|
def test_plugin_manager_initialization_time(self, benchmark):
|
|
"""Benchmark plugin manager initialization."""
|
|
from core.plugin_manager import PluginManager
|
|
|
|
mock_overlay = Mock()
|
|
|
|
pm = benchmark(PluginManager, mock_overlay)
|
|
|
|
assert pm is not None
|
|
|
|
def test_api_initialization_time(self, benchmark):
|
|
"""Benchmark API initialization."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = benchmark(PluginAPI)
|
|
|
|
assert api is not None
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestCachePerformance:
|
|
"""Benchmark caching performance."""
|
|
|
|
def test_http_cache_performance(self, benchmark):
|
|
"""Benchmark HTTP cache performance."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
# Mock request
|
|
with patch('requests.get') as mock_get:
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"data": "test" * 1000}
|
|
mock_get.return_value = mock_response
|
|
|
|
# First call - hits network
|
|
client.get("https://test.com/api")
|
|
|
|
# Second call - should use cache
|
|
def cached_request():
|
|
client.get("https://test.com/api", cache=True)
|
|
|
|
benchmark(cached_request)
|
|
|
|
# Should only have made one actual request
|
|
assert mock_get.call_count == 1
|
|
|
|
def test_data_store_cache_performance(self, benchmark, data_store):
|
|
"""Benchmark data store cache performance."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_data_service(data_store)
|
|
|
|
# Pre-populate data
|
|
for i in range(1000):
|
|
api.set_data(f"key_{i}", f"value_{i}")
|
|
|
|
# Benchmark reads
|
|
def read_random():
|
|
import random
|
|
for _ in range(100):
|
|
key = f"key_{random.randint(0, 999)}"
|
|
api.get_data(key)
|
|
|
|
benchmark(read_random)
|
|
|
|
|
|
@pytest.mark.slow
|
|
class TestConcurrentPerformance:
|
|
"""Benchmark concurrent operations."""
|
|
|
|
def test_concurrent_event_publishing(self, benchmark):
|
|
"""Benchmark concurrent event publishing."""
|
|
from core.event_bus import EventBus
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
event_bus = EventBus()
|
|
|
|
# Add subscriber
|
|
received = []
|
|
def handler(event):
|
|
received.append(event.data)
|
|
|
|
event_bus.subscribe("test", handler)
|
|
|
|
def publish_events():
|
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = [
|
|
executor.submit(event_bus.publish, "test", i)
|
|
for i in range(100)
|
|
]
|
|
for f in futures:
|
|
f.result()
|
|
|
|
benchmark(publish_events)
|
|
|
|
assert len(received) == 100
|