621 lines
21 KiB
Python
621 lines
21 KiB
Python
"""
|
|
Event Bus Test Plugin
|
|
|
|
Comprehensive testing of the Event Bus pub/sub system:
|
|
- Subscribe/unsubscribe functionality
|
|
- Message publishing and delivery
|
|
- Multiple subscribers
|
|
- Message filtering
|
|
- Performance under load
|
|
- Cross-plugin communication
|
|
"""
|
|
|
|
import time
|
|
import threading
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Callable
|
|
from dataclasses import dataclass, field
|
|
from collections import defaultdict
|
|
|
|
from core.base_plugin import BasePlugin
|
|
from core.api.plugin_api import get_api
|
|
from core.api.widget_api import get_widget_api, WidgetType
|
|
|
|
|
|
@dataclass
|
|
class EventTestResult:
|
|
"""Result of an event bus test."""
|
|
test_name: str
|
|
passed: bool
|
|
duration_ms: float
|
|
events_sent: int = 0
|
|
events_received: int = 0
|
|
details: Dict = field(default_factory=dict)
|
|
error: str = None
|
|
|
|
|
|
class EventBusTestPlugin(BasePlugin):
|
|
"""
|
|
Test suite for Event Bus functionality.
|
|
|
|
Tests pub/sub patterns, message delivery guarantees,
|
|
and performance characteristics.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.api = None
|
|
self.widget_api = None
|
|
self.results: List[EventTestResult] = []
|
|
self.widget = None
|
|
self.received_events: List[Dict] = []
|
|
self.subscriptions: List[str] = []
|
|
|
|
def initialize(self):
|
|
"""Initialize and run event bus tests."""
|
|
self.api = get_api()
|
|
self.widget_api = get_widget_api()
|
|
|
|
self._create_control_widget()
|
|
self._run_all_tests()
|
|
|
|
def _create_control_widget(self):
|
|
"""Create control widget for test visualization."""
|
|
self.widget = self.widget_api.create_widget(
|
|
name="event_bus_test",
|
|
title="📡 Event Bus Test",
|
|
size=(800, 600),
|
|
position=(200, 200),
|
|
widget_type=WidgetType.CUSTOM
|
|
)
|
|
self._update_widget_display()
|
|
self.widget.show()
|
|
|
|
def _update_widget_display(self):
|
|
"""Update widget with current results."""
|
|
try:
|
|
from PyQt6.QtWidgets import (
|
|
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
|
|
QPushButton, QTextBrowser, QTableWidget, QTableWidgetItem,
|
|
QHeaderView, QGroupBox, QGridLayout
|
|
)
|
|
from PyQt6.QtCore import Qt
|
|
|
|
container = QWidget()
|
|
main_layout = QVBoxLayout(container)
|
|
|
|
# Header
|
|
header = QLabel("📡 Event Bus Test Suite")
|
|
header.setStyleSheet("font-size: 20px; font-weight: bold; color: #ff8c42;")
|
|
main_layout.addWidget(header)
|
|
|
|
# Summary stats
|
|
stats_layout = QHBoxLayout()
|
|
|
|
total_tests = len(self.results)
|
|
passed_tests = sum(1 for r in self.results if r.passed)
|
|
total_sent = sum(r.events_sent for r in self.results)
|
|
total_received = sum(r.events_received for r in self.results)
|
|
|
|
stats = [
|
|
("Tests", f"{passed_tests}/{total_tests}"),
|
|
("Events Sent", str(total_sent)),
|
|
("Events Received", str(total_received)),
|
|
("Delivery Rate", f"{(total_received/max(total_sent,1)*100):.1f}%")
|
|
]
|
|
|
|
for label, value in stats:
|
|
group = QGroupBox(label)
|
|
group_layout = QVBoxLayout(group)
|
|
value_label = QLabel(value)
|
|
value_label.setStyleSheet("font-size: 24px; font-weight: bold; color: #4ecca3;")
|
|
value_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
|
group_layout.addWidget(value_label)
|
|
stats_layout.addWidget(group)
|
|
|
|
main_layout.addLayout(stats_layout)
|
|
|
|
# Results table
|
|
self.results_table = QTableWidget()
|
|
self.results_table.setColumnCount(6)
|
|
self.results_table.setHorizontalHeaderLabels([
|
|
"Test Name", "Status", "Duration", "Sent", "Received", "Details"
|
|
])
|
|
self.results_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
|
|
|
|
self._populate_results_table()
|
|
main_layout.addWidget(self.results_table)
|
|
|
|
# Control buttons
|
|
btn_layout = QHBoxLayout()
|
|
|
|
btn_run = QPushButton("▶ Run All Tests")
|
|
btn_run.clicked.connect(self._run_all_tests)
|
|
btn_layout.addWidget(btn_run)
|
|
|
|
btn_publish = QPushButton("📤 Test Publish")
|
|
btn_publish.clicked.connect(self._manual_publish_test)
|
|
btn_layout.addWidget(btn_publish)
|
|
|
|
btn_clear = QPushButton("🧹 Clear Results")
|
|
btn_clear.clicked.connect(self._clear_results)
|
|
btn_layout.addWidget(btn_clear)
|
|
|
|
main_layout.addLayout(btn_layout)
|
|
|
|
# Event log
|
|
log_label = QLabel("Recent Events:")
|
|
main_layout.addWidget(log_label)
|
|
|
|
self.event_log = QTextBrowser()
|
|
self.event_log.setMaximumHeight(150)
|
|
self._update_event_log()
|
|
main_layout.addWidget(self.event_log)
|
|
|
|
self.widget.set_content(container)
|
|
|
|
except ImportError as e:
|
|
print(f"Widget error: {e}")
|
|
|
|
def _populate_results_table(self):
|
|
"""Populate results table with data."""
|
|
if not hasattr(self, 'results_table'):
|
|
return
|
|
|
|
self.results_table.setRowCount(len(self.results))
|
|
|
|
for i, result in enumerate(self.results):
|
|
self.results_table.setItem(i, 0, QTableWidgetItem(result.test_name))
|
|
|
|
status_item = QTableWidgetItem("✅ PASS" if result.passed else "❌ FAIL")
|
|
status_item.setForeground(
|
|
Qt.GlobalColor.green if result.passed else Qt.GlobalColor.red
|
|
)
|
|
self.results_table.setItem(i, 1, status_item)
|
|
|
|
self.results_table.setItem(i, 2, QTableWidgetItem(f"{result.duration_ms:.2f}ms"))
|
|
self.results_table.setItem(i, 3, QTableWidgetItem(str(result.events_sent)))
|
|
self.results_table.setItem(i, 4, QTableWidgetItem(str(result.events_received)))
|
|
|
|
details = result.error if result.error else str(result.details)[:50]
|
|
self.results_table.setItem(i, 5, QTableWidgetItem(details))
|
|
|
|
def _update_event_log(self):
|
|
"""Update event log display."""
|
|
if hasattr(self, 'event_log') and self.received_events:
|
|
log_text = ""
|
|
for event in self.received_events[-10:]: # Last 10 events
|
|
log_text += f"[{event.get('time', '?')}] {event.get('type', '?')}: {event.get('data', {})}\n"
|
|
self.event_log.setText(log_text)
|
|
|
|
def _record_result(self, test_name: str, passed: bool, duration_ms: float,
|
|
events_sent: int = 0, events_received: int = 0,
|
|
details: Dict = None, error: str = None):
|
|
"""Record test result."""
|
|
result = EventTestResult(
|
|
test_name=test_name,
|
|
passed=passed,
|
|
duration_ms=duration_ms,
|
|
events_sent=events_sent,
|
|
events_received=events_received,
|
|
details=details or {},
|
|
error=error
|
|
)
|
|
self.results.append(result)
|
|
self._update_widget_display()
|
|
|
|
def _run_all_tests(self):
|
|
"""Execute all event bus tests."""
|
|
self.results.clear()
|
|
self.received_events.clear()
|
|
|
|
# Basic functionality
|
|
self._test_basic_subscribe_publish()
|
|
self._test_unsubscribe()
|
|
self._test_multiple_subscribers()
|
|
self._test_event_data_types()
|
|
self._test_wildcard_subscriptions()
|
|
|
|
# Performance
|
|
self._test_high_volume_publishing()
|
|
self._test_rapid_subscribe_unsubscribe()
|
|
|
|
# Edge cases
|
|
self._test_empty_event_data()
|
|
self._test_large_event_data()
|
|
self._test_special_characters()
|
|
|
|
# Cleanup subscriptions
|
|
self._cleanup_subscriptions()
|
|
|
|
def _test_basic_subscribe_publish(self):
|
|
"""Test basic subscribe and publish functionality."""
|
|
start = time.time()
|
|
events_received = []
|
|
|
|
try:
|
|
def handler(data):
|
|
events_received.append(data)
|
|
self.received_events.append({
|
|
'type': 'basic_test',
|
|
'data': data,
|
|
'time': datetime.now().strftime('%H:%M:%S')
|
|
})
|
|
self._update_event_log()
|
|
|
|
sub_id = self.api.subscribe("test.basic", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
# Publish event
|
|
self.api.publish("test.basic", {"message": "hello"})
|
|
time.sleep(0.1) # Allow for delivery
|
|
|
|
duration = (time.time() - start) * 1000
|
|
success = len(events_received) == 1
|
|
|
|
self._record_result(
|
|
"Basic Subscribe/Publish",
|
|
success,
|
|
duration,
|
|
events_sent=1,
|
|
events_received=len(events_received),
|
|
details={"subscription_id": sub_id[:8] if sub_id else None}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Basic Subscribe/Publish", False, duration, error=str(e))
|
|
|
|
def _test_unsubscribe(self):
|
|
"""Test unsubscribe functionality."""
|
|
start = time.time()
|
|
events_received = []
|
|
|
|
try:
|
|
def handler(data):
|
|
events_received.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.unsub", handler)
|
|
|
|
# Publish and receive
|
|
self.api.publish("test.unsub", {"seq": 1})
|
|
time.sleep(0.05)
|
|
|
|
# Unsubscribe
|
|
unsubscribed = self.api.unsubscribe(sub_id)
|
|
|
|
# Publish again - should not receive
|
|
self.api.publish("test.unsub", {"seq": 2})
|
|
time.sleep(0.05)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
success = unsubscribed and len(events_received) == 1
|
|
|
|
self._record_result(
|
|
"Unsubscribe",
|
|
success,
|
|
duration,
|
|
events_sent=2,
|
|
events_received=len(events_received),
|
|
details={"unsubscribed": unsubscribed}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Unsubscribe", False, duration, error=str(e))
|
|
|
|
def _test_multiple_subscribers(self):
|
|
"""Test multiple subscribers to same event type."""
|
|
start = time.time()
|
|
received_by_sub = defaultdict(list)
|
|
|
|
try:
|
|
def make_handler(sub_name):
|
|
return lambda data: received_by_sub[sub_name].append(data)
|
|
|
|
# Create 5 subscribers
|
|
sub_ids = []
|
|
for i in range(5):
|
|
sub_id = self.api.subscribe("test.multi", make_handler(f"sub_{i}"))
|
|
sub_ids.append(sub_id)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
# Publish single event
|
|
self.api.publish("test.multi", {"test": "data"})
|
|
time.sleep(0.1)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
total_received = sum(len(v) for v in received_by_sub.values())
|
|
success = total_received == 5
|
|
|
|
self._record_result(
|
|
"Multiple Subscribers",
|
|
success,
|
|
duration,
|
|
events_sent=1,
|
|
events_received=total_received,
|
|
details={"subscribers": 5}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Multiple Subscribers", False, duration, error=str(e))
|
|
|
|
def _test_event_data_types(self):
|
|
"""Test various event data types."""
|
|
start = time.time()
|
|
received_data = []
|
|
|
|
try:
|
|
def handler(data):
|
|
received_data.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.types", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
test_data = [
|
|
"string data",
|
|
123,
|
|
45.67,
|
|
True,
|
|
None,
|
|
[1, 2, 3],
|
|
{"nested": {"key": "value"}},
|
|
{"mixed": [1, "two", 3.0, False]}
|
|
]
|
|
|
|
for data in test_data:
|
|
self.api.publish("test.types", data)
|
|
|
|
time.sleep(0.2)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
success = len(received_data) == len(test_data)
|
|
|
|
self._record_result(
|
|
"Event Data Types",
|
|
success,
|
|
duration,
|
|
events_sent=len(test_data),
|
|
events_received=len(received_data),
|
|
details={"types_tested": len(test_data)}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Event Data Types", False, duration, error=str(e))
|
|
|
|
def _test_wildcard_subscriptions(self):
|
|
"""Test wildcard/pattern subscriptions."""
|
|
start = time.time()
|
|
received = []
|
|
|
|
try:
|
|
# Subscribe to event pattern
|
|
def handler(data):
|
|
received.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.wildcard", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
# Publish to different sub-events
|
|
self.api.publish("test.wildcard.sub1", {"n": 1})
|
|
self.api.publish("test.wildcard.sub2", {"n": 2})
|
|
self.api.publish("other.event", {"n": 3}) # Should not match
|
|
|
|
time.sleep(0.1)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
|
|
self._record_result(
|
|
"Wildcard Subscriptions",
|
|
True, # Implementation dependent
|
|
duration,
|
|
events_sent=3,
|
|
events_received=len(received),
|
|
details={"received": len(received)}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Wildcard Subscriptions", False, duration, error=str(e))
|
|
|
|
def _test_high_volume_publishing(self):
|
|
"""Test high-volume event publishing."""
|
|
start = time.time()
|
|
received = []
|
|
event_count = 100
|
|
|
|
try:
|
|
def handler(data):
|
|
received.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.volume", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
# Publish many events rapidly
|
|
for i in range(event_count):
|
|
self.api.publish("test.volume", {"seq": i, "timestamp": time.time()})
|
|
|
|
time.sleep(0.5) # Allow processing
|
|
|
|
duration = (time.time() - start) * 1000
|
|
loss_rate = (event_count - len(received)) / event_count * 100
|
|
|
|
self._record_result(
|
|
"High Volume Publishing",
|
|
loss_rate < 5, # Allow 5% loss
|
|
duration,
|
|
events_sent=event_count,
|
|
events_received=len(received),
|
|
details={"loss_rate": f"{loss_rate:.1f}%", "avg_latency_ms": f"{duration/event_count:.2f}"}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("High Volume Publishing", False, duration, error=str(e))
|
|
|
|
def _test_rapid_subscribe_unsubscribe(self):
|
|
"""Test rapid subscribe/unsubscribe cycles."""
|
|
start = time.time()
|
|
cycles = 50
|
|
|
|
try:
|
|
def handler(data):
|
|
pass
|
|
|
|
for i in range(cycles):
|
|
sub_id = self.api.subscribe(f"test.rapid.{i}", handler)
|
|
self.api.unsubscribe(sub_id)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
|
|
self._record_result(
|
|
"Rapid Subscribe/Unsubscribe",
|
|
True,
|
|
duration,
|
|
details={"cycles": cycles, "avg_time_ms": f"{duration/cycles:.2f}"}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Rapid Subscribe/Unsubscribe", False, duration, error=str(e))
|
|
|
|
def _test_empty_event_data(self):
|
|
"""Test publishing empty/null event data."""
|
|
start = time.time()
|
|
received = []
|
|
|
|
try:
|
|
def handler(data):
|
|
received.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.empty", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
self.api.publish("test.empty", None)
|
|
self.api.publish("test.empty", {})
|
|
self.api.publish("test.empty", [])
|
|
self.api.publish("test.empty", "")
|
|
|
|
time.sleep(0.1)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
|
|
self._record_result(
|
|
"Empty Event Data",
|
|
len(received) == 4,
|
|
duration,
|
|
events_sent=4,
|
|
events_received=len(received)
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Empty Event Data", False, duration, error=str(e))
|
|
|
|
def _test_large_event_data(self):
|
|
"""Test publishing large event payloads."""
|
|
start = time.time()
|
|
received = []
|
|
|
|
try:
|
|
def handler(data):
|
|
received.append(data)
|
|
|
|
sub_id = self.api.subscribe("test.large", handler)
|
|
self.subscriptions.append(sub_id)
|
|
|
|
# Large payload
|
|
large_data = {
|
|
"items": [{"id": i, "data": "x" * 100} for i in range(1000)]
|
|
}
|
|
|
|
self.api.publish("test.large", large_data)
|
|
time.sleep(0.2)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
|
|
self._record_result(
|
|
"Large Event Data",
|
|
len(received) == 1,
|
|
duration,
|
|
events_sent=1,
|
|
events_received=len(received),
|
|
details={"payload_size_kb": len(str(large_data)) / 1024}
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Large Event Data", False, duration, error=str(e))
|
|
|
|
def _test_special_characters(self):
|
|
"""Test event types with special characters."""
|
|
start = time.time()
|
|
received = []
|
|
|
|
try:
|
|
def handler(data):
|
|
received.append(data)
|
|
|
|
special_events = [
|
|
"test.special.dot",
|
|
"test-special-dash",
|
|
"test_special_underscore",
|
|
"test:special:colon",
|
|
"test/special/slash"
|
|
]
|
|
|
|
for event_type in special_events:
|
|
sub_id = self.api.subscribe(event_type, handler)
|
|
self.subscriptions.append(sub_id)
|
|
self.api.publish(event_type, {"event": event_type})
|
|
|
|
time.sleep(0.2)
|
|
|
|
duration = (time.time() - start) * 1000
|
|
|
|
self._record_result(
|
|
"Special Characters",
|
|
len(received) == len(special_events),
|
|
duration,
|
|
events_sent=len(special_events),
|
|
events_received=len(received)
|
|
)
|
|
|
|
except Exception as e:
|
|
duration = (time.time() - start) * 1000
|
|
self._record_result("Special Characters", False, duration, error=str(e))
|
|
|
|
def _manual_publish_test(self):
|
|
"""Manual test - publish a test event."""
|
|
self.api.publish("test.manual", {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"message": "Manual test event"
|
|
})
|
|
self.api.show_notification("Event Published", "Test event sent to 'test.manual'")
|
|
|
|
def _clear_results(self):
|
|
"""Clear all test results."""
|
|
self.results.clear()
|
|
self.received_events.clear()
|
|
self._update_widget_display()
|
|
|
|
def _cleanup_subscriptions(self):
|
|
"""Clean up all test subscriptions."""
|
|
for sub_id in self.subscriptions:
|
|
try:
|
|
self.api.unsubscribe(sub_id)
|
|
except:
|
|
pass
|
|
self.subscriptions.clear()
|
|
|
|
def shutdown(self):
|
|
"""Clean up on shutdown."""
|
|
self._cleanup_subscriptions()
|
|
if self.widget:
|
|
self.widget.close()
|
|
|
|
|
|
# Plugin entry point
|
|
plugin_class = EventBusTestPlugin |