diff --git a/core/api/external_api.py b/core/api/external_api.py index eeceb46..c6f25f6 100644 --- a/core/api/external_api.py +++ b/core/api/external_api.py @@ -348,7 +348,7 @@ class ExternalAPI: ) async def _handle_webhook(self, request): - """Handle incoming webhooks."""""" + """Handle incoming webhooks.""" from aiohttp import web name = request.match_info['name'] diff --git a/plugins/integration_tests/integration_browser/README.md b/plugins/integration_tests/integration_browser/README.md new file mode 100644 index 0000000..808d220 --- /dev/null +++ b/plugins/integration_tests/integration_browser/README.md @@ -0,0 +1,54 @@ +# Browser Extension Integration Tests + +Tests EU-Utility integration with browser extensions. + +## Communication Protocols + +### 1. Native Messaging +- stdin/stdout communication +- 32-bit length prefix +- JSON message format + +### 2. WebSocket Bridge +- Real-time bidirectional communication +- Port 8765 (configurable) +- Multiple client support + +### 3. HTTP API +- REST endpoints +- Port 8080 (configurable) +- CORS enabled for browser access + +## Message Format + +All messages use JSON format: + +```json +{ + "type": "message_type", + "id": "unique_id", + "data": {} +} +``` + +## Supported Message Types + +- `ping` - Connectivity check +- `get_status` - Request status +- `search` - Nexus search +- `loot_event` - Loot notification +- `global` - Global/HOF announcement + +## Compatibility + +| Browser | Native Messaging | WebSocket | HTTP | +|---------|-----------------|-----------|------| +| Chrome | ✅ | ✅ | ✅ | +| Firefox | ✅ | ✅ | ✅ | +| Edge | ✅ | ✅ | ✅ | + +## Manifest Generation + +The plugin can generate native messaging manifests for: +- Chrome/Edge (with `allowed_origins`) +- Firefox (with `allowed_extensions`) \ No newline at end of file diff --git a/plugins/integration_tests/integration_browser/plugin.json b/plugins/integration_tests/integration_browser/plugin.json new file mode 100644 index 0000000..b16338c --- /dev/null +++ b/plugins/integration_tests/integration_browser/plugin.json @@ -0,0 +1,13 @@ +{ + "name": "Browser Extension Tester", + "version": "1.0.0", + "author": "Integration Tester", + "description": "Test browser extension communication via Native Messaging, WebSocket, and HTTP", + "entry_point": "plugin.py", + "plugin_class": "BrowserExtensionTester", + "category": "integration_tests", + "dependencies": { + "pip": ["websockets"] + }, + "min_api_version": "2.0.0" +} \ No newline at end of file diff --git a/plugins/integration_tests/integration_browser/plugin.py b/plugins/integration_tests/integration_browser/plugin.py new file mode 100644 index 0000000..d93ed7f --- /dev/null +++ b/plugins/integration_tests/integration_browser/plugin.py @@ -0,0 +1,681 @@ +""" +EU-Utility Integration Test - Browser Extension +================================================ + +Tests browser extension communication protocols: +- Native messaging host +- WebSocket bridge +- HTTP API endpoints +- Message serialization + +Author: Integration Tester +Version: 1.0.0 +""" + +import json +import struct +import sys +import os +from typing import Dict, Any, Optional, List +from dataclasses import dataclass, asdict +from pathlib import Path + +from plugins.base_plugin import BasePlugin +from PyQt6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, + QTextEdit, QPushButton, QComboBox, QGroupBox, QTabWidget, + QTableWidget, QTableWidgetItem, QHeaderView, QFileDialog, + QMessageBox, QCheckBox +) +from PyQt6.QtCore import Qt, QThread, pyqtSignal + + +@dataclass +class BrowserProtocol: + """Browser communication protocol definition.""" + name: str + protocol: str # 'native_messaging', 'websocket', 'http' + browser: str # 'chrome', 'firefox', 'edge' + description: str + + +class BrowserExtensionTester(BasePlugin): + """Plugin for testing browser extension integration.""" + + name = "Browser Extension Tester" + version = "1.0.0" + author = "Integration Tester" + description = "Test browser extension communication protocols" + + # Native messaging manifest templates + CHROME_MANIFEST = { + "name": "eu_utility_native_host", + "description": "EU-Utility Native Messaging Host", + "path": "", # To be filled + "type": "stdio", + "allowed_origins": [ + "chrome-extension://*/" + ] + } + + FIREFOX_MANIFEST = { + "name": "eu_utility_native_host", + "description": "EU-Utility Native Messaging Host", + "path": "", # To be filled + "type": "stdio", + "allowed_extensions": [ + "eu-utility@impulsivefps" + ] + } + + # Test message types + TEST_MESSAGES = [ + { + "name": "Ping", + "message": {"type": "ping", "timestamp": 1234567890}, + "description": "Basic connectivity check" + }, + { + "name": "Get Status", + "message": {"type": "get_status"}, + "description": "Request current status" + }, + { + "name": "Search Request", + "message": { + "type": "search", + "query": "ArMatrix LP-35", + "entity_type": "weapons" + }, + "description": "Search via Nexus API" + }, + { + "name": "Loot Event", + "message": { + "type": "loot_event", + "data": { + "mob": "Atrox Young", + "value": 50.25, + "items": ["Animal Oil", "Animal Hide"] + } + }, + "description": "Forward loot event" + }, + { + "name": "Global Announcement", + "message": { + "type": "global", + "data": { + "value": 150.0, + "mob": "Atrox Alpha", + "player": "TestPlayer" + } + }, + "description": "Global/HOF announcement" + }, + ] + + def initialize(self): + """Initialize the tester.""" + self.log_info("Browser Extension Tester initialized") + self._native_host_path = "" + self._websocket_port = 8765 + self._http_port = 8080 + self._message_log: List[Dict] = [] + + def get_ui(self) -> QWidget: + """Create the plugin UI.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + # Title + title = QLabel("Browser Extension Integration Tester") + title.setStyleSheet("font-size: 16px; font-weight: bold;") + layout.addWidget(title) + + desc = QLabel("Test browser extension communication via Native Messaging, WebSocket, and HTTP") + desc.setWordWrap(True) + layout.addWidget(desc) + + # Tabs + tabs = QTabWidget() + + # Setup tab + tabs.addTab(self._create_setup_tab(), "Setup") + + # Native Messaging tab + tabs.addTab(self._create_native_messaging_tab(), "Native Messaging") + + # WebSocket tab + tabs.addTab(self._create_websocket_tab(), "WebSocket") + + # HTTP API tab + tabs.addTab(self._create_http_tab(), "HTTP API") + + # Message Tester tab + tabs.addTab(self._create_message_tab(), "Message Tester") + + # Log tab + tabs.addTab(self._create_log_tab(), "Message Log") + + layout.addWidget(tabs) + + return widget + + def _create_setup_tab(self) -> QWidget: + """Create the setup tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + # Browser selection + browser_group = QGroupBox("Browser Configuration") + browser_layout = QVBoxLayout(browser_group) + + browser_row = QHBoxLayout() + browser_row.addWidget(QLabel("Browser:")) + self.browser_select = QComboBox() + self.browser_select.addItems(["Chrome", "Firefox", "Edge"]) + browser_row.addWidget(self.browser_select) + browser_layout.addLayout(browser_row) + + # Native host path + host_row = QHBoxLayout() + host_row.addWidget(QLabel("Native Host Path:")) + self.host_path_input = QLineEdit() + self.host_path_input.setPlaceholderText("Path to native messaging host executable") + host_row.addWidget(self.host_path_input) + + browse_btn = QPushButton("Browse...") + browse_btn.clicked.connect(self._browse_host_path) + host_row.addWidget(browse_btn) + browser_layout.addLayout(host_row) + + # Generate manifest button + manifest_btn = QPushButton("Generate Manifest") + manifest_btn.clicked.connect(self._generate_manifest) + browser_layout.addWidget(manifest_btn) + + layout.addWidget(browser_group) + + # Installation instructions + instructions = QTextEdit() + instructions.setReadOnly(True) + instructions.setHtml(""" +

Browser Extension Setup

+ +

Chrome/Edge:

+
    +
  1. Generate the native messaging host manifest
  2. +
  3. Copy to: %LOCALAPPDATA%\\Google\\Chrome\\User Data\\NativeMessagingHosts/
  4. +
  5. Install the browser extension
  6. +
+ +

Firefox:

+
    +
  1. Generate the native messaging host manifest
  2. +
  3. Copy to: %APPDATA%\\Mozilla\\NativeMessagingHosts/
  4. +
  5. Install the browser extension
  6. +
+ """) + layout.addWidget(instructions) + + layout.addStretch() + return widget + + def _create_native_messaging_tab(self) -> QWidget: + """Create the native messaging tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("Native Messaging Protocol Test")) + + # Status + self.native_status = QLabel("Status: Not connected") + layout.addWidget(self.native_status) + + # Test messages + layout.addWidget(QLabel("Test Messages:")) + self.native_msg_list = QComboBox() + for msg in self.TEST_MESSAGES: + self.native_msg_list.addItem(f"{msg['name']}: {msg['description']}") + layout.addWidget(self.native_msg_list) + + # Message preview + layout.addWidget(QLabel("Message Preview:")) + self.native_preview = QTextEdit() + self.native_preview.setReadOnly(True) + self.native_preview.setMaximumHeight(100) + layout.addWidget(self.native_preview) + + # Update preview on selection change + self.native_msg_list.currentIndexChanged.connect(self._update_native_preview) + self._update_native_preview(0) + + # Send button + send_btn = QPushButton("Send Test Message") + send_btn.clicked.connect(self._send_native_message) + layout.addWidget(send_btn) + + # Simulate browser response + sim_btn = QPushButton("Simulate Browser Response") + sim_btn.clicked.connect(self._simulate_browser_response) + layout.addWidget(sim_btn) + + # Protocol info + info = QTextEdit() + info.setReadOnly(True) + info.setHtml(""" +

Native Messaging Protocol

+

Messages are sent over stdin/stdout with a 32-bit length prefix:

+
[length: 4 bytes][JSON message: N bytes]
+

All messages must be valid JSON.

+ """) + layout.addWidget(info) + + return widget + + def _create_websocket_tab(self) -> QWidget: + """Create the WebSocket tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("WebSocket Bridge Test")) + + # Port configuration + port_row = QHBoxLayout() + port_row.addWidget(QLabel("WebSocket Port:")) + self.ws_port_input = QLineEdit("8765") + port_row.addWidget(self.ws_port_input) + layout.addLayout(port_row) + + # Control buttons + btn_layout = QHBoxLayout() + + self.start_ws_btn = QPushButton("Start Server") + self.start_ws_btn.clicked.connect(self._start_websocket) + btn_layout.addWidget(self.start_ws_btn) + + self.stop_ws_btn = QPushButton("Stop Server") + self.stop_ws_btn.clicked.connect(self._stop_websocket) + self.stop_ws_btn.setEnabled(False) + btn_layout.addWidget(self.stop_ws_btn) + + layout.addLayout(btn_layout) + + # Status + self.ws_status = QLabel("Status: Server not running") + layout.addWidget(self.ws_status) + + # Connected clients + self.ws_clients = QLabel("Connected clients: 0") + layout.addWidget(self.ws_clients) + + # Message tester + layout.addWidget(QLabel("Send Message to All Clients:")) + self.ws_message = QTextEdit() + self.ws_message.setPlaceholderText('{"type": "test", "data": "Hello from EU-Utility"}') + self.ws_message.setMaximumHeight(80) + layout.addWidget(self.ws_message) + + broadcast_btn = QPushButton("Broadcast") + broadcast_btn.clicked.connect(self._broadcast_websocket) + layout.addWidget(broadcast_btn) + + # Received messages + layout.addWidget(QLabel("Received Messages:")) + self.ws_received = QTextEdit() + self.ws_received.setReadOnly(True) + layout.addWidget(self.ws_received) + + return widget + + def _create_http_tab(self) -> QWidget: + """Create the HTTP API tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("HTTP API Bridge Test")) + + # Port configuration + port_row = QHBoxLayout() + port_row.addWidget(QLabel("HTTP Port:")) + self.http_port_input = QLineEdit("8080") + port_row.addWidget(self.http_port_input) + layout.addLayout(port_row) + + # Control buttons + btn_layout = QHBoxLayout() + + self.start_http_btn = QPushButton("Start Server") + self.start_http_btn.clicked.connect(self._start_http) + btn_layout.addWidget(self.start_http_btn) + + self.stop_http_btn = QPushButton("Stop Server") + self.stop_http_btn.clicked.connect(self._stop_http) + self.stop_http_btn.setEnabled(False) + btn_layout.addWidget(self.stop_http_btn) + + layout.addLayout(btn_layout) + + # Status + self.http_status = QLabel("Status: Server not running") + layout.addWidget(self.http_status) + + # Endpoints list + layout.addWidget(QLabel("Available Endpoints:")) + endpoints = QTextEdit() + endpoints.setReadOnly(True) + endpoints.setHtml(""" + + + + + + + +
MethodEndpointDescription
GET/healthHealth check
GET/api/statusGet EU-Utility status
POST/api/notifySend notification
POST/api/searchSearch Nexus
GET/api/loot/sessionGet loot session
+ """) + layout.addWidget(endpoints) + + # Test request + layout.addWidget(QLabel("Test Request:")) + test_layout = QHBoxLayout() + self.http_method = QComboBox() + self.http_method.addItems(["GET", "POST"]) + test_layout.addWidget(self.http_method) + + self.http_endpoint = QLineEdit("/api/status") + test_layout.addWidget(self.http_endpoint) + + test_btn = QPushButton("Send") + test_btn.clicked.connect(self._test_http_request) + test_layout.addWidget(test_btn) + + layout.addLayout(test_layout) + + # Response + layout.addWidget(QLabel("Response:")) + self.http_response = QTextEdit() + self.http_response.setReadOnly(True) + layout.addWidget(self.http_response) + + return widget + + def _create_message_tab(self) -> QWidget: + """Create the message tester tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("Message Protocol Tester")) + + # Message type + msg_row = QHBoxLayout() + msg_row.addWidget(QLabel("Message Type:")) + self.msg_type = QComboBox() + self.msg_type.addItems(["Native", "WebSocket", "HTTP"]) + msg_row.addWidget(self.msg_type) + layout.addLayout(msg_row) + + # JSON input + layout.addWidget(QLabel("JSON Message:")) + self.msg_input = QTextEdit() + self.msg_input.setPlaceholderText('Enter JSON message here...') + layout.addWidget(self.msg_input) + + # Validate button + validate_btn = QPushButton("Validate JSON") + validate_btn.clicked.connect(self._validate_json) + layout.addWidget(validate_btn) + + # Encode/Decode + btn_layout = QHBoxLayout() + + encode_btn = QPushButton("Encode for Native Messaging") + encode_btn.clicked.connect(self._encode_native) + btn_layout.addWidget(encode_btn) + + decode_btn = QPushButton("Decode Native Message") + decode_btn.clicked.connect(self._decode_native) + btn_layout.addWidget(decode_btn) + + layout.addLayout(btn_layout) + + # Result + layout.addWidget(QLabel("Result:")) + self.msg_result = QTextEdit() + self.msg_result.setReadOnly(True) + layout.addWidget(self.msg_result) + + # Preset buttons + layout.addWidget(QLabel("Load Preset:")) + preset_layout = QHBoxLayout() + + for msg in self.TEST_MESSAGES: + btn = QPushButton(msg['name']) + btn.clicked.connect(lambda checked, m=msg: self._load_preset(m)) + preset_layout.addWidget(btn) + + layout.addLayout(preset_layout) + + return widget + + def _create_log_tab(self) -> QWidget: + """Create the message log tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("Message Log")) + + # Log table + self.log_table = QTableWidget() + self.log_table.setColumnCount(4) + self.log_table.setHorizontalHeaderLabels(["Time", "Direction", "Protocol", "Message"]) + self.log_table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) + layout.addWidget(self.log_table) + + # Buttons + btn_layout = QHBoxLayout() + + clear_btn = QPushButton("Clear Log") + clear_btn.clicked.connect(self._clear_log) + btn_layout.addWidget(clear_btn) + + export_btn = QPushButton("Export Log") + export_btn.clicked.connect(self._export_log) + btn_layout.addWidget(export_btn) + + layout.addLayout(btn_layout) + + return widget + + def _browse_host_path(self): + """Browse for native host executable.""" + path, _ = QFileDialog.getOpenFileName( + self.get_ui(), + "Select Native Host Executable", + "", + "Executables (*.exe *.py *.sh);;All Files (*)" + ) + if path: + self.host_path_input.setText(path) + self._native_host_path = path + + def _generate_manifest(self): + """Generate native messaging manifest.""" + browser = self.browser_select.currentText() + host_path = self.host_path_input.text() + + if not host_path: + QMessageBox.warning(self.get_ui(), "Missing Path", "Please select the native host executable path") + return + + # Select appropriate template + if browser in ["Chrome", "Edge"]: + manifest = self.CHROME_MANIFEST.copy() + else: + manifest = self.FIREFOX_MANIFEST.copy() + + manifest["path"] = host_path + + # Get save location + save_path, _ = QFileDialog.getSaveFileName( + self.get_ui(), + "Save Manifest", + f"eu_utility_native_host.json", + "JSON Files (*.json)" + ) + + if save_path: + with open(save_path, 'w') as f: + json.dump(manifest, f, indent=2) + + self.notify_success("Manifest Generated", f"Saved to: {save_path}") + + def _update_native_preview(self, index: int): + """Update native messaging preview.""" + if 0 <= index < len(self.TEST_MESSAGES): + msg = self.TEST_MESSAGES[index] + self.native_preview.setText(json.dumps(msg['message'], indent=2)) + + def _send_native_message(self): + """Send a test native messaging message.""" + index = self.native_msg_list.currentIndex() + if index < 0: + return + + msg = self.TEST_MESSAGES[index] + self._log_message("OUT", "native", msg['message']) + + # Simulate sending + self.notify_info("Message Sent", f"Sent: {msg['name']}") + + def _simulate_browser_response(self): + """Simulate a browser response.""" + response = { + "type": "response", + "id": "test-123", + "success": True, + "data": {"message": "Browser received the message"} + } + self._log_message("IN", "native", response) + self.notify_info("Response Simulated", "Browser response added to log") + + def _start_websocket(self): + """Start WebSocket server.""" + self.notify_info("WebSocket", "Starting WebSocket server...") + self.ws_status.setText("Status: Starting...") + self.start_ws_btn.setEnabled(False) + self.stop_ws_btn.setEnabled(True) + + def _stop_websocket(self): + """Stop WebSocket server.""" + self.notify_info("WebSocket", "Stopping WebSocket server...") + self.ws_status.setText("Status: Server not running") + self.start_ws_btn.setEnabled(True) + self.stop_ws_btn.setEnabled(False) + + def _broadcast_websocket(self): + """Broadcast message to WebSocket clients.""" + message = self.ws_message.toPlainText() + if not message: + return + + try: + msg_data = json.loads(message) + self._log_message("OUT", "websocket", msg_data) + self.notify_info("Broadcast", "Message broadcast to clients") + except json.JSONDecodeError: + self.notify_error("Invalid JSON", "Message is not valid JSON") + + def _start_http(self): + """Start HTTP server.""" + self.notify_info("HTTP", "Starting HTTP API server...") + self.http_status.setText("Status: Running on http://localhost:8080") + self.start_http_btn.setEnabled(False) + self.stop_http_btn.setEnabled(True) + + def _stop_http(self): + """Stop HTTP server.""" + self.notify_info("HTTP", "Stopping HTTP API server...") + self.http_status.setText("Status: Server not running") + self.start_http_btn.setEnabled(True) + self.stop_http_btn.setEnabled(False) + + def _test_http_request(self): + """Test HTTP request.""" + method = self.http_method.currentText() + endpoint = self.http_endpoint.text() + + self.http_response.setText(f"Simulated {method} {endpoint}\n\nResponse: 200 OK") + + def _validate_json(self): + """Validate JSON input.""" + text = self.msg_input.toPlainText() + try: + data = json.loads(text) + self.msg_result.setText(f"✅ Valid JSON\n\n{json.dumps(data, indent=2)}") + except json.JSONDecodeError as e: + self.msg_result.setText(f"❌ Invalid JSON: {str(e)}") + + def _encode_native(self): + """Encode message for native messaging.""" + text = self.msg_input.toPlainText() + try: + data = json.loads(text) + message = json.dumps(data) + # Native messaging format: 32-bit length prefix + JSON + encoded = struct.pack('I', len(message)) + message.encode('utf-8') + self.msg_result.setText(f"Encoded ({len(encoded)} bytes):\n{encoded.hex()}") + except json.JSONDecodeError: + self.msg_result.setText("❌ Invalid JSON") + + def _decode_native(self): + """Decode native messaging format.""" + # This would decode hex input back to JSON + self.msg_result.setText("Decode functionality - paste hex bytes to decode") + + def _load_preset(self, msg: Dict): + """Load a message preset.""" + self.msg_input.setText(json.dumps(msg['message'], indent=2)) + + def _log_message(self, direction: str, protocol: str, message: Dict): + """Log a message.""" + from datetime import datetime + + entry = { + "time": datetime.now().strftime("%H:%M:%S"), + "direction": direction, + "protocol": protocol, + "message": json.dumps(message)[:100] + } + + self._message_log.append(entry) + + # Update table + row = self.log_table.rowCount() + self.log_table.insertRow(row) + self.log_table.setItem(row, 0, QTableWidgetItem(entry["time"])) + self.log_table.setItem(row, 1, QTableWidgetItem(entry["direction"])) + self.log_table.setItem(row, 2, QTableWidgetItem(entry["protocol"])) + self.log_table.setItem(row, 3, QTableWidgetItem(entry["message"])) + + def _clear_log(self): + """Clear message log.""" + self._message_log.clear() + self.log_table.setRowCount(0) + + def _export_log(self): + """Export message log.""" + if not self._message_log: + self.notify_warning("Empty Log", "No messages to export") + return + + filename = f"browser_protocol_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + with open(filename, 'w') as f: + json.dump(self._message_log, f, indent=2) + + self.notify_success("Exported", f"Log saved to {filename}") + + +plugin_class = BrowserExtensionTester \ No newline at end of file diff --git a/plugins/integration_tests/platform_compat/README.md b/plugins/integration_tests/platform_compat/README.md new file mode 100644 index 0000000..c7e5c1b --- /dev/null +++ b/plugins/integration_tests/platform_compat/README.md @@ -0,0 +1,57 @@ +# Platform Compatibility Tests + +Tests EU-Utility cross-platform compatibility. + +## Test Categories + +### Path Tests +- Path separator handling (`/` vs `\`) +- Path resolution +- Unicode path support +- Long path support (Windows) +- UNC path support (Windows) + +### File Tests +- File locking (fcntl vs portalocker) +- File permissions +- Temp directory access + +### Process Tests +- Process enumeration +- Window enumeration (Windows) + +### System Tests +- CPU information +- Memory information +- Environment variables + +## Platform-Specific Features + +### Windows +- Window Manager (full support) +- Native hotkeys (full support) +- File locking via portalocker +- Long path support (10 1607+) + +### Linux +- Window Manager (limited) +- Global hotkeys (via xbindkeys) +- File locking via fcntl +- Native file permissions + +### macOS +- Window Manager (limited) +- Global hotkeys (limited) +- File locking via fcntl + +## Compatibility Matrix + +| Feature | Windows | Linux | macOS | +|---------|---------|-------|-------| +| Window Manager | ✅ Full | ⚠️ Limited | ⚠️ Limited | +| File Locking | ✅ portalocker | ✅ fcntl | ✅ fcntl | +| Long Paths | ✅* | ✅ | ✅ | +| Unicode Paths | ✅ | ✅ | ✅ | +| Process Enum | ✅ | ✅ | ✅ | + +*Requires registry key on Windows 10 \ No newline at end of file diff --git a/plugins/integration_tests/platform_compat/plugin.json b/plugins/integration_tests/platform_compat/plugin.json new file mode 100644 index 0000000..e5404df --- /dev/null +++ b/plugins/integration_tests/platform_compat/plugin.json @@ -0,0 +1,13 @@ +{ + "name": "Platform Compatibility Tester", + "version": "1.0.0", + "author": "Integration Tester", + "description": "Test cross-platform compatibility and platform-specific features", + "entry_point": "plugin.py", + "plugin_class": "PlatformCompatibilityTester", + "category": "integration_tests", + "dependencies": { + "pip": ["psutil"] + }, + "min_api_version": "2.0.0" +} \ No newline at end of file diff --git a/plugins/integration_tests/platform_compat/plugin.py b/plugins/integration_tests/platform_compat/plugin.py new file mode 100644 index 0000000..024ec98 --- /dev/null +++ b/plugins/integration_tests/platform_compat/plugin.py @@ -0,0 +1,849 @@ +""" +EU-Utility Integration Test - Platform Compatibility +===================================================== + +Tests cross-platform compatibility: +- Windows-specific features +- Linux compatibility +- Path handling differences +- File locking mechanisms +- Process management + +Author: Integration Tester +Version: 1.0.0 +""" + +import sys +import os +import platform +import subprocess +import threading +import tempfile +from pathlib import Path +from typing import Dict, Any, List, Tuple, Optional +from dataclasses import dataclass, asdict +from datetime import datetime + +from plugins.base_plugin import BasePlugin +from PyQt6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, + QTextEdit, QTableWidget, QTableWidgetItem, QHeaderView, + QGroupBox, QProgressBar, QTabWidget +) +from PyQt6.QtCore import Qt + + +@dataclass +class PlatformTest: + """Platform compatibility test case.""" + name: str + category: str # 'path', 'file', 'process', 'system' + description: str + windows_only: bool = False + linux_only: bool = False + + +class PlatformCompatibilityTester(BasePlugin): + """Plugin for testing cross-platform compatibility.""" + + name = "Platform Compatibility Tester" + version = "1.0.0" + author = "Integration Tester" + description = "Test cross-platform compatibility and platform-specific features" + + # Platform info + IS_WINDOWS = sys.platform == 'win32' + IS_LINUX = sys.platform == 'linux' + IS_MAC = sys.platform == 'darwin' + + # Test cases + TEST_CASES = [ + PlatformTest("Path Separator", "path", "Test path separator handling"), + PlatformTest("Path Resolution", "path", "Test path resolution"), + PlatformTest("Unicode Paths", "path", "Test Unicode in paths"), + PlatformTest("Long Paths", "path", "Test long path handling (Windows)", windows_only=True), + PlatformTest("UNC Paths", "path", "Test UNC path handling (Windows)", windows_only=True), + PlatformTest("File Locking - fcntl", "file", "Test fcntl file locking (Linux/Mac)", linux_only=True), + PlatformTest("File Locking - portalocker", "file", "Test portalocker file locking (Windows)"), + PlatformTest("File Permissions", "file", "Test file permission handling"), + PlatformTest("Temp Directory", "file", "Test temp directory access"), + PlatformTest("Process List", "process", "Test process enumeration"), + PlatformTest("Window Enumeration", "process", "Test window enumeration (Windows)", windows_only=True), + PlatformTest("CPU Info", "system", "Test CPU information retrieval"), + PlatformTest("Memory Info", "system", "Test memory information retrieval"), + PlatformTest("Environment Variables", "system", "Test environment variable handling"), + ] + + def initialize(self): + """Initialize the tester.""" + self.log_info("Platform Compatibility Tester initialized") + self._test_results: List[Dict] = [] + + def get_ui(self) -> QWidget: + """Create the plugin UI.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + # Title + title = QLabel("Platform Compatibility Tester") + title.setStyleSheet("font-size: 16px; font-weight: bold;") + layout.addWidget(title) + + # Platform info banner + self._create_platform_banner(layout) + + # Tabs + tabs = QTabWidget() + + # Overview tab + tabs.addTab(self._create_overview_tab(), "Overview") + + # Path Tests tab + tabs.addTab(self._create_path_tab(), "Path Tests") + + # File Tests tab + tabs.addTab(self._create_file_tab(), "File Tests") + + # System Tests tab + tabs.addTab(self._create_system_tab(), "System Tests") + + # Test Results tab + tabs.addTab(self._create_results_tab(), "Test Results") + + layout.addWidget(tabs) + + return widget + + def _create_platform_banner(self, parent_layout): + """Create the platform info banner.""" + banner = QGroupBox("Current Platform") + banner_layout = QHBoxLayout(banner) + + # Platform icon/color + if self.IS_WINDOWS: + platform_text = "🪟 Windows" + color = "#0078D4" + elif self.IS_LINUX: + platform_text = "🐧 Linux" + color = "#FCC624" + elif self.IS_MAC: + platform_text = "🍎 macOS" + color = "#999999" + else: + platform_text = f"❓ {sys.platform}" + color = "#888888" + + # Platform label + platform_label = QLabel(f"Platform: {platform_text}") + platform_label.setStyleSheet(f"color: {color}; font-weight: bold;") + banner_layout.addWidget(platform_label) + + # Python version + python_label = QLabel(f"Python: {platform.python_version()}") + banner_layout.addWidget(python_label) + + # Architecture + arch_label = QLabel(f"Arch: {platform.machine()}") + banner_layout.addWidget(arch_label) + + # Processor info + proc_label = QLabel(f"Processor: {platform.processor() or 'Unknown'}") + banner_layout.addWidget(proc_label) + + banner_layout.addStretch() + parent_layout.addWidget(banner) + + def _create_overview_tab(self) -> QWidget: + """Create the overview tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + # Run all tests button + run_btn = QPushButton("Run All Compatibility Tests") + run_btn.setStyleSheet("font-size: 14px; padding: 10px;") + run_btn.clicked.connect(self._run_all_tests) + layout.addWidget(run_btn) + + # Platform details + details = QTextEdit() + details.setReadOnly(True) + details.setHtml(self._get_platform_details_html()) + layout.addWidget(details) + + # Feature support matrix + layout.addWidget(QLabel("Feature Support Matrix:")) + matrix = QTextEdit() + matrix.setReadOnly(True) + matrix.setMaximumHeight(200) + matrix.setHtml(self._get_feature_matrix_html()) + layout.addWidget(matrix) + + return widget + + def _create_path_tab(self) -> QWidget: + """Create the path tests tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("Path Handling Tests")) + + # Path test results + self.path_results = QTextEdit() + self.path_results.setReadOnly(True) + layout.addWidget(self.path_results) + + # Test buttons + btn_layout = QHBoxLayout() + + test_sep_btn = QPushButton("Test Separators") + test_sep_btn.clicked.connect(self._test_path_separators) + btn_layout.addWidget(test_sep_btn) + + test_resolve_btn = QPushButton("Test Resolution") + test_resolve_btn.clicked.connect(self._test_path_resolution) + btn_layout.addWidget(test_resolve_btn) + + test_unicode_btn = QPushButton("Test Unicode") + test_unicode_btn.clicked.connect(self._test_unicode_paths) + btn_layout.addWidget(test_unicode_btn) + + if self.IS_WINDOWS: + test_long_btn = QPushButton("Test Long Paths") + test_long_btn.clicked.connect(self._test_long_paths) + btn_layout.addWidget(test_long_btn) + + layout.addLayout(btn_layout) + + return widget + + def _create_file_tab(self) -> QWidget: + """Create the file tests tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("File Operations Tests")) + + # File locking section + locking_group = QGroupBox("File Locking Tests") + locking_layout = QVBoxLayout(locking_group) + + self.locking_results = QTextEdit() + self.locking_results.setReadOnly(True) + self.locking_results.setMaximumHeight(150) + locking_layout.addWidget(self.locking_results) + + lock_btn_layout = QHBoxLayout() + + test_fcntl_btn = QPushButton("Test fcntl") + test_fcntl_btn.clicked.connect(self._test_fcntl_locking) + test_fcntl_btn.setEnabled(self.IS_LINUX or self.IS_MAC) + lock_btn_layout.addWidget(test_fcntl_btn) + + test_portalocker_btn = QPushButton("Test portalocker") + test_portalocker_btn.clicked.connect(self._test_portalocker_locking) + lock_btn_layout.addWidget(test_portalocker_btn) + + test_threading_btn = QPushButton("Test Threading Lock") + test_threading_btn.clicked.connect(self._test_threading_lock) + lock_btn_layout.addWidget(test_threading_btn) + + locking_layout.addLayout(lock_btn_layout) + layout.addWidget(locking_group) + + # Permissions section + perm_group = QGroupBox("Permission Tests") + perm_layout = QVBoxLayout(perm_group) + + self.perm_results = QTextEdit() + self.perm_results.setReadOnly(True) + self.perm_results.setMaximumHeight(100) + perm_layout.addWidget(self.perm_results) + + test_perm_btn = QPushButton("Test File Permissions") + test_perm_btn.clicked.connect(self._test_file_permissions) + perm_layout.addWidget(test_perm_btn) + + layout.addWidget(perm_group) + + layout.addStretch() + return widget + + def _create_system_tab(self) -> QWidget: + """Create the system tests tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("System Information Tests")) + + # System info display + self.system_info = QTextEdit() + self.system_info.setReadOnly(True) + layout.addWidget(self.system_info) + + # Test buttons + btn_layout = QHBoxLayout() + + cpu_btn = QPushButton("Get CPU Info") + cpu_btn.clicked.connect(self._test_cpu_info) + btn_layout.addWidget(cpu_btn) + + mem_btn = QPushButton("Get Memory Info") + mem_btn.clicked.connect(self._test_memory_info) + btn_layout.addWidget(mem_btn) + + env_btn = QPushButton("Test Environment") + env_btn.clicked.connect(self._test_environment) + btn_layout.addWidget(env_btn) + + if self.IS_WINDOWS: + window_btn = QPushButton("Test Window Enum") + window_btn.clicked.connect(self._test_window_enumeration) + btn_layout.addWidget(window_btn) + + layout.addLayout(btn_layout) + + return widget + + def _create_results_tab(self) -> QWidget: + """Create the results tab.""" + widget = QWidget() + layout = QVBoxLayout(widget) + + layout.addWidget(QLabel("Test Results")) + + # Results table + self.results_table = QTableWidget() + self.results_table.setColumnCount(5) + self.results_table.setHorizontalHeaderLabels([ + "Test Name", "Category", "Status", "Details", "Platform" + ]) + self.results_table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) + layout.addWidget(self.results_table) + + # Summary + self.results_summary = QLabel("No tests run yet") + layout.addWidget(self.results_summary) + + # Export button + export_btn = QPushButton("Export Results") + export_btn.clicked.connect(self._export_results) + layout.addWidget(export_btn) + + return widget + + def _get_platform_details_html(self) -> str: + """Get platform details as HTML.""" + details = [] + details.append("

Platform Details

") + details.append("") + + info = [ + ("System", platform.system()), + ("Release", platform.release()), + ("Version", platform.version()), + ("Machine", platform.machine()), + ("Processor", platform.processor() or 'Unknown'), + ("Platform", platform.platform()), + ("Python Version", platform.python_version()), + ("Python Implementation", platform.python_implementation()), + ("Python Compiler", platform.python_compiler()), + ("Default Encoding", sys.getdefaultencoding()), + ("Filesystem Encoding", sys.getfilesystemencoding()), + ("Max Unicode", f"U+{sys.maxunicode:X}"), + ("Executable", sys.executable), + ("Prefix", sys.prefix), + ] + + for key, value in info: + details.append(f"") + + details.append("
{key}{value}
") + return "\n".join(details) + + def _get_feature_matrix_html(self) -> str: + """Get feature support matrix as HTML.""" + matrix = [] + matrix.append("") + matrix.append("") + + features = [ + ("Window Manager", "✅", "❌", "❌"), + ("Native Hotkeys", "✅", "⚠️ Limited", "⚠️ Limited"), + ("Global Hotkeys", "✅", "✅", "✅"), + ("File Locking (fcntl)", "❌", "✅", "✅"), + ("File Locking (portalocker)", "✅", "⚠️ Optional", "⚠️ Optional"), + ("OCR (EasyOCR)", "✅", "✅", "✅"), + ("OCR (Tesseract)", "✅", "✅", "✅"), + ("Discord Webhooks", "✅", "✅", "✅"), + ("MQTT", "✅", "✅", "✅"), + ("WebSocket", "✅", "✅", "✅"), + ("Long Paths (>260)", "✅*", "✅", "✅"), + ] + + for feature, win, lin, mac in features: + matrix.append(f"") + + matrix.append("
FeatureWindowsLinuxmacOS
{feature}{win}{lin}{mac}
") + matrix.append("

* Requires Windows 10 1607+ with long path support enabled

") + return "\n".join(matrix) + + def _run_all_tests(self): + """Run all compatibility tests.""" + self._test_results.clear() + + # Clear and setup results table + self.results_table.setRowCount(0) + + for test in self.TEST_CASES: + # Skip platform-specific tests that don't apply + if test.windows_only and not self.IS_WINDOWS: + continue + if test.linux_only and not self.IS_LINUX: + continue + + result = self._run_single_test(test) + self._test_results.append(result) + self._add_result_to_table(result) + + self._update_results_summary() + + def _run_single_test(self, test: PlatformTest) -> Dict: + """Run a single test.""" + result = { + "name": test.name, + "category": test.category, + "description": test.description, + "success": False, + "details": "", + "platform": "Windows" if test.windows_only else ("Linux" if test.linux_only else "All") + } + + try: + if test.category == "path": + result.update(self._run_path_test(test)) + elif test.category == "file": + result.update(self._run_file_test(test)) + elif test.category == "process": + result.update(self._run_process_test(test)) + elif test.category == "system": + result.update(self._run_system_test(test)) + except Exception as e: + result["details"] = f"Error: {str(e)}" + + return result + + def _run_path_test(self, test: PlatformTest) -> Dict: + """Run path-related tests.""" + result = {"success": True, "details": ""} + + if test.name == "Path Separator": + sep = os.sep + alt_sep = os.altsep + result["details"] = f"Primary: '{sep}', Alt: {repr(alt_sep)}" + + elif test.name == "Path Resolution": + test_path = Path("~").expanduser() + result["details"] = f"Home resolved to: {test_path}" + + elif test.name == "Unicode Paths": + # Test creating a path with Unicode + test_dir = Path(tempfile.gettempdir()) / "EU_Utility_测试" + result["details"] = f"Unicode path: {test_dir}" + + elif test.name == "Long Paths": + if self.IS_WINDOWS: + # Windows long path test + result["details"] = "Long path support varies by Windows version" + + return result + + def _run_file_test(self, test: PlatformTest) -> Dict: + """Run file-related tests.""" + result = {"success": True, "details": ""} + + if test.name == "File Locking - fcntl": + if self.IS_LINUX or self.IS_MAC: + try: + import fcntl + result["details"] = "fcntl module available" + except ImportError: + result["success"] = False + result["details"] = "fcntl not available" + else: + result["success"] = False + result["details"] = "Not applicable on Windows" + + elif test.name == "File Locking - portalocker": + try: + import portalocker + result["details"] = "portalocker available" + except ImportError: + result["success"] = False + result["details"] = "portalocker not installed" + + elif test.name == "File Permissions": + test_file = Path(tempfile.mktemp()) + test_file.touch() + try: + mode = test_file.stat().st_mode + result["details"] = f"File mode: {oct(mode)}" + finally: + test_file.unlink() + + return result + + def _run_process_test(self, test: PlatformTest) -> Dict: + """Run process-related tests.""" + result = {"success": True, "details": ""} + + if test.name == "Process List": + # Simple process list test + result["details"] = f"Current PID: {os.getpid()}" + + elif test.name == "Window Enumeration": + if self.IS_WINDOWS: + try: + import ctypes + result["details"] = "ctypes available for window enum" + except ImportError: + result["success"] = False + result["details"] = "ctypes not available" + else: + result["success"] = False + result["details"] = "Not applicable on this platform" + + return result + + def _run_system_test(self, test: PlatformTest) -> Dict: + """Run system-related tests.""" + result = {"success": True, "details": ""} + + if test.name == "CPU Info": + result["details"] = f"Processor: {platform.processor() or 'Unknown'}" + + elif test.name == "Memory Info": + result["details"] = "Memory info requires psutil or platform-specific APIs" + + elif test.name == "Environment Variables": + path_var = os.environ.get('PATH', '')[:100] + result["details"] = f"PATH starts with: {path_var}..." + + return result + + def _add_result_to_table(self, result: Dict): + """Add a result to the results table.""" + row = self.results_table.rowCount() + self.results_table.insertRow(row) + + self.results_table.setItem(row, 0, QTableWidgetItem(result["name"])) + self.results_table.setItem(row, 1, QTableWidgetItem(result["category"])) + + status = "✅ PASS" if result["success"] else "❌ FAIL" + status_item = QTableWidgetItem(status) + self.results_table.setItem(row, 2, status_item) + + self.results_table.setItem(row, 3, QTableWidgetItem(result["details"])) + self.results_table.setItem(row, 4, QTableWidgetItem(result["platform"])) + + def _update_results_summary(self): + """Update results summary.""" + passed = sum(1 for r in self._test_results if r["success"]) + total = len(self._test_results) + + self.results_summary.setText(f"Results: {passed}/{total} tests passed") + + def _test_path_separators(self): + """Test path separators.""" + results = [] + results.append(f"os.sep = '{os.sep}'") + results.append(f"os.altsep = {repr(os.altsep)}") + results.append(f"os.pathsep = '{os.pathsep}'") + results.append(f"os.linesep = {repr(os.linesep)}") + results.append("") + results.append("Pathlib tests:") + results.append(f"Path('/foo/bar').parts = {Path('/foo/bar').parts}") + + self.path_results.setText("\n".join(results)) + + def _test_path_resolution(self): + """Test path resolution.""" + results = [] + results.append(f"Path.home() = {Path.home()}") + results.append(f"Path.cwd() = {Path.cwd()}") + results.append(f"Path('~').expanduser() = {Path('~').expanduser()}") + + # Test absolute/relative + test_path = Path("relative/path") + results.append(f"Path('relative/path').is_absolute() = {test_path.is_absolute()}") + results.append(f"Path('relative/path').resolve() = {test_path.resolve()}") + + self.path_results.setText("\n".join(results)) + + def _test_unicode_paths(self): + """Test Unicode path handling.""" + results = [] + + test_names = [ + "test_文件", + "test_🎮", + "test_ñáéíóú", + "test_مرحبا", + ] + + for name in test_names: + try: + test_path = Path(tempfile.gettempdir()) / name + test_path.mkdir(exist_ok=True) + exists = test_path.exists() + test_path.rmdir() + results.append(f"✅ {name}: OK") + except Exception as e: + results.append(f"❌ {name}: {str(e)}") + + self.path_results.setText("\n".join(results)) + + def _test_long_paths(self): + """Test long path handling.""" + results = [] + results.append("Long path support:") + results.append(f"Windows version: {platform.version()}") + results.append("") + results.append("Note: Windows 10 1607+ supports long paths") + results.append("when registry key is set and manifest declares support.") + + self.path_results.setText("\n".join(results)) + + def _test_fcntl_locking(self): + """Test fcntl file locking.""" + results = [] + + try: + import fcntl + results.append("✅ fcntl module available") + + # Create test file + test_file = tempfile.NamedTemporaryFile(delete=False) + test_path = test_file.name + test_file.close() + + try: + with open(test_path, 'w') as f: + # Try to acquire lock + fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + results.append("✅ Exclusive lock acquired") + + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + results.append("✅ Lock released") + finally: + os.unlink(test_path) + + except ImportError: + results.append("❌ fcntl not available (expected on Windows)") + except Exception as e: + results.append(f"❌ Error: {str(e)}") + + self.locking_results.setText("\n".join(results)) + + def _test_portalocker_locking(self): + """Test portalocker file locking.""" + results = [] + + try: + import portalocker + results.append("✅ portalocker available") + results.append(f"Version: {getattr(portalocker, '__version__', 'unknown')}") + + except ImportError: + results.append("❌ portalocker not installed") + results.append("Install with: pip install portalocker") + + self.locking_results.setText("\n".join(results)) + + def _test_threading_lock(self): + """Test threading lock as fallback.""" + results = [] + + lock = threading.RLock() + + results.append("Testing threading.RLock:") + results.append(f"✅ Lock created: {lock}") + + with lock: + results.append("✅ Lock acquired in main thread") + + results.append("✅ Lock released") + results.append("") + results.append("Threading locks work on all platforms") + results.append("but only within a single process.") + + self.locking_results.setText("\n".join(results)) + + def _test_file_permissions(self): + """Test file permissions.""" + results = [] + + test_file = Path(tempfile.mktemp()) + test_file.write_text("test") + + try: + stat = test_file.stat() + results.append(f"File: {test_file}") + results.append(f"Mode: {oct(stat.st_mode)}") + results.append(f"UID: {stat.st_uid}") + results.append(f"GID: {stat.st_gid}") + results.append(f"Size: {stat.st_size} bytes") + + if self.IS_WINDOWS: + results.append("") + results.append("Note: Windows uses ACLs, not Unix permissions") + finally: + test_file.unlink() + + self.perm_results.setText("\n".join(results)) + + def _test_cpu_info(self): + """Test CPU information retrieval.""" + results = [] + results.append(f"Platform: {platform.platform()}") + results.append(f"Processor: {platform.processor() or 'Unknown'}") + results.append(f"Machine: {platform.machine()}") + results.append(f"CPU Count: {os.cpu_count()}") + + if self.IS_LINUX: + try: + with open('/proc/cpuinfo', 'r') as f: + for line in f: + if 'model name' in line: + results.append(f"CPU: {line.split(':')[1].strip()}") + break + except: + pass + + self.system_info.setText("\n".join(results)) + + def _test_memory_info(self): + """Test memory information retrieval.""" + results = [] + + try: + import psutil + mem = psutil.virtual_memory() + results.append(f"Total: {mem.total / (1024**3):.2f} GB") + results.append(f"Available: {mem.available / (1024**3):.2f} GB") + results.append(f"Used: {mem.used / (1024**3):.2f} GB") + results.append(f"Percent: {mem.percent}%") + except ImportError: + results.append("psutil not installed") + results.append("Install with: pip install psutil") + + if self.IS_LINUX: + try: + with open('/proc/meminfo', 'r') as f: + results.append("") + results.append("From /proc/meminfo:") + for _ in range(3): + results.append(f.read(100)) + except: + pass + + self.system_info.setText("\n".join(results)) + + def _test_environment(self): + """Test environment variables.""" + results = [] + + important_vars = [ + 'PATH', 'HOME', 'USERPROFILE', 'APPDATA', 'TEMP', 'TMP', + 'PYTHONPATH', 'PYTHONHOME', 'QT_QPA_PLATFORM', + ] + + results.append("Environment Variables:") + results.append("") + + for var in important_vars: + value = os.environ.get(var) + if value: + # Truncate long values + if len(value) > 80: + value = value[:77] + "..." + results.append(f"{var}={value}") + else: + results.append(f"{var}=(not set)") + + self.system_info.setText("\n".join(results)) + + def _test_window_enumeration(self): + """Test window enumeration (Windows).""" + results = [] + + if not self.IS_WINDOWS: + results.append("❌ Window enumeration only available on Windows") + self.system_info.setText("\n".join(results)) + return + + try: + import ctypes + from ctypes import wintypes + + results.append("✅ ctypes available") + + # Try to enumerate windows + user32 = ctypes.windll.user32 + + def callback(hwnd, extra): + if user32.IsWindowVisible(hwnd): + text = ctypes.create_unicode_buffer(256) + user32.GetWindowTextW(hwnd, text, 256) + if text.value: + extra.append(text.value) + return True + + EnumWindowsProc = ctypes.WINFUNCTYPE( + wintypes.BOOL, + wintypes.HWND, + wintypes.LPARAM + ) + + windows = [] + proc = EnumWindowsProc(callback) + user32.EnumWindows(proc, 0) + + results.append(f"✅ Found {len(windows)} visible windows") + results.append("") + results.append("Sample windows:") + for title in windows[:10]: + results.append(f" - {title[:50]}") + + except Exception as e: + results.append(f"❌ Error: {str(e)}") + + self.system_info.setText("\n".join(results)) + + def _export_results(self): + """Export test results.""" + if not self._test_results: + self.notify_warning("No Results", "No test results to export") + return + + filename = f"platform_compat_{sys.platform}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + export_data = { + "platform": { + "system": platform.system(), + "release": platform.release(), + "version": platform.version(), + "machine": platform.machine(), + "processor": platform.processor(), + "python": platform.python_version(), + }, + "timestamp": datetime.now().isoformat(), + "results": self._test_results + } + + with open(filename, 'w') as f: + json.dump(export_data, f, indent=2) + + self.notify_success("Exported", f"Results saved to {filename}") + + +plugin_class = PlatformCompatibilityTester \ No newline at end of file diff --git a/plugins/test_suite/event_bus_test/manifest.json b/plugins/test_suite/event_bus_test/manifest.json new file mode 100644 index 0000000..fa2e911 --- /dev/null +++ b/plugins/test_suite/event_bus_test/manifest.json @@ -0,0 +1,18 @@ +{ + "id": "event_bus_test", + "name": "Event Bus Test", + "version": "1.0.0", + "description": "Tests Event Bus pub/sub functionality: subscriptions, publishing, message delivery", + "author": "Test Suite", + "entry_point": "plugin.py", + "category": "test", + "tags": ["test", "eventbus", "pubsub", "messaging"], + "min_api_version": "2.2.0", + "permissions": ["events", "notifications", "widgets"], + "test_metadata": { + "test_type": "messaging", + "apis_tested": ["PluginAPI (EventBus)"], + "features": ["subscribe", "unsubscribe", "publish", "filter", "async"], + "automated": true + } +} \ No newline at end of file diff --git a/plugins/test_suite/event_bus_test/plugin.py b/plugins/test_suite/event_bus_test/plugin.py new file mode 100644 index 0000000..8095621 --- /dev/null +++ b/plugins/test_suite/event_bus_test/plugin.py @@ -0,0 +1,621 @@ +""" +Event Bus Test Plugin + +Comprehensive testing of the Event Bus pub/sub system: +- Subscribe/unsubscribe functionality +- Message publishing and delivery +- Multiple subscribers +- Message filtering +- Performance under load +- Cross-plugin communication +""" + +import time +import threading +from datetime import datetime +from typing import Dict, List, Any, Callable +from dataclasses import dataclass, field +from collections import defaultdict + +from core.base_plugin import BasePlugin +from core.api.plugin_api import get_api +from core.api.widget_api import get_widget_api, WidgetType + + +@dataclass +class EventTestResult: + """Result of an event bus test.""" + test_name: str + passed: bool + duration_ms: float + events_sent: int = 0 + events_received: int = 0 + details: Dict = field(default_factory=dict) + error: str = None + + +class EventBusTestPlugin(BasePlugin): + """ + Test suite for Event Bus functionality. + + Tests pub/sub patterns, message delivery guarantees, + and performance characteristics. + """ + + def __init__(self): + super().__init__() + self.api = None + self.widget_api = None + self.results: List[EventTestResult] = [] + self.widget = None + self.received_events: List[Dict] = [] + self.subscriptions: List[str] = [] + + def initialize(self): + """Initialize and run event bus tests.""" + self.api = get_api() + self.widget_api = get_widget_api() + + self._create_control_widget() + self._run_all_tests() + + def _create_control_widget(self): + """Create control widget for test visualization.""" + self.widget = self.widget_api.create_widget( + name="event_bus_test", + title="📡 Event Bus Test", + size=(800, 600), + position=(200, 200), + widget_type=WidgetType.CUSTOM + ) + self._update_widget_display() + self.widget.show() + + def _update_widget_display(self): + """Update widget with current results.""" + try: + from PyQt6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QTextBrowser, QTableWidget, QTableWidgetItem, + QHeaderView, QGroupBox, QGridLayout + ) + from PyQt6.QtCore import Qt + + container = QWidget() + main_layout = QVBoxLayout(container) + + # Header + header = QLabel("📡 Event Bus Test Suite") + header.setStyleSheet("font-size: 20px; font-weight: bold; color: #ff8c42;") + main_layout.addWidget(header) + + # Summary stats + stats_layout = QHBoxLayout() + + total_tests = len(self.results) + passed_tests = sum(1 for r in self.results if r.passed) + total_sent = sum(r.events_sent for r in self.results) + total_received = sum(r.events_received for r in self.results) + + stats = [ + ("Tests", f"{passed_tests}/{total_tests}"), + ("Events Sent", str(total_sent)), + ("Events Received", str(total_received)), + ("Delivery Rate", f"{(total_received/max(total_sent,1)*100):.1f}%") + ] + + for label, value in stats: + group = QGroupBox(label) + group_layout = QVBoxLayout(group) + value_label = QLabel(value) + value_label.setStyleSheet("font-size: 24px; font-weight: bold; color: #4ecca3;") + value_label.setAlignment(Qt.AlignmentFlag.AlignCenter) + group_layout.addWidget(value_label) + stats_layout.addWidget(group) + + main_layout.addLayout(stats_layout) + + # Results table + self.results_table = QTableWidget() + self.results_table.setColumnCount(6) + self.results_table.setHorizontalHeaderLabels([ + "Test Name", "Status", "Duration", "Sent", "Received", "Details" + ]) + self.results_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch) + + self._populate_results_table() + main_layout.addWidget(self.results_table) + + # Control buttons + btn_layout = QHBoxLayout() + + btn_run = QPushButton("▶ Run All Tests") + btn_run.clicked.connect(self._run_all_tests) + btn_layout.addWidget(btn_run) + + btn_publish = QPushButton("📤 Test Publish") + btn_publish.clicked.connect(self._manual_publish_test) + btn_layout.addWidget(btn_publish) + + btn_clear = QPushButton("🧹 Clear Results") + btn_clear.clicked.connect(self._clear_results) + btn_layout.addWidget(btn_clear) + + main_layout.addLayout(btn_layout) + + # Event log + log_label = QLabel("Recent Events:") + main_layout.addWidget(log_label) + + self.event_log = QTextBrowser() + self.event_log.setMaximumHeight(150) + self._update_event_log() + main_layout.addWidget(self.event_log) + + self.widget.set_content(container) + + except ImportError as e: + print(f"Widget error: {e}") + + def _populate_results_table(self): + """Populate results table with data.""" + if not hasattr(self, 'results_table'): + return + + self.results_table.setRowCount(len(self.results)) + + for i, result in enumerate(self.results): + self.results_table.setItem(i, 0, QTableWidgetItem(result.test_name)) + + status_item = QTableWidgetItem("✅ PASS" if result.passed else "❌ FAIL") + status_item.setForeground( + Qt.GlobalColor.green if result.passed else Qt.GlobalColor.red + ) + self.results_table.setItem(i, 1, status_item) + + self.results_table.setItem(i, 2, QTableWidgetItem(f"{result.duration_ms:.2f}ms")) + self.results_table.setItem(i, 3, QTableWidgetItem(str(result.events_sent))) + self.results_table.setItem(i, 4, QTableWidgetItem(str(result.events_received))) + + details = result.error if result.error else str(result.details)[:50] + self.results_table.setItem(i, 5, QTableWidgetItem(details)) + + def _update_event_log(self): + """Update event log display.""" + if hasattr(self, 'event_log') and self.received_events: + log_text = "" + for event in self.received_events[-10:]: # Last 10 events + log_text += f"[{event.get('time', '?')}] {event.get('type', '?')}: {event.get('data', {})}\n" + self.event_log.setText(log_text) + + def _record_result(self, test_name: str, passed: bool, duration_ms: float, + events_sent: int = 0, events_received: int = 0, + details: Dict = None, error: str = None): + """Record test result.""" + result = EventTestResult( + test_name=test_name, + passed=passed, + duration_ms=duration_ms, + events_sent=events_sent, + events_received=events_received, + details=details or {}, + error=error + ) + self.results.append(result) + self._update_widget_display() + + def _run_all_tests(self): + """Execute all event bus tests.""" + self.results.clear() + self.received_events.clear() + + # Basic functionality + self._test_basic_subscribe_publish() + self._test_unsubscribe() + self._test_multiple_subscribers() + self._test_event_data_types() + self._test_wildcard_subscriptions() + + # Performance + self._test_high_volume_publishing() + self._test_rapid_subscribe_unsubscribe() + + # Edge cases + self._test_empty_event_data() + self._test_large_event_data() + self._test_special_characters() + + # Cleanup subscriptions + self._cleanup_subscriptions() + + def _test_basic_subscribe_publish(self): + """Test basic subscribe and publish functionality.""" + start = time.time() + events_received = [] + + try: + def handler(data): + events_received.append(data) + self.received_events.append({ + 'type': 'basic_test', + 'data': data, + 'time': datetime.now().strftime('%H:%M:%S') + }) + self._update_event_log() + + sub_id = self.api.subscribe("test.basic", handler) + self.subscriptions.append(sub_id) + + # Publish event + self.api.publish("test.basic", {"message": "hello"}) + time.sleep(0.1) # Allow for delivery + + duration = (time.time() - start) * 1000 + success = len(events_received) == 1 + + self._record_result( + "Basic Subscribe/Publish", + success, + duration, + events_sent=1, + events_received=len(events_received), + details={"subscription_id": sub_id[:8] if sub_id else None} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Basic Subscribe/Publish", False, duration, error=str(e)) + + def _test_unsubscribe(self): + """Test unsubscribe functionality.""" + start = time.time() + events_received = [] + + try: + def handler(data): + events_received.append(data) + + sub_id = self.api.subscribe("test.unsub", handler) + + # Publish and receive + self.api.publish("test.unsub", {"seq": 1}) + time.sleep(0.05) + + # Unsubscribe + unsubscribed = self.api.unsubscribe(sub_id) + + # Publish again - should not receive + self.api.publish("test.unsub", {"seq": 2}) + time.sleep(0.05) + + duration = (time.time() - start) * 1000 + success = unsubscribed and len(events_received) == 1 + + self._record_result( + "Unsubscribe", + success, + duration, + events_sent=2, + events_received=len(events_received), + details={"unsubscribed": unsubscribed} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Unsubscribe", False, duration, error=str(e)) + + def _test_multiple_subscribers(self): + """Test multiple subscribers to same event type.""" + start = time.time() + received_by_sub = defaultdict(list) + + try: + def make_handler(sub_name): + return lambda data: received_by_sub[sub_name].append(data) + + # Create 5 subscribers + sub_ids = [] + for i in range(5): + sub_id = self.api.subscribe("test.multi", make_handler(f"sub_{i}")) + sub_ids.append(sub_id) + self.subscriptions.append(sub_id) + + # Publish single event + self.api.publish("test.multi", {"test": "data"}) + time.sleep(0.1) + + duration = (time.time() - start) * 1000 + total_received = sum(len(v) for v in received_by_sub.values()) + success = total_received == 5 + + self._record_result( + "Multiple Subscribers", + success, + duration, + events_sent=1, + events_received=total_received, + details={"subscribers": 5} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Multiple Subscribers", False, duration, error=str(e)) + + def _test_event_data_types(self): + """Test various event data types.""" + start = time.time() + received_data = [] + + try: + def handler(data): + received_data.append(data) + + sub_id = self.api.subscribe("test.types", handler) + self.subscriptions.append(sub_id) + + test_data = [ + "string data", + 123, + 45.67, + True, + None, + [1, 2, 3], + {"nested": {"key": "value"}}, + {"mixed": [1, "two", 3.0, False]} + ] + + for data in test_data: + self.api.publish("test.types", data) + + time.sleep(0.2) + + duration = (time.time() - start) * 1000 + success = len(received_data) == len(test_data) + + self._record_result( + "Event Data Types", + success, + duration, + events_sent=len(test_data), + events_received=len(received_data), + details={"types_tested": len(test_data)} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Event Data Types", False, duration, error=str(e)) + + def _test_wildcard_subscriptions(self): + """Test wildcard/pattern subscriptions.""" + start = time.time() + received = [] + + try: + # Subscribe to event pattern + def handler(data): + received.append(data) + + sub_id = self.api.subscribe("test.wildcard", handler) + self.subscriptions.append(sub_id) + + # Publish to different sub-events + self.api.publish("test.wildcard.sub1", {"n": 1}) + self.api.publish("test.wildcard.sub2", {"n": 2}) + self.api.publish("other.event", {"n": 3}) # Should not match + + time.sleep(0.1) + + duration = (time.time() - start) * 1000 + + self._record_result( + "Wildcard Subscriptions", + True, # Implementation dependent + duration, + events_sent=3, + events_received=len(received), + details={"received": len(received)} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Wildcard Subscriptions", False, duration, error=str(e)) + + def _test_high_volume_publishing(self): + """Test high-volume event publishing.""" + start = time.time() + received = [] + event_count = 100 + + try: + def handler(data): + received.append(data) + + sub_id = self.api.subscribe("test.volume", handler) + self.subscriptions.append(sub_id) + + # Publish many events rapidly + for i in range(event_count): + self.api.publish("test.volume", {"seq": i, "timestamp": time.time()}) + + time.sleep(0.5) # Allow processing + + duration = (time.time() - start) * 1000 + loss_rate = (event_count - len(received)) / event_count * 100 + + self._record_result( + "High Volume Publishing", + loss_rate < 5, # Allow 5% loss + duration, + events_sent=event_count, + events_received=len(received), + details={"loss_rate": f"{loss_rate:.1f}%", "avg_latency_ms": f"{duration/event_count:.2f}"} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("High Volume Publishing", False, duration, error=str(e)) + + def _test_rapid_subscribe_unsubscribe(self): + """Test rapid subscribe/unsubscribe cycles.""" + start = time.time() + cycles = 50 + + try: + def handler(data): + pass + + for i in range(cycles): + sub_id = self.api.subscribe(f"test.rapid.{i}", handler) + self.api.unsubscribe(sub_id) + + duration = (time.time() - start) * 1000 + + self._record_result( + "Rapid Subscribe/Unsubscribe", + True, + duration, + details={"cycles": cycles, "avg_time_ms": f"{duration/cycles:.2f}"} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Rapid Subscribe/Unsubscribe", False, duration, error=str(e)) + + def _test_empty_event_data(self): + """Test publishing empty/null event data.""" + start = time.time() + received = [] + + try: + def handler(data): + received.append(data) + + sub_id = self.api.subscribe("test.empty", handler) + self.subscriptions.append(sub_id) + + self.api.publish("test.empty", None) + self.api.publish("test.empty", {}) + self.api.publish("test.empty", []) + self.api.publish("test.empty", "") + + time.sleep(0.1) + + duration = (time.time() - start) * 1000 + + self._record_result( + "Empty Event Data", + len(received) == 4, + duration, + events_sent=4, + events_received=len(received) + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Empty Event Data", False, duration, error=str(e)) + + def _test_large_event_data(self): + """Test publishing large event payloads.""" + start = time.time() + received = [] + + try: + def handler(data): + received.append(data) + + sub_id = self.api.subscribe("test.large", handler) + self.subscriptions.append(sub_id) + + # Large payload + large_data = { + "items": [{"id": i, "data": "x" * 100} for i in range(1000)] + } + + self.api.publish("test.large", large_data) + time.sleep(0.2) + + duration = (time.time() - start) * 1000 + + self._record_result( + "Large Event Data", + len(received) == 1, + duration, + events_sent=1, + events_received=len(received), + details={"payload_size_kb": len(str(large_data)) / 1024} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Large Event Data", False, duration, error=str(e)) + + def _test_special_characters(self): + """Test event types with special characters.""" + start = time.time() + received = [] + + try: + def handler(data): + received.append(data) + + special_events = [ + "test.special.dot", + "test-special-dash", + "test_special_underscore", + "test:special:colon", + "test/special/slash" + ] + + for event_type in special_events: + sub_id = self.api.subscribe(event_type, handler) + self.subscriptions.append(sub_id) + self.api.publish(event_type, {"event": event_type}) + + time.sleep(0.2) + + duration = (time.time() - start) * 1000 + + self._record_result( + "Special Characters", + len(received) == len(special_events), + duration, + events_sent=len(special_events), + events_received=len(received) + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Special Characters", False, duration, error=str(e)) + + def _manual_publish_test(self): + """Manual test - publish a test event.""" + self.api.publish("test.manual", { + "timestamp": datetime.now().isoformat(), + "message": "Manual test event" + }) + self.api.show_notification("Event Published", "Test event sent to 'test.manual'") + + def _clear_results(self): + """Clear all test results.""" + self.results.clear() + self.received_events.clear() + self._update_widget_display() + + def _cleanup_subscriptions(self): + """Clean up all test subscriptions.""" + for sub_id in self.subscriptions: + try: + self.api.unsubscribe(sub_id) + except: + pass + self.subscriptions.clear() + + def shutdown(self): + """Clean up on shutdown.""" + self._cleanup_subscriptions() + if self.widget: + self.widget.close() + + +# Plugin entry point +plugin_class = EventBusTestPlugin \ No newline at end of file diff --git a/plugins/test_suite/external_integration_test/manifest.json b/plugins/test_suite/external_integration_test/manifest.json new file mode 100644 index 0000000..48f7b40 --- /dev/null +++ b/plugins/test_suite/external_integration_test/manifest.json @@ -0,0 +1,18 @@ +{ + "id": "external_integration_test", + "name": "External Integration Test", + "version": "1.0.0", + "description": "Tests ExternalAPI features: REST server, webhooks, authentication, and IPC", + "author": "Test Suite", + "entry_point": "plugin.py", + "category": "test", + "tags": ["test", "external", "rest", "webhook", "integration"], + "min_api_version": "2.2.0", + "permissions": ["external", "http", "notifications"], + "test_metadata": { + "test_type": "integration", + "apis_tested": ["ExternalAPI"], + "features": ["REST", "webhooks", "auth", "IPC", "SSE"], + "automated": true + } +} \ No newline at end of file diff --git a/plugins/test_suite/external_integration_test/plugin.py b/plugins/test_suite/external_integration_test/plugin.py new file mode 100644 index 0000000..e296241 --- /dev/null +++ b/plugins/test_suite/external_integration_test/plugin.py @@ -0,0 +1,640 @@ +""" +External Integration Test Plugin + +Tests all ExternalAPI integration features: +- REST API server endpoints +- Incoming/outgoing webhooks +- API key authentication +- IPC communication +- Server-Sent Events (SSE) + +This plugin verifies third-party integration capabilities. +""" + +import time +import json +import threading +from datetime import datetime +from typing import Dict, List, Any +from dataclasses import dataclass + +from core.base_plugin import BasePlugin +from core.api.external_api import get_external_api +from core.api.widget_api import get_widget_api, WidgetType +from core.api.plugin_api import get_api + + +@dataclass +class IntegrationTestResult: + """Result of an integration test.""" + category: str + test_name: str + passed: bool + duration_ms: float + details: Dict = None + error: str = None + + +class ExternalIntegrationTestPlugin(BasePlugin): + """ + Integration test suite for ExternalAPI. + + Tests REST API, webhooks, authentication, IPC, and SSE functionality. + """ + + def __init__(self): + super().__init__() + self.external_api = None + self.widget_api = None + self.plugin_api = None + self.results: List[IntegrationTestResult] = [] + self.widget = None + self.test_port = 9999 + + def initialize(self): + """Initialize and run integration tests.""" + self.external_api = get_external_api() + self.widget_api = get_widget_api() + self.plugin_api = get_api() + + self._create_results_widget() + self._run_all_tests() + + def _create_results_widget(self): + """Create widget to display integration test results.""" + self.widget = self.widget_api.create_widget( + name="external_integration_test", + title="🌐 External Integration Test", + size=(900, 650), + position=(150, 150), + widget_type=WidgetType.CUSTOM + ) + self._update_widget_display() + self.widget.show() + + def _update_widget_display(self): + """Update widget content.""" + try: + from PyQt6.QtWidgets import ( + QWidget, QVBoxLayout, QHBoxLayout, QLabel, + QPushButton, QTextBrowser, QTabWidget, QGroupBox + ) + + container = QWidget() + layout = QVBoxLayout(container) + + # Header + header = QLabel("🌐 External Integration Test Suite") + header.setStyleSheet("font-size: 20px; font-weight: bold; color: #ff8c42;") + layout.addWidget(header) + + # Summary + passed = sum(1 for r in self.results if r.passed) + total = len(self.results) + summary = QLabel(f"Results: {passed}/{total} passed") + summary.setStyleSheet(f"font-size: 14px; color: {'#4ecca3' if passed == total else '#ff6b6b'};") + layout.addWidget(summary) + + # Results display + self.results_browser = QTextBrowser() + self.results_browser.setHtml(self._generate_results_html()) + layout.addWidget(self.results_browser) + + # Control buttons + btn_layout = QHBoxLayout() + + btn_run = QPushButton("Run All Tests") + btn_run.clicked.connect(self._run_all_tests) + btn_layout.addWidget(btn_run) + + btn_server = QPushButton("Test Server Start/Stop") + btn_server.clicked.connect(self._test_server_lifecycle) + btn_layout.addWidget(btn_server) + + btn_webhook = QPushButton("Test Webhooks") + btn_webhook.clicked.connect(self._test_outgoing_webhook) + btn_layout.addWidget(btn_webhook) + + layout.addLayout(btn_layout) + + self.widget.set_content(container) + + except ImportError: + pass + + def _generate_results_html(self) -> str: + """Generate results HTML.""" + html = """ + + + + """ + + for result in self.results: + status_class = "pass" if result.passed else "fail" + status_icon = "✅" if result.passed else "❌" + details = json.dumps(result.details) if result.details else "" + if len(details) > 100: + details = details[:100] + "..." + + html += f""" + + + + + + + + """ + + html += "
CategoryTestResultDurationDetails
{result.category}{result.test_name}{status_icon}{result.duration_ms:.2f}ms{details}
" + return html + + def _record_result(self, category: str, test_name: str, passed: bool, + duration_ms: float, details: Dict = None, error: str = None): + """Record test result.""" + result = IntegrationTestResult( + category=category, + test_name=test_name, + passed=passed, + duration_ms=duration_ms, + details=details, + error=error + ) + self.results.append(result) + self._update_widget_display() + + # Show notification for failures + if not passed: + self.plugin_api.show_notification( + "Integration Test Failed", + f"{category}/{test_name}: {error or 'Unknown error'}", + duration=3000 + ) + + def _run_all_tests(self): + """Execute all integration tests.""" + self.results.clear() + + # REST API Server Tests + self._test_server_lifecycle() + self._test_endpoint_registration() + self._test_endpoint_decorator() + self._test_cors_configuration() + + # Webhook Tests + self._test_incoming_webhook() + self._test_outgoing_webhook() + self._test_webhook_hmac() + + # Authentication Tests + self._test_api_key_creation() + self._test_api_key_revocation() + + # IPC Tests + self._test_ipc_handler() + self._test_ipc_send() + + # Utility Tests + self._test_status_endpoint() + self._test_url_generation() + self._test_webhook_history() + + # ================================================================= + # REST API Server Tests + # ================================================================= + + def _test_server_lifecycle(self): + """Test server start and stop.""" + start = time.time() + + try: + # Start server + started = self.external_api.start_server( + port=self.test_port, + host="127.0.0.1", + cors_origins=["http://localhost:3000"] + ) + + if started: + time.sleep(0.5) # Let server initialize + + # Verify it's running + status = self.external_api.get_status() + running = status.get('server_running', False) + + # Stop server + stopped = self.external_api.stop_server() + + duration = (time.time() - start) * 1000 + self._record_result( + "REST Server", + "Server Lifecycle", + started and running and stopped, + duration, + {"started": started, "running": running, "stopped": stopped} + ) + else: + duration = (time.time() - start) * 1000 + self._record_result( + "REST Server", + "Server Lifecycle", + False, + duration, + error="Server failed to start (may already be running)" + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("REST Server", "Server Lifecycle", False, duration, error=str(e)) + + def _test_endpoint_registration(self): + """Test programmatic endpoint registration.""" + start = time.time() + + try: + def test_handler(params): + return {"test": "data", "params": params} + + self.external_api.register_endpoint( + "test/registration", + test_handler, + methods=["GET", "POST"], + auth_required=False + ) + + endpoints = self.external_api.get_endpoints() + success = "test/registration" in endpoints + + duration = (time.time() - start) * 1000 + self._record_result( + "REST Server", + "Endpoint Registration", + success, + duration, + {"endpoints": endpoints} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("REST Server", "Endpoint Registration", False, duration, error=str(e)) + + def _test_endpoint_decorator(self): + """Test decorator-based endpoint registration.""" + start = time.time() + + try: + @self.external_api.endpoint("test/decorator", methods=["GET"]) + def decorated_endpoint(): + return {"decorated": True} + + endpoints = self.external_api.get_endpoints() + success = "test/decorator" in endpoints + + duration = (time.time() - start) * 1000 + self._record_result( + "REST Server", + "Endpoint Decorator", + success, + duration + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("REST Server", "Endpoint Decorator", False, duration, error=str(e)) + + def _test_cors_configuration(self): + """Test CORS configuration.""" + start = time.time() + + try: + # CORS is configured during server start + status = self.external_api.get_status() + success = status.get('server_running') is not None + + duration = (time.time() - start) * 1000 + self._record_result( + "REST Server", + "CORS Configuration", + success, + duration, + {"cors_origins": ["http://localhost:3000"]} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("REST Server", "CORS Configuration", False, duration, error=str(e)) + + # ================================================================= + # Webhook Tests + # ================================================================= + + def _test_incoming_webhook(self): + """Test incoming webhook registration.""" + start = time.time() + + try: + def webhook_handler(payload): + return {"received": payload} + + self.external_api.register_webhook( + "test_incoming", + webhook_handler, + methods=["POST"], + rate_limit=60 + ) + + webhooks = self.external_api.get_webhooks() + success = "test_incoming" in webhooks + + duration = (time.time() - start) * 1000 + self._record_result( + "Webhooks", + "Incoming Webhook Registration", + success, + duration, + {"webhooks": webhooks} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Webhooks", "Incoming Webhook", False, duration, error=str(e)) + + def _test_outgoing_webhook(self): + """Test outgoing webhook POST.""" + start = time.time() + + try: + # Post to httpbin for testing + result = self.external_api.post_webhook( + "https://httpbin.org/post", + {"test": "data", "timestamp": time.time()}, + headers={"X-Test-Header": "test"}, + timeout=10 + ) + + success = result.get('success', False) + status = result.get('status', 0) + + duration = (time.time() - start) * 1000 + self._record_result( + "Webhooks", + "Outgoing Webhook POST", + success and 200 <= status < 300, + duration, + {"status": status} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Webhooks", "Outgoing Webhook", False, duration, error=str(e)) + + def _test_webhook_hmac(self): + """Test webhook HMAC signature verification.""" + start = time.time() + + try: + import hmac + import hashlib + + secret = "test_secret_key" + + def secure_handler(payload): + return {"secure": True} + + self.external_api.register_webhook( + "secure_webhook", + secure_handler, + secret=secret, + methods=["POST"] + ) + + # Generate test signature + payload = {"event": "test"} + payload_str = json.dumps(payload, sort_keys=True) + signature = hmac.new( + secret.encode(), + payload_str.encode(), + hashlib.sha256 + ).hexdigest() + + # Verify signature logic is in place + webhooks = self.external_api.get_webhooks() + success = "secure_webhook" in webhooks + + duration = (time.time() - start) * 1000 + self._record_result( + "Webhooks", + "HMAC Signature", + success, + duration, + {"signature_generated": True} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Webhooks", "HMAC Signature", False, duration, error=str(e)) + + # ================================================================= + # Authentication Tests + # ================================================================= + + def _test_api_key_creation(self): + """Test API key creation.""" + start = time.time() + + try: + key = self.external_api.create_api_key( + name="test_integration_key", + permissions=["read", "write"] + ) + + success = len(key) > 0 and isinstance(key, str) + self._test_api_key = key # Save for revocation test + + duration = (time.time() - start) * 1000 + self._record_result( + "Authentication", + "API Key Creation", + success, + duration, + {"key_length": len(key)} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Authentication", "API Key Creation", False, duration, error=str(e)) + + def _test_api_key_revocation(self): + """Test API key revocation.""" + start = time.time() + + try: + # Create a key to revoke + key = self.external_api.create_api_key("temp_revoke_key") + + # Revoke it + revoked = self.external_api.revoke_api_key(key) + + duration = (time.time() - start) * 1000 + self._record_result( + "Authentication", + "API Key Revocation", + revoked, + duration + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Authentication", "API Key Revocation", False, duration, error=str(e)) + + # ================================================================= + # IPC Tests + # ================================================================= + + def _test_ipc_handler(self): + """Test IPC handler registration.""" + start = time.time() + + try: + self._ipc_received = False + + def ipc_handler(data): + self._ipc_received = True + self._ipc_data = data + + self.external_api.register_ipc_handler("test_channel", ipc_handler) + + duration = (time.time() - start) * 1000 + self._record_result( + "IPC", + "Handler Registration", + True, + duration + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("IPC", "Handler Registration", False, duration, error=str(e)) + + def _test_ipc_send(self): + """Test IPC message sending.""" + start = time.time() + + try: + sent = self.external_api.send_ipc( + "test_channel", + {"message": "test", "timestamp": time.time()} + ) + + duration = (time.time() - start) * 1000 + self._record_result( + "IPC", + "Send Message", + sent, + duration, + {"received": getattr(self, '_ipc_received', False)} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("IPC", "Send Message", False, duration, error=str(e)) + + # ================================================================= + # Utility Tests + # ================================================================= + + def _test_status_endpoint(self): + """Test /health endpoint availability.""" + start = time.time() + + try: + status = self.external_api.get_status() + + success = ( + 'server_running' in status and + 'endpoints' in status and + 'webhooks' in status + ) + + duration = (time.time() - start) * 1000 + self._record_result( + "Utilities", + "Status Endpoint", + success, + duration, + status + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Utilities", "Status Endpoint", False, duration, error=str(e)) + + def _test_url_generation(self): + """Test URL generation.""" + start = time.time() + + try: + url = self.external_api.get_url("api/v1/test") + + success = url.startswith("http") and "api/v1/test" in url + + duration = (time.time() - start) * 1000 + self._record_result( + "Utilities", + "URL Generation", + success, + duration, + {"url": url} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Utilities", "URL Generation", False, duration, error=str(e)) + + def _test_webhook_history(self): + """Test webhook history tracking.""" + start = time.time() + + try: + history = self.external_api.get_webhook_history(limit=10) + + success = isinstance(history, list) + + duration = (time.time() - start) * 1000 + self._record_result( + "Utilities", + "Webhook History", + success, + duration, + {"history_entries": len(history)} + ) + + except Exception as e: + duration = (time.time() - start) * 1000 + self._record_result("Utilities", "Webhook History", False, duration, error=str(e)) + + def shutdown(self): + """Clean up resources.""" + # Stop server if running + try: + self.external_api.stop_server() + except: + pass + + if self.widget: + self.widget.close() + + +# Plugin entry point +plugin_class = ExternalIntegrationTestPlugin \ No newline at end of file