""" API Comprehensive Test Plugin Tests every method in the three-tier API: - PluginAPI: All 12 services - WidgetAPI: All widget operations - ExternalAPI: REST, webhooks, auth, IPC Usage: This plugin runs automatically on initialization and generates a comprehensive test report visible in its widget. """ import time import json from datetime import datetime from typing import Dict, List, Any, Callable from dataclasses import dataclass, asdict from core.base_plugin import BasePlugin from core.api.plugin_api import get_api, PluginAPIError, ServiceNotAvailableError from core.api.widget_api import get_widget_api, WidgetType, WidgetAnchor, WidgetConfig from core.api.external_api import get_external_api, ExternalAPIError @dataclass class TestResult: """Single test result record.""" name: str api: str passed: bool duration_ms: float error: str = None details: Dict = None class APIComprehensiveTestPlugin(BasePlugin): """ Comprehensive test suite for EU-Utility APIs. Tests every public method across all three API tiers: - PluginAPI: 12 services - WidgetAPI: Widget lifecycle and operations - ExternalAPI: Server, webhooks, IPC """ def __init__(self): super().__init__() self.api = None self.widget_api = None self.external_api = None self.results: List[TestResult] = [] self.widget = None self.test_widget = None def initialize(self): """Initialize plugin and run all tests.""" self.api = get_api() self.widget_api = get_widget_api() self.external_api = get_external_api() self._create_results_widget() self._run_all_tests() self._update_widget_display() def _create_results_widget(self): """Create widget to display test results.""" self.widget = self.widget_api.create_widget( name="api_comprehensive_test", title="🔬 API Comprehensive Test Results", size=(800, 600), position=(50, 50), widget_type=WidgetType.CUSTOM ) # Create simple HTML content for results content = self._create_results_html() self.widget.set_content(content) self.widget.show() def _create_results_html(self): """Create HTML content for results display.""" try: from PyQt6.QtWidgets import QTextBrowser, QVBoxLayout, QWidget container = QWidget() layout = QVBoxLayout(container) self.results_browser = QTextBrowser() self.results_browser.setHtml(self._get_initial_html()) layout.addWidget(self.results_browser) return container except ImportError: return None def _get_initial_html(self) -> str: """Generate initial HTML template.""" return """

🔬 API Comprehensive Test Suite

⏳ Running tests...
""" def _update_widget_display(self): """Update widget with test results.""" if hasattr(self, 'results_browser'): self.results_browser.setHtml(self._generate_results_html()) def _generate_results_html(self) -> str: """Generate full results HTML.""" passed = sum(1 for r in self.results if r.passed) failed = sum(1 for r in self.results if not r.passed) total = len(self.results) html = f"""

🔬 API Comprehensive Test Results

{total}
Total Tests
{passed} ✅
Passed
{failed} ❌
Failed
{(passed/total*100 if total else 0):.1f}%
Success Rate
""" for result in self.results: status_class = "pass" if result.passed else "fail" status_icon = "✅" if result.passed else "❌" error_text = result.error if result.error else "" html += f""" """ html += """
API Test Name Result Duration Error
{result.api} {result.name} {status_icon} {'PASS' if result.passed else 'FAIL'} {result.duration_ms:.2f}ms {error_text}
""" return html def _run_test(self, name: str, api: str, test_func: Callable) -> TestResult: """Run a single test and record result.""" start = time.time() try: test_func() duration = (time.time() - start) * 1000 result = TestResult(name=name, api=api, passed=True, duration_ms=duration) except Exception as e: duration = (time.time() - start) * 1000 result = TestResult(name=name, api=api, passed=False, duration_ms=duration, error=str(e)) self.results.append(result) return result def _run_all_tests(self): """Execute all API tests.""" self._test_plugin_api() self._test_widget_api() self._test_external_api() # ===================================================================== # PluginAPI Tests # ===================================================================== def _test_plugin_api(self): """Test all PluginAPI services.""" # Log Reader API self._run_test("read_log_lines", "PluginAPI", self._test_read_log_lines) self._run_test("read_log_since", "PluginAPI", self._test_read_log_since) # Window Manager API self._run_test("get_eu_window", "PluginAPI", self._test_get_eu_window) self._run_test("is_eu_focused", "PluginAPI", self._test_is_eu_focused) self._run_test("is_eu_visible", "PluginAPI", self._test_is_eu_visible) self._run_test("bring_eu_to_front", "PluginAPI", self._test_bring_eu_to_front) # OCR API self._run_test("ocr_available", "PluginAPI", self._test_ocr_available) self._run_test("recognize_text", "PluginAPI", self._test_recognize_text) # Screenshot API self._run_test("screenshot_available", "PluginAPI", self._test_screenshot_available) self._run_test("capture_screen", "PluginAPI", self._test_capture_screen) # Nexus API self._run_test("search_items", "PluginAPI", self._test_search_items) self._run_test("get_item_details", "PluginAPI", self._test_get_item_details) # HTTP Client API self._run_test("http_get", "PluginAPI", self._test_http_get) self._run_test("http_post", "PluginAPI", self._test_http_post) # Audio API self._run_test("play_sound", "PluginAPI", self._test_play_sound) self._run_test("beep", "PluginAPI", self._test_beep) # Notification API self._run_test("show_notification", "PluginAPI", self._test_show_notification) # Clipboard API self._run_test("copy_to_clipboard", "PluginAPI", self._test_copy_to_clipboard) self._run_test("paste_from_clipboard", "PluginAPI", self._test_paste_from_clipboard) # Event Bus API self._run_test("subscribe", "PluginAPI", self._test_subscribe) self._run_test("unsubscribe", "PluginAPI", self._test_unsubscribe) self._run_test("publish", "PluginAPI", self._test_publish) # Data Store API self._run_test("set_data", "PluginAPI", self._test_set_data) self._run_test("get_data", "PluginAPI", self._test_get_data) self._run_test("delete_data", "PluginAPI", self._test_delete_data) # Task API self._run_test("run_task", "PluginAPI", self._test_run_task) self._run_test("cancel_task", "PluginAPI", self._test_cancel_task) def _test_read_log_lines(self): """Test read_log_lines method.""" lines = self.api.read_log_lines(10) assert isinstance(lines, list) def _test_read_log_since(self): """Test read_log_since method.""" lines = self.api.read_log_since(datetime.now()) assert isinstance(lines, list) def _test_get_eu_window(self): """Test get_eu_window method.""" window = self.api.get_eu_window() # May be None if EU not running if window is not None: assert isinstance(window, dict) assert 'title' in window def _test_is_eu_focused(self): """Test is_eu_focused method.""" result = self.api.is_eu_focused() assert isinstance(result, bool) def _test_is_eu_visible(self): """Test is_eu_visible method.""" result = self.api.is_eu_visible() assert isinstance(result, bool) def _test_bring_eu_to_front(self): """Test bring_eu_to_front method.""" result = self.api.bring_eu_to_front() assert isinstance(result, bool) def _test_ocr_available(self): """Test ocr_available method.""" result = self.api.ocr_available() assert isinstance(result, bool) def _test_recognize_text(self): """Test recognize_text method.""" if not self.api.ocr_available(): return # Skip if OCR not available try: text = self.api.recognize_text((0, 0, 100, 100)) assert isinstance(text, str) except ServiceNotAvailableError: pass # Expected if service not available def _test_screenshot_available(self): """Test screenshot_available method.""" result = self.api.screenshot_available() assert isinstance(result, bool) def _test_capture_screen(self): """Test capture_screen method.""" result = self.api.capture_screen((0, 0, 100, 100)) # May be None if screenshot service unavailable assert result is None or hasattr(result, 'size') def _test_search_items(self): """Test search_items method.""" items = self.api.search_items("test", limit=5) assert isinstance(items, list) def _test_get_item_details(self): """Test get_item_details method.""" details = self.api.get_item_details(12345) # May be None if item not found assert details is None or isinstance(details, dict) def _test_http_get(self): """Test http_get method.""" result = self.api.http_get("https://httpbin.org/get", cache=False) assert isinstance(result, dict) assert 'success' in result def _test_http_post(self): """Test http_post method.""" result = self.api.http_post("https://httpbin.org/post", {"test": "data"}) assert isinstance(result, dict) assert 'success' in result def _test_play_sound(self): """Test play_sound method.""" result = self.api.play_sound("nonexistent.wav") # Returns False for non-existent file, but shouldn't crash assert isinstance(result, bool) def _test_beep(self): """Test beep method.""" result = self.api.beep() assert isinstance(result, bool) def _test_show_notification(self): """Test show_notification method.""" result = self.api.show_notification("Test", "Test message", duration=100) assert isinstance(result, bool) def _test_copy_to_clipboard(self): """Test copy_to_clipboard method.""" result = self.api.copy_to_clipboard("test text") assert isinstance(result, bool) def _test_paste_from_clipboard(self): """Test paste_from_clipboard method.""" result = self.api.paste_from_clipboard() assert isinstance(result, str) def _test_subscribe(self): """Test subscribe method.""" def handler(data): pass sub_id = self.api.subscribe("test_event", handler) assert isinstance(sub_id, str) self._test_sub_id = sub_id def _test_unsubscribe(self): """Test unsubscribe method.""" result = self.api.unsubscribe("invalid_id") assert isinstance(result, bool) def _test_publish(self): """Test publish method.""" result = self.api.publish("test_event", {"key": "value"}) assert isinstance(result, bool) def _test_set_data(self): """Test set_data method.""" result = self.api.set_data("test_key", "test_value") assert isinstance(result, bool) def _test_get_data(self): """Test get_data method.""" result = self.api.get_data("test_key", default="default") assert result is not None def _test_delete_data(self): """Test delete_data method.""" result = self.api.delete_data("test_key") assert isinstance(result, bool) def _test_run_task(self): """Test run_task method.""" def task(): return "done" def callback(result): pass task_id = self.api.run_task(task, callback=callback) assert isinstance(task_id, str) self._test_task_id = task_id def _test_cancel_task(self): """Test cancel_task method.""" result = self.api.cancel_task("invalid_task_id") assert isinstance(result, bool) # ===================================================================== # WidgetAPI Tests # ===================================================================== def _test_widget_api(self): """Test all WidgetAPI methods.""" self._run_test("create_widget", "WidgetAPI", self._test_create_widget) self._run_test("get_widget", "WidgetAPI", self._test_get_widget) self._run_test("widget_exists", "WidgetAPI", self._test_widget_exists) self._run_test("get_all_widgets", "WidgetAPI", self._test_get_all_widgets) self._run_test("get_visible_widgets", "WidgetAPI", self._test_get_visible_widgets) self._run_test("show_widget", "WidgetAPI", self._test_show_widget) self._run_test("hide_widget", "WidgetAPI", self._test_hide_widget) self._run_test("close_widget", "WidgetAPI", self._test_close_widget) self._run_test("show_all_widgets", "WidgetAPI", self._test_show_all_widgets) self._run_test("hide_all_widgets", "WidgetAPI", self._test_hide_all_widgets) self._run_test("set_all_opacity", "WidgetAPI", self._test_set_all_opacity) self._run_test("lock_all", "WidgetAPI", self._test_lock_all) self._run_test("unlock_all", "WidgetAPI", self._test_unlock_all) self._run_test("arrange_widgets", "WidgetAPI", self._test_arrange_widgets) self._run_test("snap_to_grid", "WidgetAPI", self._test_snap_to_grid) self._run_test("register_preset", "WidgetAPI", self._test_register_preset) self._run_test("create_from_preset", "WidgetAPI", self._test_create_from_preset) self._run_test("save_all_states", "WidgetAPI", self._test_save_all_states) self._run_test("load_all_states", "WidgetAPI", self._test_load_all_states) # Widget instance methods self._run_test("widget.show", "WidgetAPI", self._test_widget_show) self._run_test("widget.hide", "WidgetAPI", self._test_widget_hide) self._run_test("widget.move", "WidgetAPI", self._test_widget_move) self._run_test("widget.resize", "WidgetAPI", self._test_widget_resize) self._run_test("widget.set_opacity", "WidgetAPI", self._test_widget_set_opacity) self._run_test("widget.set_title", "WidgetAPI", self._test_widget_set_title) self._run_test("widget.set_locked", "WidgetAPI", self._test_widget_set_locked) self._run_test("widget.minimize", "WidgetAPI", self._test_widget_minimize) self._run_test("widget.restore", "WidgetAPI", self._test_widget_restore) self._run_test("widget.raise_widget", "WidgetAPI", self._test_widget_raise) self._run_test("widget.lower_widget", "WidgetAPI", self._test_widget_lower) self._run_test("widget.save_state", "WidgetAPI", self._test_widget_save_state) self._run_test("widget.load_state", "WidgetAPI", self._test_widget_load_state) def _test_create_widget(self): """Test create_widget method.""" self.test_widget = self.widget_api.create_widget( name="test_widget_123", title="Test Widget", size=(300, 200) ) assert self.test_widget is not None assert self.test_widget.name == "test_widget_123" def _test_get_widget(self): """Test get_widget method.""" widget = self.widget_api.get_widget("test_widget_123") assert widget is not None def _test_widget_exists(self): """Test widget_exists method.""" exists = self.widget_api.widget_exists("test_widget_123") assert exists is True def _test_get_all_widgets(self): """Test get_all_widgets method.""" widgets = self.widget_api.get_all_widgets() assert isinstance(widgets, list) def _test_get_visible_widgets(self): """Test get_visible_widgets method.""" widgets = self.widget_api.get_visible_widgets() assert isinstance(widgets, list) def _test_show_widget(self): """Test show_widget method.""" result = self.widget_api.show_widget("test_widget_123") assert isinstance(result, bool) def _test_hide_widget(self): """Test hide_widget method.""" result = self.widget_api.hide_widget("test_widget_123") assert isinstance(result, bool) def _test_close_widget(self): """Test close_widget method.""" # Create a temp widget to close temp = self.widget_api.create_widget( name="temp_to_close", title="Temp", size=(100, 100) ) result = self.widget_api.close_widget("temp_to_close") assert isinstance(result, bool) def _test_show_all_widgets(self): """Test show_all_widgets method.""" self.widget_api.show_all_widgets() # No return value, just ensure no exception def _test_hide_all_widgets(self): """Test hide_all_widgets method.""" self.widget_api.hide_all_widgets() # No return value self.widget_api.show_all_widgets() # Restore def _test_set_all_opacity(self): """Test set_all_opacity method.""" self.widget_api.set_all_opacity(0.8) # Verify self.widget_api.set_all_opacity(0.95) def _test_lock_all(self): """Test lock_all method.""" self.widget_api.lock_all() def _test_unlock_all(self): """Test unlock_all method.""" self.widget_api.unlock_all() def _test_arrange_widgets(self): """Test arrange_widgets method.""" self.widget_api.arrange_widgets(layout="grid", spacing=10) self.widget_api.arrange_widgets(layout="horizontal") self.widget_api.arrange_widgets(layout="vertical") self.widget_api.arrange_widgets(layout="cascade") def _test_snap_to_grid(self): """Test snap_to_grid method.""" self.widget_api.snap_to_grid(grid_size=10) def _test_register_preset(self): """Test register_preset method.""" preset = WidgetConfig( name="preset_test", title="Preset Test", widget_type=WidgetType.MINI, size=(200, 150) ) self.widget_api.register_preset("test_preset", preset) def _test_create_from_preset(self): """Test create_from_preset method.""" widget = self.widget_api.create_from_preset("test_preset", name="from_preset") assert widget is not None or widget is None # May fail if preset not registered properly if widget: self.widget_api.close_widget("from_preset") def _test_save_all_states(self): """Test save_all_states method.""" states = self.widget_api.save_all_states() assert isinstance(states, dict) def _test_load_all_states(self): """Test load_all_states method.""" states = self.widget_api.save_all_states() self.widget_api.load_all_states(states) # Widget instance methods def _test_widget_show(self): """Test widget.show() method.""" if self.test_widget: self.test_widget.show() def _test_widget_hide(self): """Test widget.hide() method.""" if self.test_widget: self.test_widget.hide() self.test_widget.show() # Restore def _test_widget_move(self): """Test widget.move() method.""" if self.test_widget: self.test_widget.move(200, 200) x, y = self.test_widget.position assert x == 200 and y == 200 def _test_widget_resize(self): """Test widget.resize() method.""" if self.test_widget: self.test_widget.resize(400, 300) w, h = self.test_widget.size assert w == 400 and h == 300 def _test_widget_set_opacity(self): """Test widget.set_opacity() method.""" if self.test_widget: self.test_widget.set_opacity(0.5) assert self.test_widget.config.opacity == 0.5 def _test_widget_set_title(self): """Test widget.set_title() method.""" if self.test_widget: self.test_widget.set_title("New Title") assert self.test_widget.config.title == "New Title" def _test_widget_set_locked(self): """Test widget.set_locked() method.""" if self.test_widget: self.test_widget.set_locked(True) assert self.test_widget.config.locked == True self.test_widget.set_locked(False) def _test_widget_minimize(self): """Test widget.minimize() method.""" if self.test_widget: self.test_widget.minimize() def _test_widget_restore(self): """Test widget.restore() method.""" if self.test_widget: self.test_widget.restore() def _test_widget_raise(self): """Test widget.raise_widget() method.""" if self.test_widget: self.test_widget.raise_widget() def _test_widget_lower(self): """Test widget.lower_widget() method.""" if self.test_widget: self.test_widget.lower_widget() def _test_widget_save_state(self): """Test widget.save_state() method.""" if self.test_widget: state = self.test_widget.save_state() assert isinstance(state, dict) assert 'config' in state def _test_widget_load_state(self): """Test widget.load_state() method.""" if self.test_widget: state = self.test_widget.save_state() self.test_widget.load_state(state) # ===================================================================== # ExternalAPI Tests # ===================================================================== def _test_external_api(self): """Test all ExternalAPI methods.""" self._run_test("start_server", "ExternalAPI", self._test_start_server) self._run_test("get_status", "ExternalAPI", self._test_get_status) self._run_test("get_url", "ExternalAPI", self._test_get_url) self._run_test("register_endpoint", "ExternalAPI", self._test_register_endpoint) self._run_test("unregister_endpoint", "ExternalAPI", self._test_unregister_endpoint) self._run_test("get_endpoints", "ExternalAPI", self._test_get_endpoints) self._run_test("register_webhook", "ExternalAPI", self._test_register_webhook) self._run_test("unregister_webhook", "ExternalAPI", self._test_unregister_webhook) self._run_test("get_webhooks", "ExternalAPI", self._test_get_webhooks) self._run_test("get_webhook_history", "ExternalAPI", self._test_get_webhook_history) self._run_test("post_webhook", "ExternalAPI", self._test_post_webhook) self._run_test("create_api_key", "ExternalAPI", self._test_create_api_key) self._run_test("revoke_api_key", "ExternalAPI", self._test_revoke_api_key) self._run_test("register_ipc_handler", "ExternalAPI", self._test_register_ipc_handler) self._run_test("send_ipc", "ExternalAPI", self._test_send_ipc) self._run_test("stop_server", "ExternalAPI", self._test_stop_server) def _test_start_server(self): """Test start_server method.""" result = self.external_api.start_server(port=8765) assert isinstance(result, bool) def _test_get_status(self): """Test get_status method.""" status = self.external_api.get_status() assert isinstance(status, dict) assert 'server_running' in status def _test_get_url(self): """Test get_url method.""" url = self.external_api.get_url("api/test") assert isinstance(url, str) def _test_register_endpoint(self): """Test register_endpoint method.""" def handler(params): return {"test": "data"} self.external_api.register_endpoint("test_endpoint", handler, methods=["GET"]) def _test_unregister_endpoint(self): """Test unregister_endpoint method.""" result = self.external_api.unregister_endpoint("test_endpoint") assert isinstance(result, bool) def _test_get_endpoints(self): """Test get_endpoints method.""" endpoints = self.external_api.get_endpoints() assert isinstance(endpoints, list) def _test_register_webhook(self): """Test register_webhook method.""" def handler(payload): return {"received": True} self.external_api.register_webhook("test_webhook", handler) def _test_unregister_webhook(self): """Test unregister_webhook method.""" result = self.external_api.unregister_webhook("test_webhook") assert isinstance(result, bool) def _test_get_webhooks(self): """Test get_webhooks method.""" webhooks = self.external_api.get_webhooks() assert isinstance(webhooks, list) def _test_get_webhook_history(self): """Test get_webhook_history method.""" history = self.external_api.get_webhook_history(limit=10) assert isinstance(history, list) def _test_post_webhook(self): """Test post_webhook method.""" result = self.external_api.post_webhook( "https://httpbin.org/post", {"test": "data"}, timeout=5 ) assert isinstance(result, dict) assert 'success' in result def _test_create_api_key(self): """Test create_api_key method.""" key = self.external_api.create_api_key("test_key", permissions=["read"]) assert isinstance(key, str) assert len(key) > 0 self._test_api_key = key def _test_revoke_api_key(self): """Test revoke_api_key method.""" # Create then revoke key = self.external_api.create_api_key("temp_key") result = self.external_api.revoke_api_key(key) assert isinstance(result, bool) def _test_register_ipc_handler(self): """Test register_ipc_handler method.""" def handler(data): pass self.external_api.register_ipc_handler("test_channel", handler) def _test_send_ipc(self): """Test send_ipc method.""" result = self.external_api.send_ipc("test_channel", {"message": "test"}) assert isinstance(result, bool) def _test_stop_server(self): """Test stop_server method.""" result = self.external_api.stop_server() assert isinstance(result, bool) def shutdown(self): """Clean up test resources.""" # Close test widget if self.test_widget: self.test_widget.close() # Save test results results_data = { "timestamp": datetime.now().isoformat(), "results": [asdict(r) for r in self.results], "summary": { "total": len(self.results), "passed": sum(1 for r in self.results if r.passed), "failed": sum(1 for r in self.results if not r.passed) } } try: import json with open("api_comprehensive_test_results.json", "w") as f: json.dump(results_data, f, indent=2) except: pass # Plugin entry point plugin_class = APIComprehensiveTestPlugin