""" Lemontropia Suite - OpenCV EAST OCR Backend Fast text detection using OpenCV DNN with EAST model. No heavy dependencies, works with Windows Store Python. """ import cv2 import numpy as np import logging from pathlib import Path from typing import List, Tuple, Optional import urllib.request from . import BaseOCRBackend, OCRTextRegion logger = logging.getLogger(__name__) class OpenCVEASTBackend(BaseOCRBackend): """ Text detector using OpenCV DNN with EAST model. This is the primary fallback backend because: - Pure OpenCV, no PyTorch/TensorFlow dependencies - Fast (CPU: ~23 FPS, GPU: ~97 FPS) - Works with Windows Store Python - Detects text regions (does not recognize text) Based on: https://pyimagesearch.com/2022/03/14/improving-text-detection-speed-with-opencv-and-gpus/ """ NAME = "opencv_east" SUPPORTS_GPU = True # EAST model download URL (frozen inference graph) EAST_MODEL_URL = "https://github.com/oyyd/frozen_east_text_detection.pb/raw/master/frozen_east_text_detection.pb" def __init__(self, use_gpu: bool = True, lang: str = 'en', **kwargs): super().__init__(use_gpu=use_gpu, lang=lang, **kwargs) self.net = None self.model_path = kwargs.get('model_path') # Input size (must be multiple of 32) self.input_width = kwargs.get('input_width', 320) self.input_height = kwargs.get('input_height', 320) # Detection thresholds self.confidence_threshold = kwargs.get('confidence_threshold', 0.5) self.nms_threshold = kwargs.get('nms_threshold', 0.4) # GPU status self._gpu_enabled = False def _initialize(self) -> bool: """Initialize EAST text detector.""" try: # Determine model path if not self.model_path: model_dir = Path.home() / ".lemontropia" / "models" model_dir.mkdir(parents=True, exist_ok=True) self.model_path = str(model_dir / "frozen_east_text_detection.pb") model_file = Path(self.model_path) # Download model if needed if not model_file.exists(): if not self._download_model(): return False # Load the model logger.info(f"Loading EAST model from {self.model_path}") self.net = cv2.dnn.readNet(self.model_path) # Enable GPU if requested if self.use_gpu: self._gpu_enabled = self._enable_gpu() self._available = True self._version = cv2.__version__ logger.info(f"OpenCV EAST backend initialized (GPU: {self._gpu_enabled})") return True except Exception as e: self._error_msg = f"Failed to initialize EAST: {e}" logger.error(self._error_msg) return False def _download_model(self) -> bool: """Download EAST model if not present.""" try: logger.info(f"Downloading EAST model from {self.EAST_MODEL_URL}") logger.info(f"This is a one-time download (~95 MB)...") # Create progress callback def progress_hook(count, block_size, total_size): percent = int(count * block_size * 100 / total_size) if percent % 10 == 0: # Log every 10% logger.info(f"Download progress: {percent}%") urllib.request.urlretrieve( self.EAST_MODEL_URL, self.model_path, reporthook=progress_hook ) logger.info("EAST model downloaded successfully") return True except Exception as e: self._error_msg = f"Failed to download EAST model: {e}" logger.error(self._error_msg) return False def _enable_gpu(self) -> bool: """Enable CUDA GPU acceleration.""" try: # Check CUDA availability cuda_count = cv2.cuda.getCudaEnabledDeviceCount() if cuda_count > 0: self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA) self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA) # Get device info try: device_name = cv2.cuda.getDevice().name() logger.info(f"CUDA enabled: {device_name}") except: logger.info(f"CUDA enabled ({cuda_count} device(s))") return True else: logger.warning("CUDA not available in OpenCV, using CPU") return False except Exception as e: logger.warning(f"Failed to enable CUDA: {e}, using CPU") return False def extract_text(self, image: np.ndarray) -> List[OCRTextRegion]: """ Detect text regions in image. Note: EAST only detects text regions, it does not recognize text. The 'text' field will be empty, but bbox and confidence are accurate. Args: image: Input image (BGR format from OpenCV) Returns: List of detected text regions """ if not self._available or self.net is None: logger.error("EAST backend not initialized") return [] try: # Get image dimensions (H, W) = image.shape[:2] # Resize to input size resized = cv2.resize(image, (self.input_width, self.input_height)) # Create blob from image blob = cv2.dnn.blobFromImage( resized, scalefactor=1.0, size=(self.input_width, self.input_height), mean=(123.68, 116.78, 103.94), # ImageNet means swapRB=True, crop=False ) # Forward pass self.net.setInput(blob) layer_names = [ "feature_fusion/Conv_7/Sigmoid", # Scores "feature_fusion/concat_3" # Geometry ] scores, geometry = self.net.forward(layer_names) # Decode predictions rectangles, confidences = self._decode_predictions(scores, geometry) # Apply non-maximum suppression boxes = self._apply_nms(rectangles, confidences) # Scale boxes back to original image size ratio_w = W / float(self.input_width) ratio_h = H / float(self.input_height) regions = [] for (startX, startY, endX, endY, conf) in boxes: # Scale coordinates startX = int(startX * ratio_w) startY = int(startY * ratio_h) endX = int(endX * ratio_w) endY = int(endY * ratio_h) # Ensure valid coordinates startX = max(0, startX) startY = max(0, startY) endX = min(W, endX) endY = min(H, endY) w = endX - startX h = endY - startY if w > 0 and h > 0: regions.append(OCRTextRegion( text="", # EAST doesn't recognize text confidence=float(conf), bbox=(startX, startY, w, h), language=self.lang )) logger.debug(f"EAST detected {len(regions)} text regions") return regions except Exception as e: logger.error(f"EAST detection failed: {e}") return [] def _decode_predictions(self, scores: np.ndarray, geometry: np.ndarray) -> Tuple[List, List]: """Decode EAST model output to bounding boxes.""" (num_rows, num_cols) = scores.shape[2:4] rectangles = [] confidences = [] for y in range(0, num_rows): scores_data = scores[0, 0, y] x0 = geometry[0, 0, y] x1 = geometry[0, 1, y] x2 = geometry[0, 2, y] x3 = geometry[0, 3, y] angles = geometry[0, 4, y] for x in range(0, num_cols): if scores_data[x] < self.confidence_threshold: continue # Compute offset offset_x = x * 4.0 offset_y = y * 4.0 # Extract rotation angle and compute cos/sin angle = angles[x] cos = np.cos(angle) sin = np.sin(angle) # Compute box dimensions h = x0[x] + x2[x] w = x1[x] + x3[x] # Compute box coordinates end_x = int(offset_x + (cos * x1[x]) + (sin * x2[x])) end_y = int(offset_y - (sin * x1[x]) + (cos * x2[x])) start_x = int(end_x - w) start_y = int(end_y - h) rectangles.append((start_x, start_y, end_x, end_y)) confidences.append(scores_data[x]) return rectangles, confidences def _apply_nms(self, rectangles: List, confidences: List) -> List[Tuple]: """Apply non-maximum suppression.""" if not rectangles: return [] # Convert to float32 for NMS boxes = np.array(rectangles, dtype=np.float32) confidences = np.array(confidences, dtype=np.float32) # OpenCV NMSBoxes expects (x, y, w, h) format nms_boxes = [] for (x1, y1, x2, y2) in boxes: nms_boxes.append([x1, y1, x2 - x1, y2 - y1]) # Apply NMS indices = cv2.dnn.NMSBoxes( nms_boxes, confidences, self.confidence_threshold, self.nms_threshold ) results = [] if len(indices) > 0: # Handle different OpenCV versions if isinstance(indices, tuple): indices = indices[0] for i in indices.flatten() if hasattr(indices, 'flatten') else indices: x1, y1, x2, y2 = rectangles[i] results.append((x1, y1, x2, y2, confidences[i])) return results def get_info(self): """Get backend information.""" info = super().get_info() info.gpu_accelerated = self._gpu_enabled return info @staticmethod def is_opencv_cuda_available() -> bool: """Check if OpenCV was built with CUDA support.""" try: return cv2.cuda.getCudaEnabledDeviceCount() > 0 except: return False