EU-Utility/docs/TASK_SERVICE.md

4.0 KiB

Task Management Service - Implementation Summary

Overview

Implemented a thread pool-based background task service for EU-Utility that replaces the need for plugins to create their own QThreads.

Files Created

1. core/tasks.py

The core TaskManager implementation with:

  • Singleton TaskManager class - Shared across the application
  • ThreadPoolExecutor - Configurable max workers (default: 4)
  • Task priority levels - HIGH, NORMAL, LOW
  • Task scheduling:
    • run_in_thread() - Immediate background execution
    • run_later() - Delayed execution (ms)
    • run_periodic() - Repeating execution at interval
  • Callback support - on_complete, on_error
  • Task ID tracking - UUID-based unique IDs
  • Graceful shutdown - Wait for tasks or cancel with timeout
  • Qt signal integration - Thread-safe UI updates

Key signals:

  • task_completed - (task_id, result)
  • task_failed - (task_id, error_message)
  • task_started - (task_id)
  • task_cancelled - (task_id)
  • task_periodic - (task_id, iteration, result)

Files Modified

2. core/plugin_api.py

Added to PluginAPI:

  • register_task_service() - Register TaskManager instance
  • run_in_background() - Run function in thread pool
  • schedule_task() - Delayed/periodic task scheduling
  • cancel_task() - Cancel pending/running task
  • get_task_status() - Query task status
  • wait_for_task() - Wait for completion
  • connect_task_signal() - Connect to Qt signals

3. plugins/base_plugin.py

Added to BasePlugin:

  • run_in_background() - Background task execution
  • schedule_task() - Delayed/periodic scheduling
  • cancel_task() - Task cancellation
  • connect_task_signals() - Batch signal connection

4. core/main.py

Changes:

  • Import get_task_manager from tasks module
  • Initialize TaskManager in _setup_api_services()
  • Register task service with API
  • Graceful shutdown in quit() - waits for tasks (30s timeout)

Usage Examples

Simple Background Task

def heavy_calculation(data):
    return process(data)

task_id = self.run_in_background(
    heavy_calculation,
    large_dataset,
    priority='high',
    on_complete=lambda result: print(f"Done: {result}"),
    on_error=lambda e: print(f"Error: {e}")
)

Delayed Task

task_id = self.schedule_task(
    delay_ms=5000,  # 5 seconds
    func=lambda: print("Hello!"),
    on_complete=lambda _: print("Complete")
)

Periodic Task

task_id = self.schedule_task(
    delay_ms=0,
    func=fetch_data,
    periodic=True,
    interval_ms=30000,  # Every 30 seconds
    on_complete=lambda data: update_ui(data)
)

# Cancel later
self.cancel_task(task_id)

UI Updates (Thread-Safe)

def initialize(self):
    # Connect signals once
    self.connect_task_signals(
        on_completed=self._on_done,
        on_failed=self._on_error
    )

def start_work(self):
    # Run in background
    self.run_in_background(
        self.process_data,
        priority='normal'
    )

def _on_done(self, task_id, result):
    # Runs in main thread - safe to update UI!
    self.status_label.setText(f"Complete: {result}")

def _on_error(self, task_id, error):
    self.status_label.setText(f"Error: {error}")

Benefits

  1. No more QThreads - Plugins use shared thread pool
  2. Qt signal integration - Thread-safe UI updates
  3. Priority levels - HIGH tasks run before LOW
  4. Task tracking - Monitor and cancel tasks
  5. Graceful shutdown - Wait for tasks on exit
  6. Error handling - Centralized error callbacks
  7. Less code - Simple API vs managing QThreads

Migration Guide

Before (with QThread):

class Worker(QThread):
    finished = pyqtSignal(object)
    
    def run(self):
        result = heavy_work()
        self.finished.emit(result)

worker = Worker()
worker.finished.connect(self.handle_result)
worker.start()

After (with TaskManager):

task_id = self.run_in_background(
    heavy_work,
    on_complete=self.handle_result
)