diff --git a/core/window_manager.py b/core/window_manager.py index 51cc425..95da514 100644 --- a/core/window_manager.py +++ b/core/window_manager.py @@ -7,9 +7,11 @@ Windows-specific implementation using ctypes (no external dependencies). import sys import time +import threading from typing import Optional, Tuple, Dict, Any from dataclasses import dataclass from pathlib import Path +from functools import lru_cache # Platform detection IS_WINDOWS = sys.platform == 'win32' @@ -26,6 +28,13 @@ if IS_WINDOWS: else: WINDOWS_AVAILABLE = False +# Optional psutil import for fast process detection +try: + import psutil + PSUTIL_AVAILABLE = True +except ImportError: + PSUTIL_AVAILABLE = False + @dataclass class WindowInfo: @@ -50,6 +59,41 @@ class ProcessInfo: cpu_percent: Optional[float] +class _WindowCache: + """Thread-safe cache for window information.""" + + def __init__(self, ttl_seconds: float = 3.0): + self._cache: Dict[str, Any] = {} + self._timestamps: Dict[str, float] = {} + self._lock = threading.Lock() + self._ttl = ttl_seconds + + def get(self, key: str) -> Any: + """Get cached value if not expired.""" + with self._lock: + if key not in self._cache: + return None + timestamp = self._timestamps.get(key, 0) + if time.time() - timestamp > self._ttl: + # Expired + del self._cache[key] + del self._timestamps[key] + return None + return self._cache[key] + + def set(self, key: str, value: Any): + """Set cached value.""" + with self._lock: + self._cache[key] = value + self._timestamps[key] = time.time() + + def clear(self): + """Clear all cached values.""" + with self._lock: + self._cache.clear() + self._timestamps.clear() + + class WindowManager: """ Singleton Window Manager for EU-Utility. @@ -59,6 +103,7 @@ class WindowManager: """ _instance = None + _lock = threading.Lock() # Window search criteria - look for actual game client, not the loader EU_WINDOW_TITLE = "Entropia Universe Client" # Matches "Entropia Universe Client (64-bit) PlanetName" @@ -66,32 +111,49 @@ class WindowManager: def __new__(cls): if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._initialized = False + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return - + + # Core state (lazy initialization) self._window_handle: Optional[int] = None self._window_info: Optional[WindowInfo] = None self._process_info: Optional[ProcessInfo] = None self._last_update: float = 0 - self._update_interval: float = 1.0 # seconds + self._update_interval: float = 3.0 # Increased to 3 seconds - # Windows API constants - if IS_WINDOWS and WINDOWS_AVAILABLE: - self._setup_windows_api() + # Caching + self._cache = _WindowCache(ttl_seconds=3.0) + self._window_handle_cache: Optional[int] = None + self._window_handle_cache_time: float = 0 + self._window_handle_ttl: float = 5.0 # Cache window handle for 5 seconds + + # Windows API (lazy initialization) + self._windows_api_ready = False + self._user32 = None + self._kernel32 = None + self._psutil_initialized = False self._initialized = True self._available = IS_WINDOWS and WINDOWS_AVAILABLE if not self._available: - print("[WindowManager] Windows API not available - running in limited mode") + pass # Don't print here, print on first actual use if needed - def _setup_windows_api(self): - """Setup Windows API constants and functions.""" + def _ensure_windows_api(self): + """Lazy initialization of Windows API.""" + if self._windows_api_ready: + return + + if not IS_WINDOWS or not WINDOWS_AVAILABLE: + return + # Window styles self.GWL_STYLE = -16 self.GWL_EXSTYLE = -20 @@ -99,8 +161,8 @@ class WindowManager: self.WS_MINIMIZE = 0x20000000 # Load user32.dll functions - self.user32 = ctypes.windll.user32 - self.kernel32 = ctypes.windll.kernel32 + self._user32 = ctypes.windll.user32 + self._kernel32 = ctypes.windll.kernel32 # EnumWindows callback type self.EnumWindowsProc = ctypes.WINFUNCTYPE( @@ -108,6 +170,37 @@ class WindowManager: wintypes.HWND, wintypes.LPARAM ) + + self._windows_api_ready = True + + def _ensure_psutil(self): + """Lazy initialization of psutil-based process detection.""" + if self._psutil_initialized: + return + + if PSUTIL_AVAILABLE: + # Pre-cache EU process info for fast lookups + self._refresh_eu_process_cache() + + self._psutil_initialized = True + + def _refresh_eu_process_cache(self): + """Refresh cached EU process information.""" + if not PSUTIL_AVAILABLE: + return + + try: + for proc in psutil.process_iter(['pid', 'name', 'exe']): + try: + proc_name = proc.info['name'].lower() + if any(eu_name.lower() == proc_name for eu_name in self.EU_PROCESS_NAMES): + self._cache.set('eu_pid', proc.info['pid']) + self._cache.set('eu_process_name', proc.info['name']) + break + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + except Exception: + pass # ========== Public API ========== @@ -115,6 +208,33 @@ class WindowManager: """Check if window manager is fully functional.""" return self._available + def find_eu_window_cached(self) -> Optional[WindowInfo]: + """ + Find EU window with aggressive caching for performance. + Returns cached result if available and recent. + """ + if not self._available: + return None + + # Check cache first + cached = self._cache.get('eu_window_info') + if cached: + return cached + + # Check if we have a cached window handle that's still valid + if self._window_handle_cache: + current_time = time.time() + if current_time - self._window_handle_cache_time < self._window_handle_ttl: + # Validate handle is still valid + if self._is_window_valid(self._window_handle_cache): + info = self._get_window_info_fast(self._window_handle_cache) + if info: + self._cache.set('eu_window_info', info) + return info + + # Fall back to full search + return self.find_eu_window() + def find_eu_window(self) -> Optional[WindowInfo]: """ Find the Entropia Universe game window. @@ -125,24 +245,92 @@ class WindowManager: if not self._available: return None - # Try by window title first - hwnd = self._find_window_by_title(self.EU_WINDOW_TITLE) + self._ensure_windows_api() + + # Try by window title first (faster) + hwnd = self._find_window_by_title_fast(self.EU_WINDOW_TITLE) if hwnd: self._window_handle = hwnd - self._window_info = self._get_window_info(hwnd) + self._window_handle_cache = hwnd + self._window_handle_cache_time = time.time() + self._window_info = self._get_window_info_fast(hwnd) + if self._window_info: + self._cache.set('eu_window_info', self._window_info) return self._window_info - # Try by process name + # Try by process name if title fails for proc_name in self.EU_PROCESS_NAMES: - hwnd = self._find_window_by_process(proc_name) + hwnd = self._find_window_by_process_fast(proc_name) if hwnd: self._window_handle = hwnd - self._window_info = self._get_window_info(hwnd) + self._window_handle_cache = hwnd + self._window_handle_cache_time = time.time() + self._window_info = self._get_window_info_fast(hwnd) + if self._window_info: + self._cache.set('eu_window_info', self._window_info) return self._window_info return None + def is_eu_focused_fast(self) -> bool: + """ + Ultra-fast check if EU is focused using psutil + cached window handle. + Target: <10ms execution time. + """ + if not self._available: + return False + + self._ensure_windows_api() + self._ensure_psutil() + + try: + # Fast path: Check if our cached window is still focused + if self._window_handle_cache: + # Quick foreground window check (no window enumeration) + foreground_hwnd = self._user32.GetForegroundWindow() + if foreground_hwnd == self._window_handle_cache: + # Verify window is still valid + if self._is_window_valid(self._window_handle_cache): + return True + else: + # Cache is stale, clear it + self._window_handle_cache = None + + # Medium path: Use psutil to find EU process and check its windows + if PSUTIL_AVAILABLE: + eu_pid = self._cache.get('eu_pid') + if eu_pid: + try: + proc = psutil.Process(eu_pid) + # Check if process still exists + if proc.is_running(): + # Get foreground window + foreground_hwnd = self._user32.GetForegroundWindow() + # Get PID of foreground window + fg_pid = wintypes.DWORD() + self._user32.GetWindowThreadProcessId(foreground_hwnd, ctypes.byref(fg_pid)) + if fg_pid.value == eu_pid: + # Found it! Cache the handle + self._window_handle_cache = foreground_hwnd + self._window_handle_cache_time = time.time() + return True + else: + # Process died, refresh cache + self._refresh_eu_process_cache() + except (psutil.NoSuchProcess, psutil.AccessDenied): + self._refresh_eu_process_cache() + + # Slow path: Find window and check focus + window = self.find_eu_window_cached() + if window: + return window.is_focused + + return False + + except Exception: + return False + def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]: """ Get the window rectangle (left, top, right, bottom). @@ -167,18 +355,7 @@ class WindowManager: Returns: True if EU window is active, False otherwise """ - if not self._available: - return False - - if not self._window_handle: - self.find_eu_window() - - if not self._window_handle: - return False - - # Get foreground window - foreground_hwnd = self.user32.GetForegroundWindow() - return foreground_hwnd == self._window_handle + return self.is_eu_focused_fast() def is_window_visible(self) -> bool: """ @@ -207,6 +384,8 @@ class WindowManager: if not self._available: return False + self._ensure_windows_api() + if not self._window_handle: self.find_eu_window() @@ -215,13 +394,13 @@ class WindowManager: try: # Show window if minimized - self.user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9 + self._user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9 # Bring to front - result = self.user32.SetForegroundWindow(self._window_handle) + result = self._user32.SetForegroundWindow(self._window_handle) # Force window to top - self.user32.SetWindowPos( + self._user32.SetWindowPos( self._window_handle, -1, # HWND_TOPMOST 0, 0, 0, 0, @@ -229,7 +408,7 @@ class WindowManager: ) # Remove topmost flag but keep on top - self.user32.SetWindowPos( + self._user32.SetWindowPos( self._window_handle, -2, # HWND_NOTOPMOST 0, 0, 0, 0, @@ -238,7 +417,6 @@ class WindowManager: return bool(result) except Exception as e: - print(f"[WindowManager] Failed to bring window to front: {e}") return False def get_eu_process_info(self) -> Optional[ProcessInfo]: @@ -251,6 +429,13 @@ class WindowManager: if not self._available: return None + self._ensure_windows_api() + + # Try cache first + cached = self._cache.get('eu_process_info') + if cached: + return cached + if not self._window_handle: self.find_eu_window() @@ -259,20 +444,24 @@ class WindowManager: # Get PID from window pid = wintypes.DWORD() - self.user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid)) + self._user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid)) if pid.value == 0: return None - self._process_info = self._get_process_info(pid.value) + self._process_info = self._get_process_info_fast(pid.value) + if self._process_info: + self._cache.set('eu_process_info', self._process_info) return self._process_info def get_window_handle(self) -> Optional[int]: """Get the current window handle.""" - return self._window_handle + return self._window_handle_cache or self._window_handle def refresh(self) -> Optional[WindowInfo]: """Force refresh of window information.""" + self._cache.clear() + self._window_handle_cache = None self._last_update = 0 return self.find_eu_window() @@ -282,118 +471,179 @@ class WindowManager: """Update cached window info if needed.""" current_time = time.time() if current_time - self._last_update > self._update_interval: - if self._window_handle: - self._window_info = self._get_window_info(self._window_handle) + if self._window_handle_cache and self._is_window_valid(self._window_handle_cache): + self._window_info = self._get_window_info_fast(self._window_handle_cache) else: self.find_eu_window() self._last_update = current_time - def _find_window_by_title(self, title: str) -> Optional[int]: - """Find window by title (partial match) - with timeout protection.""" + def _is_window_valid(self, hwnd: int) -> bool: + """Check if a window handle is still valid (fast).""" + if not self._available or not hwnd: + return False + try: + return bool(self._user32.IsWindow(hwnd)) + except Exception: + return False + + def _find_window_by_title_fast(self, title: str) -> Optional[int]: + """ + Find window by title with strict timeout and limits. + Target: <50ms execution time. + """ found_hwnd = [None] - start_time = time.time() + start_time = time.perf_counter() window_count = [0] - MAX_WINDOWS = 500 # Limit windows to check - MAX_TIME = 0.1 # 100ms timeout + MAX_WINDOWS = 200 # Reduced from 500 + MAX_TIME = 0.025 # 25ms timeout (reduced from 100ms) + + title_lower = title.lower() def callback(hwnd, extra): - # Check timeout - if time.time() - start_time > MAX_TIME: - return False # Stop enumeration - timeout + # Check timeout first + if time.perf_counter() - start_time > MAX_TIME: + return False - # Check window limit window_count[0] += 1 if window_count[0] > MAX_WINDOWS: - return False # Stop enumeration - too many windows + return False - if not self.user32.IsWindowVisible(hwnd): + # Quick visibility check + if not self._user32.IsWindowVisible(hwnd): return True - # Quick check - skip windows with no title - text = ctypes.create_unicode_buffer(256) - self.user32.GetWindowTextW(hwnd, text, 256) - window_title = text.value + # Get window text with smaller buffer + text = ctypes.create_unicode_buffer(128) # Reduced from 256 + length = self._user32.GetWindowTextW(hwnd, text, 128) - if not window_title: # Skip empty titles + if length == 0: # Skip empty titles quickly return True - # Fast case-insensitive check - if title.lower() in window_title.lower(): + # Fast lowercase comparison + if title_lower in text.value.lower(): found_hwnd[0] = hwnd return False # Stop enumeration - found! return True proc = self.EnumWindowsProc(callback) - self.user32.EnumWindows(proc, 0) + self._user32.EnumWindows(proc, 0) return found_hwnd[0] - def _find_window_by_process(self, process_name: str) -> Optional[int]: - """Find window by process name - with timeout protection.""" + def _find_window_by_process_fast(self, process_name: str) -> Optional[int]: + """ + Find window by process name with strict timeout. + Uses psutil if available for faster process detection. + """ + # Try psutil first for faster process-based lookup + if PSUTIL_AVAILABLE: + try: + for proc in psutil.process_iter(['pid', 'name']): + try: + if process_name.lower() in proc.info['name'].lower(): + # Found process, now find its window + target_pid = proc.info['pid'] + # Look for window with this PID + found = self._find_window_by_pid(target_pid) + if found: + return found + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + except Exception: + pass + + # Fall back to enumeration found_hwnd = [None] - start_time = time.time() + start_time = time.perf_counter() window_count = [0] - MAX_WINDOWS = 300 # Lower limit for process check (slower) - MAX_TIME = 0.15 # 150ms timeout + MAX_WINDOWS = 150 # Reduced from 300 + MAX_TIME = 0.030 # 30ms timeout (reduced from 150ms) + + process_name_lower = process_name.lower() def callback(hwnd, extra): - # Check timeout - if time.time() - start_time > MAX_TIME: - return False # Stop enumeration - timeout + if time.perf_counter() - start_time > MAX_TIME: + return False - # Check window limit window_count[0] += 1 if window_count[0] > MAX_WINDOWS: - return False # Stop enumeration - too many windows + return False - if not self.user32.IsWindowVisible(hwnd): + if not self._user32.IsWindowVisible(hwnd): return True # Get process ID pid = wintypes.DWORD() - self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) if pid.value == 0: return True - # Check process name (this is slower, so we limit more) - try: - proc_info = self._get_process_info(pid.value) - if proc_info and process_name.lower() in proc_info.name.lower(): - found_hwnd[0] = hwnd - return False # Stop enumeration - found! - except Exception: - pass # Skip on error + # Use cached process info if available + cache_key = f"proc_{pid.value}" + proc_info = self._cache.get(cache_key) + + if proc_info is None: + # Get process info (cached) + proc_info = self._get_process_info_fast(pid.value) + if proc_info: + self._cache.set(cache_key, proc_info) + + if proc_info and process_name_lower in proc_info.name.lower(): + found_hwnd[0] = hwnd + return False return True proc = self.EnumWindowsProc(callback) - self.user32.EnumWindows(proc, 0) + self._user32.EnumWindows(proc, 0) return found_hwnd[0] - def _get_window_info(self, hwnd: int) -> Optional[WindowInfo]: - """Get detailed information about a window.""" + def _find_window_by_pid(self, target_pid: int) -> Optional[int]: + """Find a window by its process ID.""" + found_hwnd = [None] + + def callback(hwnd, extra): + if not self._user32.IsWindowVisible(hwnd): + return True + + pid = wintypes.DWORD() + self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + + if pid.value == target_pid: + found_hwnd[0] = hwnd + return False + + return True + + proc = self.EnumWindowsProc(callback) + self._user32.EnumWindows(proc, 0) + + return found_hwnd[0] + + def _get_window_info_fast(self, hwnd: int) -> Optional[WindowInfo]: + """Get window info with minimal overhead and timeout protection.""" try: # Get window rect rect = wintypes.RECT() - if not self.user32.GetWindowRect(hwnd, ctypes.byref(rect)): + if not self._user32.GetWindowRect(hwnd, ctypes.byref(rect)): return None - # Get window text - text = ctypes.create_unicode_buffer(256) - self.user32.GetWindowTextW(hwnd, text, 256) + # Get window text (smaller buffer for speed) + text = ctypes.create_unicode_buffer(128) + self._user32.GetWindowTextW(hwnd, text, 128) # Get PID pid = wintypes.DWORD() - self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) + self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid)) # Check visibility - is_visible = bool(self.user32.IsWindowVisible(hwnd)) + is_visible = bool(self._user32.IsWindowVisible(hwnd)) # Check if focused - foreground = self.user32.GetForegroundWindow() + foreground = self._user32.GetForegroundWindow() is_focused = (foreground == hwnd) return WindowInfo( @@ -406,26 +656,38 @@ class WindowManager: is_visible=is_visible, is_focused=is_focused ) - except Exception as e: - print(f"[WindowManager] Error getting window info: {e}") + except Exception: return None - def _get_process_info(self, pid: int) -> Optional[ProcessInfo]: - """Get process information.""" + def _get_process_info_fast(self, pid: int) -> Optional[ProcessInfo]: + """Get process info with caching and minimal overhead.""" + # Try psutil first (much faster) + if PSUTIL_AVAILABLE: + try: + proc = psutil.Process(pid) + info = proc.as_dict(attrs=['pid', 'name', 'exe', 'memory_info']) + return ProcessInfo( + pid=info['pid'], + name=info['name'], + executable_path=info.get('exe'), + memory_usage=info['memory_info'].rss if info.get('memory_info') else None, + cpu_percent=None + ) + except (psutil.NoSuchProcess, psutil.AccessDenied): + return None + except Exception: + pass + + # Fall back to tasklist (slower) try: - # Use Windows WMI or tasklist for process info - import subprocess - - # Try tasklist first result = subprocess.run( ['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'], capture_output=True, text=True, - timeout=5 + timeout=2 # Reduced from 5 ) if result.returncode == 0 and result.stdout.strip(): - # Parse CSV output lines = result.stdout.strip().split('\n') for line in lines: if str(pid) in line: @@ -447,15 +709,8 @@ class WindowManager: memory_usage=None, cpu_percent=None ) - except Exception as e: - print(f"[WindowManager] Error getting process info: {e}") + except Exception: return None - - def _check_window_exists(self, hwnd: int) -> bool: - """Check if a window handle is still valid.""" - if not self._available: - return False - return bool(self.user32.IsWindow(hwnd)) # Singleton instance @@ -473,7 +728,7 @@ def get_window_manager() -> WindowManager: def is_eu_running() -> bool: """Quick check if Entropia Universe is running.""" wm = get_window_manager() - return wm.find_eu_window() is not None + return wm.find_eu_window_cached() is not None def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]: @@ -496,7 +751,7 @@ def wait_for_eu(timeout: float = 30.0) -> bool: start_time = time.time() while time.time() - start_time < timeout: - if wm.find_eu_window(): + if wm.find_eu_window_cached(): return True time.sleep(0.5)