EU-Utility/docs/task_example_plugin.py

220 lines
6.4 KiB
Python

"""
Example plugin demonstrating background task usage.
This shows how to use the Task Manager instead of creating QThreads.
"""
from plugins.base_plugin import BasePlugin
from typing import Any
class ExampleTaskPlugin(BasePlugin):
"""Example plugin showing background task patterns."""
name = "Task Example"
version = "1.0.0"
author = "EU-Utility"
description = "Example of background task usage"
def __init__(self, overlay_window, config):
super().__init__(overlay_window, config)
self._active_tasks = []
self._periodic_task_id = None
def initialize(self) -> None:
"""Set up task signal connections."""
# Connect to task signals for UI updates
self.connect_task_signals(
on_completed=self._on_task_completed,
on_failed=self._on_task_failed,
on_started=self._on_task_started
)
def get_ui(self) -> Any:
"""Return the plugin's UI widget."""
# This would normally return a QWidget
# For this example, we just return None
return None
# ========== Example: Simple Background Task ==========
def run_heavy_calculation(self, data):
"""Run a heavy calculation in the background."""
def heavy_calc():
# This runs in a background thread
import time
time.sleep(2) # Simulate heavy work
return sum(x ** 2 for x in data)
def on_done(result):
# This runs in the main thread via Qt signals
print(f"Calculation complete: {result}")
self.notify_success("Task Complete", f"Result: {result}")
def on_error(exc):
print(f"Calculation failed: {exc}")
self.notify_error("Task Failed", str(exc))
# Submit the task
task_id = self.run_in_background(
heavy_calc,
priority='high',
on_complete=on_done,
on_error=on_error
)
self._active_tasks.append(task_id)
print(f"Started task: {task_id}")
return task_id
# ========== Example: Delayed Task ==========
def schedule_delayed_update(self):
"""Schedule a task to run after a delay."""
task_id = self.schedule_task(
delay_ms=5000, # 5 seconds
func=lambda: "Delayed update executed!",
on_complete=lambda msg: print(msg),
priority='normal'
)
print(f"Scheduled delayed task: {task_id}")
return task_id
# ========== Example: Periodic Task ==========
def start_periodic_refresh(self):
"""Start a periodic background refresh."""
def fetch_data():
# Simulate data fetching
import random
return {
'timestamp': __import__('time').time(),
'value': random.randint(1, 100)
}
def on_data(data):
print(f"Periodic update: {data}")
self._periodic_task_id = self.schedule_task(
delay_ms=0, # Start immediately
func=fetch_data,
periodic=True,
interval_ms=10000, # Every 10 seconds
on_complete=on_data,
priority='low'
)
print(f"Started periodic task: {self._periodic_task_id}")
def stop_periodic_refresh(self):
"""Stop the periodic task."""
if self._periodic_task_id:
self.cancel_task(self._periodic_task_id)
print("Periodic task stopped")
self._periodic_task_id = None
# ========== Example: Method as Background Task ==========
def background_method_example(self, data):
"""Example of running an instance method in background."""
# Note: When running instance methods, pass 'self' explicitly
task_id = self.run_in_background(
self._process_data,
data,
priority='normal',
on_complete=self._handle_result
)
return task_id
def _process_data(self, data):
"""Method that runs in background."""
import time
time.sleep(1)
return f"Processed: {len(data)} items"
def _handle_result(self, result):
"""Handle the result in main thread."""
print(f"Got result: {result}")
# ========== Task Signal Handlers ==========
def _on_task_started(self, task_id):
"""Called when any task starts."""
print(f"Task {task_id} started")
def _on_task_completed(self, task_id, result):
"""Called when any task completes."""
print(f"Task {task_id} completed with result: {result}")
if task_id in self._active_tasks:
self._active_tasks.remove(task_id)
def _on_task_failed(self, task_id, error):
"""Called when any task fails."""
print(f"Task {task_id} failed: {error}")
if task_id in self._active_tasks:
self._active_tasks.remove(task_id)
# ========== Cleanup ==========
def shutdown(self) -> None:
"""Clean up tasks when plugin shuts down."""
# Stop periodic task
self.stop_periodic_refresh()
# Cancel any active tasks
for task_id in self._active_tasks[:]:
self.cancel_task(task_id)
self._active_tasks.clear()
super().shutdown()
# ========== Quick Usage Examples ==========
"""
In a plugin, use the task service like this:
# 1. Simple background execution
task_id = self.run_in_background(
some_function,
arg1, arg2,
priority='high',
on_complete=lambda result: print(f"Done: {result}"),
on_error=lambda e: print(f"Error: {e}")
)
# 2. Delayed execution
task_id = self.schedule_task(
delay_ms=5000,
func=some_function,
on_complete=lambda result: print("Delayed task done")
)
# 3. Periodic execution
task_id = self.schedule_task(
delay_ms=0, # Start now
func=fetch_data,
periodic=True,
interval_ms=30000, # Every 30 seconds
on_complete=lambda data: update_ui(data)
)
# 4. Cancel a task
self.cancel_task(task_id)
# 5. Connect to task signals for UI updates
self.connect_task_signals(
on_completed=self._on_task_done,
on_failed=self._on_task_error
)
def _on_task_done(self, task_id, result):
# This runs in main thread - safe to update UI
self.status_label.setText(f"Task complete: {result}")
"""