""" Lemontropia Suite - Game Vision AI Module Advanced computer vision with local GPU-accelerated AI models. Supports OCR (PaddleOCR) and icon detection for game UI analysis. """ import cv2 import numpy as np import logging import time from pathlib import Path from dataclasses import dataclass, field from typing import Optional, Tuple, List, Dict, Any, Union from enum import Enum import json import hashlib logger = logging.getLogger(__name__) # Optional PyTorch import with fallback try: import torch TORCH_AVAILABLE = True except Exception as e: logger.warning(f"PyTorch not available: {e}") TORCH_AVAILABLE = False torch = None # Import OpenCV text detector as fallback from .opencv_text_detector import OpenCVTextDetector, TextDetection as OpenCVTextDetection # Optional PaddleOCR import with fallback try: from paddleocr import PaddleOCR PADDLE_AVAILABLE = True except Exception as e: logger.warning(f"PaddleOCR not available: {e}") PADDLE_AVAILABLE = False PaddleOCR = None class GPUBackend(Enum): """Supported GPU backends.""" CUDA = "cuda" # NVIDIA CUDA MPS = "mps" # Apple Metal Performance Shaders DIRECTML = "directml" # Windows DirectML CPU = "cpu" # Fallback CPU @dataclass class TextRegion: """Detected text region with metadata.""" text: str confidence: float bbox: Tuple[int, int, int, int] # x, y, w, h language: str = "en" def to_dict(self) -> Dict[str, Any]: return { 'text': self.text, 'confidence': self.confidence, 'bbox': self.bbox, 'language': self.language } @dataclass class IconRegion: """Detected icon region with metadata.""" image: np.ndarray bbox: Tuple[int, int, int, int] # x, y, w, h confidence: float icon_hash: str = "" def __post_init__(self): if not self.icon_hash: self.icon_hash = self._compute_hash() def _compute_hash(self) -> str: """Compute perceptual hash of icon.""" if self.image is None or self.image.size == 0: return "" # Resize to standard size and compute average hash small = cv2.resize(self.image, (16, 16), interpolation=cv2.INTER_AREA) gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) if len(small.shape) == 3 else small avg = gray.mean() hash_bits = (gray > avg).flatten() return ''.join(['1' if b else '0' for b in hash_bits]) @dataclass class ItemMatch: """Result of matching an icon to database.""" name: str confidence: float item_id: Optional[str] = None category: Optional[str] = None matched_hash: str = "" @dataclass class VisionResult: """Complete vision processing result.""" text_regions: List[TextRegion] = field(default_factory=list) icon_regions: List[IconRegion] = field(default_factory=list) processing_time_ms: float = 0.0 gpu_backend: str = "cpu" timestamp: float = field(default_factory=time.time) def to_dict(self) -> Dict[str, Any]: return { 'text_regions': [t.to_dict() for t in self.text_regions], 'icon_count': len(self.icon_regions), 'processing_time_ms': self.processing_time_ms, 'gpu_backend': self.gpu_backend, 'timestamp': self.timestamp } class GPUDetector: """Detect and manage GPU availability.""" @staticmethod def detect_backend() -> GPUBackend: """Detect best available GPU backend.""" # Check CUDA first (most common) if torch.cuda.is_available(): logger.info(f"CUDA available: {torch.cuda.get_device_name(0)}") return GPUBackend.CUDA # Check Apple MPS if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): logger.info("Apple MPS (Metal) available") return GPUBackend.MPS # Check DirectML on Windows try: import torch_directml if torch_directml.is_available(): logger.info("DirectML available") return GPUBackend.DIRECTML except ImportError: pass logger.info("No GPU backend available, using CPU") return GPUBackend.CPU @staticmethod def get_device_string(backend: GPUBackend) -> str: """Get PyTorch device string for backend.""" if backend == GPUBackend.CUDA: return "cuda:0" elif backend == GPUBackend.MPS: return "mps" elif backend == GPUBackend.DIRECTML: return "privateuseone:0" # DirectML device return "cpu" @staticmethod def get_gpu_info() -> Dict[str, Any]: """Get detailed GPU information.""" info = { 'backend': GPUDetector.detect_backend().value, 'cuda_available': torch.cuda.is_available(), 'mps_available': hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(), 'devices': [] } if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): info['devices'].append({ 'id': i, 'name': torch.cuda.get_device_name(i), 'memory_total': torch.cuda.get_device_properties(i).total_memory }) return info class OCRProcessor: """OCR text extraction using PaddleOCR or OpenCV fallback with GPU support.""" SUPPORTED_LANGUAGES = ['en', 'sv', 'latin'] # English, Swedish, Latin script def __init__(self, use_gpu: bool = True, lang: str = 'en'): self.use_gpu = use_gpu self.lang = lang if lang in self.SUPPORTED_LANGUAGES else 'en' self.ocr = None self.backend = GPUBackend.CPU self.opencv_detector = None self._primary_backend = None # 'paddle' or 'opencv' self._init_ocr() def _init_ocr(self): """Initialize OCR with PaddleOCR or OpenCV fallback.""" # Try PaddleOCR first (better accuracy) if PADDLE_AVAILABLE: try: self._init_paddle() if self.ocr is not None: self._primary_backend = 'paddle' return except Exception as e: logger.warning(f"PaddleOCR init failed: {e}") # Fallback to OpenCV text detection logger.info("Using OpenCV text detection as fallback") self.opencv_detector = OpenCVTextDetector(use_gpu=self.use_gpu) if self.opencv_detector.is_available(): self._primary_backend = 'opencv' self.backend = GPUBackend.CUDA if self.opencv_detector.check_gpu_available() else GPUBackend.CPU logger.info(f"OpenCV text detector ready (GPU: {self.backend == GPUBackend.CUDA})") else: logger.error("No OCR backend available") def _init_paddle(self): """Initialize PaddleOCR with appropriate backend.""" # Detect GPU if self.use_gpu: self.backend = GPUDetector.detect_backend() use_gpu_flag = self.backend != GPUBackend.CPU else: use_gpu_flag = False # Map language codes lang_map = { 'en': 'en', 'sv': 'latin', # Swedish uses latin script model 'latin': 'latin' } paddle_lang = lang_map.get(self.lang, 'en') logger.info(f"Initializing PaddleOCR (lang={paddle_lang}, gpu={use_gpu_flag})") self.ocr = PaddleOCR( lang=paddle_lang, use_gpu=use_gpu_flag, show_log=False, use_angle_cls=True, det_db_thresh=0.3, det_db_box_thresh=0.5, rec_thresh=0.5, ) logger.info(f"PaddleOCR initialized successfully (backend: {self.backend.value})") def preprocess_for_ocr(self, image: np.ndarray) -> np.ndarray: """Preprocess image for better OCR results.""" # Convert to grayscale if needed if len(image.shape) == 3: gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) else: gray = image # Denoise denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21) # Adaptive threshold for better text contrast binary = cv2.adaptiveThreshold( denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) return binary def extract_text(self, image: Union[str, np.ndarray, Path]) -> List[TextRegion]: """ Extract text from image using PaddleOCR or OpenCV fallback. Args: image: Image path or numpy array Returns: List of detected text regions """ # Load image if path provided if isinstance(image, (str, Path)): img = cv2.imread(str(image)) if img is None: logger.error(f"Failed to load image: {image}") return [] else: img = image.copy() # Use appropriate backend if self._primary_backend == 'paddle' and self.ocr is not None: return self._extract_text_paddle(img) elif self._primary_backend == 'opencv' and self.opencv_detector is not None: return self._extract_text_opencv(img) else: logger.warning("No OCR backend available") return [] def _extract_text_opencv(self, img: np.ndarray) -> List[TextRegion]: """Extract text using OpenCV EAST detector.""" detections = self.opencv_detector.detect_text(img) # Convert to TextRegion format (no text recognition, just detection) regions = [] for det in detections: regions.append(TextRegion( text="", # OpenCV detector doesn't recognize text, just finds regions confidence=det.confidence, bbox=det.bbox, language=self.lang )) return regions def _extract_text_paddle(self, img: np.ndarray) -> List[TextRegion]: """Extract text using PaddleOCR.""" # Preprocess processed = self.preprocess_for_ocr(img) try: # Run OCR result = self.ocr.ocr(processed, cls=True) detected = [] if result and result[0]: for line in result[0]: if line is None: continue bbox, (text, confidence) = line # Calculate bounding box x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] x, y = int(min(x_coords)), int(min(y_coords)) w = int(max(x_coords) - x) h = int(max(y_coords) - y) detected.append(TextRegion( text=text.strip(), confidence=float(confidence), bbox=(x, y, w, h), language=self.lang )) return detected except Exception as e: logger.error(f"OCR processing failed: {e}") return [] def extract_text_from_region(self, image: np.ndarray, region: Tuple[int, int, int, int]) -> List[TextRegion]: """Extract text from specific region of image.""" x, y, w, h = region roi = image[y:y+h, x:x+w] if roi.size == 0: return [] regions = self.extract_text(roi) # Adjust coordinates back to original image for r in regions: rx, ry, rw, rh = r.bbox r.bbox = (x + rx, y + ry, rw, rh) return regions class IconDetector: """Detect and extract item icons from game UI.""" # Typical Entropia Universe loot window icon sizes ICON_SIZES = { 'small': (32, 32), 'medium': (48, 48), 'large': (64, 64), 'hud': (40, 40) } def __init__(self, template_dir: Optional[Path] = None): self.template_dir = template_dir or Path(__file__).parent / "templates" / "icons" self.templates: Dict[str, np.ndarray] = {} self._load_templates() def _load_templates(self): """Load icon templates for matching.""" if not self.template_dir.exists(): logger.warning(f"Template directory not found: {self.template_dir}") return for template_file in self.template_dir.glob("*.png"): try: name = template_file.stem template = cv2.imread(str(template_file), cv2.IMREAD_COLOR) if template is not None: self.templates[name] = template logger.debug(f"Loaded icon template: {name}") except Exception as e: logger.error(f"Failed to load template {template_file}: {e}") def detect_loot_window(self, image: np.ndarray) -> Optional[Tuple[int, int, int, int]]: """ Detect loot window in screenshot. Returns bounding box of loot window or None if not found. """ # Look for common loot window indicators # Method 1: Template matching for "Loot" text or window frame if 'loot_window' in self.templates: result = cv2.matchTemplate( image, self.templates['loot_window'], cv2.TM_CCOEFF_NORMED ) _, max_val, _, max_loc = cv2.minMaxLoc(result) if max_val > 0.7: h, w = self.templates['loot_window'].shape[:2] return (*max_loc, w, h) # Method 2: Detect based on typical loot window characteristics # Loot windows usually have a grid of items with consistent spacing gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Look for high-contrast regions that could be icons _, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY) # Find contours contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # Filter for icon-sized squares potential_icons = [] for cnt in contours: x, y, w, h = cv2.boundingRect(cnt) aspect = w / h if h > 0 else 0 # Check if dimensions match typical icon sizes for size_name, (sw, sh) in self.ICON_SIZES.items(): if abs(w - sw) < 5 and abs(h - sh) < 5 and 0.8 < aspect < 1.2: potential_icons.append((x, y, w, h)) break # If we found multiple icons in a grid pattern, assume loot window if len(potential_icons) >= 2: # Calculate bounding box of all icons xs = [p[0] for p in potential_icons] ys = [p[1] for p in potential_icons] ws = [p[2] for p in potential_icons] hs = [p[3] for p in potential_icons] min_x, max_x = min(xs), max(xs) + max(ws) min_y, max_y = min(ys), max(ys) + max(hs) # Add padding padding = 20 return ( max(0, min_x - padding), max(0, min_y - padding), max_x - min_x + padding * 2, max_y - min_y + padding * 2 ) return None def extract_icons_from_region(self, image: np.ndarray, region: Tuple[int, int, int, int], icon_size: str = 'medium') -> List[IconRegion]: """ Extract icons from a specific region (e.g., loot window). Args: image: Full screenshot region: Bounding box (x, y, w, h) icon_size: Size preset ('small', 'medium', 'large') Returns: List of detected icon regions """ x, y, w, h = region roi = image[y:y+h, x:x+w] if roi.size == 0: return [] target_size = self.ICON_SIZES.get(icon_size, (48, 48)) gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) # Multiple threshold attempts for different icon styles icons = [] thresholds = [(200, 255), (180, 255), (150, 255)] for thresh_low, thresh_high in thresholds: _, thresh = cv2.threshold(gray, thresh_low, thresh_high, cv2.THRESH_BINARY) contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) for cnt in contours: cx, cy, cw, ch = cv2.boundingRect(cnt) aspect = cw / ch if ch > 0 else 0 # Match icon size with tolerance if (abs(cw - target_size[0]) < 8 and abs(ch - target_size[1]) < 8 and 0.7 < aspect < 1.3): # Extract icon image icon_img = roi[cy:cy+ch, cx:cx+cw] # Resize to standard size icon_img = cv2.resize(icon_img, target_size, interpolation=cv2.INTER_AREA) icons.append(IconRegion( image=icon_img, bbox=(x + cx, y + cy, cw, ch), confidence=0.8 # Placeholder confidence )) # Remove duplicates (icons that overlap significantly) unique_icons = self._remove_duplicate_icons(icons) return unique_icons def _remove_duplicate_icons(self, icons: List[IconRegion], iou_threshold: float = 0.5) -> List[IconRegion]: """Remove duplicate icons based on IoU.""" if not icons: return [] # Sort by confidence sorted_icons = sorted(icons, key=lambda x: x.confidence, reverse=True) kept = [] for icon in sorted_icons: is_duplicate = False for kept_icon in kept: if self._calculate_iou(icon.bbox, kept_icon.bbox) > iou_threshold: is_duplicate = True break if not is_duplicate: kept.append(icon) return kept def _calculate_iou(self, box1: Tuple[int, int, int, int], box2: Tuple[int, int, int, int]) -> float: """Calculate Intersection over Union of two bounding boxes.""" x1, y1, w1, h1 = box1 x2, y2, w2, h2 = box2 xi1 = max(x1, x2) yi1 = max(y1, y2) xi2 = min(x1 + w1, x2 + w2) yi2 = min(y1 + h1, y2 + h2) inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1) box1_area = w1 * h1 box2_area = w2 * h2 union_area = box1_area + box2_area - inter_area return inter_area / union_area if union_area > 0 else 0 def detect_icons_yolo(self, image: np.ndarray, model_path: Optional[str] = None) -> List[IconRegion]: """ Detect icons using YOLO model (if available). This is a placeholder for future YOLO integration. """ # TODO: Implement YOLO detection when model is trained logger.debug("YOLO detection not yet implemented") return [] class GameVisionAI: """ Main AI vision interface for game screenshot analysis. Combines OCR and icon detection with GPU acceleration. """ def __init__(self, use_gpu: bool = True, ocr_lang: str = 'en', data_dir: Optional[Path] = None): """ Initialize Game Vision AI. Args: use_gpu: Enable GPU acceleration if available ocr_lang: Language for OCR ('en', 'sv', 'latin') data_dir: Directory for storing extracted data """ self.use_gpu = use_gpu self.data_dir = data_dir or Path.home() / ".lemontropia" self.extracted_icons_dir = self.data_dir / "extracted_icons" self.extracted_icons_dir.mkdir(parents=True, exist_ok=True) # Detect GPU self.backend = GPUDetector.detect_backend() if use_gpu else GPUBackend.CPU # Initialize processors self.ocr = OCRProcessor(use_gpu=use_gpu, lang=ocr_lang) self.icon_detector = IconDetector() # Icon matching cache self.icon_cache: Dict[str, ItemMatch] = {} logger.info(f"GameVisionAI initialized (GPU: {self.backend.value})") def extract_text_from_image(self, image_path: Union[str, Path]) -> List[TextRegion]: """ Extract all text from an image. Args: image_path: Path to screenshot image Returns: List of detected text regions """ return self.ocr.extract_text(image_path) def extract_icons_from_image(self, image_path: Union[str, Path], auto_detect_window: bool = True) -> List[IconRegion]: """ Extract item icons from image. Args: image_path: Path to screenshot image auto_detect_window: Automatically detect loot window Returns: List of detected icon regions """ image = cv2.imread(str(image_path)) if image is None: logger.error(f"Failed to load image: {image_path}") return [] if auto_detect_window: window_region = self.icon_detector.detect_loot_window(image) if window_region: logger.debug(f"Detected loot window: {window_region}") return self.icon_detector.extract_icons_from_region( image, window_region ) else: logger.debug("No loot window detected, scanning full image") # Scan full image h, w = image.shape[:2] return self.icon_detector.extract_icons_from_region( image, (0, 0, w, h) ) else: h, w = image.shape[:2] return self.icon_detector.extract_icons_from_region( image, (0, 0, w, h) ) def match_icon_to_database(self, icon_image: np.ndarray, database_path: Optional[Path] = None) -> Optional[ItemMatch]: """ Match extracted icon to item database. Args: icon_image: Icon image (numpy array) database_path: Path to icon database directory Returns: ItemMatch if found, None otherwise """ from .icon_matcher import IconMatcher # Lazy load matcher if not hasattr(self, '_icon_matcher'): self._icon_matcher = IconMatcher(database_path) return self._icon_matcher.match_icon(icon_image) def process_screenshot(self, image_path: Union[str, Path], extract_text: bool = True, extract_icons: bool = True) -> VisionResult: """ Process screenshot with all vision capabilities. Args: image_path: Path to screenshot extract_text: Enable text extraction extract_icons: Enable icon extraction Returns: VisionResult with all detections """ start_time = time.time() result = VisionResult(gpu_backend=self.backend.value) # Load image once image = cv2.imread(str(image_path)) if image is None: logger.error(f"Failed to load image: {image_path}") return result # Extract text if extract_text: result.text_regions = self.ocr.extract_text(image) logger.debug(f"Extracted {len(result.text_regions)} text regions") # Extract icons if extract_icons: result.icon_regions = self.extract_icons_from_image(image_path) logger.debug(f"Extracted {len(result.icon_regions)} icons") # Save extracted icons self._save_extracted_icons(result.icon_regions) result.processing_time_ms = (time.time() - start_time) * 1000 return result def _save_extracted_icons(self, icons: List[IconRegion]): """Save extracted icons to disk.""" for i, icon in enumerate(icons): filename = f"icon_{icon.icon_hash[:16]}_{int(time.time())}_{i}.png" filepath = self.extracted_icons_dir / filename cv2.imwrite(str(filepath), icon.image) logger.debug(f"Saved icon: {filepath}") def get_gpu_info(self) -> Dict[str, Any]: """Get GPU information.""" return GPUDetector.get_gpu_info() def is_gpu_available(self) -> bool: """Check if GPU acceleration is available.""" return self.backend != GPUBackend.CPU def calibrate_for_game(self, sample_screenshots: List[Path]) -> Dict[str, Any]: """ Calibrate vision system using sample screenshots. Args: sample_screenshots: List of sample game screenshots Returns: Calibration results """ calibration = { 'screenshots_processed': 0, 'text_regions_detected': 0, 'icons_detected': 0, 'average_processing_time_ms': 0, 'detected_regions': {} } total_time = 0 for screenshot_path in sample_screenshots: try: start = time.time() result = self.process_screenshot(screenshot_path) elapsed = (time.time() - start) * 1000 calibration['screenshots_processed'] += 1 calibration['text_regions_detected'] += len(result.text_regions) calibration['icons_detected'] += len(result.icon_regions) total_time += elapsed except Exception as e: logger.error(f"Failed to process {screenshot_path}: {e}") if calibration['screenshots_processed'] > 0: calibration['average_processing_time_ms'] = ( total_time / calibration['screenshots_processed'] ) return calibration # Export main classes __all__ = [ 'GameVisionAI', 'TextRegion', 'IconRegion', 'ItemMatch', 'VisionResult', 'GPUBackend', 'GPUDetector', 'OCRProcessor', 'IconDetector' ]