Lemontropia-Suite/modules/game_vision_ai.py

723 lines
24 KiB
Python

"""
Lemontropia Suite - Game Vision AI Module
Advanced computer vision with local GPU-accelerated AI models.
Supports OCR (PaddleOCR) and icon detection for game UI analysis.
"""
import cv2
import numpy as np
import logging
import torch
import time
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Tuple, List, Dict, Any, Union
from enum import Enum
import json
import hashlib
logger = logging.getLogger(__name__)
class GPUBackend(Enum):
"""Supported GPU backends."""
CUDA = "cuda" # NVIDIA CUDA
MPS = "mps" # Apple Metal Performance Shaders
DIRECTML = "directml" # Windows DirectML
CPU = "cpu" # Fallback CPU
@dataclass
class TextRegion:
"""Detected text region with metadata."""
text: str
confidence: float
bbox: Tuple[int, int, int, int] # x, y, w, h
language: str = "en"
def to_dict(self) -> Dict[str, Any]:
return {
'text': self.text,
'confidence': self.confidence,
'bbox': self.bbox,
'language': self.language
}
@dataclass
class IconRegion:
"""Detected icon region with metadata."""
image: np.ndarray
bbox: Tuple[int, int, int, int] # x, y, w, h
confidence: float
icon_hash: str = ""
def __post_init__(self):
if not self.icon_hash:
self.icon_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""Compute perceptual hash of icon."""
if self.image is None or self.image.size == 0:
return ""
# Resize to standard size and compute average hash
small = cv2.resize(self.image, (16, 16), interpolation=cv2.INTER_AREA)
gray = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) if len(small.shape) == 3 else small
avg = gray.mean()
hash_bits = (gray > avg).flatten()
return ''.join(['1' if b else '0' for b in hash_bits])
@dataclass
class ItemMatch:
"""Result of matching an icon to database."""
name: str
confidence: float
item_id: Optional[str] = None
category: Optional[str] = None
matched_hash: str = ""
@dataclass
class VisionResult:
"""Complete vision processing result."""
text_regions: List[TextRegion] = field(default_factory=list)
icon_regions: List[IconRegion] = field(default_factory=list)
processing_time_ms: float = 0.0
gpu_backend: str = "cpu"
timestamp: float = field(default_factory=time.time)
def to_dict(self) -> Dict[str, Any]:
return {
'text_regions': [t.to_dict() for t in self.text_regions],
'icon_count': len(self.icon_regions),
'processing_time_ms': self.processing_time_ms,
'gpu_backend': self.gpu_backend,
'timestamp': self.timestamp
}
class GPUDetector:
"""Detect and manage GPU availability."""
@staticmethod
def detect_backend() -> GPUBackend:
"""Detect best available GPU backend."""
# Check CUDA first (most common)
if torch.cuda.is_available():
logger.info(f"CUDA available: {torch.cuda.get_device_name(0)}")
return GPUBackend.CUDA
# Check Apple MPS
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
logger.info("Apple MPS (Metal) available")
return GPUBackend.MPS
# Check DirectML on Windows
try:
import torch_directml
if torch_directml.is_available():
logger.info("DirectML available")
return GPUBackend.DIRECTML
except ImportError:
pass
logger.info("No GPU backend available, using CPU")
return GPUBackend.CPU
@staticmethod
def get_device_string(backend: GPUBackend) -> str:
"""Get PyTorch device string for backend."""
if backend == GPUBackend.CUDA:
return "cuda:0"
elif backend == GPUBackend.MPS:
return "mps"
elif backend == GPUBackend.DIRECTML:
return "privateuseone:0" # DirectML device
return "cpu"
@staticmethod
def get_gpu_info() -> Dict[str, Any]:
"""Get detailed GPU information."""
info = {
'backend': GPUDetector.detect_backend().value,
'cuda_available': torch.cuda.is_available(),
'mps_available': hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(),
'devices': []
}
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
info['devices'].append({
'id': i,
'name': torch.cuda.get_device_name(i),
'memory_total': torch.cuda.get_device_properties(i).total_memory
})
return info
class OCRProcessor:
"""OCR text extraction using PaddleOCR with GPU support."""
SUPPORTED_LANGUAGES = ['en', 'sv', 'latin'] # English, Swedish, Latin script
def __init__(self, use_gpu: bool = True, lang: str = 'en'):
self.use_gpu = use_gpu
self.lang = lang if lang in self.SUPPORTED_LANGUAGES else 'en'
self.ocr = None
self.backend = GPUBackend.CPU
self._init_ocr()
def _init_ocr(self):
"""Initialize PaddleOCR with appropriate backend."""
try:
from paddleocr import PaddleOCR
# Detect GPU
if self.use_gpu:
self.backend = GPUDetector.detect_backend()
use_gpu_flag = self.backend != GPUBackend.CPU
else:
use_gpu_flag = False
# Map language codes
lang_map = {
'en': 'en',
'sv': 'latin', # Swedish uses latin script model
'latin': 'latin'
}
paddle_lang = lang_map.get(self.lang, 'en')
logger.info(f"Initializing PaddleOCR (lang={paddle_lang}, gpu={use_gpu_flag})")
self.ocr = PaddleOCR(
lang=paddle_lang,
use_gpu=use_gpu_flag,
show_log=False,
use_angle_cls=True,
det_db_thresh=0.3,
det_db_box_thresh=0.5,
rec_thresh=0.5,
)
logger.info(f"PaddleOCR initialized successfully (backend: {self.backend.value})")
except ImportError:
logger.error("PaddleOCR not installed. Install with: pip install paddleocr")
self.ocr = None
except Exception as e:
logger.error(f"Failed to initialize PaddleOCR: {e}")
self.ocr = None
def preprocess_for_ocr(self, image: np.ndarray) -> np.ndarray:
"""Preprocess image for better OCR results."""
# Convert to grayscale if needed
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image
# Denoise
denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21)
# Adaptive threshold for better text contrast
binary = cv2.adaptiveThreshold(
denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
return binary
def extract_text(self, image: Union[str, np.ndarray, Path]) -> List[TextRegion]:
"""
Extract text from image.
Args:
image: Image path or numpy array
Returns:
List of detected text regions
"""
if self.ocr is None:
logger.warning("OCR not available")
return []
# Load image if path provided
if isinstance(image, (str, Path)):
img = cv2.imread(str(image))
if img is None:
logger.error(f"Failed to load image: {image}")
return []
else:
img = image.copy()
# Preprocess
processed = self.preprocess_for_ocr(img)
try:
# Run OCR
result = self.ocr.ocr(processed, cls=True)
detected = []
if result and result[0]:
for line in result[0]:
if line is None:
continue
bbox, (text, confidence) = line
# Calculate bounding box
x_coords = [p[0] for p in bbox]
y_coords = [p[1] for p in bbox]
x, y = int(min(x_coords)), int(min(y_coords))
w = int(max(x_coords) - x)
h = int(max(y_coords) - y)
detected.append(TextRegion(
text=text.strip(),
confidence=float(confidence),
bbox=(x, y, w, h),
language=self.lang
))
return detected
except Exception as e:
logger.error(f"OCR processing failed: {e}")
return []
def extract_text_from_region(self, image: np.ndarray,
region: Tuple[int, int, int, int]) -> List[TextRegion]:
"""Extract text from specific region of image."""
x, y, w, h = region
roi = image[y:y+h, x:x+w]
if roi.size == 0:
return []
regions = self.extract_text(roi)
# Adjust coordinates back to original image
for r in regions:
rx, ry, rw, rh = r.bbox
r.bbox = (x + rx, y + ry, rw, rh)
return regions
class IconDetector:
"""Detect and extract item icons from game UI."""
# Typical Entropia Universe loot window icon sizes
ICON_SIZES = {
'small': (32, 32),
'medium': (48, 48),
'large': (64, 64),
'hud': (40, 40)
}
def __init__(self, template_dir: Optional[Path] = None):
self.template_dir = template_dir or Path(__file__).parent / "templates" / "icons"
self.templates: Dict[str, np.ndarray] = {}
self._load_templates()
def _load_templates(self):
"""Load icon templates for matching."""
if not self.template_dir.exists():
logger.warning(f"Template directory not found: {self.template_dir}")
return
for template_file in self.template_dir.glob("*.png"):
try:
name = template_file.stem
template = cv2.imread(str(template_file), cv2.IMREAD_COLOR)
if template is not None:
self.templates[name] = template
logger.debug(f"Loaded icon template: {name}")
except Exception as e:
logger.error(f"Failed to load template {template_file}: {e}")
def detect_loot_window(self, image: np.ndarray) -> Optional[Tuple[int, int, int, int]]:
"""
Detect loot window in screenshot.
Returns bounding box of loot window or None if not found.
"""
# Look for common loot window indicators
# Method 1: Template matching for "Loot" text or window frame
if 'loot_window' in self.templates:
result = cv2.matchTemplate(
image, self.templates['loot_window'], cv2.TM_CCOEFF_NORMED
)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
if max_val > 0.7:
h, w = self.templates['loot_window'].shape[:2]
return (*max_loc, w, h)
# Method 2: Detect based on typical loot window characteristics
# Loot windows usually have a grid of items with consistent spacing
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Look for high-contrast regions that could be icons
_, thresh = cv2.threshold(gray, 200, 255, cv2.THRESH_BINARY)
# Find contours
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# Filter for icon-sized squares
potential_icons = []
for cnt in contours:
x, y, w, h = cv2.boundingRect(cnt)
aspect = w / h if h > 0 else 0
# Check if dimensions match typical icon sizes
for size_name, (sw, sh) in self.ICON_SIZES.items():
if abs(w - sw) < 5 and abs(h - sh) < 5 and 0.8 < aspect < 1.2:
potential_icons.append((x, y, w, h))
break
# If we found multiple icons in a grid pattern, assume loot window
if len(potential_icons) >= 2:
# Calculate bounding box of all icons
xs = [p[0] for p in potential_icons]
ys = [p[1] for p in potential_icons]
ws = [p[2] for p in potential_icons]
hs = [p[3] for p in potential_icons]
min_x, max_x = min(xs), max(xs) + max(ws)
min_y, max_y = min(ys), max(ys) + max(hs)
# Add padding
padding = 20
return (
max(0, min_x - padding),
max(0, min_y - padding),
max_x - min_x + padding * 2,
max_y - min_y + padding * 2
)
return None
def extract_icons_from_region(self, image: np.ndarray,
region: Tuple[int, int, int, int],
icon_size: str = 'medium') -> List[IconRegion]:
"""
Extract icons from a specific region (e.g., loot window).
Args:
image: Full screenshot
region: Bounding box (x, y, w, h)
icon_size: Size preset ('small', 'medium', 'large')
Returns:
List of detected icon regions
"""
x, y, w, h = region
roi = image[y:y+h, x:x+w]
if roi.size == 0:
return []
target_size = self.ICON_SIZES.get(icon_size, (48, 48))
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
# Multiple threshold attempts for different icon styles
icons = []
thresholds = [(200, 255), (180, 255), (150, 255)]
for thresh_low, thresh_high in thresholds:
_, thresh = cv2.threshold(gray, thresh_low, thresh_high, cv2.THRESH_BINARY)
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
cx, cy, cw, ch = cv2.boundingRect(cnt)
aspect = cw / ch if ch > 0 else 0
# Match icon size with tolerance
if (abs(cw - target_size[0]) < 8 and
abs(ch - target_size[1]) < 8 and
0.7 < aspect < 1.3):
# Extract icon image
icon_img = roi[cy:cy+ch, cx:cx+cw]
# Resize to standard size
icon_img = cv2.resize(icon_img, target_size, interpolation=cv2.INTER_AREA)
icons.append(IconRegion(
image=icon_img,
bbox=(x + cx, y + cy, cw, ch),
confidence=0.8 # Placeholder confidence
))
# Remove duplicates (icons that overlap significantly)
unique_icons = self._remove_duplicate_icons(icons)
return unique_icons
def _remove_duplicate_icons(self, icons: List[IconRegion],
iou_threshold: float = 0.5) -> List[IconRegion]:
"""Remove duplicate icons based on IoU."""
if not icons:
return []
# Sort by confidence
sorted_icons = sorted(icons, key=lambda x: x.confidence, reverse=True)
kept = []
for icon in sorted_icons:
is_duplicate = False
for kept_icon in kept:
if self._calculate_iou(icon.bbox, kept_icon.bbox) > iou_threshold:
is_duplicate = True
break
if not is_duplicate:
kept.append(icon)
return kept
def _calculate_iou(self, box1: Tuple[int, int, int, int],
box2: Tuple[int, int, int, int]) -> float:
"""Calculate Intersection over Union of two bounding boxes."""
x1, y1, w1, h1 = box1
x2, y2, w2, h2 = box2
xi1 = max(x1, x2)
yi1 = max(y1, y2)
xi2 = min(x1 + w1, x2 + w2)
yi2 = min(y1 + h1, y2 + h2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
box1_area = w1 * h1
box2_area = w2 * h2
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area > 0 else 0
def detect_icons_yolo(self, image: np.ndarray,
model_path: Optional[str] = None) -> List[IconRegion]:
"""
Detect icons using YOLO model (if available).
This is a placeholder for future YOLO integration.
"""
# TODO: Implement YOLO detection when model is trained
logger.debug("YOLO detection not yet implemented")
return []
class GameVisionAI:
"""
Main AI vision interface for game screenshot analysis.
Combines OCR and icon detection with GPU acceleration.
"""
def __init__(self, use_gpu: bool = True, ocr_lang: str = 'en',
data_dir: Optional[Path] = None):
"""
Initialize Game Vision AI.
Args:
use_gpu: Enable GPU acceleration if available
ocr_lang: Language for OCR ('en', 'sv', 'latin')
data_dir: Directory for storing extracted data
"""
self.use_gpu = use_gpu
self.data_dir = data_dir or Path.home() / ".lemontropia"
self.extracted_icons_dir = self.data_dir / "extracted_icons"
self.extracted_icons_dir.mkdir(parents=True, exist_ok=True)
# Detect GPU
self.backend = GPUDetector.detect_backend() if use_gpu else GPUBackend.CPU
# Initialize processors
self.ocr = OCRProcessor(use_gpu=use_gpu, lang=ocr_lang)
self.icon_detector = IconDetector()
# Icon matching cache
self.icon_cache: Dict[str, ItemMatch] = {}
logger.info(f"GameVisionAI initialized (GPU: {self.backend.value})")
def extract_text_from_image(self, image_path: Union[str, Path]) -> List[TextRegion]:
"""
Extract all text from an image.
Args:
image_path: Path to screenshot image
Returns:
List of detected text regions
"""
return self.ocr.extract_text(image_path)
def extract_icons_from_image(self, image_path: Union[str, Path],
auto_detect_window: bool = True) -> List[IconRegion]:
"""
Extract item icons from image.
Args:
image_path: Path to screenshot image
auto_detect_window: Automatically detect loot window
Returns:
List of detected icon regions
"""
image = cv2.imread(str(image_path))
if image is None:
logger.error(f"Failed to load image: {image_path}")
return []
if auto_detect_window:
window_region = self.icon_detector.detect_loot_window(image)
if window_region:
logger.debug(f"Detected loot window: {window_region}")
return self.icon_detector.extract_icons_from_region(
image, window_region
)
else:
logger.debug("No loot window detected, scanning full image")
# Scan full image
h, w = image.shape[:2]
return self.icon_detector.extract_icons_from_region(
image, (0, 0, w, h)
)
else:
h, w = image.shape[:2]
return self.icon_detector.extract_icons_from_region(
image, (0, 0, w, h)
)
def match_icon_to_database(self, icon_image: np.ndarray,
database_path: Optional[Path] = None) -> Optional[ItemMatch]:
"""
Match extracted icon to item database.
Args:
icon_image: Icon image (numpy array)
database_path: Path to icon database directory
Returns:
ItemMatch if found, None otherwise
"""
from .icon_matcher import IconMatcher
# Lazy load matcher
if not hasattr(self, '_icon_matcher'):
self._icon_matcher = IconMatcher(database_path)
return self._icon_matcher.match_icon(icon_image)
def process_screenshot(self, image_path: Union[str, Path],
extract_text: bool = True,
extract_icons: bool = True) -> VisionResult:
"""
Process screenshot with all vision capabilities.
Args:
image_path: Path to screenshot
extract_text: Enable text extraction
extract_icons: Enable icon extraction
Returns:
VisionResult with all detections
"""
start_time = time.time()
result = VisionResult(gpu_backend=self.backend.value)
# Load image once
image = cv2.imread(str(image_path))
if image is None:
logger.error(f"Failed to load image: {image_path}")
return result
# Extract text
if extract_text:
result.text_regions = self.ocr.extract_text(image)
logger.debug(f"Extracted {len(result.text_regions)} text regions")
# Extract icons
if extract_icons:
result.icon_regions = self.extract_icons_from_image(image_path)
logger.debug(f"Extracted {len(result.icon_regions)} icons")
# Save extracted icons
self._save_extracted_icons(result.icon_regions)
result.processing_time_ms = (time.time() - start_time) * 1000
return result
def _save_extracted_icons(self, icons: List[IconRegion]):
"""Save extracted icons to disk."""
for i, icon in enumerate(icons):
filename = f"icon_{icon.icon_hash[:16]}_{int(time.time())}_{i}.png"
filepath = self.extracted_icons_dir / filename
cv2.imwrite(str(filepath), icon.image)
logger.debug(f"Saved icon: {filepath}")
def get_gpu_info(self) -> Dict[str, Any]:
"""Get GPU information."""
return GPUDetector.get_gpu_info()
def is_gpu_available(self) -> bool:
"""Check if GPU acceleration is available."""
return self.backend != GPUBackend.CPU
def calibrate_for_game(self, sample_screenshots: List[Path]) -> Dict[str, Any]:
"""
Calibrate vision system using sample screenshots.
Args:
sample_screenshots: List of sample game screenshots
Returns:
Calibration results
"""
calibration = {
'screenshots_processed': 0,
'text_regions_detected': 0,
'icons_detected': 0,
'average_processing_time_ms': 0,
'detected_regions': {}
}
total_time = 0
for screenshot_path in sample_screenshots:
try:
start = time.time()
result = self.process_screenshot(screenshot_path)
elapsed = (time.time() - start) * 1000
calibration['screenshots_processed'] += 1
calibration['text_regions_detected'] += len(result.text_regions)
calibration['icons_detected'] += len(result.icon_regions)
total_time += elapsed
except Exception as e:
logger.error(f"Failed to process {screenshot_path}: {e}")
if calibration['screenshots_processed'] > 0:
calibration['average_processing_time_ms'] = (
total_time / calibration['screenshots_processed']
)
return calibration
# Export main classes
__all__ = [
'GameVisionAI',
'TextRegion',
'IconRegion',
'ItemMatch',
'VisionResult',
'GPUBackend',
'GPUDetector',
'OCRProcessor',
'IconDetector'
]