feat: add OpenCV GPU text detection as fallback to PaddleOCR
Based on PyImageSearch article: https://pyimagesearch.com/2022/03/14/improving-text-detection-speed-with-opencv-and-gpus/ - Add opencv_text_detector.py using EAST model with CUDA support - OCRProcessor now uses PaddleOCR first, falls back to OpenCV - OpenCV text detection: ~97 FPS on GPU vs ~23 FPS on CPU (4x faster) - No heavy dependencies (PyTorch/PaddlePaddle) required for OpenCV mode - Auto-downloads EAST model on first use
This commit is contained in:
parent
786b292fcd
commit
1ff64ac53f
|
|
@ -26,6 +26,18 @@ except Exception as e:
|
|||
TORCH_AVAILABLE = False
|
||||
torch = None
|
||||
|
||||
# Import OpenCV text detector as fallback
|
||||
from .opencv_text_detector import OpenCVTextDetector, TextDetection as OpenCVTextDetection
|
||||
|
||||
# Optional PaddleOCR import with fallback
|
||||
try:
|
||||
from paddleocr import PaddleOCR
|
||||
PADDLE_AVAILABLE = True
|
||||
except Exception as e:
|
||||
logger.warning(f"PaddleOCR not available: {e}")
|
||||
PADDLE_AVAILABLE = False
|
||||
PaddleOCR = None
|
||||
|
||||
|
||||
class GPUBackend(Enum):
|
||||
"""Supported GPU backends."""
|
||||
|
|
@ -166,7 +178,7 @@ class GPUDetector:
|
|||
|
||||
|
||||
class OCRProcessor:
|
||||
"""OCR text extraction using PaddleOCR with GPU support."""
|
||||
"""OCR text extraction using PaddleOCR or OpenCV fallback with GPU support."""
|
||||
|
||||
SUPPORTED_LANGUAGES = ['en', 'sv', 'latin'] # English, Swedish, Latin script
|
||||
|
||||
|
|
@ -175,13 +187,34 @@ class OCRProcessor:
|
|||
self.lang = lang if lang in self.SUPPORTED_LANGUAGES else 'en'
|
||||
self.ocr = None
|
||||
self.backend = GPUBackend.CPU
|
||||
self.opencv_detector = None
|
||||
self._primary_backend = None # 'paddle' or 'opencv'
|
||||
self._init_ocr()
|
||||
|
||||
def _init_ocr(self):
|
||||
"""Initialize PaddleOCR with appropriate backend."""
|
||||
"""Initialize OCR with PaddleOCR or OpenCV fallback."""
|
||||
# Try PaddleOCR first (better accuracy)
|
||||
if PADDLE_AVAILABLE:
|
||||
try:
|
||||
from paddleocr import PaddleOCR
|
||||
self._init_paddle()
|
||||
if self.ocr is not None:
|
||||
self._primary_backend = 'paddle'
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"PaddleOCR init failed: {e}")
|
||||
|
||||
# Fallback to OpenCV text detection
|
||||
logger.info("Using OpenCV text detection as fallback")
|
||||
self.opencv_detector = OpenCVTextDetector(use_gpu=self.use_gpu)
|
||||
if self.opencv_detector.is_available():
|
||||
self._primary_backend = 'opencv'
|
||||
self.backend = GPUBackend.CUDA if self.opencv_detector.check_gpu_available() else GPUBackend.CPU
|
||||
logger.info(f"OpenCV text detector ready (GPU: {self.backend == GPUBackend.CUDA})")
|
||||
else:
|
||||
logger.error("No OCR backend available")
|
||||
|
||||
def _init_paddle(self):
|
||||
"""Initialize PaddleOCR with appropriate backend."""
|
||||
# Detect GPU
|
||||
if self.use_gpu:
|
||||
self.backend = GPUDetector.detect_backend()
|
||||
|
|
@ -211,13 +244,6 @@ class OCRProcessor:
|
|||
|
||||
logger.info(f"PaddleOCR initialized successfully (backend: {self.backend.value})")
|
||||
|
||||
except ImportError:
|
||||
logger.error("PaddleOCR not installed. Install with: pip install paddleocr")
|
||||
self.ocr = None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize PaddleOCR: {e}")
|
||||
self.ocr = None
|
||||
|
||||
def preprocess_for_ocr(self, image: np.ndarray) -> np.ndarray:
|
||||
"""Preprocess image for better OCR results."""
|
||||
# Convert to grayscale if needed
|
||||
|
|
@ -239,7 +265,7 @@ class OCRProcessor:
|
|||
|
||||
def extract_text(self, image: Union[str, np.ndarray, Path]) -> List[TextRegion]:
|
||||
"""
|
||||
Extract text from image.
|
||||
Extract text from image using PaddleOCR or OpenCV fallback.
|
||||
|
||||
Args:
|
||||
image: Image path or numpy array
|
||||
|
|
@ -247,10 +273,6 @@ class OCRProcessor:
|
|||
Returns:
|
||||
List of detected text regions
|
||||
"""
|
||||
if self.ocr is None:
|
||||
logger.warning("OCR not available")
|
||||
return []
|
||||
|
||||
# Load image if path provided
|
||||
if isinstance(image, (str, Path)):
|
||||
img = cv2.imread(str(image))
|
||||
|
|
@ -260,6 +282,33 @@ class OCRProcessor:
|
|||
else:
|
||||
img = image.copy()
|
||||
|
||||
# Use appropriate backend
|
||||
if self._primary_backend == 'paddle' and self.ocr is not None:
|
||||
return self._extract_text_paddle(img)
|
||||
elif self._primary_backend == 'opencv' and self.opencv_detector is not None:
|
||||
return self._extract_text_opencv(img)
|
||||
else:
|
||||
logger.warning("No OCR backend available")
|
||||
return []
|
||||
|
||||
def _extract_text_opencv(self, img: np.ndarray) -> List[TextRegion]:
|
||||
"""Extract text using OpenCV EAST detector."""
|
||||
detections = self.opencv_detector.detect_text(img)
|
||||
|
||||
# Convert to TextRegion format (no text recognition, just detection)
|
||||
regions = []
|
||||
for det in detections:
|
||||
regions.append(TextRegion(
|
||||
text="", # OpenCV detector doesn't recognize text, just finds regions
|
||||
confidence=det.confidence,
|
||||
bbox=det.bbox,
|
||||
language=self.lang
|
||||
))
|
||||
|
||||
return regions
|
||||
|
||||
def _extract_text_paddle(self, img: np.ndarray) -> List[TextRegion]:
|
||||
"""Extract text using PaddleOCR."""
|
||||
# Preprocess
|
||||
processed = self.preprocess_for_ocr(img)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,293 @@
|
|||
"""
|
||||
Lemontropia Suite - OpenCV GPU Text Detector
|
||||
Alternative to PaddleOCR using OpenCV DNN with CUDA support.
|
||||
Faster, simpler, no heavy dependencies.
|
||||
|
||||
Based on: https://pyimagesearch.com/2022/03/14/improving-text-detection-speed-with-opencv-and-gpus/
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class TextDetection:
|
||||
"""Detected text region."""
|
||||
text: str # Recognized text (may be empty if detection only)
|
||||
confidence: float
|
||||
bbox: Tuple[int, int, int, int] # x, y, w, h
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
'text': self.text,
|
||||
'confidence': self.confidence,
|
||||
'bbox': self.bbox
|
||||
}
|
||||
|
||||
|
||||
class OpenCVTextDetector:
|
||||
"""
|
||||
Text detector using OpenCV DNN with optional GPU acceleration.
|
||||
|
||||
Uses EAST (Efficient and Accurate Scene Text) detection model.
|
||||
Much faster than PaddleOCR and has fewer dependencies.
|
||||
|
||||
Performance (from PyImageSearch):
|
||||
- CPU: ~23 FPS
|
||||
- GPU: ~97 FPS (4x faster!)
|
||||
"""
|
||||
|
||||
# EAST model download URL
|
||||
EAST_MODEL_URL = "https://github.com/oyyd/frozen_east_text_detection.pb/raw/master/frozen_east_text_detection.pb"
|
||||
|
||||
def __init__(self, model_path: Optional[str] = None, use_gpu: bool = True):
|
||||
"""
|
||||
Initialize OpenCV text detector.
|
||||
|
||||
Args:
|
||||
model_path: Path to frozen_east_text_detection.pb model
|
||||
use_gpu: Whether to use CUDA GPU acceleration
|
||||
"""
|
||||
self.use_gpu = use_gpu
|
||||
self.net = None
|
||||
self.model_path = model_path
|
||||
self._model_loaded = False
|
||||
|
||||
# Default input size (must be multiple of 32)
|
||||
self.input_width = 320
|
||||
self.input_height = 320
|
||||
|
||||
# Detection thresholds
|
||||
self.confidence_threshold = 0.5
|
||||
self.nms_threshold = 0.4
|
||||
|
||||
self._load_model()
|
||||
|
||||
def _load_model(self) -> bool:
|
||||
"""Load EAST text detection model."""
|
||||
try:
|
||||
# Default model location
|
||||
if not self.model_path:
|
||||
model_dir = Path(__file__).parent.parent / "data" / "models"
|
||||
model_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.model_path = str(model_dir / "frozen_east_text_detection.pb")
|
||||
|
||||
model_file = Path(self.model_path)
|
||||
|
||||
# Download if not exists
|
||||
if not model_file.exists():
|
||||
logger.info(f"EAST model not found, downloading...")
|
||||
self._download_model()
|
||||
|
||||
# Load model
|
||||
logger.info(f"Loading EAST text detector from {self.model_path}")
|
||||
self.net = cv2.dnn.readNet(self.model_path)
|
||||
|
||||
# Enable GPU if requested and available
|
||||
if self.use_gpu:
|
||||
self._enable_gpu()
|
||||
|
||||
self._model_loaded = True
|
||||
logger.info("EAST text detector loaded successfully")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load EAST model: {e}")
|
||||
return False
|
||||
|
||||
def _download_model(self) -> bool:
|
||||
"""Download EAST model if not present."""
|
||||
try:
|
||||
import urllib.request
|
||||
|
||||
logger.info(f"Downloading EAST model to {self.model_path}")
|
||||
urllib.request.urlretrieve(self.EAST_MODEL_URL, self.model_path)
|
||||
logger.info("EAST model downloaded successfully")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download EAST model: {e}")
|
||||
return False
|
||||
|
||||
def _enable_gpu(self) -> bool:
|
||||
"""Enable CUDA GPU acceleration."""
|
||||
try:
|
||||
# Check if CUDA is available in OpenCV
|
||||
if cv2.cuda.getCudaEnabledDeviceCount() > 0:
|
||||
logger.info("Enabling CUDA GPU acceleration for text detection")
|
||||
self.net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
|
||||
self.net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
|
||||
return True
|
||||
else:
|
||||
logger.warning("CUDA not available, using CPU")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to enable GPU: {e}, using CPU")
|
||||
return False
|
||||
|
||||
def detect_text(self, image: np.ndarray) -> List[TextDetection]:
|
||||
"""
|
||||
Detect text regions in image.
|
||||
|
||||
Args:
|
||||
image: Input image (BGR format)
|
||||
|
||||
Returns:
|
||||
List of detected text regions
|
||||
"""
|
||||
if not self._model_loaded:
|
||||
logger.error("Model not loaded, cannot detect text")
|
||||
return []
|
||||
|
||||
try:
|
||||
# Get image dimensions
|
||||
(H, W) = image.shape[:2]
|
||||
|
||||
# Resize image to multiple of 32
|
||||
(newW, newH) = (self.input_width, self.input_height)
|
||||
rW = W / float(newW)
|
||||
rH = H / float(newH)
|
||||
|
||||
# Resize and prepare blob
|
||||
resized = cv2.resize(image, (newW, newH))
|
||||
blob = cv2.dnn.blobFromImage(resized, 1.0, (newW, newH),
|
||||
(123.68, 116.78, 103.94),
|
||||
swapRB=True, crop=False)
|
||||
|
||||
# Forward pass
|
||||
self.net.setInput(blob)
|
||||
(scores, geometry) = self.net.forward(self._get_output_layers())
|
||||
|
||||
# Decode predictions
|
||||
(rects, confidences) = self._decode_predictions(scores, geometry)
|
||||
|
||||
# Apply non-maxima suppression
|
||||
boxes = self._apply_nms(rects, confidences, rW, rH)
|
||||
|
||||
# Create detection objects
|
||||
detections = []
|
||||
for (startX, startY, endX, endY, conf) in boxes:
|
||||
detections.append(TextDetection(
|
||||
text="", # EAST only detects, doesn't recognize
|
||||
confidence=conf,
|
||||
bbox=(int(startX), int(startY), int(endX - startX), int(endY - startY))
|
||||
))
|
||||
|
||||
return detections
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Text detection failed: {e}")
|
||||
return []
|
||||
|
||||
def _get_output_layers(self) -> List[str]:
|
||||
"""Get EAST model output layer names."""
|
||||
layerNames = [
|
||||
"feature_fusion/Conv_7/Sigmoid", # scores
|
||||
"feature_fusion/concat_3" # geometry
|
||||
]
|
||||
return layerNames
|
||||
|
||||
def _decode_predictions(self, scores: np.ndarray, geometry: np.ndarray) -> Tuple[List, List]:
|
||||
"""Decode EAST model output to bounding boxes."""
|
||||
(numRows, numCols) = scores.shape[2:4]
|
||||
rects = []
|
||||
confidences = []
|
||||
|
||||
for y in range(0, numRows):
|
||||
scoresData = scores[0, 0, y]
|
||||
xData0 = geometry[0, 0, y]
|
||||
xData1 = geometry[0, 1, y]
|
||||
xData2 = geometry[0, 2, y]
|
||||
xData3 = geometry[0, 3, y]
|
||||
anglesData = geometry[0, 4, y]
|
||||
|
||||
for x in range(0, numCols):
|
||||
if scoresData[x] < self.confidence_threshold:
|
||||
continue
|
||||
|
||||
# Compute offset factor
|
||||
(offsetX, offsetY) = (x * 4.0, y * 4.0)
|
||||
|
||||
# Extract rotation angle
|
||||
angle = anglesData[x]
|
||||
cos = np.cos(angle)
|
||||
sin = np.sin(angle)
|
||||
|
||||
# Compute bounding box dimensions
|
||||
h = xData0[x] + xData2[x]
|
||||
w = xData1[x] + xData3[x]
|
||||
|
||||
# Compute bounding box coordinates
|
||||
endX = int(offsetX + (cos * xData1[x]) + (sin * xData2[x]))
|
||||
endY = int(offsetY - (sin * xData1[x]) + (cos * xData2[x]))
|
||||
startX = int(endX - w)
|
||||
startY = int(endY - h)
|
||||
|
||||
rects.append((startX, startY, endX, endY))
|
||||
confidences.append(float(scoresData[x]))
|
||||
|
||||
return (rects, confidences)
|
||||
|
||||
def _apply_nms(self, rects: List, confidences: List,
|
||||
rW: float, rH: float) -> List[Tuple]:
|
||||
"""Apply non-maximum suppression and scale boxes."""
|
||||
# Convert to numpy arrays
|
||||
boxes = np.array(rects)
|
||||
|
||||
# Apply NMS
|
||||
indices = cv2.dnn.NMSBoxesRotated(
|
||||
[((0, 0), 0, 0)] * len(boxes), # dummy rotated boxes
|
||||
confidences,
|
||||
self.confidence_threshold,
|
||||
self.nms_threshold
|
||||
)
|
||||
|
||||
# Scale boxes back to original image size
|
||||
results = []
|
||||
if len(indices) > 0:
|
||||
for i in indices.flatten():
|
||||
(startX, startY, endX, endY) = boxes[i]
|
||||
|
||||
# Scale coordinates
|
||||
startX = int(startX * rW)
|
||||
startY = int(startY * rH)
|
||||
endX = int(endX * rW)
|
||||
endY = int(endY * rH)
|
||||
|
||||
results.append((startX, startY, endX, endY, confidences[i]))
|
||||
|
||||
return results
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if detector is available."""
|
||||
return self._model_loaded
|
||||
|
||||
@staticmethod
|
||||
def check_gpu_available() -> bool:
|
||||
"""Check if CUDA GPU is available in OpenCV."""
|
||||
try:
|
||||
return cv2.cuda.getCudaEnabledDeviceCount() > 0
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
# Convenience function for quick text detection
|
||||
def detect_text_opencv(image: np.ndarray, use_gpu: bool = True) -> List[TextDetection]:
|
||||
"""
|
||||
Quick text detection using OpenCV DNN.
|
||||
|
||||
Args:
|
||||
image: Input image
|
||||
use_gpu: Use GPU acceleration if available
|
||||
|
||||
Returns:
|
||||
List of detected text regions
|
||||
"""
|
||||
detector = OpenCVTextDetector(use_gpu=use_gpu)
|
||||
return detector.detect_text(image)
|
||||
Loading…
Reference in New Issue