511 lines
16 KiB
Python
511 lines
16 KiB
Python
"""
|
|
Unit Tests - API Integration
|
|
=============================
|
|
|
|
Tests for Plugin API, Nexus API, and external service integration.
|
|
"""
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
|
|
|
|
class TestPluginAPI:
|
|
"""Test Plugin API functionality."""
|
|
|
|
def test_plugin_api_singleton(self):
|
|
"""Test PluginAPI is a singleton."""
|
|
from core.plugin_api import PluginAPI, get_api
|
|
|
|
api1 = get_api()
|
|
api2 = get_api()
|
|
|
|
assert api1 is api2
|
|
|
|
def test_plugin_api_service_registration(self):
|
|
"""Test service registration."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_service = Mock()
|
|
|
|
api.register_log_service(mock_service)
|
|
|
|
assert api._services['log_reader'] == mock_service
|
|
|
|
def test_read_log_lines(self):
|
|
"""Test reading log lines."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_reader = Mock(return_value=["line1", "line2", "line3"])
|
|
api.register_log_service(mock_reader)
|
|
|
|
lines = api.read_log_lines(3)
|
|
|
|
assert len(lines) == 3
|
|
assert lines[0] == "line1"
|
|
mock_reader.assert_called_once_with(3)
|
|
|
|
def test_read_log_lines_service_unavailable(self):
|
|
"""Test reading log lines when service unavailable."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
# No service registered
|
|
|
|
lines = api.read_log_lines(10)
|
|
|
|
assert lines == []
|
|
|
|
def test_get_eu_window(self, mock_window_info):
|
|
"""Test getting EU window info."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_wm = Mock()
|
|
mock_wm.is_available.return_value = True
|
|
|
|
window_mock = Mock()
|
|
window_mock.title = mock_window_info["title"]
|
|
window_mock.hwnd = mock_window_info["handle"]
|
|
window_mock.x = mock_window_info["rect"][0]
|
|
window_mock.y = mock_window_info["rect"][1]
|
|
window_mock.width = mock_window_info["width"]
|
|
window_mock.height = mock_window_info["height"]
|
|
window_mock.is_focused.return_value = True
|
|
window_mock.is_visible.return_value = True
|
|
|
|
mock_wm.find_eu_window.return_value = window_mock
|
|
api.register_window_service(mock_wm)
|
|
|
|
info = api.get_eu_window()
|
|
|
|
assert info is not None
|
|
assert info['title'] == "Entropia Universe"
|
|
assert info['is_focused'] is True
|
|
|
|
def test_get_eu_window_not_available(self):
|
|
"""Test getting EU window when unavailable."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_wm = Mock()
|
|
mock_wm.is_available.return_value = False
|
|
api.register_window_service(mock_wm)
|
|
|
|
info = api.get_eu_window()
|
|
|
|
assert info is None
|
|
|
|
def test_is_eu_focused_true(self):
|
|
"""Test checking if EU is focused - true case."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_wm = Mock()
|
|
mock_wm.is_available.return_value = True
|
|
|
|
window_mock = Mock()
|
|
window_mock.is_focused.return_value = True
|
|
mock_wm.find_eu_window.return_value = window_mock
|
|
api.register_window_service(mock_wm)
|
|
|
|
result = api.is_eu_focused()
|
|
|
|
assert result is True
|
|
|
|
def test_is_eu_focused_false(self):
|
|
"""Test checking if EU is focused - false case."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_wm = Mock()
|
|
mock_wm.is_available.return_value = True
|
|
|
|
window_mock = Mock()
|
|
window_mock.is_focused.return_value = False
|
|
mock_wm.find_eu_window.return_value = window_mock
|
|
api.register_window_service(mock_wm)
|
|
|
|
result = api.is_eu_focused()
|
|
|
|
assert result is False
|
|
|
|
def test_recognize_text(self, mock_ocr_result):
|
|
"""Test OCR text recognition."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_ocr = Mock(return_value=mock_ocr_result)
|
|
api.register_ocr_service(mock_ocr)
|
|
|
|
text = api.recognize_text(region=(0, 0, 100, 50))
|
|
|
|
assert "Inventory" in text
|
|
assert "PED: 1500.00" in text
|
|
|
|
def test_recognize_text_service_unavailable(self):
|
|
"""Test OCR when service unavailable."""
|
|
from core.plugin_api import PluginAPI, ServiceNotAvailableError
|
|
|
|
api = PluginAPI()
|
|
# No OCR service registered
|
|
|
|
with pytest.raises(ServiceNotAvailableError):
|
|
api.recognize_text(region=(0, 0, 100, 50))
|
|
|
|
def test_ocr_available_true(self):
|
|
"""Test OCR availability check - true."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_ocr_service(Mock())
|
|
|
|
assert api.ocr_available() is True
|
|
|
|
def test_ocr_available_false(self):
|
|
"""Test OCR availability check - false."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
|
|
assert api.ocr_available() is False
|
|
|
|
def test_capture_screen(self):
|
|
"""Test screen capture."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_screenshot = Mock()
|
|
mock_screenshot.is_available.return_value = True
|
|
mock_image = Mock()
|
|
mock_image.size = (1920, 1080)
|
|
mock_screenshot.capture.return_value = mock_image
|
|
api.register_screenshot_service(mock_screenshot)
|
|
|
|
result = api.capture_screen(region=(0, 0, 1920, 1080))
|
|
|
|
assert result is not None
|
|
assert result.size == (1920, 1080)
|
|
|
|
def test_screenshot_available_true(self):
|
|
"""Test screenshot availability - true."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_screenshot = Mock()
|
|
mock_screenshot.is_available.return_value = True
|
|
api.register_screenshot_service(mock_screenshot)
|
|
|
|
assert api.screenshot_available() is True
|
|
|
|
def test_search_items(self, mock_nexus_response):
|
|
"""Test Nexus item search."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_nexus = Mock()
|
|
mock_nexus.search_items.return_value = mock_nexus_response["data"]
|
|
api.register_nexus_service(mock_nexus)
|
|
|
|
items = api.search_items("omegaton", limit=5)
|
|
|
|
assert len(items) == 2
|
|
assert items[0]["Name"] == "Omegaton A104"
|
|
|
|
def test_get_item_details(self, mock_nexus_response):
|
|
"""Test getting item details."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_nexus = Mock()
|
|
mock_nexus.get_item.return_value = mock_nexus_response["data"][0]
|
|
api.register_nexus_service(mock_nexus)
|
|
|
|
item = api.get_item_details(12345)
|
|
|
|
assert item is not None
|
|
assert item["Name"] == "Omegaton A104"
|
|
|
|
def test_http_get(self, mock_http_client):
|
|
"""Test HTTP GET request."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_http_service(mock_http_client)
|
|
|
|
result = api.http_get("https://api.example.com/data")
|
|
|
|
assert result['success'] is True
|
|
assert result['data'] == {"test": "data"}
|
|
|
|
def test_http_get_service_unavailable(self):
|
|
"""Test HTTP GET when service unavailable."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
|
|
result = api.http_get("https://api.example.com/data")
|
|
|
|
assert result['success'] is False
|
|
assert 'error' in result
|
|
|
|
def test_play_sound(self):
|
|
"""Test playing sound."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_audio = Mock()
|
|
api.register_audio_service(mock_audio)
|
|
|
|
result = api.play_sound("alert.wav", volume=0.7)
|
|
|
|
assert result is True
|
|
mock_audio.play.assert_called_once_with("alert.wav", volume=0.7)
|
|
|
|
def test_show_notification(self):
|
|
"""Test showing notification."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_notification = Mock()
|
|
api.register_notification_service(mock_notification)
|
|
|
|
result = api.show_notification("Title", "Message", duration=3000, sound=True)
|
|
|
|
assert result is True
|
|
mock_notification.show.assert_called_once_with(
|
|
"Title", "Message", duration=3000, sound=True
|
|
)
|
|
|
|
def test_copy_to_clipboard(self):
|
|
"""Test clipboard copy."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_clipboard = Mock()
|
|
api.register_clipboard_service(mock_clipboard)
|
|
|
|
result = api.copy_to_clipboard("Test text")
|
|
|
|
assert result is True
|
|
mock_clipboard.copy.assert_called_once_with("Test text")
|
|
|
|
def test_paste_from_clipboard(self):
|
|
"""Test clipboard paste."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_clipboard = Mock()
|
|
mock_clipboard.paste.return_value = "Pasted text"
|
|
api.register_clipboard_service(mock_clipboard)
|
|
|
|
result = api.paste_from_clipboard()
|
|
|
|
assert result == "Pasted text"
|
|
|
|
def test_event_bus_subscribe(self, event_bus):
|
|
"""Test event subscription."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_event_bus(event_bus)
|
|
|
|
callback = Mock()
|
|
sub_id = api.subscribe("test_event", callback)
|
|
|
|
assert sub_id != ""
|
|
|
|
# Test publishing triggers callback
|
|
api.publish("test_event", {"data": "test"})
|
|
callback.assert_called_once()
|
|
|
|
def test_event_bus_unsubscribe(self, event_bus):
|
|
"""Test event unsubscription."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_event_bus(event_bus)
|
|
|
|
callback = Mock()
|
|
sub_id = api.subscribe("test_event", callback)
|
|
|
|
result = api.unsubscribe(sub_id)
|
|
|
|
assert result is True
|
|
|
|
def test_data_store_get_set(self, data_store):
|
|
"""Test data store get/set operations."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_data_service(data_store)
|
|
|
|
# Set data
|
|
result = api.set_data("test_key", {"value": 123})
|
|
assert result is True
|
|
|
|
# Get data
|
|
value = api.get_data("test_key")
|
|
assert value == {"value": 123}
|
|
|
|
def test_data_store_get_default(self, data_store):
|
|
"""Test data store get with default."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
api.register_data_service(data_store)
|
|
|
|
value = api.get_data("nonexistent_key", default="default_value")
|
|
assert value == "default_value"
|
|
|
|
def test_run_task(self):
|
|
"""Test running background task."""
|
|
from core.plugin_api import PluginAPI
|
|
|
|
api = PluginAPI()
|
|
mock_tasks = Mock()
|
|
mock_tasks.submit.return_value = "task_123"
|
|
api.register_task_service(mock_tasks)
|
|
|
|
def task_func():
|
|
return "result"
|
|
|
|
task_id = api.run_task(task_func)
|
|
|
|
assert task_id == "task_123"
|
|
|
|
|
|
class TestNexusAPI:
|
|
"""Test Nexus API integration."""
|
|
|
|
def test_nexus_api_initialization(self):
|
|
"""Test Nexus API initialization."""
|
|
from core.nexus_api import NexusAPI
|
|
|
|
api = NexusAPI()
|
|
|
|
assert api.base_url == "https://api.entropianexus.com"
|
|
assert api.api_key is None # No key set by default
|
|
|
|
def test_nexus_api_search_items(self, mock_nexus_response):
|
|
"""Test Nexus API item search."""
|
|
from core.nexus_api import NexusAPI
|
|
|
|
api = NexusAPI()
|
|
|
|
with patch.object(api, '_make_request') as mock_request:
|
|
mock_request.return_value = mock_nexus_response["data"]
|
|
|
|
items = api.search_items("omegaton", limit=5)
|
|
|
|
assert len(items) == 2
|
|
mock_request.assert_called_once()
|
|
|
|
def test_nexus_api_get_item(self, mock_nexus_response):
|
|
"""Test Nexus API get item details."""
|
|
from core.nexus_api import NexusAPI
|
|
|
|
api = NexusAPI()
|
|
|
|
with patch.object(api, '_make_request') as mock_request:
|
|
mock_request.return_value = mock_nexus_response["data"][0]
|
|
|
|
item = api.get_item(12345)
|
|
|
|
assert item["Name"] == "Omegaton A104"
|
|
mock_request.assert_called_once()
|
|
|
|
def test_nexus_api_error_handling(self):
|
|
"""Test Nexus API error handling."""
|
|
from core.nexus_api import NexusAPI
|
|
|
|
api = NexusAPI()
|
|
|
|
with patch.object(api, '_make_request') as mock_request:
|
|
mock_request.side_effect = Exception("Network error")
|
|
|
|
items = api.search_items("test")
|
|
|
|
assert items == []
|
|
|
|
|
|
class TestHTTPClient:
|
|
"""Test HTTP Client functionality."""
|
|
|
|
def test_http_client_initialization(self):
|
|
"""Test HTTP client initialization."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
assert client.cache_enabled is True
|
|
assert client.timeout == 30
|
|
|
|
def test_http_get_success(self):
|
|
"""Test successful HTTP GET."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
with patch('requests.get') as mock_get:
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"success": True}
|
|
mock_get.return_value = mock_response
|
|
|
|
result = client.get("https://api.example.com/test")
|
|
|
|
assert result['success'] is True
|
|
|
|
def test_http_get_failure(self):
|
|
"""Test failed HTTP GET."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
with patch('requests.get') as mock_get:
|
|
mock_get.side_effect = Exception("Connection error")
|
|
|
|
result = client.get("https://api.example.com/test")
|
|
|
|
assert result['success'] is False
|
|
assert 'error' in result
|
|
|
|
def test_http_post_success(self):
|
|
"""Test successful HTTP POST."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
with patch('requests.post') as mock_post:
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"success": True}
|
|
mock_post.return_value = mock_response
|
|
|
|
result = client.post("https://api.example.com/test", data={"key": "value"})
|
|
|
|
assert result['success'] is True
|
|
|
|
def test_cache_functionality(self):
|
|
"""Test HTTP client caching."""
|
|
from core.http_client import HTTPClient
|
|
|
|
client = HTTPClient()
|
|
|
|
with patch('requests.get') as mock_get:
|
|
mock_response = Mock()
|
|
mock_response.status_code = 200
|
|
mock_response.json.return_value = {"data": "test"}
|
|
mock_get.return_value = mock_response
|
|
|
|
# First call should hit the API
|
|
result1 = client.get("https://api.example.com/test", cache=True)
|
|
|
|
# Second call should use cache
|
|
result2 = client.get("https://api.example.com/test", cache=True)
|
|
|
|
# Request should only be made once
|
|
assert mock_get.call_count == 1
|