4.0 KiB
4.0 KiB
Task Management Service - Implementation Summary
Overview
Implemented a thread pool-based background task service for EU-Utility that replaces the need for plugins to create their own QThreads.
Files Created
1. core/tasks.py
The core TaskManager implementation with:
- Singleton TaskManager class - Shared across the application
- ThreadPoolExecutor - Configurable max workers (default: 4)
- Task priority levels - HIGH, NORMAL, LOW
- Task scheduling:
run_in_thread()- Immediate background executionrun_later()- Delayed execution (ms)run_periodic()- Repeating execution at interval
- Callback support -
on_complete,on_error - Task ID tracking - UUID-based unique IDs
- Graceful shutdown - Wait for tasks or cancel with timeout
- Qt signal integration - Thread-safe UI updates
Key signals:
task_completed- (task_id, result)task_failed- (task_id, error_message)task_started- (task_id)task_cancelled- (task_id)task_periodic- (task_id, iteration, result)
Files Modified
2. core/plugin_api.py
Added to PluginAPI:
register_task_service()- Register TaskManager instancerun_in_background()- Run function in thread poolschedule_task()- Delayed/periodic task schedulingcancel_task()- Cancel pending/running taskget_task_status()- Query task statuswait_for_task()- Wait for completionconnect_task_signal()- Connect to Qt signals
3. plugins/base_plugin.py
Added to BasePlugin:
run_in_background()- Background task executionschedule_task()- Delayed/periodic schedulingcancel_task()- Task cancellationconnect_task_signals()- Batch signal connection
4. core/main.py
Changes:
- Import
get_task_managerfrom tasks module - Initialize TaskManager in
_setup_api_services() - Register task service with API
- Graceful shutdown in
quit()- waits for tasks (30s timeout)
Usage Examples
Simple Background Task
def heavy_calculation(data):
return process(data)
task_id = self.run_in_background(
heavy_calculation,
large_dataset,
priority='high',
on_complete=lambda result: print(f"Done: {result}"),
on_error=lambda e: print(f"Error: {e}")
)
Delayed Task
task_id = self.schedule_task(
delay_ms=5000, # 5 seconds
func=lambda: print("Hello!"),
on_complete=lambda _: print("Complete")
)
Periodic Task
task_id = self.schedule_task(
delay_ms=0,
func=fetch_data,
periodic=True,
interval_ms=30000, # Every 30 seconds
on_complete=lambda data: update_ui(data)
)
# Cancel later
self.cancel_task(task_id)
UI Updates (Thread-Safe)
def initialize(self):
# Connect signals once
self.connect_task_signals(
on_completed=self._on_done,
on_failed=self._on_error
)
def start_work(self):
# Run in background
self.run_in_background(
self.process_data,
priority='normal'
)
def _on_done(self, task_id, result):
# Runs in main thread - safe to update UI!
self.status_label.setText(f"Complete: {result}")
def _on_error(self, task_id, error):
self.status_label.setText(f"Error: {error}")
Benefits
- No more QThreads - Plugins use shared thread pool
- Qt signal integration - Thread-safe UI updates
- Priority levels - HIGH tasks run before LOW
- Task tracking - Monitor and cancel tasks
- Graceful shutdown - Wait for tasks on exit
- Error handling - Centralized error callbacks
- Less code - Simple API vs managing QThreads
Migration Guide
Before (with QThread):
class Worker(QThread):
finished = pyqtSignal(object)
def run(self):
result = heavy_work()
self.finished.emit(result)
worker = Worker()
worker.finished.connect(self.handle_result)
worker.start()
After (with TaskManager):
task_id = self.run_in_background(
heavy_work,
on_complete=self.handle_result
)