572 lines
19 KiB
Python
572 lines
19 KiB
Python
"""
|
|
EU-Utility - Optimized OCR Service
|
|
|
|
Performance improvements:
|
|
1. Image preprocessing pipeline
|
|
2. Result caching for repeated regions
|
|
3. Connection pooling for backend resources
|
|
4. Memory-efficient image handling
|
|
5. Async processing support
|
|
"""
|
|
|
|
import io
|
|
import base64
|
|
import time
|
|
import threading
|
|
import hashlib
|
|
from typing import Dict, List, Tuple, Optional, Any
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from functools import lru_cache
|
|
from collections import OrderedDict
|
|
|
|
import numpy as np
|
|
from PIL import Image, ImageEnhance, ImageFilter
|
|
|
|
|
|
@dataclass
|
|
class OCRResult:
|
|
"""Result from OCR operation."""
|
|
text: str
|
|
confidence: float
|
|
bounding_box: Tuple[int, int, int, int] # x, y, width, height
|
|
raw_data: Any = None
|
|
|
|
|
|
class LRUCache:
|
|
"""Simple LRU cache for OCR results."""
|
|
|
|
def __init__(self, capacity: int = 100):
|
|
self.capacity = capacity
|
|
self.cache: OrderedDict = OrderedDict()
|
|
self.lock = threading.Lock()
|
|
|
|
def get(self, key: str) -> Optional[Any]:
|
|
with self.lock:
|
|
if key in self.cache:
|
|
# Move to end (most recently used)
|
|
self.cache.move_to_end(key)
|
|
return self.cache[key]
|
|
return None
|
|
|
|
def put(self, key: str, value: Any):
|
|
with self.lock:
|
|
if key in self.cache:
|
|
self.cache.move_to_end(key)
|
|
else:
|
|
if len(self.cache) >= self.capacity:
|
|
# Remove oldest
|
|
self.cache.popitem(last=False)
|
|
self.cache[key] = value
|
|
|
|
def clear(self):
|
|
with self.lock:
|
|
self.cache.clear()
|
|
|
|
|
|
class ImagePreprocessor:
|
|
"""
|
|
Image preprocessing pipeline for better OCR accuracy and speed.
|
|
"""
|
|
|
|
@staticmethod
|
|
def preprocess(image: Image.Image,
|
|
grayscale: bool = True,
|
|
contrast: float = 1.5,
|
|
sharpness: float = 1.2,
|
|
denoise: bool = False) -> Image.Image:
|
|
"""
|
|
Preprocess image for OCR.
|
|
|
|
Args:
|
|
image: Input PIL Image
|
|
grayscale: Convert to grayscale
|
|
contrast: Contrast enhancement factor
|
|
sharpness: Sharpness enhancement factor
|
|
denoise: Apply denoising filter
|
|
|
|
Returns:
|
|
Preprocessed PIL Image
|
|
"""
|
|
# Convert to RGB if needed
|
|
if image.mode not in ('RGB', 'L'):
|
|
image = image.convert('RGB')
|
|
|
|
# Grayscale conversion
|
|
if grayscale and image.mode != 'L':
|
|
image = image.convert('L')
|
|
|
|
# Contrast enhancement
|
|
if contrast != 1.0:
|
|
enhancer = ImageEnhance.Contrast(image)
|
|
image = enhancer.enhance(contrast)
|
|
|
|
# Sharpness enhancement
|
|
if sharpness != 1.0:
|
|
enhancer = ImageEnhance.Sharpness(image)
|
|
image = enhancer.enhance(sharpness)
|
|
|
|
# Denoising
|
|
if denoise:
|
|
image = image.filter(ImageFilter.MedianFilter(size=3))
|
|
|
|
return image
|
|
|
|
@staticmethod
|
|
def resize_for_ocr(image: Image.Image,
|
|
max_dimension: int = 1024,
|
|
min_dimension: int = 32) -> Image.Image:
|
|
"""
|
|
Resize image to optimal size for OCR.
|
|
|
|
Args:
|
|
image: Input PIL Image
|
|
max_dimension: Maximum width or height
|
|
min_dimension: Minimum width or height
|
|
|
|
Returns:
|
|
Resized PIL Image
|
|
"""
|
|
width, height = image.size
|
|
|
|
# Check if resize needed
|
|
if width <= max_dimension and height <= max_dimension:
|
|
if width >= min_dimension and height >= min_dimension:
|
|
return image
|
|
|
|
# Calculate new size
|
|
ratio = min(max_dimension / max(width, height),
|
|
min_dimension / min(width, height) if min(width, height) > 0 else 1)
|
|
|
|
if ratio < 1 or (width < min_dimension or height < min_dimension):
|
|
new_width = max(int(width * ratio), min_dimension)
|
|
new_height = max(int(height * ratio), min_dimension)
|
|
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
return image
|
|
|
|
|
|
class OCRService:
|
|
"""
|
|
Optimized OCR service with caching and preprocessing.
|
|
|
|
Features:
|
|
- LRU cache for repeated regions
|
|
- Image preprocessing pipeline
|
|
- Memory-efficient numpy conversion
|
|
- Backend connection pooling
|
|
"""
|
|
|
|
def __init__(self, cache_size: int = 50):
|
|
self._ocr_reader = None
|
|
self._backend = None
|
|
self._initialized = False
|
|
self._initializing = False
|
|
self._init_lock = threading.Lock()
|
|
|
|
# Result cache
|
|
self._cache = LRUCache(capacity=cache_size)
|
|
|
|
# Preprocessor
|
|
self._preprocessor = ImagePreprocessor()
|
|
|
|
# Stats
|
|
self._stats = {
|
|
'cache_hits': 0,
|
|
'cache_misses': 0,
|
|
'total_requests': 0,
|
|
'total_time_ms': 0,
|
|
}
|
|
self._stats_lock = threading.Lock()
|
|
|
|
def _init_backends(self):
|
|
"""Initialize available OCR backends (lazy - called on first use)."""
|
|
if self._initialized or self._initializing:
|
|
return
|
|
|
|
with self._init_lock:
|
|
if self._initialized or self._initializing:
|
|
return
|
|
|
|
self._initializing = True
|
|
print("[OCR] Initializing backends...")
|
|
|
|
# Try EasyOCR first (best accuracy)
|
|
try:
|
|
import easyocr
|
|
self._ocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False)
|
|
self._backend = 'easyocr'
|
|
self._initialized = True
|
|
print("[OCR] Using EasyOCR backend")
|
|
return
|
|
except ImportError:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[OCR] EasyOCR failed: {e}")
|
|
|
|
# Try Tesseract (most common)
|
|
try:
|
|
import pytesseract
|
|
pytesseract.get_tesseract_version()
|
|
self._backend = 'tesseract'
|
|
self._initialized = True
|
|
print("[OCR] Using Tesseract backend")
|
|
return
|
|
except Exception as e:
|
|
print(f"[OCR] Tesseract failed: {e}")
|
|
|
|
# Try PaddleOCR (fallback)
|
|
try:
|
|
from paddleocr import PaddleOCR
|
|
self._ocr_reader = PaddleOCR(lang='en', show_log=False)
|
|
self._backend = 'paddle'
|
|
self._initialized = True
|
|
print("[OCR] Using PaddleOCR backend")
|
|
except Exception as e:
|
|
print(f"[OCR] PaddleOCR failed: {e}")
|
|
|
|
self._initializing = False
|
|
|
|
if not self._initialized:
|
|
print("[OCR] WARNING: No OCR backend available!")
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if OCR is available (lazy init)."""
|
|
if not self._initialized and not self._initializing:
|
|
self._init_backends()
|
|
return self._initialized
|
|
|
|
def _get_cache_key(self, image: Image.Image = None,
|
|
region: Tuple[int, int, int, int] = None) -> str:
|
|
"""Generate cache key for image/region."""
|
|
if region:
|
|
return f"region:{region}"
|
|
elif image:
|
|
# Hash image content
|
|
img_bytes = io.BytesIO()
|
|
image.save(img_bytes, format='PNG')
|
|
return hashlib.md5(img_bytes.getvalue()).hexdigest()
|
|
return ""
|
|
|
|
def recognize(self,
|
|
image: Image.Image = None,
|
|
region: Tuple[int, int, int, int] = None,
|
|
use_cache: bool = True,
|
|
preprocess: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Perform OCR on image or screen region.
|
|
|
|
Args:
|
|
image: PIL Image, numpy array, or None to capture screen
|
|
region: Screen region to capture (if image is None)
|
|
use_cache: Whether to use result caching
|
|
preprocess: Whether to apply image preprocessing
|
|
|
|
Returns:
|
|
Dict with 'text', 'confidence', 'results', 'image_size', 'cached'
|
|
"""
|
|
start_time = time.perf_counter()
|
|
|
|
with self._stats_lock:
|
|
self._stats['total_requests'] += 1
|
|
|
|
# Lazy initialization
|
|
if not self._initialized and not self._initializing:
|
|
self._init_backends()
|
|
|
|
if not self._initialized:
|
|
return {
|
|
'text': '',
|
|
'confidence': 0,
|
|
'error': 'OCR not initialized - no backend available',
|
|
'results': [],
|
|
'cached': False
|
|
}
|
|
|
|
try:
|
|
# Capture if needed
|
|
if image is None:
|
|
image = self.capture_screen(region)
|
|
|
|
# Check cache
|
|
if use_cache:
|
|
cache_key = self._get_cache_key(image, region)
|
|
cached_result = self._cache.get(cache_key)
|
|
if cached_result is not None:
|
|
with self._stats_lock:
|
|
self._stats['cache_hits'] += 1
|
|
cached_result['cached'] = True
|
|
return cached_result
|
|
|
|
with self._stats_lock:
|
|
self._stats['cache_misses'] += 1
|
|
|
|
# Preprocess image
|
|
if preprocess:
|
|
image = self._preprocessor.preprocess(image)
|
|
image = self._preprocessor.resize_for_ocr(image)
|
|
|
|
# Perform OCR
|
|
if self._backend == 'easyocr':
|
|
result = self._ocr_easyocr(image)
|
|
elif self._backend == 'tesseract':
|
|
result = self._ocr_tesseract(image)
|
|
elif self._backend == 'paddle':
|
|
result = self._ocr_paddle(image)
|
|
else:
|
|
return {
|
|
'text': '',
|
|
'confidence': 0,
|
|
'error': 'Unknown backend',
|
|
'results': [],
|
|
'cached': False
|
|
}
|
|
|
|
# Cache result
|
|
if use_cache:
|
|
result['cached'] = False
|
|
self._cache.put(cache_key, result.copy())
|
|
|
|
# Update stats
|
|
elapsed_ms = (time.perf_counter() - start_time) * 1000
|
|
with self._stats_lock:
|
|
self._stats['total_time_ms'] += elapsed_ms
|
|
|
|
result['time_ms'] = elapsed_ms
|
|
return result
|
|
|
|
except Exception as e:
|
|
return {
|
|
'text': '',
|
|
'confidence': 0,
|
|
'error': str(e),
|
|
'results': [],
|
|
'cached': False
|
|
}
|
|
|
|
def capture_screen(self, region: Tuple[int, int, int, int] = None) -> Image.Image:
|
|
"""
|
|
Capture screen or region using the ScreenshotService.
|
|
|
|
Args:
|
|
region: (x, y, width, height) or None for full screen
|
|
|
|
Returns:
|
|
PIL Image
|
|
"""
|
|
try:
|
|
from core.screenshot import get_screenshot_service
|
|
screenshot_service = get_screenshot_service()
|
|
|
|
if region:
|
|
x, y, width, height = region
|
|
return screenshot_service.capture_region(x, y, width, height)
|
|
else:
|
|
return screenshot_service.capture(full_screen=True)
|
|
|
|
except Exception as e:
|
|
print(f"[OCR] Screenshot service failed, falling back: {e}")
|
|
# Fallback to direct pyautogui capture
|
|
try:
|
|
import pyautogui
|
|
|
|
if region:
|
|
x, y, width, height = region
|
|
return pyautogui.screenshot(region=(x, y, width, height))
|
|
else:
|
|
return pyautogui.screenshot()
|
|
|
|
except ImportError:
|
|
raise RuntimeError("pyautogui not installed. Run: pip install pyautogui")
|
|
|
|
def _ocr_easyocr(self, image: Image.Image) -> Dict[str, Any]:
|
|
"""OCR using EasyOCR."""
|
|
# Convert PIL to numpy (zero-copy where possible)
|
|
image_np = np.array(image)
|
|
|
|
results = self._ocr_reader.readtext(image_np)
|
|
|
|
# Parse results
|
|
texts = []
|
|
total_confidence = 0
|
|
parsed_results = []
|
|
|
|
for (bbox, text, conf) in results:
|
|
texts.append(text)
|
|
total_confidence += conf
|
|
|
|
# Get bounding box
|
|
x_coords = [p[0] for p in bbox]
|
|
y_coords = [p[1] for p in bbox]
|
|
x_min, x_max = min(x_coords), max(x_coords)
|
|
y_min, y_max = min(y_coords), max(y_coords)
|
|
|
|
parsed_results.append(OCRResult(
|
|
text=text,
|
|
confidence=conf,
|
|
bounding_box=(int(x_min), int(y_min), int(x_max-x_min), int(y_max-y_min)),
|
|
raw_data={'bbox': bbox}
|
|
))
|
|
|
|
avg_confidence = total_confidence / len(results) if results else 0
|
|
|
|
return {
|
|
'text': ' '.join(texts),
|
|
'confidence': avg_confidence,
|
|
'results': parsed_results,
|
|
'image_size': image.size
|
|
}
|
|
|
|
def _ocr_tesseract(self, image: Image.Image) -> Dict[str, Any]:
|
|
"""OCR using Tesseract."""
|
|
import pytesseract
|
|
|
|
# Ensure grayscale for tesseract
|
|
if image.mode != 'L':
|
|
image = image.convert('L')
|
|
|
|
# Get full text
|
|
text = pytesseract.image_to_string(image).strip()
|
|
|
|
# Get detailed data
|
|
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
|
|
|
parsed_results = []
|
|
for i, word in enumerate(data['text']):
|
|
if word.strip():
|
|
conf = int(data['conf'][i])
|
|
if conf > 0: # Valid confidence
|
|
parsed_results.append(OCRResult(
|
|
text=word,
|
|
confidence=conf / 100.0,
|
|
bounding_box=(
|
|
data['left'][i],
|
|
data['top'][i],
|
|
data['width'][i],
|
|
data['height'][i]
|
|
),
|
|
raw_data={'block_num': data['block_num'][i]}
|
|
))
|
|
|
|
avg_confidence = sum(r.confidence for r in parsed_results) / len(parsed_results) if parsed_results else 0
|
|
|
|
return {
|
|
'text': text,
|
|
'confidence': avg_confidence,
|
|
'results': parsed_results,
|
|
'image_size': image.size
|
|
}
|
|
|
|
def _ocr_paddle(self, image: Image.Image) -> Dict[str, Any]:
|
|
"""OCR using PaddleOCR."""
|
|
image_np = np.array(image)
|
|
|
|
result = self._ocr_reader.ocr(image_np, cls=True)
|
|
|
|
texts = []
|
|
parsed_results = []
|
|
total_confidence = 0
|
|
|
|
if result and result[0]:
|
|
for line in result[0]:
|
|
bbox, (text, conf) = line
|
|
texts.append(text)
|
|
total_confidence += conf
|
|
|
|
# Parse bounding box
|
|
x_coords = [p[0] for p in bbox]
|
|
y_coords = [p[1] for p in bbox]
|
|
|
|
parsed_results.append(OCRResult(
|
|
text=text,
|
|
confidence=conf,
|
|
bounding_box=(
|
|
int(min(x_coords)),
|
|
int(min(y_coords)),
|
|
int(max(x_coords) - min(x_coords)),
|
|
int(max(y_coords) - min(y_coords))
|
|
),
|
|
raw_data={'bbox': bbox}
|
|
))
|
|
|
|
avg_confidence = total_confidence / len(parsed_results) if parsed_results else 0
|
|
|
|
return {
|
|
'text': ' '.join(texts),
|
|
'confidence': avg_confidence,
|
|
'results': parsed_results,
|
|
'image_size': image.size
|
|
}
|
|
|
|
def recognize_region(self, x: int, y: int, width: int, height: int,
|
|
use_cache: bool = True) -> Dict[str, Any]:
|
|
"""Convenience method for region OCR."""
|
|
return self.recognize(region=(x, y, width, height), use_cache=use_cache)
|
|
|
|
def find_text(self, target_text: str, image: Image.Image = None,
|
|
region: Tuple[int, int, int, int] = None) -> List[OCRResult]:
|
|
"""
|
|
Find specific text in image.
|
|
|
|
Returns list of OCRResult where target_text is found.
|
|
"""
|
|
result = self.recognize(image, region, use_cache=False)
|
|
matches = []
|
|
target_lower = target_text.lower()
|
|
|
|
for r in result.get('results', []):
|
|
if target_lower in r.text.lower():
|
|
matches.append(r)
|
|
|
|
return matches
|
|
|
|
def get_text_at_position(self, x: int, y: int, image: Image.Image = None) -> Optional[str]:
|
|
"""Get text at specific screen position."""
|
|
# Small region around point
|
|
region = (x - 50, y - 10, 100, 20)
|
|
result = self.recognize(image, region, use_cache=False)
|
|
return result.get('text') if result.get('text') else None
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get OCR service statistics."""
|
|
with self._stats_lock:
|
|
stats = self._stats.copy()
|
|
|
|
total = stats['cache_hits'] + stats['cache_misses']
|
|
stats['cache_hit_rate'] = (stats['cache_hits'] / total * 100) if total > 0 else 0
|
|
stats['avg_time_ms'] = (stats['total_time_ms'] / stats['total_requests']) if stats['total_requests'] > 0 else 0
|
|
stats['backend'] = self._backend
|
|
return stats
|
|
|
|
def clear_cache(self):
|
|
"""Clear the OCR result cache."""
|
|
self._cache.clear()
|
|
|
|
|
|
# Singleton instance
|
|
_ocr_service = None
|
|
_ocr_lock = threading.Lock()
|
|
|
|
def get_ocr_service() -> OCRService:
|
|
"""Get global OCRService instance."""
|
|
global _ocr_service
|
|
if _ocr_service is None:
|
|
with _ocr_lock:
|
|
if _ocr_service is None:
|
|
_ocr_service = OCRService()
|
|
return _ocr_service
|
|
|
|
|
|
# Convenience function for quick OCR
|
|
def quick_ocr(region: Tuple[int, int, int, int] = None, use_cache: bool = True) -> str:
|
|
"""
|
|
Quick OCR - capture and get text.
|
|
|
|
Usage:
|
|
text = quick_ocr() # Full screen
|
|
text = quick_ocr((100, 100, 200, 50)) # Region
|
|
"""
|
|
service = get_ocr_service()
|
|
result = service.recognize(region=region, use_cache=use_cache)
|
|
return result.get('text', '')
|