""" Unit tests for Task Manager service. Tests cover: - Singleton pattern - Task creation and execution - Task priorities - Task status tracking - Delayed and periodic tasks - Task cancellation - Signal emission """ import pytest import time from unittest.mock import MagicMock, patch from datetime import datetime from core.tasks import TaskManager, Task, TaskPriority, TaskStatus, get_task_manager @pytest.mark.unit class TestTaskManagerSingleton: """Test TaskManager singleton behavior.""" def test_singleton_instance(self): """Test that TaskManager is a proper singleton.""" tm1 = get_task_manager() tm2 = get_task_manager() assert tm1 is tm2 assert isinstance(tm1, TaskManager) @pytest.mark.unit class TestTaskCreation: """Test task creation.""" def test_task_initialization(self): """Test Task dataclass initialization.""" def dummy_func(): return "result" task = Task( id="test_123", func=dummy_func, args=(), kwargs={}, priority=TaskPriority.NORMAL ) assert task.id == "test_123" assert task.func is dummy_func assert task.priority == TaskPriority.NORMAL assert task.status == TaskStatus.PENDING assert task.created_at is not None def test_task_priority_enum(self): """Test TaskPriority enumeration.""" assert TaskPriority.HIGH.value == 1 assert TaskPriority.NORMAL.value == 2 assert TaskPriority.LOW.value == 3 def test_task_status_enum(self): """Test TaskStatus enumeration.""" assert TaskStatus.PENDING.value == "pending" assert TaskStatus.RUNNING.value == "running" assert TaskStatus.COMPLETED.value == "completed" assert TaskStatus.FAILED.value == "failed" assert TaskStatus.CANCELLED.value == "cancelled" @pytest.mark.unit class TestTaskExecution: """Test task execution.""" def test_run_in_thread(self): """Test running a function in a thread.""" tm = TaskManager() def test_func(): return "success" task_id = tm.run_in_thread(test_func) assert isinstance(task_id, str) assert len(task_id) == 8 # UUID first 8 chars assert task_id in tm._tasks def test_task_execution_success(self): """Test successful task execution.""" tm = TaskManager() result_received = [] def test_func(): return "success" def on_complete(result): result_received.append(result) task_id = tm.run_in_thread(test_func, on_complete=on_complete) # Wait for task to complete tm._futures[task_id].result(timeout=2) # Trigger signal manually for testing task = tm._tasks[task_id] tm._signals.task_completed.emit(task_id, task.result) assert result_received == ["success"] def test_task_execution_error(self): """Test task execution with error.""" tm = TaskManager() error_received = [] def error_func(): raise ValueError("Test error") def on_error(error): error_received.append(error) task_id = tm.run_in_thread(error_func, on_error=on_error) # Wait for task to complete (with error) try: tm._futures[task_id].result(timeout=2) except: pass # Trigger signal manually for testing task = tm._tasks[task_id] if task.error: tm._signals.task_failed.emit(task_id, task.error) assert len(error_received) == 1 assert isinstance(error_received[0], ValueError) def test_task_priority(self): """Test task priority assignment.""" tm = TaskManager() task_id = tm.run_in_thread(lambda: None, priority=TaskPriority.HIGH) task = tm._tasks[task_id] assert task.priority == TaskPriority.HIGH @pytest.mark.unit class TestTaskStatus: """Test task status tracking.""" def test_get_task_status_pending(self): """Test getting status of pending task.""" tm = TaskManager() # Create a task that will take some time def slow_func(): time.sleep(0.1) return "done" task_id = tm.run_in_thread(slow_func) # Status should be pending or running status = tm.get_task_status(task_id) assert status in [TaskStatus.PENDING, TaskStatus.RUNNING] def test_get_task_status_completed(self): """Test getting status of completed task.""" tm = TaskManager() task_id = tm.run_in_thread(lambda: "done") # Wait for completion tm._futures[task_id].result(timeout=2) # Manually set status for testing tm._tasks[task_id].status = TaskStatus.COMPLETED status = tm.get_task_status(task_id) assert status == TaskStatus.COMPLETED def test_get_task_result(self): """Test getting task result.""" tm = TaskManager() task_id = tm.run_in_thread(lambda: "test_result") # Wait for completion tm._futures[task_id].result(timeout=2) # Manually set result for testing tm._tasks[task_id].result = "test_result" tm._tasks[task_id].status = TaskStatus.COMPLETED result = tm.get_task_result(task_id) assert result == "test_result" def test_get_task_status_invalid_id(self): """Test getting status of non-existent task.""" tm = TaskManager() status = tm.get_task_status("invalid_id") assert status is None @pytest.mark.unit class TestDelayedTasks: """Test delayed task execution.""" @patch('core.tasks.QTimer') def test_run_later(self, mock_timer_class): """Test scheduling a delayed task.""" tm = TaskManager() mock_timer = MagicMock() mock_timer_class.return_value = mock_timer def test_func(): return "done" timer_id = tm.run_later(1000, test_func) assert isinstance(timer_id, str) mock_timer.setSingleShot.assert_called_once_with(True) mock_timer.start.assert_called_once_with(1000) @patch('core.tasks.QTimer') def test_run_periodic(self, mock_timer_class): """Test scheduling a periodic task.""" tm = TaskManager() mock_timer = MagicMock() mock_timer_class.return_value = mock_timer def test_func(): return "done" timer_id = tm.run_periodic(5000, test_func) assert isinstance(timer_id, str) mock_timer.setSingleShot.assert_called_once_with(False) mock_timer.start.assert_called_once_with(5000) @pytest.mark.unit class TestTaskCancellation: """Test task cancellation.""" def test_cancel_pending_task(self): """Test cancelling a pending task.""" tm = TaskManager() # Create a task that hasn't started yet def slow_func(): time.sleep(10) return "done" task_id = tm.run_in_thread(slow_func) # Cancel it result = tm.cancel_task(task_id) # Note: Cancellation may or may not succeed depending on timing assert isinstance(result, bool) @patch('core.tasks.QTimer') def test_cancel_timer(self, mock_timer_class): """Test cancelling a timer.""" tm = TaskManager() mock_timer = MagicMock() mock_timer_class.return_value = mock_timer timer_id = tm.run_later(1000, lambda: None) result = tm.cancel_task(timer_id) assert result is True mock_timer.stop.assert_called_once() def test_cancel_invalid_task(self): """Test cancelling non-existent task.""" tm = TaskManager() result = tm.cancel_task("invalid_id") assert result is False @pytest.mark.unit class TestActiveTasks: """Test active task retrieval.""" def test_get_active_tasks(self): """Test getting list of active tasks.""" tm = TaskManager() # Create some tasks task_id1 = tm.run_in_thread(lambda: "done1") task_id2 = tm.run_in_thread(lambda: "done2") # Wait for completion tm._futures[task_id1].result(timeout=2) tm._futures[task_id2].result(timeout=2) # Update statuses tm._tasks[task_id1].status = TaskStatus.COMPLETED tm._tasks[task_id2].status = TaskStatus.COMPLETED active_tasks = tm.get_active_tasks() # Should be empty since tasks completed assert len(active_tasks) == 0 def test_get_active_tasks_with_pending(self): """Test getting active tasks with pending tasks.""" tm = TaskManager() # Create a task that stays pending task = Task( id="test_task", func=lambda: None, args=(), kwargs={}, priority=TaskPriority.NORMAL, status=TaskStatus.PENDING ) tm._tasks["test_task"] = task active_tasks = tm.get_active_tasks() assert len(active_tasks) == 1 assert active_tasks[0].id == "test_task" @pytest.mark.unit class TestSignals: """Test Qt signal emission.""" def test_task_completed_signal(self): """Test task completed signal.""" tm = TaskManager() received = [] def on_completed(task_id, result): received.append((task_id, result)) tm._signals.task_completed.connect(on_completed) tm._signals.task_completed.emit("task_123", "result") assert len(received) == 1 assert received[0] == ("task_123", "result") def test_task_failed_signal(self): """Test task failed signal.""" tm = TaskManager() received = [] def on_failed(task_id, error): received.append((task_id, error)) tm._signals.task_failed.connect(on_failed) error = ValueError("Test error") tm._signals.task_failed.emit("task_123", error) assert len(received) == 1 assert received[0][0] == "task_123" assert isinstance(received[0][1], ValueError) def test_task_started_signal(self): """Test task started signal.""" tm = TaskManager() received = [] def on_started(task_id): received.append(task_id) tm._signals.task_started.connect(on_started) tm._signals.task_started.emit("task_123") assert len(received) == 1 assert received[0] == "task_123" @pytest.mark.unit class TestShutdown: """Test task manager shutdown.""" @patch('core.tasks.QTimer') def test_shutdown_stops_timers(self, mock_timer_class): """Test that shutdown stops all timers.""" tm = TaskManager() mock_timer = MagicMock() mock_timer_class.return_value = mock_timer # Create some timers tm.run_later(1000, lambda: None) tm.run_later(2000, lambda: None) tm.shutdown(wait=False) # Each timer should be stopped assert mock_timer.stop.call_count == 2 def test_shutdown_clears_tasks(self): """Test that shutdown clears tasks.""" tm = TaskManager() # Create a task task_id = tm.run_in_thread(lambda: None) tm.shutdown(wait=False) # Tasks should be cleared # Note: In real implementation, this may vary