EU-Utility/tests/unit/test_tasks.py

414 lines
12 KiB
Python

"""
Unit tests for Task Manager service.
Tests cover:
- Singleton pattern
- Task creation and execution
- Task priorities
- Task status tracking
- Delayed and periodic tasks
- Task cancellation
- Signal emission
"""
import pytest
import time
from unittest.mock import MagicMock, patch
from datetime import datetime
from core.tasks import TaskManager, Task, TaskPriority, TaskStatus, get_task_manager
@pytest.mark.unit
class TestTaskManagerSingleton:
"""Test TaskManager singleton behavior."""
def test_singleton_instance(self):
"""Test that TaskManager is a proper singleton."""
tm1 = get_task_manager()
tm2 = get_task_manager()
assert tm1 is tm2
assert isinstance(tm1, TaskManager)
@pytest.mark.unit
class TestTaskCreation:
"""Test task creation."""
def test_task_initialization(self):
"""Test Task dataclass initialization."""
def dummy_func():
return "result"
task = Task(
id="test_123",
func=dummy_func,
args=(),
kwargs={},
priority=TaskPriority.NORMAL
)
assert task.id == "test_123"
assert task.func is dummy_func
assert task.priority == TaskPriority.NORMAL
assert task.status == TaskStatus.PENDING
assert task.created_at is not None
def test_task_priority_enum(self):
"""Test TaskPriority enumeration."""
assert TaskPriority.HIGH.value == 1
assert TaskPriority.NORMAL.value == 2
assert TaskPriority.LOW.value == 3
def test_task_status_enum(self):
"""Test TaskStatus enumeration."""
assert TaskStatus.PENDING.value == "pending"
assert TaskStatus.RUNNING.value == "running"
assert TaskStatus.COMPLETED.value == "completed"
assert TaskStatus.FAILED.value == "failed"
assert TaskStatus.CANCELLED.value == "cancelled"
@pytest.mark.unit
class TestTaskExecution:
"""Test task execution."""
def test_run_in_thread(self):
"""Test running a function in a thread."""
tm = TaskManager()
def test_func():
return "success"
task_id = tm.run_in_thread(test_func)
assert isinstance(task_id, str)
assert len(task_id) == 8 # UUID first 8 chars
assert task_id in tm._tasks
def test_task_execution_success(self):
"""Test successful task execution."""
tm = TaskManager()
result_received = []
def test_func():
return "success"
def on_complete(result):
result_received.append(result)
task_id = tm.run_in_thread(test_func, on_complete=on_complete)
# Wait for task to complete
tm._futures[task_id].result(timeout=2)
# Trigger signal manually for testing
task = tm._tasks[task_id]
tm._signals.task_completed.emit(task_id, task.result)
assert result_received == ["success"]
def test_task_execution_error(self):
"""Test task execution with error."""
tm = TaskManager()
error_received = []
def error_func():
raise ValueError("Test error")
def on_error(error):
error_received.append(error)
task_id = tm.run_in_thread(error_func, on_error=on_error)
# Wait for task to complete (with error)
try:
tm._futures[task_id].result(timeout=2)
except:
pass
# Trigger signal manually for testing
task = tm._tasks[task_id]
if task.error:
tm._signals.task_failed.emit(task_id, task.error)
assert len(error_received) == 1
assert isinstance(error_received[0], ValueError)
def test_task_priority(self):
"""Test task priority assignment."""
tm = TaskManager()
task_id = tm.run_in_thread(lambda: None, priority=TaskPriority.HIGH)
task = tm._tasks[task_id]
assert task.priority == TaskPriority.HIGH
@pytest.mark.unit
class TestTaskStatus:
"""Test task status tracking."""
def test_get_task_status_pending(self):
"""Test getting status of pending task."""
tm = TaskManager()
# Create a task that will take some time
def slow_func():
time.sleep(0.1)
return "done"
task_id = tm.run_in_thread(slow_func)
# Status should be pending or running
status = tm.get_task_status(task_id)
assert status in [TaskStatus.PENDING, TaskStatus.RUNNING]
def test_get_task_status_completed(self):
"""Test getting status of completed task."""
tm = TaskManager()
task_id = tm.run_in_thread(lambda: "done")
# Wait for completion
tm._futures[task_id].result(timeout=2)
# Manually set status for testing
tm._tasks[task_id].status = TaskStatus.COMPLETED
status = tm.get_task_status(task_id)
assert status == TaskStatus.COMPLETED
def test_get_task_result(self):
"""Test getting task result."""
tm = TaskManager()
task_id = tm.run_in_thread(lambda: "test_result")
# Wait for completion
tm._futures[task_id].result(timeout=2)
# Manually set result for testing
tm._tasks[task_id].result = "test_result"
tm._tasks[task_id].status = TaskStatus.COMPLETED
result = tm.get_task_result(task_id)
assert result == "test_result"
def test_get_task_status_invalid_id(self):
"""Test getting status of non-existent task."""
tm = TaskManager()
status = tm.get_task_status("invalid_id")
assert status is None
@pytest.mark.unit
class TestDelayedTasks:
"""Test delayed task execution."""
@patch('core.tasks.QTimer')
def test_run_later(self, mock_timer_class):
"""Test scheduling a delayed task."""
tm = TaskManager()
mock_timer = MagicMock()
mock_timer_class.return_value = mock_timer
def test_func():
return "done"
timer_id = tm.run_later(1000, test_func)
assert isinstance(timer_id, str)
mock_timer.setSingleShot.assert_called_once_with(True)
mock_timer.start.assert_called_once_with(1000)
@patch('core.tasks.QTimer')
def test_run_periodic(self, mock_timer_class):
"""Test scheduling a periodic task."""
tm = TaskManager()
mock_timer = MagicMock()
mock_timer_class.return_value = mock_timer
def test_func():
return "done"
timer_id = tm.run_periodic(5000, test_func)
assert isinstance(timer_id, str)
mock_timer.setSingleShot.assert_called_once_with(False)
mock_timer.start.assert_called_once_with(5000)
@pytest.mark.unit
class TestTaskCancellation:
"""Test task cancellation."""
def test_cancel_pending_task(self):
"""Test cancelling a pending task."""
tm = TaskManager()
# Create a task that hasn't started yet
def slow_func():
time.sleep(10)
return "done"
task_id = tm.run_in_thread(slow_func)
# Cancel it
result = tm.cancel_task(task_id)
# Note: Cancellation may or may not succeed depending on timing
assert isinstance(result, bool)
@patch('core.tasks.QTimer')
def test_cancel_timer(self, mock_timer_class):
"""Test cancelling a timer."""
tm = TaskManager()
mock_timer = MagicMock()
mock_timer_class.return_value = mock_timer
timer_id = tm.run_later(1000, lambda: None)
result = tm.cancel_task(timer_id)
assert result is True
mock_timer.stop.assert_called_once()
def test_cancel_invalid_task(self):
"""Test cancelling non-existent task."""
tm = TaskManager()
result = tm.cancel_task("invalid_id")
assert result is False
@pytest.mark.unit
class TestActiveTasks:
"""Test active task retrieval."""
def test_get_active_tasks(self):
"""Test getting list of active tasks."""
tm = TaskManager()
# Create some tasks
task_id1 = tm.run_in_thread(lambda: "done1")
task_id2 = tm.run_in_thread(lambda: "done2")
# Wait for completion
tm._futures[task_id1].result(timeout=2)
tm._futures[task_id2].result(timeout=2)
# Update statuses
tm._tasks[task_id1].status = TaskStatus.COMPLETED
tm._tasks[task_id2].status = TaskStatus.COMPLETED
active_tasks = tm.get_active_tasks()
# Should be empty since tasks completed
assert len(active_tasks) == 0
def test_get_active_tasks_with_pending(self):
"""Test getting active tasks with pending tasks."""
tm = TaskManager()
# Create a task that stays pending
task = Task(
id="test_task",
func=lambda: None,
args=(),
kwargs={},
priority=TaskPriority.NORMAL,
status=TaskStatus.PENDING
)
tm._tasks["test_task"] = task
active_tasks = tm.get_active_tasks()
assert len(active_tasks) == 1
assert active_tasks[0].id == "test_task"
@pytest.mark.unit
class TestSignals:
"""Test Qt signal emission."""
def test_task_completed_signal(self):
"""Test task completed signal."""
tm = TaskManager()
received = []
def on_completed(task_id, result):
received.append((task_id, result))
tm._signals.task_completed.connect(on_completed)
tm._signals.task_completed.emit("task_123", "result")
assert len(received) == 1
assert received[0] == ("task_123", "result")
def test_task_failed_signal(self):
"""Test task failed signal."""
tm = TaskManager()
received = []
def on_failed(task_id, error):
received.append((task_id, error))
tm._signals.task_failed.connect(on_failed)
error = ValueError("Test error")
tm._signals.task_failed.emit("task_123", error)
assert len(received) == 1
assert received[0][0] == "task_123"
assert isinstance(received[0][1], ValueError)
def test_task_started_signal(self):
"""Test task started signal."""
tm = TaskManager()
received = []
def on_started(task_id):
received.append(task_id)
tm._signals.task_started.connect(on_started)
tm._signals.task_started.emit("task_123")
assert len(received) == 1
assert received[0] == "task_123"
@pytest.mark.unit
class TestShutdown:
"""Test task manager shutdown."""
@patch('core.tasks.QTimer')
def test_shutdown_stops_timers(self, mock_timer_class):
"""Test that shutdown stops all timers."""
tm = TaskManager()
mock_timer = MagicMock()
mock_timer_class.return_value = mock_timer
# Create some timers
tm.run_later(1000, lambda: None)
tm.run_later(2000, lambda: None)
tm.shutdown(wait=False)
# Each timer should be stopped
assert mock_timer.stop.call_count == 2
def test_shutdown_clears_tasks(self):
"""Test that shutdown clears tasks."""
tm = TaskManager()
# Create a task
task_id = tm.run_in_thread(lambda: None)
tm.shutdown(wait=False)
# Tasks should be cleared
# Note: In real implementation, this may vary