EU-Utility/core/ocr_service_optimized_v2.py

608 lines
19 KiB
Python

"""
EU-Utility - Optimized OCR Service
Performance improvements:
1. Deferred model loading with background pre-warming
2. Image preprocessing pipeline for faster OCR
3. Result caching with content-based keys
4. Memory pool for image buffers
5. Multi-threaded OCR processing
6. Automatic backend selection based on hardware
"""
import io
import hashlib
import base64
import threading
from typing import Dict, List, Tuple, Optional, Any, Callable
from dataclasses import dataclass
from pathlib import Path
from functools import lru_cache
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
np = None
@dataclass
class OCRResult:
"""Result from OCR operation."""
text: str
confidence: float
bounding_box: Tuple[int, int, int, int] # x, y, width, height
raw_data: Any = None
class ImagePreprocessor:
"""
Optimized image preprocessing for OCR.
"""
@staticmethod
def preprocess_for_ocr(image, target_size: Optional[Tuple[int, int]] = None) -> np.ndarray:
"""
Preprocess image for optimal OCR performance.
Steps:
1. Resize if needed (maintaining aspect ratio)
2. Convert to grayscale
3. Apply adaptive thresholding
4. Denoise
"""
from PIL import Image, ImageFilter, ImageEnhance
# Convert to PIL if needed
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
# Resize if too large (OCR is slow on huge images)
if target_size:
orig_w, orig_h = image.size
target_w, target_h = target_size
# Only downscale, never upscale
if orig_w > target_w or orig_h > target_h:
scale = min(target_w / orig_w, target_h / orig_h)
new_size = (int(orig_w * scale), int(orig_h * scale))
image = image.resize(new_size, Image.Resampling.LANCZOS)
# Convert to grayscale
if image.mode != 'L':
image = image.convert('L')
# Enhance contrast
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(1.5)
# Sharpen
image = image.filter(ImageFilter.SHARPEN)
return np.array(image)
@staticmethod
def compute_hash(image) -> str:
"""Compute a fast hash of image content for caching."""
from PIL import Image
if isinstance(image, Image.Image):
# Resize to thumbnail for hash
thumb = image.copy()
thumb.thumbnail((64, 64))
data = thumb.tobytes()
elif isinstance(image, np.ndarray):
# Downsample for hash
if image.size > 64 * 64:
import cv2
thumb = cv2.resize(image, (64, 64))
else:
thumb = image
data = thumb.tobytes()
else:
return ""
return hashlib.md5(data).hexdigest()
class OCRCache:
"""
LRU cache for OCR results with size limits.
"""
def __init__(self, max_size: int = 100, ttl_seconds: float = 300):
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self._cache: Dict[str, Tuple[Dict, float]] = {}
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Dict]:
"""Get cached result if not expired."""
with self._lock:
if key not in self._cache:
return None
result, timestamp = self._cache[key]
# Check TTL
import time
if time.time() - timestamp > self.ttl_seconds:
del self._cache[key]
return None
return result
def put(self, key: str, result: Dict):
"""Cache OCR result."""
import time
with self._lock:
# Evict oldest if needed
if len(self._cache) >= self.max_size:
oldest = min(self._cache.items(), key=lambda x: x[1][1])
del self._cache[oldest[0]]
self._cache[key] = (result, time.time())
def clear(self):
"""Clear cache."""
with self._lock:
self._cache.clear()
def get_stats(self) -> Dict:
"""Get cache statistics."""
with self._lock:
return {
'size': len(self._cache),
'max_size': self.max_size,
'ttl_seconds': self.ttl_seconds
}
class OptimizedOCRService:
"""
High-performance OCR service with lazy loading and caching.
Features:
- Deferred model loading (only when needed)
- Background pre-warming option
- Result caching
- Image preprocessing pipeline
- Multi-threaded processing
"""
def __init__(self,
max_cache_size: int = 100,
background_init: bool = False,
auto_select_backend: bool = True):
self._ocr_reader = None
self._backend = None
self._initialized = False
self._initializing = False
self._init_lock = threading.Lock()
self._auto_select = auto_select_backend
self._cache = OCRCache(max_size=max_cache_size)
self._preprocessor = ImagePreprocessor()
# Thread pool for parallel processing
self._executor = None
self._max_workers = 2
# Background initialization
if background_init:
self._start_background_init()
def _start_background_init(self):
"""Start background initialization of OCR."""
def init_in_background():
try:
self._init_backends()
except Exception as e:
print(f"[OCR] Background init failed: {e}")
thread = threading.Thread(target=init_in_background, daemon=True, name="OCRInit")
thread.start()
def _init_backends(self):
"""Initialize available OCR backends."""
if self._initialized or self._initializing:
return
with self._init_lock:
if self._initialized or self._initializing:
return
self._initializing = True
print("[OCR] Initializing backends...")
# Auto-select best backend
if self._auto_select:
backend_order = self._get_optimal_backend_order()
else:
backend_order = ['easyocr', 'tesseract', 'paddle']
for backend in backend_order:
if self._try_backend(backend):
break
self._initializing = False
def _get_optimal_backend_order(self) -> List[str]:
"""Determine optimal backend order based on hardware."""
backends = []
# Check for GPU
try:
import torch
if torch.cuda.is_available():
# GPU available - EasyOCR with GPU is best
backends = ['easyocr', 'paddle', 'tesseract']
else:
# CPU only - Tesseract is fastest on CPU
backends = ['tesseract', 'easyocr', 'paddle']
except ImportError:
# No torch - use order based on typical performance
backends = ['tesseract', 'easyocr', 'paddle']
return backends
def _try_backend(self, backend: str) -> bool:
"""Try to initialize a specific backend."""
try:
if backend == 'easyocr':
return self._init_easyocr()
elif backend == 'tesseract':
return self._init_tesseract()
elif backend == 'paddle':
return self._init_paddle()
except Exception as e:
print(f"[OCR] {backend} init failed: {e}")
return False
def _init_easyocr(self) -> bool:
"""Initialize EasyOCR backend."""
import easyocr
# Check for GPU
gpu = False
try:
import torch
gpu = torch.cuda.is_available()
except ImportError:
pass
self._ocr_reader = easyocr.Reader(['en'], gpu=gpu, verbose=False)
self._backend = 'easyocr'
self._initialized = True
print(f"[OCR] Using EasyOCR backend (GPU: {gpu})")
return True
def _init_tesseract(self) -> bool:
"""Initialize Tesseract backend."""
import pytesseract
from PIL import Image
# Verify tesseract is installed
version = pytesseract.get_tesseract_version()
self._backend = 'tesseract'
self._initialized = True
print(f"[OCR] Using Tesseract backend (v{version})")
return True
def _init_paddle(self) -> bool:
"""Initialize PaddleOCR backend."""
from paddleocr import PaddleOCR
self._ocr_reader = PaddleOCR(lang='en', show_log=False, use_gpu=False)
self._backend = 'paddle'
self._initialized = True
print("[OCR] Using PaddleOCR backend")
return True
def is_available(self) -> bool:
"""Check if OCR is available (lazy init)."""
if not self._initialized and not self._initializing:
self._init_backends()
return self._initialized
def recognize(self,
image=None,
region: Tuple[int, int, int, int] = None,
preprocess: bool = True,
use_cache: bool = True) -> Dict[str, Any]:
"""
Perform OCR on image or screen region.
Args:
image: PIL Image, numpy array, or None to capture screen
region: Screen region to capture (if image is None)
preprocess: Whether to apply image preprocessing
use_cache: Whether to use result caching
Returns:
Dict with 'text', 'confidence', 'results', 'image_size'
"""
from PIL import Image
# Lazy initialization
if not self._initialized and not self._initializing:
self._init_backends()
if not self._initialized:
return {
'text': '',
'confidence': 0,
'error': 'OCR not initialized - no backend available',
'results': []
}
try:
# Capture if needed
if image is None:
image = self._capture_screen(region)
# Ensure PIL Image
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
# Check cache
if use_cache:
cache_key = self._preprocessor.compute_hash(image)
cached = self._cache.get(cache_key)
if cached:
cached['cached'] = True
return cached
# Preprocess
if preprocess:
# Max dimension for OCR (larger = slower)
max_dim = 1920
image_array = self._preprocessor.preprocess_for_ocr(
image,
target_size=(max_dim, max_dim)
)
else:
image_array = np.array(image)
# Perform OCR
if self._backend == 'easyocr':
result = self._ocr_easyocr(image_array)
elif self._backend == 'tesseract':
result = self._ocr_tesseract(image_array)
elif self._backend == 'paddle':
result = self._ocr_paddle(image_array)
else:
return {'text': '', 'confidence': 0, 'error': 'Unknown backend', 'results': []}
result['cached'] = False
result['image_size'] = image.size if hasattr(image, 'size') else image_array.shape[:2][::-1]
# Cache result
if use_cache:
self._cache.put(cache_key, result)
return result
except Exception as e:
return {
'text': '',
'confidence': 0,
'error': str(e),
'results': []
}
def _capture_screen(self, region: Tuple[int, int, int, int] = None):
"""Capture screen or region."""
try:
from core.screenshot import get_screenshot_service
screenshot_service = get_screenshot_service()
if region:
x, y, width, height = region
return screenshot_service.capture_region(x, y, width, height)
else:
return screenshot_service.capture(full_screen=True)
except Exception as e:
# Fallback to pyautogui
import pyautogui
if region:
return pyautogui.screenshot(region=region)
return pyautogui.screenshot()
def _ocr_easyocr(self, image_np: np.ndarray) -> Dict[str, Any]:
"""OCR using EasyOCR."""
results = self._ocr_reader.readtext(image_np)
texts = []
parsed_results = []
total_confidence = 0
for (bbox, text, conf) in results:
texts.append(text)
total_confidence += conf
x_coords = [p[0] for p in bbox]
y_coords = [p[1] for p in bbox]
parsed_results.append(OCRResult(
text=text,
confidence=conf,
bounding_box=(
int(min(x_coords)),
int(min(y_coords)),
int(max(x_coords) - min(x_coords)),
int(max(y_coords) - min(y_coords))
),
raw_data={'bbox': bbox}
))
avg_confidence = total_confidence / len(results) if results else 0
return {
'text': ' '.join(texts),
'confidence': avg_confidence,
'results': parsed_results
}
def _ocr_tesseract(self, image_np: np.ndarray) -> Dict[str, Any]:
"""OCR using Tesseract."""
import pytesseract
from PIL import Image
image = Image.fromarray(image_np) if isinstance(image_np, np.ndarray) else image_np
# Get full text
text = pytesseract.image_to_string(image).strip()
# Get detailed data
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
parsed_results = []
for i, word in enumerate(data['text']):
if word.strip():
conf = int(data['conf'][i])
if conf > 0:
parsed_results.append(OCRResult(
text=word,
confidence=conf / 100.0,
bounding_box=(
data['left'][i],
data['top'][i],
data['width'][i],
data['height'][i]
)
))
avg_confidence = sum(r.confidence for r in parsed_results) / len(parsed_results) if parsed_results else 0
return {
'text': text,
'confidence': avg_confidence,
'results': parsed_results
}
def _ocr_paddle(self, image_np: np.ndarray) -> Dict[str, Any]:
"""OCR using PaddleOCR."""
result = self._ocr_reader.ocr(image_np, cls=True)
texts = []
parsed_results = []
total_confidence = 0
if result and result[0]:
for line in result[0]:
bbox, (text, conf) = line
texts.append(text)
total_confidence += conf
x_coords = [p[0] for p in bbox]
y_coords = [p[1] for p in bbox]
parsed_results.append(OCRResult(
text=text,
confidence=conf,
bounding_box=(
int(min(x_coords)),
int(min(y_coords)),
int(max(x_coords) - min(x_coords)),
int(max(y_coords) - min(y_coords))
)
))
avg_confidence = total_confidence / len(parsed_results) if parsed_results else 0
return {
'text': ' '.join(texts),
'confidence': avg_confidence,
'results': parsed_results
}
def recognize_async(self,
image=None,
region: Tuple[int, int, int, int] = None,
callback: Callable[[Dict], None] = None) -> Optional[threading.Thread]:
"""
Perform OCR asynchronously.
Returns the thread handle if started, None otherwise.
"""
def do_ocr():
result = self.recognize(image, region)
if callback:
callback(result)
thread = threading.Thread(target=do_ocr, daemon=True)
thread.start()
return thread
def recognize_batch(self,
images: List,
preprocess: bool = True) -> List[Dict[str, Any]]:
"""
Process multiple images efficiently.
"""
results = []
# Process in batches of 4 for optimal throughput
batch_size = 4
for i in range(0, len(images), batch_size):
batch = images[i:i+batch_size]
# Process batch
for image in batch:
result = self.recognize(image, preprocess=preprocess)
results.append(result)
return results
def find_text(self,
target_text: str,
image=None,
region: Tuple[int, int, int, int] = None) -> List[OCRResult]:
"""Find specific text in image."""
result = self.recognize(image, region)
matches = []
for r in result.get('results', []):
if target_text.lower() in r.text.lower():
matches.append(r)
return matches
def get_cache_stats(self) -> Dict:
"""Get cache statistics."""
return self._cache.get_stats()
def clear_cache(self):
"""Clear OCR result cache."""
self._cache.clear()
def get_backend(self) -> Optional[str]:
"""Get current OCR backend name."""
return self._backend
# Singleton
_ocr_service = None
_ocr_lock = threading.Lock()
def get_ocr_service(background_init: bool = False) -> OptimizedOCRService:
"""Get global OptimizedOCRService instance."""
global _ocr_service
if _ocr_service is None:
with _ocr_lock:
if _ocr_service is None:
_ocr_service = OptimizedOCRService(background_init=background_init)
return _ocr_service
def quick_ocr(region: Tuple[int, int, int, int] = None) -> str:
"""Quick OCR - capture and get text."""
service = get_ocr_service()
result = service.recognize(region=region)
return result.get('text', '')