""" EU-Utility - Window Manager Core Service Manages interaction with the Entropia Universe game window. Windows-specific implementation using ctypes (no external dependencies). """ import sys import time import threading from typing import Optional, Tuple, Dict, Any from dataclasses import dataclass from pathlib import Path from functools import lru_cache # Platform detection IS_WINDOWS = sys.platform == 'win32' # Windows-specific imports with graceful fallback if IS_WINDOWS: try: import ctypes from ctypes import wintypes import subprocess WINDOWS_AVAILABLE = True except ImportError: WINDOWS_AVAILABLE = False else: WINDOWS_AVAILABLE = False # Optional psutil import for fast process detection try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False @dataclass class WindowInfo: """Information about a window.""" handle: int title: str pid: int rect: Tuple[int, int, int, int] # left, top, right, bottom width: int height: int is_visible: bool is_focused: bool @dataclass class ProcessInfo: """Information about a process.""" pid: int name: str executable_path: Optional[str] memory_usage: Optional[int] # in bytes cpu_percent: Optional[float] class _WindowCache: """Thread-safe cache for window information.""" def __init__(self, ttl_seconds: float = 3.0): self._cache: Dict[str, Any] = {} self._timestamps: Dict[str, float] = {} self._lock = threading.Lock() self._ttl = ttl_seconds def get(self, key: str) -> Any: """Get cached value if not expired.""" with self._lock: if key not in self._cache: return None timestamp = self._timestamps.get(key, 0) if time.time() - timestamp > self._ttl: # Expired del self._cache[key] del self._timestamps[key] return None return self._cache[key] def set(self, key: str, value: Any): """Set cached value.""" with self._lock: self._cache[key] = value self._timestamps[key] = time.time() def clear(self): """Clear all cached values.""" with self._lock: self._cache.clear() self._timestamps.clear() class WindowManager: """ Singleton Window Manager for EU-Utility. Manages finding and interacting with the Entropia Universe game window. Windows-specific implementation with graceful fallback on Linux. """ _instance = None _lock = threading.Lock() # Window search criteria - look for actual game client, not the loader EU_WINDOW_TITLE = "Entropia Universe Client" # Matches "Entropia Universe Client (64-bit) PlanetName" EU_PROCESS_NAMES = ["entropia.exe", "entropiauniverse.exe"] # Excludes clientloader.exe (launcher only) def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return # Core state (lazy initialization) self._window_handle: Optional[int] = None self._window_info: Optional[WindowInfo] = None self._process_info: Optional[ProcessInfo] = None self._last_update: float = 0 self._update_interval: float = 3.0 # Increased to 3 seconds # Caching self._cache = _WindowCache(ttl_seconds=3.0) self._window_handle_cache: Optional[int] = None self._window_handle_cache_time: float = 0 self._window_handle_ttl: float = 5.0 # Cache window handle for 5 seconds # Windows API (lazy initialization) self._windows_api_ready = False self._user32 = None self._kernel32 = None self._psutil_initialized = False self._initialized = True self._available = IS_WINDOWS and WINDOWS_AVAILABLE if not self._available: pass # Don't print here, print on first actual use if needed def _ensure_windows_api(self): """Lazy initialization of Windows API.""" if self._windows_api_ready: return if not IS_WINDOWS or not WINDOWS_AVAILABLE: return # Window styles self.GWL_STYLE = -16 self.GWL_EXSTYLE = -20 self.WS_VISIBLE = 0x10000000 self.WS_MINIMIZE = 0x20000000 # Load user32.dll functions self._user32 = ctypes.windll.user32 self._kernel32 = ctypes.windll.kernel32 # EnumWindows callback type self.EnumWindowsProc = ctypes.WINFUNCTYPE( wintypes.BOOL, wintypes.HWND, wintypes.LPARAM ) self._windows_api_ready = True def _ensure_psutil(self): """Lazy initialization of psutil-based process detection.""" if self._psutil_initialized: return if PSUTIL_AVAILABLE: # Pre-cache EU process info for fast lookups self._refresh_eu_process_cache() self._psutil_initialized = True def _refresh_eu_process_cache(self): """Refresh cached EU process information.""" if not PSUTIL_AVAILABLE: return try: for proc in psutil.process_iter(['pid', 'name', 'exe']): try: proc_name = proc.info['name'].lower() if any(eu_name.lower() == proc_name for eu_name in self.EU_PROCESS_NAMES): self._cache.set('eu_pid', proc.info['pid']) self._cache.set('eu_process_name', proc.info['name']) break except (psutil.NoSuchProcess, psutil.AccessDenied): continue except Exception: pass # ========== Public API ========== def is_available(self) -> bool: """Check if window manager is fully functional.""" return self._available def find_eu_window_cached(self) -> Optional[WindowInfo]: """ Find EU window with aggressive caching for performance. Returns cached result if available and recent. """ if not self._available: return None # Check cache first cached = self._cache.get('eu_window_info') if cached: return cached # Check if we have a cached window handle that's still valid if self._window_handle_cache: current_time = time.time() if current_time - self._window_handle_cache_time < self._window_handle_ttl: # Validate handle is still valid if self._is_window_valid(self._window_handle_cache): info = self._get_window_info_fast(self._window_handle_cache) if info: self._cache.set('eu_window_info', info) return info # Fall back to full search return self.find_eu_window() def find_eu_window(self) -> Optional[WindowInfo]: """ Find the Entropia Universe game window. Returns: WindowInfo if found, None otherwise """ if not self._available: return None self._ensure_windows_api() # Try by window title first (faster) hwnd = self._find_window_by_title_fast(self.EU_WINDOW_TITLE) if hwnd: self._window_handle = hwnd self._window_handle_cache = hwnd self._window_handle_cache_time = time.time() self._window_info = self._get_window_info_fast(hwnd) if self._window_info: self._cache.set('eu_window_info', self._window_info) return self._window_info # Try by process name if title fails for proc_name in self.EU_PROCESS_NAMES: hwnd = self._find_window_by_process_fast(proc_name) if hwnd: self._window_handle = hwnd self._window_handle_cache = hwnd self._window_handle_cache_time = time.time() self._window_info = self._get_window_info_fast(hwnd) if self._window_info: self._cache.set('eu_window_info', self._window_info) return self._window_info return None def is_eu_focused_fast(self) -> bool: """ Ultra-fast check if EU is focused using psutil + cached window handle. Target: <10ms execution time. """ if not self._available: return False self._ensure_windows_api() self._ensure_psutil() try: # Fast path: Check if our cached window is still focused if self._window_handle_cache: # Quick foreground window check (no window enumeration) foreground_hwnd = self._user32.GetForegroundWindow() if foreground_hwnd == self._window_handle_cache: # Verify window is still valid if self._is_window_valid(self._window_handle_cache): return True else: # Cache is stale, clear it self._window_handle_cache = None # Medium path: Use psutil to find EU process and check its windows if PSUTIL_AVAILABLE: eu_pid = self._cache.get('eu_pid') if eu_pid: try: proc = psutil.Process(eu_pid) # Check if process still exists if proc.is_running(): # Get foreground window foreground_hwnd = self._user32.GetForegroundWindow() # Get PID of foreground window fg_pid = wintypes.DWORD() self._user32.GetWindowThreadProcessId(foreground_hwnd, ctypes.byref(fg_pid)) if fg_pid.value == eu_pid: # Found it! Cache the handle self._window_handle_cache = foreground_hwnd self._window_handle_cache_time = time.time() return True else: # Process died, refresh cache self._refresh_eu_process_cache() except (psutil.NoSuchProcess, psutil.AccessDenied): self._refresh_eu_process_cache() # Slow path: Find window and check focus window = self.find_eu_window_cached() if window: return window.is_focused return False except Exception: return False def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]: """ Get the window rectangle (left, top, right, bottom). Returns: Tuple of (left, top, right, bottom) or None """ if not self._available: return None # Refresh window info self._update_window_info() if self._window_info: return self._window_info.rect return None def is_window_focused(self) -> bool: """ Check if the EU window is currently focused/active. Returns: True if EU window is active, False otherwise """ return self.is_eu_focused_fast() def is_window_visible(self) -> bool: """ Check if the EU window is visible. Returns: True if visible, False otherwise """ if not self._available: return False # Refresh window info self._update_window_info() if self._window_info: return self._window_info.is_visible return False def bring_to_front(self) -> bool: """ Bring the EU window to the front and focus it. Returns: True if successful, False otherwise """ if not self._available: return False self._ensure_windows_api() if not self._window_handle: self.find_eu_window() if not self._window_handle: return False try: # Show window if minimized self._user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9 # Bring to front result = self._user32.SetForegroundWindow(self._window_handle) # Force window to top self._user32.SetWindowPos( self._window_handle, -1, # HWND_TOPMOST 0, 0, 0, 0, 0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE ) # Remove topmost flag but keep on top self._user32.SetWindowPos( self._window_handle, -2, # HWND_NOTOPMOST 0, 0, 0, 0, 0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE ) return bool(result) except Exception as e: return False def get_eu_process_info(self) -> Optional[ProcessInfo]: """ Get process information for Entropia Universe. Returns: ProcessInfo if found, None otherwise """ if not self._available: return None self._ensure_windows_api() # Try cache first cached = self._cache.get('eu_process_info') if cached: return cached if not self._window_handle: self.find_eu_window() if not self._window_handle: return None # Get PID from window pid = wintypes.DWORD() self._user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid)) if pid.value == 0: return None self._process_info = self._get_process_info_fast(pid.value) if self._process_info: self._cache.set('eu_process_info', self._process_info) return self._process_info def get_window_handle(self) -> Optional[int]: """Get the current window handle.""" return self._window_handle_cache or self._window_handle def refresh(self) -> Optional[WindowInfo]: """Force refresh of window information.""" self._cache.clear() self._window_handle_cache = None self._last_update = 0 return self.find_eu_window() # ========== Private Methods ========== def _update_window_info(self): """Update cached window info if needed.""" current_time = time.time() if current_time - self._last_update > self._update_interval: if self._window_handle_cache and self._is_window_valid(self._window_handle_cache): self._window_info = self._get_window_info_fast(self._window_handle_cache) else: self.find_eu_window() self._last_update = current_time def _is_window_valid(self, hwnd: int) -> bool: """Check if a window handle is still valid (fast).""" if not self._available or not hwnd: return False try: return bool(self._user32.IsWindow(hwnd)) except Exception: return False def _find_window_by_title_fast(self, title: str) -> Optional[int]: """ Find window by title with strict timeout and limits. Target: <50ms execution time. """ found_hwnd = [None] start_time = time.perf_counter() window_count = [0] MAX_WINDOWS = 200 # Reduced from 500 MAX_TIME = 0.025 # 25ms timeout (reduced from 100ms) title_lower = title.lower() def callback(hwnd, extra): # Check timeout first if time.perf_counter() - start_time > MAX_TIME: return False window_count[0] += 1 if window_count[0] > MAX_WINDOWS: return False # Quick visibility check if not self._user32.IsWindowVisible(hwnd): return True # Get window text with smaller buffer text = ctypes.create_unicode_buffer(128) # Reduced from 256 length = self._user32.GetWindowTextW(hwnd, text, 128) if length == 0: # Skip empty titles quickly return True # Fast lowercase comparison if title_lower in text.value.lower(): found_hwnd[0] = hwnd return False # Stop enumeration - found! return True proc = self.EnumWindowsProc(callback) self._user32.EnumWindows(proc, 0) return found_hwnd[0] def _find_window_by_process_fast(self, process_name: str) -> Optional[int]: """ Find window by process name with strict timeout. Uses psutil if available for faster process detection. """ # Try psutil first for faster process-based lookup if PSUTIL_AVAILABLE: try: for proc in psutil.process_iter(['pid', 'name']): try: if process_name.lower() in proc.info['name'].lower(): # Found process, now find its window target_pid = proc.info['pid'] # Look for window with this PID found = self._find_window_by_pid(target_pid) if found: return found except (psutil.NoSuchProcess, psutil.AccessDenied): continue except Exception: pass # Fall back to enumeration found_hwnd = [None] start_time = time.perf_counter() window_count = [0] MAX_WINDOWS = 150 # Reduced from 300 MAX_TIME = 0.030 # 30ms timeout (reduced from 150ms) process_name_lower = process_name.lower() def callback(hwnd, extra): if time.perf_counter() - start_time > MAX_TIME: return False window_count[0] += 1 if window_count[0] > MAX_WINDOWS: return False if not self._user32.IsWindowVisible(hwnd): return True # Get process ID pid = wintypes.DWORD() self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) if pid.value == 0: return True # Use cached process info if available cache_key = f"proc_{pid.value}" proc_info = self._cache.get(cache_key) if proc_info is None: # Get process info (cached) proc_info = self._get_process_info_fast(pid.value) if proc_info: self._cache.set(cache_key, proc_info) if proc_info and process_name_lower in proc_info.name.lower(): found_hwnd[0] = hwnd return False return True proc = self.EnumWindowsProc(callback) self._user32.EnumWindows(proc, 0) return found_hwnd[0] def _find_window_by_pid(self, target_pid: int) -> Optional[int]: """Find a window by its process ID.""" found_hwnd = [None] def callback(hwnd, extra): if not self._user32.IsWindowVisible(hwnd): return True pid = wintypes.DWORD() self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) if pid.value == target_pid: found_hwnd[0] = hwnd return False return True proc = self.EnumWindowsProc(callback) self._user32.EnumWindows(proc, 0) return found_hwnd[0] def _get_window_info_fast(self, hwnd: int) -> Optional[WindowInfo]: """Get window info with minimal overhead and timeout protection.""" try: # Get window rect rect = wintypes.RECT() if not self._user32.GetWindowRect(hwnd, ctypes.byref(rect)): return None # Get window text (smaller buffer for speed) text = ctypes.create_unicode_buffer(128) self._user32.GetWindowTextW(hwnd, text, 128) # Get PID pid = wintypes.DWORD() self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) # Check visibility is_visible = bool(self._user32.IsWindowVisible(hwnd)) # Check if focused foreground = self._user32.GetForegroundWindow() is_focused = (foreground == hwnd) return WindowInfo( handle=hwnd, title=text.value, pid=pid.value, rect=(rect.left, rect.top, rect.right, rect.bottom), width=rect.right - rect.left, height=rect.bottom - rect.top, is_visible=is_visible, is_focused=is_focused ) except Exception: return None def _get_process_info_fast(self, pid: int) -> Optional[ProcessInfo]: """Get process info with caching and minimal overhead.""" # Try psutil first (much faster) if PSUTIL_AVAILABLE: try: proc = psutil.Process(pid) info = proc.as_dict(attrs=['pid', 'name', 'exe', 'memory_info']) return ProcessInfo( pid=info['pid'], name=info['name'], executable_path=info.get('exe'), memory_usage=info['memory_info'].rss if info.get('memory_info') else None, cpu_percent=None ) except (psutil.NoSuchProcess, psutil.AccessDenied): return None except Exception: pass # Fall back to tasklist (slower) try: result = subprocess.run( ['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'], capture_output=True, text=True, timeout=2 # Reduced from 5 ) if result.returncode == 0 and result.stdout.strip(): lines = result.stdout.strip().split('\n') for line in lines: if str(pid) in line: parts = line.split('","') if len(parts) >= 2: name = parts[0].replace('"', '') return ProcessInfo( pid=pid, name=name, executable_path=None, memory_usage=None, cpu_percent=None ) return ProcessInfo( pid=pid, name="Unknown", executable_path=None, memory_usage=None, cpu_percent=None ) except Exception: return None # Singleton instance _window_manager = None def get_window_manager() -> WindowManager: """Get the global WindowManager instance.""" global _window_manager if _window_manager is None: _window_manager = WindowManager() return _window_manager def is_eu_running() -> bool: """Quick check if Entropia Universe is running.""" wm = get_window_manager() return wm.find_eu_window_cached() is not None def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]: """Quick access to EU window rectangle.""" wm = get_window_manager() return wm.get_window_rect() def wait_for_eu(timeout: float = 30.0) -> bool: """ Wait for Entropia Universe to start. Args: timeout: Maximum seconds to wait Returns: True if EU found, False if timeout """ wm = get_window_manager() start_time = time.time() while time.time() - start_time < timeout: if wm.find_eu_window_cached(): return True time.sleep(0.5) return False