""" Event Bus Test Plugin Comprehensive testing of the Event Bus pub/sub system: - Subscribe/unsubscribe functionality - Message publishing and delivery - Multiple subscribers - Message filtering - Performance under load - Cross-plugin communication """ import time import threading from datetime import datetime from typing import Dict, List, Any, Callable from dataclasses import dataclass, field from collections import defaultdict from core.base_plugin import BasePlugin from core.api.plugin_api import get_api from core.api.widget_api import get_widget_api, WidgetType @dataclass class EventTestResult: """Result of an event bus test.""" test_name: str passed: bool duration_ms: float events_sent: int = 0 events_received: int = 0 details: Dict = field(default_factory=dict) error: str = None class EventBusTestPlugin(BasePlugin): """ Test suite for Event Bus functionality. Tests pub/sub patterns, message delivery guarantees, and performance characteristics. """ def __init__(self): super().__init__() self.api = None self.widget_api = None self.results: List[EventTestResult] = [] self.widget = None self.received_events: List[Dict] = [] self.subscriptions: List[str] = [] def initialize(self): """Initialize and run event bus tests.""" self.api = get_api() self.widget_api = get_widget_api() self._create_control_widget() self._run_all_tests() def _create_control_widget(self): """Create control widget for test visualization.""" self.widget = self.widget_api.create_widget( name="event_bus_test", title="๐Ÿ“ก Event Bus Test", size=(800, 600), position=(200, 200), widget_type=WidgetType.CUSTOM ) self._update_widget_display() self.widget.show() def _update_widget_display(self): """Update widget with current results.""" try: from PyQt6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QTextBrowser, QTableWidget, QTableWidgetItem, QHeaderView, QGroupBox, QGridLayout ) from PyQt6.QtCore import Qt container = QWidget() main_layout = QVBoxLayout(container) # Header header = QLabel("๐Ÿ“ก Event Bus Test Suite") header.setStyleSheet("font-size: 20px; font-weight: bold; color: #ff8c42;") main_layout.addWidget(header) # Summary stats stats_layout = QHBoxLayout() total_tests = len(self.results) passed_tests = sum(1 for r in self.results if r.passed) total_sent = sum(r.events_sent for r in self.results) total_received = sum(r.events_received for r in self.results) stats = [ ("Tests", f"{passed_tests}/{total_tests}"), ("Events Sent", str(total_sent)), ("Events Received", str(total_received)), ("Delivery Rate", f"{(total_received/max(total_sent,1)*100):.1f}%") ] for label, value in stats: group = QGroupBox(label) group_layout = QVBoxLayout(group) value_label = QLabel(value) value_label.setStyleSheet("font-size: 24px; font-weight: bold; color: #4ecca3;") value_label.setAlignment(Qt.AlignmentFlag.AlignCenter) group_layout.addWidget(value_label) stats_layout.addWidget(group) main_layout.addLayout(stats_layout) # Results table self.results_table = QTableWidget() self.results_table.setColumnCount(6) self.results_table.setHorizontalHeaderLabels([ "Test Name", "Status", "Duration", "Sent", "Received", "Details" ]) self.results_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch) self._populate_results_table() main_layout.addWidget(self.results_table) # Control buttons btn_layout = QHBoxLayout() btn_run = QPushButton("โ–ถ Run All Tests") btn_run.clicked.connect(self._run_all_tests) btn_layout.addWidget(btn_run) btn_publish = QPushButton("๐Ÿ“ค Test Publish") btn_publish.clicked.connect(self._manual_publish_test) btn_layout.addWidget(btn_publish) btn_clear = QPushButton("๐Ÿงน Clear Results") btn_clear.clicked.connect(self._clear_results) btn_layout.addWidget(btn_clear) main_layout.addLayout(btn_layout) # Event log log_label = QLabel("Recent Events:") main_layout.addWidget(log_label) self.event_log = QTextBrowser() self.event_log.setMaximumHeight(150) self._update_event_log() main_layout.addWidget(self.event_log) self.widget.set_content(container) except ImportError as e: print(f"Widget error: {e}") def _populate_results_table(self): """Populate results table with data.""" if not hasattr(self, 'results_table'): return self.results_table.setRowCount(len(self.results)) for i, result in enumerate(self.results): self.results_table.setItem(i, 0, QTableWidgetItem(result.test_name)) status_item = QTableWidgetItem("โœ… PASS" if result.passed else "โŒ FAIL") status_item.setForeground( Qt.GlobalColor.green if result.passed else Qt.GlobalColor.red ) self.results_table.setItem(i, 1, status_item) self.results_table.setItem(i, 2, QTableWidgetItem(f"{result.duration_ms:.2f}ms")) self.results_table.setItem(i, 3, QTableWidgetItem(str(result.events_sent))) self.results_table.setItem(i, 4, QTableWidgetItem(str(result.events_received))) details = result.error if result.error else str(result.details)[:50] self.results_table.setItem(i, 5, QTableWidgetItem(details)) def _update_event_log(self): """Update event log display.""" if hasattr(self, 'event_log') and self.received_events: log_text = "" for event in self.received_events[-10:]: # Last 10 events log_text += f"[{event.get('time', '?')}] {event.get('type', '?')}: {event.get('data', {})}\n" self.event_log.setText(log_text) def _record_result(self, test_name: str, passed: bool, duration_ms: float, events_sent: int = 0, events_received: int = 0, details: Dict = None, error: str = None): """Record test result.""" result = EventTestResult( test_name=test_name, passed=passed, duration_ms=duration_ms, events_sent=events_sent, events_received=events_received, details=details or {}, error=error ) self.results.append(result) self._update_widget_display() def _run_all_tests(self): """Execute all event bus tests.""" self.results.clear() self.received_events.clear() # Basic functionality self._test_basic_subscribe_publish() self._test_unsubscribe() self._test_multiple_subscribers() self._test_event_data_types() self._test_wildcard_subscriptions() # Performance self._test_high_volume_publishing() self._test_rapid_subscribe_unsubscribe() # Edge cases self._test_empty_event_data() self._test_large_event_data() self._test_special_characters() # Cleanup subscriptions self._cleanup_subscriptions() def _test_basic_subscribe_publish(self): """Test basic subscribe and publish functionality.""" start = time.time() events_received = [] try: def handler(data): events_received.append(data) self.received_events.append({ 'type': 'basic_test', 'data': data, 'time': datetime.now().strftime('%H:%M:%S') }) self._update_event_log() sub_id = self.api.subscribe("test.basic", handler) self.subscriptions.append(sub_id) # Publish event self.api.publish("test.basic", {"message": "hello"}) time.sleep(0.1) # Allow for delivery duration = (time.time() - start) * 1000 success = len(events_received) == 1 self._record_result( "Basic Subscribe/Publish", success, duration, events_sent=1, events_received=len(events_received), details={"subscription_id": sub_id[:8] if sub_id else None} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Basic Subscribe/Publish", False, duration, error=str(e)) def _test_unsubscribe(self): """Test unsubscribe functionality.""" start = time.time() events_received = [] try: def handler(data): events_received.append(data) sub_id = self.api.subscribe("test.unsub", handler) # Publish and receive self.api.publish("test.unsub", {"seq": 1}) time.sleep(0.05) # Unsubscribe unsubscribed = self.api.unsubscribe(sub_id) # Publish again - should not receive self.api.publish("test.unsub", {"seq": 2}) time.sleep(0.05) duration = (time.time() - start) * 1000 success = unsubscribed and len(events_received) == 1 self._record_result( "Unsubscribe", success, duration, events_sent=2, events_received=len(events_received), details={"unsubscribed": unsubscribed} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Unsubscribe", False, duration, error=str(e)) def _test_multiple_subscribers(self): """Test multiple subscribers to same event type.""" start = time.time() received_by_sub = defaultdict(list) try: def make_handler(sub_name): return lambda data: received_by_sub[sub_name].append(data) # Create 5 subscribers sub_ids = [] for i in range(5): sub_id = self.api.subscribe("test.multi", make_handler(f"sub_{i}")) sub_ids.append(sub_id) self.subscriptions.append(sub_id) # Publish single event self.api.publish("test.multi", {"test": "data"}) time.sleep(0.1) duration = (time.time() - start) * 1000 total_received = sum(len(v) for v in received_by_sub.values()) success = total_received == 5 self._record_result( "Multiple Subscribers", success, duration, events_sent=1, events_received=total_received, details={"subscribers": 5} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Multiple Subscribers", False, duration, error=str(e)) def _test_event_data_types(self): """Test various event data types.""" start = time.time() received_data = [] try: def handler(data): received_data.append(data) sub_id = self.api.subscribe("test.types", handler) self.subscriptions.append(sub_id) test_data = [ "string data", 123, 45.67, True, None, [1, 2, 3], {"nested": {"key": "value"}}, {"mixed": [1, "two", 3.0, False]} ] for data in test_data: self.api.publish("test.types", data) time.sleep(0.2) duration = (time.time() - start) * 1000 success = len(received_data) == len(test_data) self._record_result( "Event Data Types", success, duration, events_sent=len(test_data), events_received=len(received_data), details={"types_tested": len(test_data)} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Event Data Types", False, duration, error=str(e)) def _test_wildcard_subscriptions(self): """Test wildcard/pattern subscriptions.""" start = time.time() received = [] try: # Subscribe to event pattern def handler(data): received.append(data) sub_id = self.api.subscribe("test.wildcard", handler) self.subscriptions.append(sub_id) # Publish to different sub-events self.api.publish("test.wildcard.sub1", {"n": 1}) self.api.publish("test.wildcard.sub2", {"n": 2}) self.api.publish("other.event", {"n": 3}) # Should not match time.sleep(0.1) duration = (time.time() - start) * 1000 self._record_result( "Wildcard Subscriptions", True, # Implementation dependent duration, events_sent=3, events_received=len(received), details={"received": len(received)} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Wildcard Subscriptions", False, duration, error=str(e)) def _test_high_volume_publishing(self): """Test high-volume event publishing.""" start = time.time() received = [] event_count = 100 try: def handler(data): received.append(data) sub_id = self.api.subscribe("test.volume", handler) self.subscriptions.append(sub_id) # Publish many events rapidly for i in range(event_count): self.api.publish("test.volume", {"seq": i, "timestamp": time.time()}) time.sleep(0.5) # Allow processing duration = (time.time() - start) * 1000 loss_rate = (event_count - len(received)) / event_count * 100 self._record_result( "High Volume Publishing", loss_rate < 5, # Allow 5% loss duration, events_sent=event_count, events_received=len(received), details={"loss_rate": f"{loss_rate:.1f}%", "avg_latency_ms": f"{duration/event_count:.2f}"} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("High Volume Publishing", False, duration, error=str(e)) def _test_rapid_subscribe_unsubscribe(self): """Test rapid subscribe/unsubscribe cycles.""" start = time.time() cycles = 50 try: def handler(data): pass for i in range(cycles): sub_id = self.api.subscribe(f"test.rapid.{i}", handler) self.api.unsubscribe(sub_id) duration = (time.time() - start) * 1000 self._record_result( "Rapid Subscribe/Unsubscribe", True, duration, details={"cycles": cycles, "avg_time_ms": f"{duration/cycles:.2f}"} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Rapid Subscribe/Unsubscribe", False, duration, error=str(e)) def _test_empty_event_data(self): """Test publishing empty/null event data.""" start = time.time() received = [] try: def handler(data): received.append(data) sub_id = self.api.subscribe("test.empty", handler) self.subscriptions.append(sub_id) self.api.publish("test.empty", None) self.api.publish("test.empty", {}) self.api.publish("test.empty", []) self.api.publish("test.empty", "") time.sleep(0.1) duration = (time.time() - start) * 1000 self._record_result( "Empty Event Data", len(received) == 4, duration, events_sent=4, events_received=len(received) ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Empty Event Data", False, duration, error=str(e)) def _test_large_event_data(self): """Test publishing large event payloads.""" start = time.time() received = [] try: def handler(data): received.append(data) sub_id = self.api.subscribe("test.large", handler) self.subscriptions.append(sub_id) # Large payload large_data = { "items": [{"id": i, "data": "x" * 100} for i in range(1000)] } self.api.publish("test.large", large_data) time.sleep(0.2) duration = (time.time() - start) * 1000 self._record_result( "Large Event Data", len(received) == 1, duration, events_sent=1, events_received=len(received), details={"payload_size_kb": len(str(large_data)) / 1024} ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Large Event Data", False, duration, error=str(e)) def _test_special_characters(self): """Test event types with special characters.""" start = time.time() received = [] try: def handler(data): received.append(data) special_events = [ "test.special.dot", "test-special-dash", "test_special_underscore", "test:special:colon", "test/special/slash" ] for event_type in special_events: sub_id = self.api.subscribe(event_type, handler) self.subscriptions.append(sub_id) self.api.publish(event_type, {"event": event_type}) time.sleep(0.2) duration = (time.time() - start) * 1000 self._record_result( "Special Characters", len(received) == len(special_events), duration, events_sent=len(special_events), events_received=len(received) ) except Exception as e: duration = (time.time() - start) * 1000 self._record_result("Special Characters", False, duration, error=str(e)) def _manual_publish_test(self): """Manual test - publish a test event.""" self.api.publish("test.manual", { "timestamp": datetime.now().isoformat(), "message": "Manual test event" }) self.api.show_notification("Event Published", "Test event sent to 'test.manual'") def _clear_results(self): """Clear all test results.""" self.results.clear() self.received_events.clear() self._update_widget_display() def _cleanup_subscriptions(self): """Clean up all test subscriptions.""" for sub_id in self.subscriptions: try: self.api.unsubscribe(sub_id) except: pass self.subscriptions.clear() def shutdown(self): """Clean up on shutdown.""" self._cleanup_subscriptions() if self.widget: self.widget.close() # Plugin entry point plugin_class = EventBusTestPlugin