504 lines
15 KiB
Python
504 lines
15 KiB
Python
"""
|
|
EU-Utility - Window Manager Core Service
|
|
|
|
Manages interaction with the Entropia Universe game window.
|
|
Windows-specific implementation using ctypes (no external dependencies).
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
from typing import Optional, Tuple, Dict, Any
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
# Platform detection
|
|
IS_WINDOWS = sys.platform == 'win32'
|
|
|
|
# Windows-specific imports with graceful fallback
|
|
if IS_WINDOWS:
|
|
try:
|
|
import ctypes
|
|
from ctypes import wintypes
|
|
import subprocess
|
|
WINDOWS_AVAILABLE = True
|
|
except ImportError:
|
|
WINDOWS_AVAILABLE = False
|
|
else:
|
|
WINDOWS_AVAILABLE = False
|
|
|
|
|
|
@dataclass
|
|
class WindowInfo:
|
|
"""Information about a window."""
|
|
handle: int
|
|
title: str
|
|
pid: int
|
|
rect: Tuple[int, int, int, int] # left, top, right, bottom
|
|
width: int
|
|
height: int
|
|
is_visible: bool
|
|
is_focused: bool
|
|
|
|
|
|
@dataclass
|
|
class ProcessInfo:
|
|
"""Information about a process."""
|
|
pid: int
|
|
name: str
|
|
executable_path: Optional[str]
|
|
memory_usage: Optional[int] # in bytes
|
|
cpu_percent: Optional[float]
|
|
|
|
|
|
class WindowManager:
|
|
"""
|
|
Singleton Window Manager for EU-Utility.
|
|
|
|
Manages finding and interacting with the Entropia Universe game window.
|
|
Windows-specific implementation with graceful fallback on Linux.
|
|
"""
|
|
|
|
_instance = None
|
|
|
|
# Window search criteria
|
|
EU_WINDOW_TITLE = "Entropia Universe"
|
|
EU_PROCESS_NAMES = ["entropia.exe", "entropiauniverse.exe", "clientloader.exe"]
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._initialized:
|
|
return
|
|
|
|
self._window_handle: Optional[int] = None
|
|
self._window_info: Optional[WindowInfo] = None
|
|
self._process_info: Optional[ProcessInfo] = None
|
|
self._last_update: float = 0
|
|
self._update_interval: float = 1.0 # seconds
|
|
|
|
# Windows API constants
|
|
if IS_WINDOWS and WINDOWS_AVAILABLE:
|
|
self._setup_windows_api()
|
|
|
|
self._initialized = True
|
|
self._available = IS_WINDOWS and WINDOWS_AVAILABLE
|
|
|
|
if not self._available:
|
|
print("[WindowManager] Windows API not available - running in limited mode")
|
|
|
|
def _setup_windows_api(self):
|
|
"""Setup Windows API constants and functions."""
|
|
# Window styles
|
|
self.GWL_STYLE = -16
|
|
self.GWL_EXSTYLE = -20
|
|
self.WS_VISIBLE = 0x10000000
|
|
self.WS_MINIMIZE = 0x20000000
|
|
|
|
# Load user32.dll functions
|
|
self.user32 = ctypes.windll.user32
|
|
self.kernel32 = ctypes.windll.kernel32
|
|
|
|
# EnumWindows callback type
|
|
self.EnumWindowsProc = ctypes.WINFUNCTYPE(
|
|
wintypes.BOOL,
|
|
wintypes.HWND,
|
|
wintypes.LPARAM
|
|
)
|
|
|
|
# ========== Public API ==========
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if window manager is fully functional."""
|
|
return self._available
|
|
|
|
def find_eu_window(self) -> Optional[WindowInfo]:
|
|
"""
|
|
Find the Entropia Universe game window.
|
|
|
|
Returns:
|
|
WindowInfo if found, None otherwise
|
|
"""
|
|
if not self._available:
|
|
return None
|
|
|
|
# Try by window title first
|
|
hwnd = self._find_window_by_title(self.EU_WINDOW_TITLE)
|
|
|
|
if hwnd:
|
|
self._window_handle = hwnd
|
|
self._window_info = self._get_window_info(hwnd)
|
|
return self._window_info
|
|
|
|
# Try by process name
|
|
for proc_name in self.EU_PROCESS_NAMES:
|
|
hwnd = self._find_window_by_process(proc_name)
|
|
if hwnd:
|
|
self._window_handle = hwnd
|
|
self._window_info = self._get_window_info(hwnd)
|
|
return self._window_info
|
|
|
|
return None
|
|
|
|
def get_window_rect(self) -> Optional[Tuple[int, int, int, int]]:
|
|
"""
|
|
Get the window rectangle (left, top, right, bottom).
|
|
|
|
Returns:
|
|
Tuple of (left, top, right, bottom) or None
|
|
"""
|
|
if not self._available:
|
|
return None
|
|
|
|
# Refresh window info
|
|
self._update_window_info()
|
|
|
|
if self._window_info:
|
|
return self._window_info.rect
|
|
return None
|
|
|
|
def is_window_focused(self) -> bool:
|
|
"""
|
|
Check if the EU window is currently focused/active.
|
|
|
|
Returns:
|
|
True if EU window is active, False otherwise
|
|
"""
|
|
if not self._available:
|
|
return False
|
|
|
|
if not self._window_handle:
|
|
self.find_eu_window()
|
|
|
|
if not self._window_handle:
|
|
return False
|
|
|
|
# Get foreground window
|
|
foreground_hwnd = self.user32.GetForegroundWindow()
|
|
return foreground_hwnd == self._window_handle
|
|
|
|
def is_window_visible(self) -> bool:
|
|
"""
|
|
Check if the EU window is visible.
|
|
|
|
Returns:
|
|
True if visible, False otherwise
|
|
"""
|
|
if not self._available:
|
|
return False
|
|
|
|
# Refresh window info
|
|
self._update_window_info()
|
|
|
|
if self._window_info:
|
|
return self._window_info.is_visible
|
|
return False
|
|
|
|
def bring_to_front(self) -> bool:
|
|
"""
|
|
Bring the EU window to the front and focus it.
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if not self._available:
|
|
return False
|
|
|
|
if not self._window_handle:
|
|
self.find_eu_window()
|
|
|
|
if not self._window_handle:
|
|
return False
|
|
|
|
try:
|
|
# Show window if minimized
|
|
self.user32.ShowWindow(self._window_handle, 9) # SW_RESTORE = 9
|
|
|
|
# Bring to front
|
|
result = self.user32.SetForegroundWindow(self._window_handle)
|
|
|
|
# Force window to top
|
|
self.user32.SetWindowPos(
|
|
self._window_handle,
|
|
-1, # HWND_TOPMOST
|
|
0, 0, 0, 0,
|
|
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
|
|
)
|
|
|
|
# Remove topmost flag but keep on top
|
|
self.user32.SetWindowPos(
|
|
self._window_handle,
|
|
-2, # HWND_NOTOPMOST
|
|
0, 0, 0, 0,
|
|
0x0001 | 0x0002 # SWP_NOSIZE | SWP_NOMOVE
|
|
)
|
|
|
|
return bool(result)
|
|
except Exception as e:
|
|
print(f"[WindowManager] Failed to bring window to front: {e}")
|
|
return False
|
|
|
|
def get_eu_process_info(self) -> Optional[ProcessInfo]:
|
|
"""
|
|
Get process information for Entropia Universe.
|
|
|
|
Returns:
|
|
ProcessInfo if found, None otherwise
|
|
"""
|
|
if not self._available:
|
|
return None
|
|
|
|
if not self._window_handle:
|
|
self.find_eu_window()
|
|
|
|
if not self._window_handle:
|
|
return None
|
|
|
|
# Get PID from window
|
|
pid = wintypes.DWORD()
|
|
self.user32.GetWindowThreadProcessId(self._window_handle, ctypes.byref(pid))
|
|
|
|
if pid.value == 0:
|
|
return None
|
|
|
|
self._process_info = self._get_process_info(pid.value)
|
|
return self._process_info
|
|
|
|
def get_window_handle(self) -> Optional[int]:
|
|
"""Get the current window handle."""
|
|
return self._window_handle
|
|
|
|
def refresh(self) -> Optional[WindowInfo]:
|
|
"""Force refresh of window information."""
|
|
self._last_update = 0
|
|
return self.find_eu_window()
|
|
|
|
# ========== Private Methods ==========
|
|
|
|
def _update_window_info(self):
|
|
"""Update cached window info if needed."""
|
|
current_time = time.time()
|
|
if current_time - self._last_update > self._update_interval:
|
|
if self._window_handle:
|
|
self._window_info = self._get_window_info(self._window_handle)
|
|
else:
|
|
self.find_eu_window()
|
|
self._last_update = current_time
|
|
|
|
def _find_window_by_title(self, title: str) -> Optional[int]:
|
|
"""Find window by title (partial match) - with timeout protection."""
|
|
found_hwnd = [None]
|
|
start_time = time.time()
|
|
window_count = [0]
|
|
MAX_WINDOWS = 500 # Limit windows to check
|
|
MAX_TIME = 0.1 # 100ms timeout
|
|
|
|
def callback(hwnd, extra):
|
|
# Check timeout
|
|
if time.time() - start_time > MAX_TIME:
|
|
return False # Stop enumeration - timeout
|
|
|
|
# Check window limit
|
|
window_count[0] += 1
|
|
if window_count[0] > MAX_WINDOWS:
|
|
return False # Stop enumeration - too many windows
|
|
|
|
if not self.user32.IsWindowVisible(hwnd):
|
|
return True
|
|
|
|
# Quick check - skip windows with no title
|
|
text = ctypes.create_unicode_buffer(256)
|
|
self.user32.GetWindowTextW(hwnd, text, 256)
|
|
window_title = text.value
|
|
|
|
if not window_title: # Skip empty titles
|
|
return True
|
|
|
|
# Fast case-insensitive check
|
|
if title.lower() in window_title.lower():
|
|
found_hwnd[0] = hwnd
|
|
return False # Stop enumeration - found!
|
|
|
|
return True
|
|
|
|
proc = self.EnumWindowsProc(callback)
|
|
self.user32.EnumWindows(proc, 0)
|
|
|
|
return found_hwnd[0]
|
|
|
|
def _find_window_by_process(self, process_name: str) -> Optional[int]:
|
|
"""Find window by process name - with timeout protection."""
|
|
found_hwnd = [None]
|
|
start_time = time.time()
|
|
window_count = [0]
|
|
MAX_WINDOWS = 300 # Lower limit for process check (slower)
|
|
MAX_TIME = 0.15 # 150ms timeout
|
|
|
|
def callback(hwnd, extra):
|
|
# Check timeout
|
|
if time.time() - start_time > MAX_TIME:
|
|
return False # Stop enumeration - timeout
|
|
|
|
# Check window limit
|
|
window_count[0] += 1
|
|
if window_count[0] > MAX_WINDOWS:
|
|
return False # Stop enumeration - too many windows
|
|
|
|
if not self.user32.IsWindowVisible(hwnd):
|
|
return True
|
|
|
|
# Get process ID
|
|
pid = wintypes.DWORD()
|
|
self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
|
|
|
if pid.value == 0:
|
|
return True
|
|
|
|
# Check process name (this is slower, so we limit more)
|
|
try:
|
|
proc_info = self._get_process_info(pid.value)
|
|
if proc_info and process_name.lower() in proc_info.name.lower():
|
|
found_hwnd[0] = hwnd
|
|
return False # Stop enumeration - found!
|
|
except Exception:
|
|
pass # Skip on error
|
|
|
|
return True
|
|
|
|
proc = self.EnumWindowsProc(callback)
|
|
self.user32.EnumWindows(proc, 0)
|
|
|
|
return found_hwnd[0]
|
|
|
|
def _get_window_info(self, hwnd: int) -> Optional[WindowInfo]:
|
|
"""Get detailed information about a window."""
|
|
try:
|
|
# Get window rect
|
|
rect = wintypes.RECT()
|
|
if not self.user32.GetWindowRect(hwnd, ctypes.byref(rect)):
|
|
return None
|
|
|
|
# Get window text
|
|
text = ctypes.create_unicode_buffer(256)
|
|
self.user32.GetWindowTextW(hwnd, text, 256)
|
|
|
|
# Get PID
|
|
pid = wintypes.DWORD()
|
|
self.user32.GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
|
|
|
|
# Check visibility
|
|
is_visible = bool(self.user32.IsWindowVisible(hwnd))
|
|
|
|
# Check if focused
|
|
foreground = self.user32.GetForegroundWindow()
|
|
is_focused = (foreground == hwnd)
|
|
|
|
return WindowInfo(
|
|
handle=hwnd,
|
|
title=text.value,
|
|
pid=pid.value,
|
|
rect=(rect.left, rect.top, rect.right, rect.bottom),
|
|
width=rect.right - rect.left,
|
|
height=rect.bottom - rect.top,
|
|
is_visible=is_visible,
|
|
is_focused=is_focused
|
|
)
|
|
except Exception as e:
|
|
print(f"[WindowManager] Error getting window info: {e}")
|
|
return None
|
|
|
|
def _get_process_info(self, pid: int) -> Optional[ProcessInfo]:
|
|
"""Get process information."""
|
|
try:
|
|
# Use Windows WMI or tasklist for process info
|
|
import subprocess
|
|
|
|
# Try tasklist first
|
|
result = subprocess.run(
|
|
['tasklist', '/FI', f'PID eq {pid}', '/FO', 'CSV', '/NH'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
# Parse CSV output
|
|
lines = result.stdout.strip().split('\n')
|
|
for line in lines:
|
|
if str(pid) in line:
|
|
parts = line.split('","')
|
|
if len(parts) >= 2:
|
|
name = parts[0].replace('"', '')
|
|
return ProcessInfo(
|
|
pid=pid,
|
|
name=name,
|
|
executable_path=None,
|
|
memory_usage=None,
|
|
cpu_percent=None
|
|
)
|
|
|
|
return ProcessInfo(
|
|
pid=pid,
|
|
name="Unknown",
|
|
executable_path=None,
|
|
memory_usage=None,
|
|
cpu_percent=None
|
|
)
|
|
except Exception as e:
|
|
print(f"[WindowManager] Error getting process info: {e}")
|
|
return None
|
|
|
|
def _check_window_exists(self, hwnd: int) -> bool:
|
|
"""Check if a window handle is still valid."""
|
|
if not self._available:
|
|
return False
|
|
return bool(self.user32.IsWindow(hwnd))
|
|
|
|
|
|
# Singleton instance
|
|
_window_manager = None
|
|
|
|
|
|
def get_window_manager() -> WindowManager:
|
|
"""Get the global WindowManager instance."""
|
|
global _window_manager
|
|
if _window_manager is None:
|
|
_window_manager = WindowManager()
|
|
return _window_manager
|
|
|
|
|
|
def is_eu_running() -> bool:
|
|
"""Quick check if Entropia Universe is running."""
|
|
wm = get_window_manager()
|
|
return wm.find_eu_window() is not None
|
|
|
|
|
|
def get_eu_window_rect() -> Optional[Tuple[int, int, int, int]]:
|
|
"""Quick access to EU window rectangle."""
|
|
wm = get_window_manager()
|
|
return wm.get_window_rect()
|
|
|
|
|
|
def wait_for_eu(timeout: float = 30.0) -> bool:
|
|
"""
|
|
Wait for Entropia Universe to start.
|
|
|
|
Args:
|
|
timeout: Maximum seconds to wait
|
|
|
|
Returns:
|
|
True if EU found, False if timeout
|
|
"""
|
|
wm = get_window_manager()
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout:
|
|
if wm.find_eu_window():
|
|
return True
|
|
time.sleep(0.5)
|
|
|
|
return False
|