""" EU-Utility - Optimized OCR Service Performance improvements: 1. Image preprocessing pipeline 2. Result caching for repeated regions 3. Connection pooling for backend resources 4. Memory-efficient image handling 5. Async processing support """ import io import base64 import time import threading import hashlib from typing import Dict, List, Tuple, Optional, Any from dataclasses import dataclass from pathlib import Path from functools import lru_cache from collections import OrderedDict import numpy as np from PIL import Image, ImageEnhance, ImageFilter @dataclass class OCRResult: """Result from OCR operation.""" text: str confidence: float bounding_box: Tuple[int, int, int, int] # x, y, width, height raw_data: Any = None class LRUCache: """Simple LRU cache for OCR results.""" def __init__(self, capacity: int = 100): self.capacity = capacity self.cache: OrderedDict = OrderedDict() self.lock = threading.Lock() def get(self, key: str) -> Optional[Any]: with self.lock: if key in self.cache: # Move to end (most recently used) self.cache.move_to_end(key) return self.cache[key] return None def put(self, key: str, value: Any): with self.lock: if key in self.cache: self.cache.move_to_end(key) else: if len(self.cache) >= self.capacity: # Remove oldest self.cache.popitem(last=False) self.cache[key] = value def clear(self): with self.lock: self.cache.clear() class ImagePreprocessor: """ Image preprocessing pipeline for better OCR accuracy and speed. """ @staticmethod def preprocess(image: Image.Image, grayscale: bool = True, contrast: float = 1.5, sharpness: float = 1.2, denoise: bool = False) -> Image.Image: """ Preprocess image for OCR. Args: image: Input PIL Image grayscale: Convert to grayscale contrast: Contrast enhancement factor sharpness: Sharpness enhancement factor denoise: Apply denoising filter Returns: Preprocessed PIL Image """ # Convert to RGB if needed if image.mode not in ('RGB', 'L'): image = image.convert('RGB') # Grayscale conversion if grayscale and image.mode != 'L': image = image.convert('L') # Contrast enhancement if contrast != 1.0: enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(contrast) # Sharpness enhancement if sharpness != 1.0: enhancer = ImageEnhance.Sharpness(image) image = enhancer.enhance(sharpness) # Denoising if denoise: image = image.filter(ImageFilter.MedianFilter(size=3)) return image @staticmethod def resize_for_ocr(image: Image.Image, max_dimension: int = 1024, min_dimension: int = 32) -> Image.Image: """ Resize image to optimal size for OCR. Args: image: Input PIL Image max_dimension: Maximum width or height min_dimension: Minimum width or height Returns: Resized PIL Image """ width, height = image.size # Check if resize needed if width <= max_dimension and height <= max_dimension: if width >= min_dimension and height >= min_dimension: return image # Calculate new size ratio = min(max_dimension / max(width, height), min_dimension / min(width, height) if min(width, height) > 0 else 1) if ratio < 1 or (width < min_dimension or height < min_dimension): new_width = max(int(width * ratio), min_dimension) new_height = max(int(height * ratio), min_dimension) image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) return image class OCRService: """ Optimized OCR service with caching and preprocessing. Features: - LRU cache for repeated regions - Image preprocessing pipeline - Memory-efficient numpy conversion - Backend connection pooling """ def __init__(self, cache_size: int = 50): self._ocr_reader = None self._backend = None self._initialized = False self._initializing = False self._init_lock = threading.Lock() # Result cache self._cache = LRUCache(capacity=cache_size) # Preprocessor self._preprocessor = ImagePreprocessor() # Stats self._stats = { 'cache_hits': 0, 'cache_misses': 0, 'total_requests': 0, 'total_time_ms': 0, } self._stats_lock = threading.Lock() def _init_backends(self): """Initialize available OCR backends (lazy - called on first use).""" if self._initialized or self._initializing: return with self._init_lock: if self._initialized or self._initializing: return self._initializing = True print("[OCR] Initializing backends...") # Try EasyOCR first (best accuracy) try: import easyocr self._ocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False) self._backend = 'easyocr' self._initialized = True print("[OCR] Using EasyOCR backend") return except ImportError: pass except Exception as e: print(f"[OCR] EasyOCR failed: {e}") # Try Tesseract (most common) try: import pytesseract pytesseract.get_tesseract_version() self._backend = 'tesseract' self._initialized = True print("[OCR] Using Tesseract backend") return except Exception as e: print(f"[OCR] Tesseract failed: {e}") # Try PaddleOCR (fallback) try: from paddleocr import PaddleOCR self._ocr_reader = PaddleOCR(lang='en', show_log=False) self._backend = 'paddle' self._initialized = True print("[OCR] Using PaddleOCR backend") except Exception as e: print(f"[OCR] PaddleOCR failed: {e}") self._initializing = False if not self._initialized: print("[OCR] WARNING: No OCR backend available!") def is_available(self) -> bool: """Check if OCR is available (lazy init).""" if not self._initialized and not self._initializing: self._init_backends() return self._initialized def _get_cache_key(self, image: Image.Image = None, region: Tuple[int, int, int, int] = None) -> str: """Generate cache key for image/region.""" if region: return f"region:{region}" elif image: # Hash image content img_bytes = io.BytesIO() image.save(img_bytes, format='PNG') return hashlib.md5(img_bytes.getvalue()).hexdigest() return "" def recognize(self, image: Image.Image = None, region: Tuple[int, int, int, int] = None, use_cache: bool = True, preprocess: bool = True) -> Dict[str, Any]: """ Perform OCR on image or screen region. Args: image: PIL Image, numpy array, or None to capture screen region: Screen region to capture (if image is None) use_cache: Whether to use result caching preprocess: Whether to apply image preprocessing Returns: Dict with 'text', 'confidence', 'results', 'image_size', 'cached' """ start_time = time.perf_counter() with self._stats_lock: self._stats['total_requests'] += 1 # Lazy initialization if not self._initialized and not self._initializing: self._init_backends() if not self._initialized: return { 'text': '', 'confidence': 0, 'error': 'OCR not initialized - no backend available', 'results': [], 'cached': False } try: # Capture if needed if image is None: image = self.capture_screen(region) # Check cache if use_cache: cache_key = self._get_cache_key(image, region) cached_result = self._cache.get(cache_key) if cached_result is not None: with self._stats_lock: self._stats['cache_hits'] += 1 cached_result['cached'] = True return cached_result with self._stats_lock: self._stats['cache_misses'] += 1 # Preprocess image if preprocess: image = self._preprocessor.preprocess(image) image = self._preprocessor.resize_for_ocr(image) # Perform OCR if self._backend == 'easyocr': result = self._ocr_easyocr(image) elif self._backend == 'tesseract': result = self._ocr_tesseract(image) elif self._backend == 'paddle': result = self._ocr_paddle(image) else: return { 'text': '', 'confidence': 0, 'error': 'Unknown backend', 'results': [], 'cached': False } # Cache result if use_cache: result['cached'] = False self._cache.put(cache_key, result.copy()) # Update stats elapsed_ms = (time.perf_counter() - start_time) * 1000 with self._stats_lock: self._stats['total_time_ms'] += elapsed_ms result['time_ms'] = elapsed_ms return result except Exception as e: return { 'text': '', 'confidence': 0, 'error': str(e), 'results': [], 'cached': False } def capture_screen(self, region: Tuple[int, int, int, int] = None) -> Image.Image: """ Capture screen or region using the ScreenshotService. Args: region: (x, y, width, height) or None for full screen Returns: PIL Image """ try: from core.screenshot import get_screenshot_service screenshot_service = get_screenshot_service() if region: x, y, width, height = region return screenshot_service.capture_region(x, y, width, height) else: return screenshot_service.capture(full_screen=True) except Exception as e: print(f"[OCR] Screenshot service failed, falling back: {e}") # Fallback to direct pyautogui capture try: import pyautogui if region: x, y, width, height = region return pyautogui.screenshot(region=(x, y, width, height)) else: return pyautogui.screenshot() except ImportError: raise RuntimeError("pyautogui not installed. Run: pip install pyautogui") def _ocr_easyocr(self, image: Image.Image) -> Dict[str, Any]: """OCR using EasyOCR.""" # Convert PIL to numpy (zero-copy where possible) image_np = np.array(image) results = self._ocr_reader.readtext(image_np) # Parse results texts = [] total_confidence = 0 parsed_results = [] for (bbox, text, conf) in results: texts.append(text) total_confidence += conf # Get bounding box x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] x_min, x_max = min(x_coords), max(x_coords) y_min, y_max = min(y_coords), max(y_coords) parsed_results.append(OCRResult( text=text, confidence=conf, bounding_box=(int(x_min), int(y_min), int(x_max-x_min), int(y_max-y_min)), raw_data={'bbox': bbox} )) avg_confidence = total_confidence / len(results) if results else 0 return { 'text': ' '.join(texts), 'confidence': avg_confidence, 'results': parsed_results, 'image_size': image.size } def _ocr_tesseract(self, image: Image.Image) -> Dict[str, Any]: """OCR using Tesseract.""" import pytesseract # Ensure grayscale for tesseract if image.mode != 'L': image = image.convert('L') # Get full text text = pytesseract.image_to_string(image).strip() # Get detailed data data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) parsed_results = [] for i, word in enumerate(data['text']): if word.strip(): conf = int(data['conf'][i]) if conf > 0: # Valid confidence parsed_results.append(OCRResult( text=word, confidence=conf / 100.0, bounding_box=( data['left'][i], data['top'][i], data['width'][i], data['height'][i] ), raw_data={'block_num': data['block_num'][i]} )) avg_confidence = sum(r.confidence for r in parsed_results) / len(parsed_results) if parsed_results else 0 return { 'text': text, 'confidence': avg_confidence, 'results': parsed_results, 'image_size': image.size } def _ocr_paddle(self, image: Image.Image) -> Dict[str, Any]: """OCR using PaddleOCR.""" image_np = np.array(image) result = self._ocr_reader.ocr(image_np, cls=True) texts = [] parsed_results = [] total_confidence = 0 if result and result[0]: for line in result[0]: bbox, (text, conf) = line texts.append(text) total_confidence += conf # Parse bounding box x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] parsed_results.append(OCRResult( text=text, confidence=conf, bounding_box=( int(min(x_coords)), int(min(y_coords)), int(max(x_coords) - min(x_coords)), int(max(y_coords) - min(y_coords)) ), raw_data={'bbox': bbox} )) avg_confidence = total_confidence / len(parsed_results) if parsed_results else 0 return { 'text': ' '.join(texts), 'confidence': avg_confidence, 'results': parsed_results, 'image_size': image.size } def recognize_region(self, x: int, y: int, width: int, height: int, use_cache: bool = True) -> Dict[str, Any]: """Convenience method for region OCR.""" return self.recognize(region=(x, y, width, height), use_cache=use_cache) def find_text(self, target_text: str, image: Image.Image = None, region: Tuple[int, int, int, int] = None) -> List[OCRResult]: """ Find specific text in image. Returns list of OCRResult where target_text is found. """ result = self.recognize(image, region, use_cache=False) matches = [] target_lower = target_text.lower() for r in result.get('results', []): if target_lower in r.text.lower(): matches.append(r) return matches def get_text_at_position(self, x: int, y: int, image: Image.Image = None) -> Optional[str]: """Get text at specific screen position.""" # Small region around point region = (x - 50, y - 10, 100, 20) result = self.recognize(image, region, use_cache=False) return result.get('text') if result.get('text') else None def get_stats(self) -> Dict[str, Any]: """Get OCR service statistics.""" with self._stats_lock: stats = self._stats.copy() total = stats['cache_hits'] + stats['cache_misses'] stats['cache_hit_rate'] = (stats['cache_hits'] / total * 100) if total > 0 else 0 stats['avg_time_ms'] = (stats['total_time_ms'] / stats['total_requests']) if stats['total_requests'] > 0 else 0 stats['backend'] = self._backend return stats def clear_cache(self): """Clear the OCR result cache.""" self._cache.clear() # Singleton instance _ocr_service = None _ocr_lock = threading.Lock() def get_ocr_service() -> OCRService: """Get global OCRService instance.""" global _ocr_service if _ocr_service is None: with _ocr_lock: if _ocr_service is None: _ocr_service = OCRService() return _ocr_service # Convenience function for quick OCR def quick_ocr(region: Tuple[int, int, int, int] = None, use_cache: bool = True) -> str: """ Quick OCR - capture and get text. Usage: text = quick_ocr() # Full screen text = quick_ocr((100, 100, 200, 50)) # Region """ service = get_ocr_service() result = service.recognize(region=region, use_cache=use_cache) return result.get('text', '')