358 lines
11 KiB
Python
358 lines
11 KiB
Python
"""
|
|
EU-Utility - Task Manager
|
|
|
|
Thread pool and background task execution for plugins.
|
|
Part of core - plugins access via PluginAPI.
|
|
"""
|
|
|
|
import uuid
|
|
import time
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, Future
|
|
from typing import Callable, Any, Dict, Optional, List
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
from PyQt6.QtCore import QObject, pyqtSignal, QTimer
|
|
|
|
|
|
class TaskPriority(Enum):
|
|
"""Task priority levels."""
|
|
HIGH = 1
|
|
NORMAL = 2
|
|
LOW = 3
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
"""Task execution status."""
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
"""Represents a background task."""
|
|
id: str
|
|
func: Callable
|
|
args: tuple
|
|
kwargs: dict
|
|
priority: TaskPriority
|
|
callback: Optional[Callable] = None
|
|
error_callback: Optional[Callable] = None
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
result: Any = None
|
|
error: Optional[Exception] = None
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
|
|
|
|
class TaskSignals(QObject):
|
|
"""Qt signals for task updates."""
|
|
task_completed = pyqtSignal(str, object) # task_id, result
|
|
task_failed = pyqtSignal(str, object) # task_id, error
|
|
task_started = pyqtSignal(str) # task_id
|
|
|
|
|
|
class TaskManager:
|
|
"""
|
|
Core task management service with thread pool.
|
|
|
|
Usage:
|
|
manager = get_task_manager()
|
|
|
|
# Run in background
|
|
task_id = manager.run_in_thread(my_function, arg1, arg2)
|
|
|
|
# Run later
|
|
manager.run_later(1000, my_function) # After 1000ms
|
|
|
|
# Run periodic
|
|
manager.run_periodic(5000, my_function) # Every 5 seconds
|
|
"""
|
|
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self, max_workers: int = 4):
|
|
if self._initialized:
|
|
return
|
|
|
|
self._initialized = True
|
|
self._executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
self._tasks: Dict[str, Task] = {}
|
|
self._futures: Dict[str, Future] = {}
|
|
self._timers: Dict[str, QTimer] = {}
|
|
self._signals = TaskSignals()
|
|
self._shutdown = False
|
|
|
|
# Connect signals
|
|
self._signals.task_completed.connect(self._on_task_completed)
|
|
self._signals.task_failed.connect(self._on_task_failed)
|
|
|
|
def run_in_thread(self, func: Callable, *args,
|
|
priority: TaskPriority = TaskPriority.NORMAL,
|
|
callback: Callable = None,
|
|
error_callback: Callable = None,
|
|
**kwargs) -> str:
|
|
"""Run a function in a background thread.
|
|
|
|
Args:
|
|
func: Function to run
|
|
*args: Arguments for function
|
|
priority: Task priority
|
|
callback: Called with result on success (in main thread)
|
|
error_callback: Called with error on failure (in main thread)
|
|
**kwargs: Keyword arguments for function
|
|
|
|
Returns:
|
|
Task ID
|
|
"""
|
|
task_id = str(uuid.uuid4())[:8]
|
|
|
|
task = Task(
|
|
id=task_id,
|
|
func=func,
|
|
args=args,
|
|
kwargs=kwargs,
|
|
priority=priority,
|
|
callback=callback,
|
|
error_callback=error_callback
|
|
)
|
|
|
|
self._tasks[task_id] = task
|
|
|
|
# Submit to thread pool
|
|
future = self._executor.submit(self._execute_task, task_id)
|
|
self._futures[task_id] = future
|
|
|
|
return task_id
|
|
|
|
def _execute_task(self, task_id: str):
|
|
"""Execute task in thread."""
|
|
task = self._tasks.get(task_id)
|
|
if not task:
|
|
return
|
|
|
|
try:
|
|
task.status = TaskStatus.RUNNING
|
|
task.started_at = datetime.now()
|
|
self._signals.task_started.emit(task_id)
|
|
|
|
# Execute function
|
|
result = task.func(*task.args, **task.kwargs)
|
|
|
|
task.result = result
|
|
task.status = TaskStatus.COMPLETED
|
|
task.completed_at = datetime.now()
|
|
|
|
# Emit completion signal
|
|
self._signals.task_completed.emit(task_id, result)
|
|
|
|
except Exception as e:
|
|
task.error = e
|
|
task.status = TaskStatus.FAILED
|
|
task.completed_at = datetime.now()
|
|
|
|
# Emit failure signal
|
|
self._signals.task_failed.emit(task_id, e)
|
|
|
|
def _on_task_completed(self, task_id: str, result: Any):
|
|
"""Handle task completion (in main thread)."""
|
|
task = self._tasks.get(task_id)
|
|
if task and task.callback:
|
|
try:
|
|
task.callback(result)
|
|
except Exception as e:
|
|
print(f"[TaskManager] Callback error: {e}")
|
|
|
|
def _on_task_failed(self, task_id: str, error: Exception):
|
|
"""Handle task failure (in main thread)."""
|
|
task = self._tasks.get(task_id)
|
|
if task and task.error_callback:
|
|
try:
|
|
task.error_callback(error)
|
|
except Exception as e:
|
|
print(f"[TaskManager] Error callback error: {e}")
|
|
|
|
def run_later(self, delay_ms: int, func: Callable, *args, **kwargs) -> str:
|
|
"""Run a function after a delay.
|
|
|
|
Args:
|
|
delay_ms: Delay in milliseconds
|
|
func: Function to run
|
|
*args, **kwargs: Arguments for function
|
|
|
|
Returns:
|
|
Timer ID
|
|
"""
|
|
timer_id = str(uuid.uuid4())[:8]
|
|
|
|
def timeout():
|
|
func(*args, **kwargs)
|
|
self._timers.pop(timer_id, None)
|
|
|
|
timer = QTimer()
|
|
timer.setSingleShot(True)
|
|
timer.timeout.connect(timeout)
|
|
timer.start(delay_ms)
|
|
|
|
self._timers[timer_id] = timer
|
|
return timer_id
|
|
|
|
def run_periodic(self, interval_ms: int, func: Callable, *args, **kwargs) -> str:
|
|
"""Run a function periodically.
|
|
|
|
Args:
|
|
interval_ms: Interval in milliseconds
|
|
func: Function to run
|
|
*args, **kwargs: Arguments for function
|
|
|
|
Returns:
|
|
Timer ID (use cancel_task to stop)
|
|
"""
|
|
timer_id = str(uuid.uuid4())[:8]
|
|
|
|
timer = QTimer()
|
|
timer.timeout.connect(lambda: func(*args, **kwargs))
|
|
timer.start(interval_ms)
|
|
|
|
self._timers[timer_id] = timer
|
|
return timer_id
|
|
|
|
def cancel_task(self, task_id: str) -> bool:
|
|
"""Cancel a task or timer.
|
|
|
|
Args:
|
|
task_id: Task or timer ID
|
|
|
|
Returns:
|
|
True if cancelled
|
|
"""
|
|
# Cancel thread task
|
|
if task_id in self._futures:
|
|
future = self._futures[task_id]
|
|
cancelled = future.cancel()
|
|
if cancelled:
|
|
task = self._tasks.get(task_id)
|
|
if task:
|
|
task.status = TaskStatus.CANCELLED
|
|
return cancelled
|
|
|
|
# Cancel timer
|
|
if task_id in self._timers:
|
|
self._timers[task_id].stop()
|
|
del self._timers[task_id]
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
|
|
"""Get task status."""
|
|
task = self._tasks.get(task_id)
|
|
return task.status if task else None
|
|
|
|
def get_task_result(self, task_id: str) -> Any:
|
|
"""Get task result (if completed)."""
|
|
task = self._tasks.get(task_id)
|
|
return task.result if task else None
|
|
|
|
def get_active_tasks(self) -> List[Task]:
|
|
"""Get list of active tasks."""
|
|
return [t for t in self._tasks.values()
|
|
if t.status in (TaskStatus.PENDING, TaskStatus.RUNNING)]
|
|
|
|
def shutdown(self, wait: bool = True, timeout: float = None):
|
|
"""Shutdown task manager.
|
|
|
|
Args:
|
|
wait: Wait for tasks to complete
|
|
timeout: Maximum time to wait (seconds)
|
|
"""
|
|
self._shutdown = True
|
|
|
|
# Stop all timers
|
|
for timer in self._timers.values():
|
|
timer.stop()
|
|
self._timers.clear()
|
|
|
|
# Shutdown executor
|
|
if wait and timeout:
|
|
# Wait with timeout
|
|
self._executor.shutdown(wait=False)
|
|
# Give some time for tasks to complete
|
|
time.sleep(min(timeout, 1.0))
|
|
elif wait:
|
|
self._executor.shutdown(wait=True)
|
|
else:
|
|
self._executor.shutdown(wait=False)
|
|
|
|
def wait_for_task(self, task_id: str, timeout: float = None) -> bool:
|
|
"""Wait for a task to complete.
|
|
|
|
Args:
|
|
task_id: Task ID to wait for
|
|
timeout: Maximum seconds to wait
|
|
|
|
Returns:
|
|
True if task completed, False if timeout
|
|
"""
|
|
future = self._futures.get(task_id)
|
|
if not future:
|
|
return False
|
|
|
|
try:
|
|
future.result(timeout=timeout)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
def connect_signal(self, signal_name: str, callback: Callable) -> bool:
|
|
"""Connect to task signals.
|
|
|
|
Args:
|
|
signal_name: One of 'completed', 'failed', 'started'
|
|
callback: Function to call when signal emits
|
|
|
|
Returns:
|
|
True if connected
|
|
"""
|
|
try:
|
|
if signal_name == 'completed':
|
|
self._signals.task_completed.connect(callback)
|
|
elif signal_name == 'failed':
|
|
self._signals.task_failed.connect(callback)
|
|
elif signal_name == 'started':
|
|
self._signals.task_started.connect(callback)
|
|
else:
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
print(f"[TaskManager] Error connecting signal: {e}")
|
|
return False
|
|
|
|
def initialize(self):
|
|
"""Initialize the task manager (called after creation)."""
|
|
# Already initialized in __init__
|
|
pass
|
|
|
|
|
|
def get_task_manager(max_workers: int = 4) -> TaskManager:
|
|
"""Get global TaskManager instance."""
|
|
# Create with specified max_workers if not already created
|
|
if TaskManager._instance is None:
|
|
TaskManager(max_workers=max_workers)
|
|
return TaskManager()
|