""" Example plugin demonstrating background task usage. This shows how to use the Task Manager instead of creating QThreads. """ from plugins.base_plugin import BasePlugin from typing import Any class ExampleTaskPlugin(BasePlugin): """Example plugin showing background task patterns.""" name = "Task Example" version = "1.0.0" author = "EU-Utility" description = "Example of background task usage" def __init__(self, overlay_window, config): super().__init__(overlay_window, config) self._active_tasks = [] self._periodic_task_id = None def initialize(self) -> None: """Set up task signal connections.""" # Connect to task signals for UI updates self.connect_task_signals( on_completed=self._on_task_completed, on_failed=self._on_task_failed, on_started=self._on_task_started ) def get_ui(self) -> Any: """Return the plugin's UI widget.""" # This would normally return a QWidget # For this example, we just return None return None # ========== Example: Simple Background Task ========== def run_heavy_calculation(self, data): """Run a heavy calculation in the background.""" def heavy_calc(): # This runs in a background thread import time time.sleep(2) # Simulate heavy work return sum(x ** 2 for x in data) def on_done(result): # This runs in the main thread via Qt signals print(f"Calculation complete: {result}") self.notify_success("Task Complete", f"Result: {result}") def on_error(exc): print(f"Calculation failed: {exc}") self.notify_error("Task Failed", str(exc)) # Submit the task task_id = self.run_in_background( heavy_calc, priority='high', on_complete=on_done, on_error=on_error ) self._active_tasks.append(task_id) print(f"Started task: {task_id}") return task_id # ========== Example: Delayed Task ========== def schedule_delayed_update(self): """Schedule a task to run after a delay.""" task_id = self.schedule_task( delay_ms=5000, # 5 seconds func=lambda: "Delayed update executed!", on_complete=lambda msg: print(msg), priority='normal' ) print(f"Scheduled delayed task: {task_id}") return task_id # ========== Example: Periodic Task ========== def start_periodic_refresh(self): """Start a periodic background refresh.""" def fetch_data(): # Simulate data fetching import random return { 'timestamp': __import__('time').time(), 'value': random.randint(1, 100) } def on_data(data): print(f"Periodic update: {data}") self._periodic_task_id = self.schedule_task( delay_ms=0, # Start immediately func=fetch_data, periodic=True, interval_ms=10000, # Every 10 seconds on_complete=on_data, priority='low' ) print(f"Started periodic task: {self._periodic_task_id}") def stop_periodic_refresh(self): """Stop the periodic task.""" if self._periodic_task_id: self.cancel_task(self._periodic_task_id) print("Periodic task stopped") self._periodic_task_id = None # ========== Example: Method as Background Task ========== def background_method_example(self, data): """Example of running an instance method in background.""" # Note: When running instance methods, pass 'self' explicitly task_id = self.run_in_background( self._process_data, data, priority='normal', on_complete=self._handle_result ) return task_id def _process_data(self, data): """Method that runs in background.""" import time time.sleep(1) return f"Processed: {len(data)} items" def _handle_result(self, result): """Handle the result in main thread.""" print(f"Got result: {result}") # ========== Task Signal Handlers ========== def _on_task_started(self, task_id): """Called when any task starts.""" print(f"Task {task_id} started") def _on_task_completed(self, task_id, result): """Called when any task completes.""" print(f"Task {task_id} completed with result: {result}") if task_id in self._active_tasks: self._active_tasks.remove(task_id) def _on_task_failed(self, task_id, error): """Called when any task fails.""" print(f"Task {task_id} failed: {error}") if task_id in self._active_tasks: self._active_tasks.remove(task_id) # ========== Cleanup ========== def shutdown(self) -> None: """Clean up tasks when plugin shuts down.""" # Stop periodic task self.stop_periodic_refresh() # Cancel any active tasks for task_id in self._active_tasks[:]: self.cancel_task(task_id) self._active_tasks.clear() super().shutdown() # ========== Quick Usage Examples ========== """ In a plugin, use the task service like this: # 1. Simple background execution task_id = self.run_in_background( some_function, arg1, arg2, priority='high', on_complete=lambda result: print(f"Done: {result}"), on_error=lambda e: print(f"Error: {e}") ) # 2. Delayed execution task_id = self.schedule_task( delay_ms=5000, func=some_function, on_complete=lambda result: print("Delayed task done") ) # 3. Periodic execution task_id = self.schedule_task( delay_ms=0, # Start now func=fetch_data, periodic=True, interval_ms=30000, # Every 30 seconds on_complete=lambda data: update_ui(data) ) # 4. Cancel a task self.cancel_task(task_id) # 5. Connect to task signals for UI updates self.connect_task_signals( on_completed=self._on_task_done, on_failed=self._on_task_error ) def _on_task_done(self, task_id, result): # This runs in main thread - safe to update UI self.status_label.setText(f"Task complete: {result}") """