414 lines
12 KiB
Python
414 lines
12 KiB
Python
"""
|
|
Unit tests for Task Manager service.
|
|
|
|
Tests cover:
|
|
- Singleton pattern
|
|
- Task creation and execution
|
|
- Task priorities
|
|
- Task status tracking
|
|
- Delayed and periodic tasks
|
|
- Task cancellation
|
|
- Signal emission
|
|
"""
|
|
|
|
import pytest
|
|
import time
|
|
from unittest.mock import MagicMock, patch
|
|
from datetime import datetime
|
|
|
|
from core.tasks import TaskManager, Task, TaskPriority, TaskStatus, get_task_manager
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTaskManagerSingleton:
|
|
"""Test TaskManager singleton behavior."""
|
|
|
|
def test_singleton_instance(self):
|
|
"""Test that TaskManager is a proper singleton."""
|
|
tm1 = get_task_manager()
|
|
tm2 = get_task_manager()
|
|
|
|
assert tm1 is tm2
|
|
assert isinstance(tm1, TaskManager)
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTaskCreation:
|
|
"""Test task creation."""
|
|
|
|
def test_task_initialization(self):
|
|
"""Test Task dataclass initialization."""
|
|
def dummy_func():
|
|
return "result"
|
|
|
|
task = Task(
|
|
id="test_123",
|
|
func=dummy_func,
|
|
args=(),
|
|
kwargs={},
|
|
priority=TaskPriority.NORMAL
|
|
)
|
|
|
|
assert task.id == "test_123"
|
|
assert task.func is dummy_func
|
|
assert task.priority == TaskPriority.NORMAL
|
|
assert task.status == TaskStatus.PENDING
|
|
assert task.created_at is not None
|
|
|
|
def test_task_priority_enum(self):
|
|
"""Test TaskPriority enumeration."""
|
|
assert TaskPriority.HIGH.value == 1
|
|
assert TaskPriority.NORMAL.value == 2
|
|
assert TaskPriority.LOW.value == 3
|
|
|
|
def test_task_status_enum(self):
|
|
"""Test TaskStatus enumeration."""
|
|
assert TaskStatus.PENDING.value == "pending"
|
|
assert TaskStatus.RUNNING.value == "running"
|
|
assert TaskStatus.COMPLETED.value == "completed"
|
|
assert TaskStatus.FAILED.value == "failed"
|
|
assert TaskStatus.CANCELLED.value == "cancelled"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTaskExecution:
|
|
"""Test task execution."""
|
|
|
|
def test_run_in_thread(self):
|
|
"""Test running a function in a thread."""
|
|
tm = TaskManager()
|
|
|
|
def test_func():
|
|
return "success"
|
|
|
|
task_id = tm.run_in_thread(test_func)
|
|
|
|
assert isinstance(task_id, str)
|
|
assert len(task_id) == 8 # UUID first 8 chars
|
|
assert task_id in tm._tasks
|
|
|
|
def test_task_execution_success(self):
|
|
"""Test successful task execution."""
|
|
tm = TaskManager()
|
|
result_received = []
|
|
|
|
def test_func():
|
|
return "success"
|
|
|
|
def on_complete(result):
|
|
result_received.append(result)
|
|
|
|
task_id = tm.run_in_thread(test_func, on_complete=on_complete)
|
|
|
|
# Wait for task to complete
|
|
tm._futures[task_id].result(timeout=2)
|
|
|
|
# Trigger signal manually for testing
|
|
task = tm._tasks[task_id]
|
|
tm._signals.task_completed.emit(task_id, task.result)
|
|
|
|
assert result_received == ["success"]
|
|
|
|
def test_task_execution_error(self):
|
|
"""Test task execution with error."""
|
|
tm = TaskManager()
|
|
error_received = []
|
|
|
|
def error_func():
|
|
raise ValueError("Test error")
|
|
|
|
def on_error(error):
|
|
error_received.append(error)
|
|
|
|
task_id = tm.run_in_thread(error_func, on_error=on_error)
|
|
|
|
# Wait for task to complete (with error)
|
|
try:
|
|
tm._futures[task_id].result(timeout=2)
|
|
except:
|
|
pass
|
|
|
|
# Trigger signal manually for testing
|
|
task = tm._tasks[task_id]
|
|
if task.error:
|
|
tm._signals.task_failed.emit(task_id, task.error)
|
|
|
|
assert len(error_received) == 1
|
|
assert isinstance(error_received[0], ValueError)
|
|
|
|
def test_task_priority(self):
|
|
"""Test task priority assignment."""
|
|
tm = TaskManager()
|
|
|
|
task_id = tm.run_in_thread(lambda: None, priority=TaskPriority.HIGH)
|
|
|
|
task = tm._tasks[task_id]
|
|
assert task.priority == TaskPriority.HIGH
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTaskStatus:
|
|
"""Test task status tracking."""
|
|
|
|
def test_get_task_status_pending(self):
|
|
"""Test getting status of pending task."""
|
|
tm = TaskManager()
|
|
|
|
# Create a task that will take some time
|
|
def slow_func():
|
|
time.sleep(0.1)
|
|
return "done"
|
|
|
|
task_id = tm.run_in_thread(slow_func)
|
|
|
|
# Status should be pending or running
|
|
status = tm.get_task_status(task_id)
|
|
assert status in [TaskStatus.PENDING, TaskStatus.RUNNING]
|
|
|
|
def test_get_task_status_completed(self):
|
|
"""Test getting status of completed task."""
|
|
tm = TaskManager()
|
|
|
|
task_id = tm.run_in_thread(lambda: "done")
|
|
|
|
# Wait for completion
|
|
tm._futures[task_id].result(timeout=2)
|
|
|
|
# Manually set status for testing
|
|
tm._tasks[task_id].status = TaskStatus.COMPLETED
|
|
|
|
status = tm.get_task_status(task_id)
|
|
assert status == TaskStatus.COMPLETED
|
|
|
|
def test_get_task_result(self):
|
|
"""Test getting task result."""
|
|
tm = TaskManager()
|
|
|
|
task_id = tm.run_in_thread(lambda: "test_result")
|
|
|
|
# Wait for completion
|
|
tm._futures[task_id].result(timeout=2)
|
|
|
|
# Manually set result for testing
|
|
tm._tasks[task_id].result = "test_result"
|
|
tm._tasks[task_id].status = TaskStatus.COMPLETED
|
|
|
|
result = tm.get_task_result(task_id)
|
|
assert result == "test_result"
|
|
|
|
def test_get_task_status_invalid_id(self):
|
|
"""Test getting status of non-existent task."""
|
|
tm = TaskManager()
|
|
|
|
status = tm.get_task_status("invalid_id")
|
|
assert status is None
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestDelayedTasks:
|
|
"""Test delayed task execution."""
|
|
|
|
@patch('core.tasks.QTimer')
|
|
def test_run_later(self, mock_timer_class):
|
|
"""Test scheduling a delayed task."""
|
|
tm = TaskManager()
|
|
mock_timer = MagicMock()
|
|
mock_timer_class.return_value = mock_timer
|
|
|
|
def test_func():
|
|
return "done"
|
|
|
|
timer_id = tm.run_later(1000, test_func)
|
|
|
|
assert isinstance(timer_id, str)
|
|
mock_timer.setSingleShot.assert_called_once_with(True)
|
|
mock_timer.start.assert_called_once_with(1000)
|
|
|
|
@patch('core.tasks.QTimer')
|
|
def test_run_periodic(self, mock_timer_class):
|
|
"""Test scheduling a periodic task."""
|
|
tm = TaskManager()
|
|
mock_timer = MagicMock()
|
|
mock_timer_class.return_value = mock_timer
|
|
|
|
def test_func():
|
|
return "done"
|
|
|
|
timer_id = tm.run_periodic(5000, test_func)
|
|
|
|
assert isinstance(timer_id, str)
|
|
mock_timer.setSingleShot.assert_called_once_with(False)
|
|
mock_timer.start.assert_called_once_with(5000)
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestTaskCancellation:
|
|
"""Test task cancellation."""
|
|
|
|
def test_cancel_pending_task(self):
|
|
"""Test cancelling a pending task."""
|
|
tm = TaskManager()
|
|
|
|
# Create a task that hasn't started yet
|
|
def slow_func():
|
|
time.sleep(10)
|
|
return "done"
|
|
|
|
task_id = tm.run_in_thread(slow_func)
|
|
|
|
# Cancel it
|
|
result = tm.cancel_task(task_id)
|
|
|
|
# Note: Cancellation may or may not succeed depending on timing
|
|
assert isinstance(result, bool)
|
|
|
|
@patch('core.tasks.QTimer')
|
|
def test_cancel_timer(self, mock_timer_class):
|
|
"""Test cancelling a timer."""
|
|
tm = TaskManager()
|
|
mock_timer = MagicMock()
|
|
mock_timer_class.return_value = mock_timer
|
|
|
|
timer_id = tm.run_later(1000, lambda: None)
|
|
|
|
result = tm.cancel_task(timer_id)
|
|
|
|
assert result is True
|
|
mock_timer.stop.assert_called_once()
|
|
|
|
def test_cancel_invalid_task(self):
|
|
"""Test cancelling non-existent task."""
|
|
tm = TaskManager()
|
|
|
|
result = tm.cancel_task("invalid_id")
|
|
|
|
assert result is False
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestActiveTasks:
|
|
"""Test active task retrieval."""
|
|
|
|
def test_get_active_tasks(self):
|
|
"""Test getting list of active tasks."""
|
|
tm = TaskManager()
|
|
|
|
# Create some tasks
|
|
task_id1 = tm.run_in_thread(lambda: "done1")
|
|
task_id2 = tm.run_in_thread(lambda: "done2")
|
|
|
|
# Wait for completion
|
|
tm._futures[task_id1].result(timeout=2)
|
|
tm._futures[task_id2].result(timeout=2)
|
|
|
|
# Update statuses
|
|
tm._tasks[task_id1].status = TaskStatus.COMPLETED
|
|
tm._tasks[task_id2].status = TaskStatus.COMPLETED
|
|
|
|
active_tasks = tm.get_active_tasks()
|
|
|
|
# Should be empty since tasks completed
|
|
assert len(active_tasks) == 0
|
|
|
|
def test_get_active_tasks_with_pending(self):
|
|
"""Test getting active tasks with pending tasks."""
|
|
tm = TaskManager()
|
|
|
|
# Create a task that stays pending
|
|
task = Task(
|
|
id="test_task",
|
|
func=lambda: None,
|
|
args=(),
|
|
kwargs={},
|
|
priority=TaskPriority.NORMAL,
|
|
status=TaskStatus.PENDING
|
|
)
|
|
tm._tasks["test_task"] = task
|
|
|
|
active_tasks = tm.get_active_tasks()
|
|
|
|
assert len(active_tasks) == 1
|
|
assert active_tasks[0].id == "test_task"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestSignals:
|
|
"""Test Qt signal emission."""
|
|
|
|
def test_task_completed_signal(self):
|
|
"""Test task completed signal."""
|
|
tm = TaskManager()
|
|
received = []
|
|
|
|
def on_completed(task_id, result):
|
|
received.append((task_id, result))
|
|
|
|
tm._signals.task_completed.connect(on_completed)
|
|
tm._signals.task_completed.emit("task_123", "result")
|
|
|
|
assert len(received) == 1
|
|
assert received[0] == ("task_123", "result")
|
|
|
|
def test_task_failed_signal(self):
|
|
"""Test task failed signal."""
|
|
tm = TaskManager()
|
|
received = []
|
|
|
|
def on_failed(task_id, error):
|
|
received.append((task_id, error))
|
|
|
|
tm._signals.task_failed.connect(on_failed)
|
|
error = ValueError("Test error")
|
|
tm._signals.task_failed.emit("task_123", error)
|
|
|
|
assert len(received) == 1
|
|
assert received[0][0] == "task_123"
|
|
assert isinstance(received[0][1], ValueError)
|
|
|
|
def test_task_started_signal(self):
|
|
"""Test task started signal."""
|
|
tm = TaskManager()
|
|
received = []
|
|
|
|
def on_started(task_id):
|
|
received.append(task_id)
|
|
|
|
tm._signals.task_started.connect(on_started)
|
|
tm._signals.task_started.emit("task_123")
|
|
|
|
assert len(received) == 1
|
|
assert received[0] == "task_123"
|
|
|
|
|
|
@pytest.mark.unit
|
|
class TestShutdown:
|
|
"""Test task manager shutdown."""
|
|
|
|
@patch('core.tasks.QTimer')
|
|
def test_shutdown_stops_timers(self, mock_timer_class):
|
|
"""Test that shutdown stops all timers."""
|
|
tm = TaskManager()
|
|
mock_timer = MagicMock()
|
|
mock_timer_class.return_value = mock_timer
|
|
|
|
# Create some timers
|
|
tm.run_later(1000, lambda: None)
|
|
tm.run_later(2000, lambda: None)
|
|
|
|
tm.shutdown(wait=False)
|
|
|
|
# Each timer should be stopped
|
|
assert mock_timer.stop.call_count == 2
|
|
|
|
def test_shutdown_clears_tasks(self):
|
|
"""Test that shutdown clears tasks."""
|
|
tm = TaskManager()
|
|
|
|
# Create a task
|
|
task_id = tm.run_in_thread(lambda: None)
|
|
|
|
tm.shutdown(wait=False)
|
|
|
|
# Tasks should be cleared
|
|
# Note: In real implementation, this may vary
|