631 lines
21 KiB
Python
631 lines
21 KiB
Python
"""
|
|
Error Handling Test Plugin
|
|
|
|
Tests error conditions and exception handling:
|
|
- Invalid input validation
|
|
- Service unavailable scenarios
|
|
- Resource not found errors
|
|
- Type errors and malformed data
|
|
- Timeout handling
|
|
- Edge cases and boundary conditions
|
|
|
|
Verifies APIs handle errors gracefully without crashing.
|
|
"""
|
|
|
|
import time
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Dict, List, Any, Tuple
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from core.base_plugin import BasePlugin
|
|
from core.api.plugin_api import get_api, PluginAPIError, ServiceNotAvailableError
|
|
from core.api.widget_api import get_widget_api, WidgetType
|
|
from core.api.external_api import get_external_api, ExternalAPIError
|
|
|
|
|
|
class ErrorType(Enum):
|
|
"""Types of errors tested."""
|
|
INVALID_INPUT = "invalid_input"
|
|
SERVICE_UNAVAILABLE = "service_unavailable"
|
|
RESOURCE_NOT_FOUND = "resource_not_found"
|
|
TYPE_ERROR = "type_error"
|
|
TIMEOUT = "timeout"
|
|
BOUNDARY = "boundary"
|
|
UNEXPECTED = "unexpected"
|
|
|
|
|
|
@dataclass
|
|
class ErrorTestResult:
|
|
"""Result of an error handling test."""
|
|
api: str
|
|
test_name: str
|
|
error_type: ErrorType
|
|
handled_gracefully: bool
|
|
correct_exception: bool
|
|
error_message: str = ""
|
|
details: Dict = None
|
|
|
|
|
|
class ErrorHandlingTestPlugin(BasePlugin):
|
|
"""
|
|
Error handling test suite for EU-Utility APIs.
|
|
|
|
Tests how APIs respond to invalid inputs, missing resources,
|
|
and exceptional conditions.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.api = None
|
|
self.widget_api = None
|
|
self.external_api = None
|
|
self.results: List[ErrorTestResult] = []
|
|
self.widget = None
|
|
|
|
def initialize(self):
|
|
"""Initialize and run error handling tests."""
|
|
self.api = get_api()
|
|
self.widget_api = get_widget_api()
|
|
self.external_api = get_external_api()
|
|
|
|
self._create_results_widget()
|
|
self._run_all_tests()
|
|
|
|
def _create_results_widget(self):
|
|
"""Create widget for results display."""
|
|
self.widget = self.widget_api.create_widget(
|
|
name="error_handling_test",
|
|
title="🛡️ Error Handling Test",
|
|
size=(900, 650),
|
|
position=(250, 150),
|
|
widget_type=WidgetType.CUSTOM
|
|
)
|
|
self._update_widget_display()
|
|
self.widget.show()
|
|
|
|
def _update_widget_display(self):
|
|
"""Update widget content."""
|
|
try:
|
|
from PyQt6.QtWidgets import (
|
|
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
|
|
QPushButton, QTableWidget, QTableWidgetItem,
|
|
QHeaderView, QGroupBox, QTextBrowser
|
|
)
|
|
from PyQt6.QtCore import Qt
|
|
from PyQt6.QtGui import QColor
|
|
|
|
container = QWidget()
|
|
main_layout = QVBoxLayout(container)
|
|
|
|
# Header
|
|
header = QLabel("🛡️ Error Handling Test Suite")
|
|
header.setStyleSheet("font-size: 22px; font-weight: bold; color: #ff8c42;")
|
|
main_layout.addWidget(header)
|
|
|
|
# Summary stats
|
|
if self.results:
|
|
summary_layout = QHBoxLayout()
|
|
|
|
total = len(self.results)
|
|
graceful = sum(1 for r in self.results if r.handled_gracefully)
|
|
correct_exc = sum(1 for r in self.results if r.correct_exception)
|
|
|
|
stats = [
|
|
("Tests", str(total)),
|
|
("Graceful", f"{graceful}/{total}"),
|
|
("Correct Exception", f"{correct_exc}/{total}")
|
|
]
|
|
|
|
for title, value in stats:
|
|
group = QGroupBox(title)
|
|
group_layout = QVBoxLayout(group)
|
|
lbl = QLabel(value)
|
|
lbl.setStyleSheet("font-size: 18px; font-weight: bold;")
|
|
lbl.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
|
|
|
if "Graceful" in title:
|
|
lbl.setStyleSheet(f"font-size: 18px; font-weight: bold; color: {'#4ecca3' if graceful == total else '#ffd93d'};")
|
|
elif "Correct" in title:
|
|
lbl.setStyleSheet(f"font-size: 18px; font-weight: bold; color: {'#4ecca3' if correct_exc == total else '#ffd93d'};")
|
|
|
|
group_layout.addWidget(lbl)
|
|
summary_layout.addWidget(group)
|
|
|
|
main_layout.addLayout(summary_layout)
|
|
|
|
# Results table
|
|
self.results_table = QTableWidget()
|
|
self.results_table.setColumnCount(6)
|
|
self.results_table.setHorizontalHeaderLabels([
|
|
"API", "Test", "Error Type", "Handled", "Correct Exc", "Error Message"
|
|
])
|
|
self.results_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
|
|
self._populate_results_table()
|
|
main_layout.addWidget(self.results_table)
|
|
|
|
# Controls
|
|
btn_layout = QHBoxLayout()
|
|
|
|
btn_run = QPushButton("▶ Run All Error Tests")
|
|
btn_run.clicked.connect(self._run_all_tests)
|
|
btn_layout.addWidget(btn_run)
|
|
|
|
btn_summary = QPushButton("📋 View Summary Report")
|
|
btn_summary.clicked.connect(self._show_summary_report)
|
|
btn_layout.addWidget(btn_summary)
|
|
|
|
main_layout.addLayout(btn_layout)
|
|
|
|
self.widget.set_content(container)
|
|
|
|
except ImportError as e:
|
|
print(f"Widget error: {e}")
|
|
|
|
def _populate_results_table(self):
|
|
"""Populate results table."""
|
|
if not hasattr(self, 'results_table'):
|
|
return
|
|
|
|
self.results_table.setRowCount(len(self.results))
|
|
|
|
for i, r in enumerate(self.results):
|
|
self.results_table.setItem(i, 0, QTableWidgetItem(r.api))
|
|
self.results_table.setItem(i, 1, QTableWidgetItem(r.test_name))
|
|
self.results_table.setItem(i, 2, QTableWidgetItem(r.error_type.value))
|
|
|
|
handled_item = QTableWidgetItem("✅" if r.handled_gracefully else "❌")
|
|
handled_item.setForeground(QColor("#4ecca3" if r.handled_gracefully else "#ff6b6b"))
|
|
self.results_table.setItem(i, 3, handled_item)
|
|
|
|
correct_item = QTableWidgetItem("✅" if r.correct_exception else "⚠️")
|
|
self.results_table.setItem(i, 4, correct_item)
|
|
|
|
msg = r.error_message[:50] + "..." if len(r.error_message) > 50 else r.error_message
|
|
self.results_table.setItem(i, 5, QTableWidgetItem(msg))
|
|
|
|
def _run_test(self, api: str, test_name: str, error_type: ErrorType,
|
|
test_func) -> ErrorTestResult:
|
|
"""Run a single error handling test."""
|
|
error_occurred = False
|
|
handled_gracefully = False
|
|
correct_exception = False
|
|
error_message = ""
|
|
details = {}
|
|
|
|
try:
|
|
test_func()
|
|
# If no error occurred, check if we expected one
|
|
error_message = "No error occurred (may be expected)"
|
|
handled_gracefully = True
|
|
|
|
except ServiceNotAvailableError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = error_type == ErrorType.SERVICE_UNAVAILABLE
|
|
error_message = str(e)
|
|
|
|
except PluginAPIError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = True
|
|
error_message = str(e)
|
|
|
|
except ExternalAPIError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = True
|
|
error_message = str(e)
|
|
|
|
except ValueError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = error_type in [ErrorType.INVALID_INPUT, ErrorType.BOUNDARY]
|
|
error_message = str(e)
|
|
|
|
except TypeError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = error_type == ErrorType.TYPE_ERROR
|
|
error_message = str(e)
|
|
|
|
except TimeoutError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = error_type == ErrorType.TIMEOUT
|
|
error_message = str(e)
|
|
|
|
except KeyError as e:
|
|
error_occurred = True
|
|
handled_gracefully = True
|
|
correct_exception = error_type == ErrorType.RESOURCE_NOT_FOUND
|
|
error_message = f"KeyError: {e}"
|
|
|
|
except Exception as e:
|
|
error_occurred = True
|
|
handled_gracefully = False # Unexpected exception type
|
|
correct_exception = False
|
|
error_message = f"{type(e).__name__}: {str(e)[:100]}"
|
|
details["exception_type"] = type(e).__name__
|
|
|
|
result = ErrorTestResult(
|
|
api=api,
|
|
test_name=test_name,
|
|
error_type=error_type,
|
|
handled_gracefully=handled_gracefully,
|
|
correct_exception=correct_exception,
|
|
error_message=error_message,
|
|
details=details
|
|
)
|
|
|
|
self.results.append(result)
|
|
return result
|
|
|
|
def _run_all_tests(self):
|
|
"""Execute all error handling tests."""
|
|
self.results.clear()
|
|
|
|
# PluginAPI Error Tests
|
|
self._test_pluginapi_errors()
|
|
|
|
# WidgetAPI Error Tests
|
|
self._test_widgetapi_errors()
|
|
|
|
# ExternalAPI Error Tests
|
|
self._test_externalapi_errors()
|
|
|
|
self._update_widget_display()
|
|
|
|
def _test_pluginapi_errors(self):
|
|
"""Test PluginAPI error handling."""
|
|
# Test invalid log line count
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Invalid log line count (negative)",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.read_log_lines(-1)
|
|
)
|
|
|
|
# Test invalid log line count (too large)
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Invalid log line count (excessive)",
|
|
ErrorType.BOUNDARY,
|
|
lambda: self.api.read_log_lines(10000000)
|
|
)
|
|
|
|
# Test OCR with invalid region
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"OCR invalid region",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.recognize_text((-1, -1, -1, -1))
|
|
)
|
|
|
|
# Test capture with invalid region
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Screenshot invalid region",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.capture_screen((-100, -100, 0, 0))
|
|
)
|
|
|
|
# Test HTTP with invalid URL
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"HTTP invalid URL",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.http_get("not_a_valid_url")
|
|
)
|
|
|
|
# Test HTTP with malformed URL
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"HTTP malformed URL",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.http_get("")
|
|
)
|
|
|
|
# Test play_sound with invalid path
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Play sound invalid path",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.api.play_sound("/nonexistent/path/to/sound.wav")
|
|
)
|
|
|
|
# Test notification with empty title
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Notification empty title",
|
|
ErrorType.BOUNDARY,
|
|
lambda: self.api.show_notification("", "message")
|
|
)
|
|
|
|
# Test set_data with non-serializable object
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Set data non-serializable",
|
|
ErrorType.TYPE_ERROR,
|
|
lambda: self.api.set_data("test_key", lambda x: x)
|
|
)
|
|
|
|
# Test subscribe with non-callable
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Subscribe non-callable",
|
|
ErrorType.TYPE_ERROR,
|
|
lambda: self.api.subscribe("test", "not_a_function")
|
|
)
|
|
|
|
# Test unsubscribe with invalid ID
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Unsubscribe invalid ID",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.api.unsubscribe("invalid_subscription_id_12345")
|
|
)
|
|
|
|
# Test cancel_task with invalid ID
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Cancel task invalid ID",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.api.cancel_task("invalid_task_id_12345")
|
|
)
|
|
|
|
# Test get_data with None key
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Get data None key",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.api.get_data(None)
|
|
)
|
|
|
|
# Test volume out of range
|
|
self._run_test(
|
|
"PluginAPI",
|
|
"Play sound volume out of range",
|
|
ErrorType.BOUNDARY,
|
|
lambda: self.api.play_sound("test.wav", volume=5.0)
|
|
)
|
|
|
|
def _test_widgetapi_errors(self):
|
|
"""Test WidgetAPI error handling."""
|
|
# Test duplicate widget name
|
|
def create_duplicate():
|
|
w1 = self.widget_api.create_widget(name="duplicate_test", title="Test")
|
|
w2 = self.widget_api.create_widget(name="duplicate_test", title="Test 2")
|
|
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Duplicate widget name",
|
|
ErrorType.INVALID_INPUT,
|
|
create_duplicate
|
|
)
|
|
|
|
# Cleanup if created
|
|
try:
|
|
self.widget_api.close_widget("duplicate_test")
|
|
except:
|
|
pass
|
|
|
|
# Test get non-existent widget
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Get non-existent widget",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.widget_api.get_widget("definitely_does_not_exist_12345")
|
|
)
|
|
|
|
# Test operations on non-existent widget
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Show non-existent widget",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.widget_api.show_widget("definitely_does_not_exist_12345")
|
|
)
|
|
|
|
# Test invalid widget size
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Create widget with negative size",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.widget_api.create_widget(name="bad_size", title="Bad", size=(-100, -100))
|
|
)
|
|
|
|
# Test invalid opacity
|
|
def test_bad_opacity():
|
|
w = self.widget_api.create_widget(name="opacity_test", title="Test", size=(100, 100))
|
|
w.set_opacity(5.0) # Should be clamped
|
|
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Set opacity out of range",
|
|
ErrorType.BOUNDARY,
|
|
test_bad_opacity
|
|
)
|
|
|
|
# Cleanup
|
|
try:
|
|
self.widget_api.close_widget("opacity_test")
|
|
except:
|
|
pass
|
|
|
|
# Test load state with invalid data
|
|
def load_invalid_state():
|
|
w = self.widget_api.create_widget(name="state_test", title="Test")
|
|
w.load_state({"invalid": "state_data"})
|
|
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Load invalid state",
|
|
ErrorType.INVALID_INPUT,
|
|
load_invalid_state
|
|
)
|
|
|
|
# Cleanup
|
|
try:
|
|
self.widget_api.close_widget("state_test")
|
|
except:
|
|
pass
|
|
|
|
# Test close already closed widget
|
|
def close_closed():
|
|
w = self.widget_api.create_widget(name="close_test", title="Test")
|
|
w.close()
|
|
w.close() # Second close
|
|
|
|
self._run_test(
|
|
"WidgetAPI",
|
|
"Close already closed widget",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
close_closed
|
|
)
|
|
|
|
def _test_externalapi_errors(self):
|
|
"""Test ExternalAPI error handling."""
|
|
# Test start server on invalid port
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Start server invalid port",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.external_api.start_server(port=-1)
|
|
)
|
|
|
|
# Test register endpoint with invalid path
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Register endpoint invalid path",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.external_api.register_endpoint("", lambda x: x)
|
|
)
|
|
|
|
# Test register webhook with invalid name
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Register webhook invalid name",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.external_api.register_webhook("", lambda x: x)
|
|
)
|
|
|
|
# Test unregister non-existent endpoint
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Unregister non-existent endpoint",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.external_api.unregister_endpoint("definitely_not_registered")
|
|
)
|
|
|
|
# Test unregister non-existent webhook
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Unregister non-existent webhook",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.external_api.unregister_webhook("definitely_not_registered")
|
|
)
|
|
|
|
# Test revoke non-existent API key
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Revoke non-existent API key",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.external_api.revoke_api_key("invalid_key_12345")
|
|
)
|
|
|
|
# Test post_webhook with invalid URL
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Post webhook invalid URL",
|
|
ErrorType.INVALID_INPUT,
|
|
lambda: self.external_api.post_webhook("not_a_url", {})
|
|
)
|
|
|
|
# Test post_webhook with unreachable host
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Post webhook unreachable host",
|
|
ErrorType.TIMEOUT,
|
|
lambda: self.external_api.post_webhook(
|
|
"http://192.0.2.1:9999/test", # TEST-NET-1, should be unreachable
|
|
{},
|
|
timeout=1
|
|
)
|
|
)
|
|
|
|
# Test IPC send to non-existent channel
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"IPC send non-existent channel",
|
|
ErrorType.RESOURCE_NOT_FOUND,
|
|
lambda: self.external_api.send_ipc("nonexistent_channel", {})
|
|
)
|
|
|
|
# Test get_url when server not running
|
|
def get_url_not_running():
|
|
# Ensure server is stopped
|
|
self.external_api.stop_server()
|
|
url = self.external_api.get_url("test")
|
|
if not url:
|
|
raise ValueError("Empty URL when server not running")
|
|
|
|
self._run_test(
|
|
"ExternalAPI",
|
|
"Get URL server not running",
|
|
ErrorType.SERVICE_UNAVAILABLE,
|
|
get_url_not_running
|
|
)
|
|
|
|
def _show_summary_report(self):
|
|
"""Display summary report."""
|
|
if not self.results:
|
|
self.api.show_notification("No Results", "Run tests first")
|
|
return
|
|
|
|
total = len(self.results)
|
|
graceful = sum(1 for r in self.results if r.handled_gracefully)
|
|
correct = sum(1 for r in self.results if r.correct_exception)
|
|
|
|
report = f"""
|
|
Error Handling Test Summary
|
|
===========================
|
|
|
|
Total Tests: {total}
|
|
Handled Gracefully: {graceful}/{total} ({graceful/total*100:.1f}%)
|
|
Correct Exception: {correct}/{total} ({correct/total*100:.1f}%)
|
|
|
|
By Error Type:
|
|
"""
|
|
|
|
for error_type in ErrorType:
|
|
type_tests = [r for r in self.results if r.error_type == error_type]
|
|
if type_tests:
|
|
type_graceful = sum(1 for r in type_tests if r.handled_gracefully)
|
|
report += f" {error_type.value}: {type_graceful}/{len(type_tests)} graceful\n"
|
|
|
|
report += "\nBy API:\n"
|
|
for api in ["PluginAPI", "WidgetAPI", "ExternalAPI"]:
|
|
api_tests = [r for r in self.results if r.api == api]
|
|
if api_tests:
|
|
api_graceful = sum(1 for r in api_tests if r.handled_gracefully)
|
|
report += f" {api}: {api_graceful}/{len(api_tests)} graceful\n"
|
|
|
|
print(report)
|
|
self.api.show_notification("Report Generated", "See console for full report")
|
|
|
|
def shutdown(self):
|
|
"""Clean up resources."""
|
|
# Clean up any test widgets
|
|
for name in ["duplicate_test", "opacity_test", "state_test", "close_test"]:
|
|
try:
|
|
self.widget_api.close_widget(name)
|
|
except:
|
|
pass
|
|
|
|
if self.widget:
|
|
self.widget.close()
|
|
|
|
|
|
# Plugin entry point
|
|
plugin_class = ErrorHandlingTestPlugin |