From b6d33127e4483b1033b1e87ce02317403b1dcc9a Mon Sep 17 00:00:00 2001 From: devmatrix Date: Tue, 17 Feb 2026 00:01:12 +0000 Subject: [PATCH] fix: Add thread-safe event emission and fix background thread errors --- premium/__init__.py | 9 ++++- premium/core/event_bus.py | 55 ++++++++++++++++++++++++--- premium/eu_integration/game_client.py | 6 ++- 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/premium/__init__.py b/premium/__init__.py index f077184..0130566 100644 --- a/premium/__init__.py +++ b/premium/__init__.py @@ -104,6 +104,9 @@ class EUUtilityApp: if not self._initialized: self.initialize() + # Start event bus first (captures main event loop) + self.event_bus.start() + # Discover and load plugins self.plugin_manager.discover_all() self.plugin_manager.load_all(auto_activate=True) @@ -119,10 +122,12 @@ class EUUtilityApp: def shutdown(self): """Shutdown the application gracefully.""" - if self.plugin_manager: - self.plugin_manager.shutdown() if self.game_client: self.game_client.stop() + if self.plugin_manager: + self.plugin_manager.shutdown() + if self.event_bus: + self.event_bus.stop() def _root_reducer(self, state, action): """Root state reducer.""" diff --git a/premium/core/event_bus.py b/premium/core/event_bus.py index 173b8c2..cbe3101 100644 --- a/premium/core/event_bus.py +++ b/premium/core/event_bus.py @@ -37,6 +37,8 @@ from abc import ABC, abstractmethod from collections import defaultdict, deque from dataclasses import dataclass, field from enum import Enum, auto +from queue import Queue +from threading import Lock from typing import ( Any, Callable, Coroutine, Dict, List, Optional, Set, Type, TypeVar, Union, Generic, Protocol, runtime_checkable @@ -237,6 +239,11 @@ class EventBus: self._lock = asyncio.Lock() self._event_count = 0 self._dropped_count = 0 + + # Thread-safe queue for cross-thread event emission + self._thread_queue: Queue = Queue() + self._thread_lock = Lock() + self._main_loop: Optional[asyncio.AbstractEventLoop] = None # ========== Subscription Methods ========== @@ -321,6 +328,24 @@ class EventBus: return sub_id + def start(self) -> None: + """Start the event bus and capture the main event loop. + + This should be called from the main thread's async context. + """ + try: + self._main_loop = asyncio.get_running_loop() + self._running = True + self._logger.info("Event bus started") + except RuntimeError: + self._logger.warning("Event bus started without event loop") + + def stop(self) -> None: + """Stop the event bus.""" + self._running = False + self._main_loop = None + self._logger.info("Event bus stopped") + def unsubscribe(self, subscription_id: str) -> bool: """Unsubscribe from events. @@ -413,17 +438,37 @@ class EventBus: self._queue.append(event) self._event_count += 1 - # Process immediately in async context (thread-safe) + # Try to process in async context try: loop = asyncio.get_running_loop() - if loop.is_running(): - asyncio.create_task(self._process_event(event)) + # We're in an async context - safe to create task + asyncio.create_task(self._process_event(event)) except RuntimeError: - # No event loop running in this thread - queue for later processing - pass + # No event loop in this thread - queue for main thread + with self._thread_lock: + self._thread_queue.put(event) + # Try to notify main loop if available + if self._main_loop and self._main_loop.is_running(): + try: + self._main_loop.call_soon_threadsafe( + lambda: asyncio.create_task(self._process_thread_queue()) + ) + except: + pass return event + async def _process_thread_queue(self) -> None: + """Process events queued from other threads.""" + while not self._thread_queue.empty(): + try: + with self._thread_lock: + if not self._thread_queue.empty(): + event = self._thread_queue.get_nowait() + await self._process_event(event) + except: + break + async def emit_async( self, event_type: str, diff --git a/premium/eu_integration/game_client.py b/premium/eu_integration/game_client.py index 80766fd..5bf5c1a 100644 --- a/premium/eu_integration/game_client.py +++ b/premium/eu_integration/game_client.py @@ -188,10 +188,14 @@ class GameClient: except Exception as e: self._logger.error(f"Error in callback: {e}") - # Emit to event bus + # Emit to event bus (thread-safe) if self.event_bus: try: self.event_bus.emit(f"game.{event_type}", data, source="game_client") + except RuntimeError as e: + # No event loop in this thread - event is queued for main thread + # This is expected when emitting from background threads + pass except Exception as e: self._logger.error(f"Error emitting to event bus: {e}")