316 lines
11 KiB
Python
316 lines
11 KiB
Python
"""
|
|
Lemontropia Suite - OpenCV EAST OCR Backend
|
|
Fast text detection using OpenCV DNN with EAST model.
|
|
No heavy dependencies, works with Windows Store Python.
|
|
"""
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Tuple, Optional
|
|
import urllib.request
|
|
|
|
from . import BaseOCRBackend, OCRTextRegion
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OpenCVEASTBackend(BaseOCRBackend):
|
|
"""
|
|
Text detector using OpenCV DNN with EAST model.
|
|
|
|
This is the primary fallback backend because:
|
|
- Pure OpenCV, no PyTorch/TensorFlow dependencies
|
|
- Fast (CPU: ~23 FPS, GPU: ~97 FPS)
|
|
- Works with Windows Store Python
|
|
- Detects text regions (does not recognize text)
|
|
|
|
Based on: https://pyimagesearch.com/2022/03/14/improving-text-detection-speed-with-opencv-and-gpus/
|
|
"""
|
|
|
|
NAME = "opencv_east"
|
|
SUPPORTS_GPU = True
|
|
|
|
# EAST model download URL (frozen inference graph)
|
|
EAST_MODEL_URL = "https://github.com/oyyd/frozen_east_text_detection.pb/raw/master/frozen_east_text_detection.pb"
|
|
|
|
def __init__(self, use_gpu: bool = True, lang: str = 'en', **kwargs):
|
|
super().__init__(use_gpu=use_gpu, lang=lang, **kwargs)
|
|
|
|
self.net = None
|
|
self.model_path = kwargs.get('model_path')
|
|
|
|
# Input size (must be multiple of 32)
|
|
self.input_width = kwargs.get('input_width', 320)
|
|
self.input_height = kwargs.get('input_height', 320)
|
|
|
|
# Detection thresholds
|
|
self.confidence_threshold = kwargs.get('confidence_threshold', 0.5)
|
|
self.nms_threshold = kwargs.get('nms_threshold', 0.4)
|
|
|
|
# GPU status
|
|
self._gpu_enabled = False
|
|
|
|
def _initialize(self) -> bool:
|
|
"""Initialize EAST text detector."""
|
|
try:
|
|
# Determine model path
|
|
if not self.model_path:
|
|
model_dir = Path.home() / ".lemontropia" / "models"
|
|
model_dir.mkdir(parents=True, exist_ok=True)
|
|
self.model_path = str(model_dir / "frozen_east_text_detection.pb")
|
|
|
|
model_file = Path(self.model_path)
|
|
|
|
# Download model if needed
|
|
if not model_file.exists():
|
|
if not self._download_model():
|
|
return False
|
|
|
|
# Load the model
|
|
logger.info(f"Loading EAST model from {self.model_path}")
|
|
self.net = cv2.dnn.readNet(self.model_path)
|
|
|
|
# Enable GPU if requested
|
|
if self.use_gpu:
|
|
self._gpu_enabled = self._enable_gpu()
|
|
|
|
self._available = True
|
|
self._version = cv2.__version__
|
|
|
|
logger.info(f"OpenCV EAST backend initialized (GPU: {self._gpu_enabled})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._error_msg = f"Failed to initialize EAST: {e}"
|
|
logger.error(self._error_msg)
|
|
return False
|
|
|
|
def _download_model(self) -> bool:
|
|
"""Download EAST model if not present."""
|
|
try:
|
|
logger.info(f"Downloading EAST model from {self.EAST_MODEL_URL}")
|
|
logger.info(f"This is a one-time download (~95 MB)...")
|
|
|
|
# Create progress callback
|
|
def progress_hook(count, block_size, total_size):
|
|
percent = int(count * block_size * 100 / total_size)
|
|
if percent % 10 == 0: # Log every 10%
|
|
logger.info(f"Download progress: {percent}%")
|
|
|
|
urllib.request.urlretrieve(
|
|
self.EAST_MODEL_URL,
|
|
self.model_path,
|
|
reporthook=progress_hook
|
|
)
|
|
|
|
logger.info("EAST model downloaded successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
self._error_msg = f"Failed to download EAST model: {e}"
|
|
logger.error(self._error_msg)
|
|
return False
|
|
|
|
def _enable_gpu(self) -> bool:
|
|
"""Enable CUDA GPU acceleration."""
|
|
try:
|
|
# Check CUDA availability
|
|
cuda_count = cv2.cuda.getCudaEnabledDeviceCount()
|
|
|
|
if cuda_count > 0:
|
|
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
|
|
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
|
|
|
|
# Get device info
|
|
try:
|
|
device_name = cv2.cuda.getDevice().name()
|
|
logger.info(f"CUDA enabled: {device_name}")
|
|
except:
|
|
logger.info(f"CUDA enabled ({cuda_count} device(s))")
|
|
|
|
return True
|
|
else:
|
|
logger.warning("CUDA not available in OpenCV, using CPU")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to enable CUDA: {e}, using CPU")
|
|
return False
|
|
|
|
def extract_text(self, image: np.ndarray) -> List[OCRTextRegion]:
|
|
"""
|
|
Detect text regions in image.
|
|
|
|
Note: EAST only detects text regions, it does not recognize text.
|
|
The 'text' field will be empty, but bbox and confidence are accurate.
|
|
|
|
Args:
|
|
image: Input image (BGR format from OpenCV)
|
|
|
|
Returns:
|
|
List of detected text regions
|
|
"""
|
|
if not self._available or self.net is None:
|
|
logger.error("EAST backend not initialized")
|
|
return []
|
|
|
|
try:
|
|
# Get image dimensions
|
|
(H, W) = image.shape[:2]
|
|
|
|
# Resize to input size
|
|
resized = cv2.resize(image, (self.input_width, self.input_height))
|
|
|
|
# Create blob from image
|
|
blob = cv2.dnn.blobFromImage(
|
|
resized,
|
|
scalefactor=1.0,
|
|
size=(self.input_width, self.input_height),
|
|
mean=(123.68, 116.78, 103.94), # ImageNet means
|
|
swapRB=True,
|
|
crop=False
|
|
)
|
|
|
|
# Forward pass
|
|
self.net.setInput(blob)
|
|
layer_names = [
|
|
"feature_fusion/Conv_7/Sigmoid", # Scores
|
|
"feature_fusion/concat_3" # Geometry
|
|
]
|
|
scores, geometry = self.net.forward(layer_names)
|
|
|
|
# Decode predictions
|
|
rectangles, confidences = self._decode_predictions(scores, geometry)
|
|
|
|
# Apply non-maximum suppression
|
|
boxes = self._apply_nms(rectangles, confidences)
|
|
|
|
# Scale boxes back to original image size
|
|
ratio_w = W / float(self.input_width)
|
|
ratio_h = H / float(self.input_height)
|
|
|
|
regions = []
|
|
for (startX, startY, endX, endY, conf) in boxes:
|
|
# Scale coordinates
|
|
startX = int(startX * ratio_w)
|
|
startY = int(startY * ratio_h)
|
|
endX = int(endX * ratio_w)
|
|
endY = int(endY * ratio_h)
|
|
|
|
# Ensure valid coordinates
|
|
startX = max(0, startX)
|
|
startY = max(0, startY)
|
|
endX = min(W, endX)
|
|
endY = min(H, endY)
|
|
|
|
w = endX - startX
|
|
h = endY - startY
|
|
|
|
if w > 0 and h > 0:
|
|
regions.append(OCRTextRegion(
|
|
text="", # EAST doesn't recognize text
|
|
confidence=float(conf),
|
|
bbox=(startX, startY, w, h),
|
|
language=self.lang
|
|
))
|
|
|
|
logger.debug(f"EAST detected {len(regions)} text regions")
|
|
return regions
|
|
|
|
except Exception as e:
|
|
logger.error(f"EAST detection failed: {e}")
|
|
return []
|
|
|
|
def _decode_predictions(self, scores: np.ndarray,
|
|
geometry: np.ndarray) -> Tuple[List, List]:
|
|
"""Decode EAST model output to bounding boxes."""
|
|
(num_rows, num_cols) = scores.shape[2:4]
|
|
rectangles = []
|
|
confidences = []
|
|
|
|
for y in range(0, num_rows):
|
|
scores_data = scores[0, 0, y]
|
|
x0 = geometry[0, 0, y]
|
|
x1 = geometry[0, 1, y]
|
|
x2 = geometry[0, 2, y]
|
|
x3 = geometry[0, 3, y]
|
|
angles = geometry[0, 4, y]
|
|
|
|
for x in range(0, num_cols):
|
|
if scores_data[x] < self.confidence_threshold:
|
|
continue
|
|
|
|
# Compute offset
|
|
offset_x = x * 4.0
|
|
offset_y = y * 4.0
|
|
|
|
# Extract rotation angle and compute cos/sin
|
|
angle = angles[x]
|
|
cos = np.cos(angle)
|
|
sin = np.sin(angle)
|
|
|
|
# Compute box dimensions
|
|
h = x0[x] + x2[x]
|
|
w = x1[x] + x3[x]
|
|
|
|
# Compute box coordinates
|
|
end_x = int(offset_x + (cos * x1[x]) + (sin * x2[x]))
|
|
end_y = int(offset_y - (sin * x1[x]) + (cos * x2[x]))
|
|
start_x = int(end_x - w)
|
|
start_y = int(end_y - h)
|
|
|
|
rectangles.append((start_x, start_y, end_x, end_y))
|
|
confidences.append(scores_data[x])
|
|
|
|
return rectangles, confidences
|
|
|
|
def _apply_nms(self, rectangles: List, confidences: List) -> List[Tuple]:
|
|
"""Apply non-maximum suppression."""
|
|
if not rectangles:
|
|
return []
|
|
|
|
# Convert to float32 for NMS
|
|
boxes = np.array(rectangles, dtype=np.float32)
|
|
confidences = np.array(confidences, dtype=np.float32)
|
|
|
|
# OpenCV NMSBoxes expects (x, y, w, h) format
|
|
nms_boxes = []
|
|
for (x1, y1, x2, y2) in boxes:
|
|
nms_boxes.append([x1, y1, x2 - x1, y2 - y1])
|
|
|
|
# Apply NMS
|
|
indices = cv2.dnn.NMSBoxes(
|
|
nms_boxes,
|
|
confidences,
|
|
self.confidence_threshold,
|
|
self.nms_threshold
|
|
)
|
|
|
|
results = []
|
|
if len(indices) > 0:
|
|
# Handle different OpenCV versions
|
|
if isinstance(indices, tuple):
|
|
indices = indices[0]
|
|
|
|
for i in indices.flatten() if hasattr(indices, 'flatten') else indices:
|
|
x1, y1, x2, y2 = rectangles[i]
|
|
results.append((x1, y1, x2, y2, confidences[i]))
|
|
|
|
return results
|
|
|
|
def get_info(self):
|
|
"""Get backend information."""
|
|
info = super().get_info()
|
|
info.gpu_accelerated = self._gpu_enabled
|
|
return info
|
|
|
|
@staticmethod
|
|
def is_opencv_cuda_available() -> bool:
|
|
"""Check if OpenCV was built with CUDA support."""
|
|
try:
|
|
return cv2.cuda.getCudaEnabledDeviceCount() > 0
|
|
except:
|
|
return False
|