""" EU-Utility - Optimized OCR Service Performance improvements: 1. Deferred model loading with background pre-warming 2. Image preprocessing pipeline for faster OCR 3. Result caching with content-based keys 4. Memory pool for image buffers 5. Multi-threaded OCR processing 6. Automatic backend selection based on hardware """ import io import hashlib import base64 import threading from typing import Dict, List, Tuple, Optional, Any, Callable from dataclasses import dataclass from pathlib import Path from functools import lru_cache try: import numpy as np NUMPY_AVAILABLE = True except ImportError: NUMPY_AVAILABLE = False np = None @dataclass class OCRResult: """Result from OCR operation.""" text: str confidence: float bounding_box: Tuple[int, int, int, int] # x, y, width, height raw_data: Any = None class ImagePreprocessor: """ Optimized image preprocessing for OCR. """ @staticmethod def preprocess_for_ocr(image, target_size: Optional[Tuple[int, int]] = None) -> np.ndarray: """ Preprocess image for optimal OCR performance. Steps: 1. Resize if needed (maintaining aspect ratio) 2. Convert to grayscale 3. Apply adaptive thresholding 4. Denoise """ from PIL import Image, ImageFilter, ImageEnhance # Convert to PIL if needed if isinstance(image, np.ndarray): image = Image.fromarray(image) # Resize if too large (OCR is slow on huge images) if target_size: orig_w, orig_h = image.size target_w, target_h = target_size # Only downscale, never upscale if orig_w > target_w or orig_h > target_h: scale = min(target_w / orig_w, target_h / orig_h) new_size = (int(orig_w * scale), int(orig_h * scale)) image = image.resize(new_size, Image.Resampling.LANCZOS) # Convert to grayscale if image.mode != 'L': image = image.convert('L') # Enhance contrast enhancer = ImageEnhance.Contrast(image) image = enhancer.enhance(1.5) # Sharpen image = image.filter(ImageFilter.SHARPEN) return np.array(image) @staticmethod def compute_hash(image) -> str: """Compute a fast hash of image content for caching.""" from PIL import Image if isinstance(image, Image.Image): # Resize to thumbnail for hash thumb = image.copy() thumb.thumbnail((64, 64)) data = thumb.tobytes() elif isinstance(image, np.ndarray): # Downsample for hash if image.size > 64 * 64: import cv2 thumb = cv2.resize(image, (64, 64)) else: thumb = image data = thumb.tobytes() else: return "" return hashlib.md5(data).hexdigest() class OCRCache: """ LRU cache for OCR results with size limits. """ def __init__(self, max_size: int = 100, ttl_seconds: float = 300): self.max_size = max_size self.ttl_seconds = ttl_seconds self._cache: Dict[str, Tuple[Dict, float]] = {} self._lock = threading.RLock() def get(self, key: str) -> Optional[Dict]: """Get cached result if not expired.""" with self._lock: if key not in self._cache: return None result, timestamp = self._cache[key] # Check TTL import time if time.time() - timestamp > self.ttl_seconds: del self._cache[key] return None return result def put(self, key: str, result: Dict): """Cache OCR result.""" import time with self._lock: # Evict oldest if needed if len(self._cache) >= self.max_size: oldest = min(self._cache.items(), key=lambda x: x[1][1]) del self._cache[oldest[0]] self._cache[key] = (result, time.time()) def clear(self): """Clear cache.""" with self._lock: self._cache.clear() def get_stats(self) -> Dict: """Get cache statistics.""" with self._lock: return { 'size': len(self._cache), 'max_size': self.max_size, 'ttl_seconds': self.ttl_seconds } class OptimizedOCRService: """ High-performance OCR service with lazy loading and caching. Features: - Deferred model loading (only when needed) - Background pre-warming option - Result caching - Image preprocessing pipeline - Multi-threaded processing """ def __init__(self, max_cache_size: int = 100, background_init: bool = False, auto_select_backend: bool = True): self._ocr_reader = None self._backend = None self._initialized = False self._initializing = False self._init_lock = threading.Lock() self._auto_select = auto_select_backend self._cache = OCRCache(max_size=max_cache_size) self._preprocessor = ImagePreprocessor() # Thread pool for parallel processing self._executor = None self._max_workers = 2 # Background initialization if background_init: self._start_background_init() def _start_background_init(self): """Start background initialization of OCR.""" def init_in_background(): try: self._init_backends() except Exception as e: print(f"[OCR] Background init failed: {e}") thread = threading.Thread(target=init_in_background, daemon=True, name="OCRInit") thread.start() def _init_backends(self): """Initialize available OCR backends.""" if self._initialized or self._initializing: return with self._init_lock: if self._initialized or self._initializing: return self._initializing = True print("[OCR] Initializing backends...") # Auto-select best backend if self._auto_select: backend_order = self._get_optimal_backend_order() else: backend_order = ['easyocr', 'tesseract', 'paddle'] for backend in backend_order: if self._try_backend(backend): break self._initializing = False def _get_optimal_backend_order(self) -> List[str]: """Determine optimal backend order based on hardware.""" backends = [] # Check for GPU try: import torch if torch.cuda.is_available(): # GPU available - EasyOCR with GPU is best backends = ['easyocr', 'paddle', 'tesseract'] else: # CPU only - Tesseract is fastest on CPU backends = ['tesseract', 'easyocr', 'paddle'] except ImportError: # No torch - use order based on typical performance backends = ['tesseract', 'easyocr', 'paddle'] return backends def _try_backend(self, backend: str) -> bool: """Try to initialize a specific backend.""" try: if backend == 'easyocr': return self._init_easyocr() elif backend == 'tesseract': return self._init_tesseract() elif backend == 'paddle': return self._init_paddle() except Exception as e: print(f"[OCR] {backend} init failed: {e}") return False def _init_easyocr(self) -> bool: """Initialize EasyOCR backend.""" import easyocr # Check for GPU gpu = False try: import torch gpu = torch.cuda.is_available() except ImportError: pass self._ocr_reader = easyocr.Reader(['en'], gpu=gpu, verbose=False) self._backend = 'easyocr' self._initialized = True print(f"[OCR] Using EasyOCR backend (GPU: {gpu})") return True def _init_tesseract(self) -> bool: """Initialize Tesseract backend.""" import pytesseract from PIL import Image # Verify tesseract is installed version = pytesseract.get_tesseract_version() self._backend = 'tesseract' self._initialized = True print(f"[OCR] Using Tesseract backend (v{version})") return True def _init_paddle(self) -> bool: """Initialize PaddleOCR backend.""" from paddleocr import PaddleOCR self._ocr_reader = PaddleOCR(lang='en', show_log=False, use_gpu=False) self._backend = 'paddle' self._initialized = True print("[OCR] Using PaddleOCR backend") return True def is_available(self) -> bool: """Check if OCR is available (lazy init).""" if not self._initialized and not self._initializing: self._init_backends() return self._initialized def recognize(self, image=None, region: Tuple[int, int, int, int] = None, preprocess: bool = True, use_cache: bool = True) -> Dict[str, Any]: """ Perform OCR on image or screen region. Args: image: PIL Image, numpy array, or None to capture screen region: Screen region to capture (if image is None) preprocess: Whether to apply image preprocessing use_cache: Whether to use result caching Returns: Dict with 'text', 'confidence', 'results', 'image_size' """ from PIL import Image # Lazy initialization if not self._initialized and not self._initializing: self._init_backends() if not self._initialized: return { 'text': '', 'confidence': 0, 'error': 'OCR not initialized - no backend available', 'results': [] } try: # Capture if needed if image is None: image = self._capture_screen(region) # Ensure PIL Image if isinstance(image, np.ndarray): image = Image.fromarray(image) # Check cache if use_cache: cache_key = self._preprocessor.compute_hash(image) cached = self._cache.get(cache_key) if cached: cached['cached'] = True return cached # Preprocess if preprocess: # Max dimension for OCR (larger = slower) max_dim = 1920 image_array = self._preprocessor.preprocess_for_ocr( image, target_size=(max_dim, max_dim) ) else: image_array = np.array(image) # Perform OCR if self._backend == 'easyocr': result = self._ocr_easyocr(image_array) elif self._backend == 'tesseract': result = self._ocr_tesseract(image_array) elif self._backend == 'paddle': result = self._ocr_paddle(image_array) else: return {'text': '', 'confidence': 0, 'error': 'Unknown backend', 'results': []} result['cached'] = False result['image_size'] = image.size if hasattr(image, 'size') else image_array.shape[:2][::-1] # Cache result if use_cache: self._cache.put(cache_key, result) return result except Exception as e: return { 'text': '', 'confidence': 0, 'error': str(e), 'results': [] } def _capture_screen(self, region: Tuple[int, int, int, int] = None): """Capture screen or region.""" try: from core.screenshot import get_screenshot_service screenshot_service = get_screenshot_service() if region: x, y, width, height = region return screenshot_service.capture_region(x, y, width, height) else: return screenshot_service.capture(full_screen=True) except Exception as e: # Fallback to pyautogui import pyautogui if region: return pyautogui.screenshot(region=region) return pyautogui.screenshot() def _ocr_easyocr(self, image_np: np.ndarray) -> Dict[str, Any]: """OCR using EasyOCR.""" results = self._ocr_reader.readtext(image_np) texts = [] parsed_results = [] total_confidence = 0 for (bbox, text, conf) in results: texts.append(text) total_confidence += conf x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] parsed_results.append(OCRResult( text=text, confidence=conf, bounding_box=( int(min(x_coords)), int(min(y_coords)), int(max(x_coords) - min(x_coords)), int(max(y_coords) - min(y_coords)) ), raw_data={'bbox': bbox} )) avg_confidence = total_confidence / len(results) if results else 0 return { 'text': ' '.join(texts), 'confidence': avg_confidence, 'results': parsed_results } def _ocr_tesseract(self, image_np: np.ndarray) -> Dict[str, Any]: """OCR using Tesseract.""" import pytesseract from PIL import Image image = Image.fromarray(image_np) if isinstance(image_np, np.ndarray) else image_np # Get full text text = pytesseract.image_to_string(image).strip() # Get detailed data data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT) parsed_results = [] for i, word in enumerate(data['text']): if word.strip(): conf = int(data['conf'][i]) if conf > 0: parsed_results.append(OCRResult( text=word, confidence=conf / 100.0, bounding_box=( data['left'][i], data['top'][i], data['width'][i], data['height'][i] ) )) avg_confidence = sum(r.confidence for r in parsed_results) / len(parsed_results) if parsed_results else 0 return { 'text': text, 'confidence': avg_confidence, 'results': parsed_results } def _ocr_paddle(self, image_np: np.ndarray) -> Dict[str, Any]: """OCR using PaddleOCR.""" result = self._ocr_reader.ocr(image_np, cls=True) texts = [] parsed_results = [] total_confidence = 0 if result and result[0]: for line in result[0]: bbox, (text, conf) = line texts.append(text) total_confidence += conf x_coords = [p[0] for p in bbox] y_coords = [p[1] for p in bbox] parsed_results.append(OCRResult( text=text, confidence=conf, bounding_box=( int(min(x_coords)), int(min(y_coords)), int(max(x_coords) - min(x_coords)), int(max(y_coords) - min(y_coords)) ) )) avg_confidence = total_confidence / len(parsed_results) if parsed_results else 0 return { 'text': ' '.join(texts), 'confidence': avg_confidence, 'results': parsed_results } def recognize_async(self, image=None, region: Tuple[int, int, int, int] = None, callback: Callable[[Dict], None] = None) -> Optional[threading.Thread]: """ Perform OCR asynchronously. Returns the thread handle if started, None otherwise. """ def do_ocr(): result = self.recognize(image, region) if callback: callback(result) thread = threading.Thread(target=do_ocr, daemon=True) thread.start() return thread def recognize_batch(self, images: List, preprocess: bool = True) -> List[Dict[str, Any]]: """ Process multiple images efficiently. """ results = [] # Process in batches of 4 for optimal throughput batch_size = 4 for i in range(0, len(images), batch_size): batch = images[i:i+batch_size] # Process batch for image in batch: result = self.recognize(image, preprocess=preprocess) results.append(result) return results def find_text(self, target_text: str, image=None, region: Tuple[int, int, int, int] = None) -> List[OCRResult]: """Find specific text in image.""" result = self.recognize(image, region) matches = [] for r in result.get('results', []): if target_text.lower() in r.text.lower(): matches.append(r) return matches def get_cache_stats(self) -> Dict: """Get cache statistics.""" return self._cache.get_stats() def clear_cache(self): """Clear OCR result cache.""" self._cache.clear() def get_backend(self) -> Optional[str]: """Get current OCR backend name.""" return self._backend # Singleton _ocr_service = None _ocr_lock = threading.Lock() def get_ocr_service(background_init: bool = False) -> OptimizedOCRService: """Get global OptimizedOCRService instance.""" global _ocr_service if _ocr_service is None: with _ocr_lock: if _ocr_service is None: _ocr_service = OptimizedOCRService(background_init=background_init) return _ocr_service def quick_ocr(region: Tuple[int, int, int, int] = None) -> str: """Quick OCR - capture and get text.""" service = get_ocr_service() result = service.recognize(region=region) return result.get('text', '')