EU-Utility/core/window_manager.py

759 lines
25 KiB
Python

"""
EU-Utility - Window Manager Core Service
Manages interaction with the Entropia Universe game window.
Windows-specific implementation using ctypes (no external dependencies).
"""
import sys
import time
import threading
from typing import Optional, Tuple, Dict, Any
from dataclasses import dataclass
from pathlib import Path
from functools import lru_cache
# Platform detection
IS_WINDOWS = sys.platform == 'win32'
# Windows-specific imports with graceful fallback
if IS_WINDOWS:
try:
import ctypes
from ctypes import wintypes
import subprocess
WINDOWS_AVAILABLE = True
except ImportError:
WINDOWS_AVAILABLE = False
else:
WINDOWS_AVAILABLE = False
# Optional psutil import for fast process detection
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
@dataclass
class WindowInfo:
"""Information about a window."""
handle: int
title: str
pid: int
rect: Tuple[int, int, int, int] # left, top, right, bottom
width: int
height: int
is_visible: bool
is_focused: bool
@dataclass
class ProcessInfo:
"""Information about a process."""
pid: int
name: str
executable_path: Optional[str]
memory_usage: Optional[int] # in bytes
cpu_percent: Optional[float]
class _WindowCache:
"""Thread-safe cache for window information."""
def __init__(self, ttl_seconds: float = 3.0):
self._cache: Dict[str, Any] = {}
self._timestamps: Dict[str, float] = {}
self._lock = threading.Lock()
self._ttl = ttl_seconds
def get(self, key: str) -> Any:
"""Get cached value if not expired."""
with self._lock:
if key not in self._cache:
return None
timestamp = self._timestamps.get(key, 0)
if time.time() - timestamp > self._ttl:
# Expired
del self._cache[key]
del self._timestamps[key]
return None
return self._cache[key]
def set(self, key: str, value: Any):
"""Set cached value."""
with self._lock:
self._cache[key] = value
self._timestamps[key] = time.time()
def clear(self):
"""Clear all cached values."""
with self._lock:
self._cache.clear()
self._timestamps.clear()
class WindowManager:
"""
Singleton Window Manager for EU-Utility.
Manages finding and interacting with the Entropia Universe game window.
Windows-specific implementation with graceful fallback on Linux.
"""
_instance = None
_lock = threading.Lock()
# Window search criteria - look for actual game client, not the loader
EU_WINDOW_TITLE = "Entropia Universe Client" # Matches "Entropia Universe Client (64-bit) PlanetName"
EU_PROCESS_NAMES = ["entropia.exe", "entropiauniverse.exe"] # Excludes clientloader.exe (launcher only)
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# Core state (lazy initialization)
self._window_handle: Optional[int] = None
self._window_info: Optional[WindowInfo] = None
self._process_info: Optional[ProcessInfo] = None
self._last_update: float = 0
self._update_interval: float = 3.0 # Increased to 3 seconds
# Caching
self._cache = _WindowCache(ttl_seconds=3.0)
self._window_handle_cache: Optional[int] = None
self._window_handle_cache_time: float = 0
self._window_handle_ttl: float = 5.0 # Cache window handle for 5 seconds
# Windows API (lazy initialization)
self._windows_api_ready = False
self._user32 = None
self._kernel32 = None
self._psutil_initialized = False
self._initialized = True
self._available = IS_WINDOWS and WINDOWS_AVAILABLE
if not self._available:
pass # Don't print here, print on first actual use if needed
def _ensure_windows_api(self):
"""Lazy initialization of Windows API."""
if self._windows_api_ready:
return
if not IS_WINDOWS or not WINDOWS_AVAILABLE:
return
# Window styles
self.GWL_STYLE = -16
self.GWL_EXSTYLE = -20
self.WS_VISIBLE = 0x10000000
self.WS_MINIMIZE = 0x20000000
# Load user32.dll functions
self._user32 = ctypes.windll.user32
self._kernel32 = ctypes.windll.kernel32
# EnumWindows callback type
self.EnumWindowsProc = ctypes.WINFUNCTYPE(
wintypes.BOOL,
wintypes.HWND,
wintypes.LPARAM
)
self._windows_api_ready = True
def _ensure_psutil(self):
"""Lazy initialization of psutil-based process detection."""
if self._psutil_initialized:
return
if PSUTIL_AVAILABLE:
# Pre-cache EU process info for fast lookups
self._refresh_eu_process_cache()
self._psutil_initialized = True
def _refresh_eu_process_cache(self):
"""Refresh cached EU process information."""
if not PSUTIL_AVAILABLE:
return
try:
for proc in psutil.process_iter(['pid', 'name', 'exe']):
try:
proc_name = proc.info['name'].lower()
if any(eu_name.lower() == proc_name for eu_name in self.EU_PROCESS_NAMES):
self._cache.set('eu_pid', proc.info['pid'])
self._cache.set('eu_process_name', proc.info['name'])
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception:
pass
# ========== Public API ==========
def is_available(self) -> bool:
"""Check if window manager is fully functional."""
return self._available
def find_eu_window_cached(self) -> Optional[WindowInfo]:
"""
Find EU window with aggressive caching for performance.
Returns cached result if available and recent.
"""
if not self._available:
return None
# Check cache first
cached = self._cache.get('eu_window_info')
if cached:
return cached
# Check if we have a cached window handle that's still valid
if self._window_handle_cache:
current_time = time.time()
if current_time - self._window_handle_cache_time < self._window_handle_ttl:
# Validate handle is still valid
if self._is_window_valid(self._window_handle_cache):
info = self._get_window_info_fast(self._window_handle_cache)
if info:
self._cache.set('eu_window_info', info)
return info
# Fall back to full search
return self.find_eu_window()
def find_eu_window(self) -> Optional[WindowInfo]:
"""
Find the Entropia Universe game window.
Returns:
WindowInfo if found, None otherwise
"""
if not self._available:
return None
self._ensure_windows_api()
# Try by window title first (faster)
hwnd = self._find_window_by_title_fast(self.EU_WINDOW_TITLE)
if hwnd:
self._window_handle = hwnd
self._window_handle_cache = hwnd
self._window_handle_cache_time = time.time()
self._window_info = self._get_window_info_fast(hwnd)
if self._window_info:
self._cache.set('eu_window_info', self._window_info)
return self._window_info
# Try by process name if title fails
for proc_name in self.EU_PROCESS_NAMES:
hwnd = self._find_window_by_process_fast(proc_name)
if hwnd:
self._window_handle = hwnd
self._window_handle_cache = hwnd
self._window_handle_cache_time = time.time()
self._window_info = self._get_window_info_fast(hwnd)
if self._window_info:
self._cache.set('eu_window_info', self._window_info)
return self._window_info
return None
def is_eu_focused_fast(self) -> bool:
"""
Ultra-fast check if EU is focused using psutil + cached window handle.
Target: <10ms execution time.
"""
if not self._available:
return False
self._ensure_windows_api()
self._ensure_psutil()
try:
# Fast path: Check if our cached window is still focused
if self._window_handle_cache:
# Quick foreground window check (no window enumeration)
foreground_hwnd = self._user32.GetForegroundWindow()
if foreground_hwnd == self._window_handle_cache:
# Verify window is still valid
if self._is_window_valid(self._window_handle_cache):
return True
else:
# Cache is stale, clear it
self._window_handle_cache = None
# Medium path: Use psutil to find EU process and check its windows
if PSUTIL_AVAILABLE:
eu_pid = self._cache.get('eu_pid')
if eu_pid:
try:
proc = psutil.Process(eu_pid)
# Check if process still exists
if proc.is_running():
# Get foreground window
foreground_hwnd = self._user32.GetForegroundWindow()
# Get PID of foreground window
fg_pid = wintypes.DWORD()
self._user32.GetWindowThreadProcessId(foreground_hwnd, ctypes.byref(fg_pid))
if fg_pid.value == eu_pid:
# Found it! Cache the handle
self._window_handle_cache = foreground_hwnd
self._window_handle_cache_time = time.time()
return True
else:
# Process died, refresh cache
self._refresh_eu_process_cache()
except (psutil.NoSuchProcess, psutil.AccessDenied):
self._refresh_eu_process_cache()
# Slow path: Find window and check focus
window = self.find_eu_window_cached()
if window:
return window.is_focused
return False
except Exception:
return False
def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]:
"""
Get the window rectangle (left, top, right, bottom).
Returns:
Tuple of (left, top, right, bottom) or None
"""
if not self._available:
return None
# Refresh window info
self._update_window_info()
if self._window_info:
return self._window_info.rect
return None
def is_window_focused(self) -> bool:
"""
Check if the EU window is currently focused/active.
Returns:
True if EU window is active, False otherwise
"""
return self.is_eu_focused_fast()
def is_window_visible(self) -> bool:
"""
Check if the EU window is visible.
Returns:
True if visible, False otherwise
"""
if not self._available:
return False
# Refresh window info
self._update_window_info()
if self._window_info:
return self._window_info.is_visible
return False
def bring_to_front(self) -> bool:
"""
Bring the EU window to the front and focus it.
Returns:
True if successful, False otherwise
"""
if not self._available:
return False
self._ensure_windows_api()
if not self._window_handle:
self.find_eu_window()
if not self._window_handle:
return False
try:
# Show window if minimized
self._user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9
# Bring to front
result = self._user32.SetForegroundWindow(self._window_handle)
# Force window to top
self._user32.SetWindowPos(
self._window_handle,
-1, # HWND_TOPMOST
0, 0, 0, 0,
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
)
# Remove topmost flag but keep on top
self._user32.SetWindowPos(
self._window_handle,
-2, # HWND_NOTOPMOST
0, 0, 0, 0,
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
)
return bool(result)
except Exception as e:
return False
def get_eu_process_info(self) -> Optional[ProcessInfo]:
"""
Get process information for Entropia Universe.
Returns:
ProcessInfo if found, None otherwise
"""
if not self._available:
return None
self._ensure_windows_api()
# Try cache first
cached = self._cache.get('eu_process_info')
if cached:
return cached
if not self._window_handle:
self.find_eu_window()
if not self._window_handle:
return None
# Get PID from window
pid = wintypes.DWORD()
self._user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid))
if pid.value == 0:
return None
self._process_info = self._get_process_info_fast(pid.value)
if self._process_info:
self._cache.set('eu_process_info', self._process_info)
return self._process_info
def get_window_handle(self) -> Optional[int]:
"""Get the current window handle."""
return self._window_handle_cache or self._window_handle
def refresh(self) -> Optional[WindowInfo]:
"""Force refresh of window information."""
self._cache.clear()
self._window_handle_cache = None
self._last_update = 0
return self.find_eu_window()
# ========== Private Methods ==========
def _update_window_info(self):
"""Update cached window info if needed."""
current_time = time.time()
if current_time - self._last_update > self._update_interval:
if self._window_handle_cache and self._is_window_valid(self._window_handle_cache):
self._window_info = self._get_window_info_fast(self._window_handle_cache)
else:
self.find_eu_window()
self._last_update = current_time
def _is_window_valid(self, hwnd: int) -> bool:
"""Check if a window handle is still valid (fast)."""
if not self._available or not hwnd:
return False
try:
return bool(self._user32.IsWindow(hwnd))
except Exception:
return False
def _find_window_by_title_fast(self, title: str) -> Optional[int]:
"""
Find window by title with strict timeout and limits.
Target: <50ms execution time.
"""
found_hwnd = [None]
start_time = time.perf_counter()
window_count = [0]
MAX_WINDOWS = 200 # Reduced from 500
MAX_TIME = 0.025 # 25ms timeout (reduced from 100ms)
title_lower = title.lower()
def callback(hwnd, extra):
# Check timeout first
if time.perf_counter() - start_time > MAX_TIME:
return False
window_count[0] += 1
if window_count[0] > MAX_WINDOWS:
return False
# Quick visibility check
if not self._user32.IsWindowVisible(hwnd):
return True
# Get window text with smaller buffer
text = ctypes.create_unicode_buffer(128) # Reduced from 256
length = self._user32.GetWindowTextW(hwnd, text, 128)
if length == 0: # Skip empty titles quickly
return True
# Fast lowercase comparison
if title_lower in text.value.lower():
found_hwnd[0] = hwnd
return False # Stop enumeration - found!
return True
proc = self.EnumWindowsProc(callback)
self._user32.EnumWindows(proc, 0)
return found_hwnd[0]
def _find_window_by_process_fast(self, process_name: str) -> Optional[int]:
"""
Find window by process name with strict timeout.
Uses psutil if available for faster process detection.
"""
# Try psutil first for faster process-based lookup
if PSUTIL_AVAILABLE:
try:
for proc in psutil.process_iter(['pid', 'name']):
try:
if process_name.lower() in proc.info['name'].lower():
# Found process, now find its window
target_pid = proc.info['pid']
# Look for window with this PID
found = self._find_window_by_pid(target_pid)
if found:
return found
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except Exception:
pass
# Fall back to enumeration
found_hwnd = [None]
start_time = time.perf_counter()
window_count = [0]
MAX_WINDOWS = 150 # Reduced from 300
MAX_TIME = 0.030 # 30ms timeout (reduced from 150ms)
process_name_lower = process_name.lower()
def callback(hwnd, extra):
if time.perf_counter() - start_time > MAX_TIME:
return False
window_count[0] += 1
if window_count[0] > MAX_WINDOWS:
return False
if not self._user32.IsWindowVisible(hwnd):
return True
# Get process ID
pid = wintypes.DWORD()
self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if pid.value == 0:
return True
# Use cached process info if available
cache_key = f"proc_{pid.value}"
proc_info = self._cache.get(cache_key)
if proc_info is None:
# Get process info (cached)
proc_info = self._get_process_info_fast(pid.value)
if proc_info:
self._cache.set(cache_key, proc_info)
if proc_info and process_name_lower in proc_info.name.lower():
found_hwnd[0] = hwnd
return False
return True
proc = self.EnumWindowsProc(callback)
self._user32.EnumWindows(proc, 0)
return found_hwnd[0]
def _find_window_by_pid(self, target_pid: int) -> Optional[int]:
"""Find a window by its process ID."""
found_hwnd = [None]
def callback(hwnd, extra):
if not self._user32.IsWindowVisible(hwnd):
return True
pid = wintypes.DWORD()
self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
if pid.value == target_pid:
found_hwnd[0] = hwnd
return False
return True
proc = self.EnumWindowsProc(callback)
self._user32.EnumWindows(proc, 0)
return found_hwnd[0]
def _get_window_info_fast(self, hwnd: int) -> Optional[WindowInfo]:
"""Get window info with minimal overhead and timeout protection."""
try:
# Get window rect
rect = wintypes.RECT()
if not self._user32.GetWindowRect(hwnd, ctypes.byref(rect)):
return None
# Get window text (smaller buffer for speed)
text = ctypes.create_unicode_buffer(128)
self._user32.GetWindowTextW(hwnd, text, 128)
# Get PID
pid = wintypes.DWORD()
self._user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
# Check visibility
is_visible = bool(self._user32.IsWindowVisible(hwnd))
# Check if focused
foreground = self._user32.GetForegroundWindow()
is_focused = (foreground == hwnd)
return WindowInfo(
handle=hwnd,
title=text.value,
pid=pid.value,
rect=(rect.left, rect.top, rect.right, rect.bottom),
width=rect.right - rect.left,
height=rect.bottom - rect.top,
is_visible=is_visible,
is_focused=is_focused
)
except Exception:
return None
def _get_process_info_fast(self, pid: int) -> Optional[ProcessInfo]:
"""Get process info with caching and minimal overhead."""
# Try psutil first (much faster)
if PSUTIL_AVAILABLE:
try:
proc = psutil.Process(pid)
info = proc.as_dict(attrs=['pid', 'name', 'exe', 'memory_info'])
return ProcessInfo(
pid=info['pid'],
name=info['name'],
executable_path=info.get('exe'),
memory_usage=info['memory_info'].rss if info.get('memory_info') else None,
cpu_percent=None
)
except (psutil.NoSuchProcess, psutil.AccessDenied):
return None
except Exception:
pass
# Fall back to tasklist (slower)
try:
result = subprocess.run(
['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'],
capture_output=True,
text=True,
timeout=2 # Reduced from 5
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split('\n')
for line in lines:
if str(pid) in line:
parts = line.split('","')
if len(parts) >= 2:
name = parts[0].replace('"', '')
return ProcessInfo(
pid=pid,
name=name,
executable_path=None,
memory_usage=None,
cpu_percent=None
)
return ProcessInfo(
pid=pid,
name="Unknown",
executable_path=None,
memory_usage=None,
cpu_percent=None
)
except Exception:
return None
# Singleton instance
_window_manager = None
def get_window_manager() -> WindowManager:
"""Get the global WindowManager instance."""
global _window_manager
if _window_manager is None:
_window_manager = WindowManager()
return _window_manager
def is_eu_running() -> bool:
"""Quick check if Entropia Universe is running."""
wm = get_window_manager()
return wm.find_eu_window_cached() is not None
def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]:
"""Quick access to EU window rectangle."""
wm = get_window_manager()
return wm.get_window_rect()
def wait_for_eu(timeout: float = 30.0) -> bool:
"""
Wait for Entropia Universe to start.
Args:
timeout: Maximum seconds to wait
Returns:
True if EU found, False if timeout
"""
wm = get_window_manager()
start_time = time.time()
while time.time() - start_time < timeout:
if wm.find_eu_window_cached():
return True
time.sleep(0.5)
return False