feat: Core OCR and Log services with API integration
CORE SERVICES (not plugins): - core/log_reader.py - LogReader class with real-time monitoring - core/ocr_service.py - OCRService with multi-backend support API INTEGRATION: - PluginAPI.register_log_service() / read_log() - PluginAPI.register_ocr_service() / ocr_capture() - Services initialized in main.py on startup - Auto-registered with PluginAPI for plugin access PLUGIN UPDATES: - Skill Scanner now uses core services - Shows service status in UI - Falls back gracefully if services unavailable BACKEND CHAIN: OCR: EasyOCR -> Tesseract -> PaddleOCR (auto-fallback) Log: Watches Documents/Entropia Universe/chat.log
This commit is contained in:
parent
8ee0e5606d
commit
f6c4971826
|
|
@ -0,0 +1,254 @@
|
|||
"""
|
||||
EU-Utility - Log Reader Core Service
|
||||
|
||||
Real-time log file monitoring and parsing for Entropia Universe.
|
||||
Part of core - not a plugin. Plugins access via PluginAPI.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Callable, Optional
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogEvent:
|
||||
"""Represents a parsed log event."""
|
||||
timestamp: datetime
|
||||
raw_line: str
|
||||
event_type: str
|
||||
data: Dict = field(default_factory=dict)
|
||||
|
||||
|
||||
class LogReader:
|
||||
"""
|
||||
Core service for reading and parsing EU chat.log.
|
||||
Runs in background thread, notifies subscribers of events.
|
||||
"""
|
||||
|
||||
# Log file patterns
|
||||
LOG_PATHS = [
|
||||
Path.home() / "Documents" / "Entropia Universe" / "chat.log",
|
||||
Path.home() / "Documents" / "Entropia Universe" / "Logs" / "chat.log",
|
||||
Path.home() / "Entropia Universe" / "chat.log",
|
||||
]
|
||||
|
||||
# Event patterns for parsing
|
||||
PATTERNS = {
|
||||
'skill_gain': re.compile(
|
||||
r'(.+?)\s+has\s+improved\s+by\s+(\d+\.?\d*)\s+points?',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'loot': re.compile(
|
||||
r'You\s+received\s+(.+?)\s+x\s*(\d+)',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'global': re.compile(
|
||||
r'(\w+)\s+received\s+.+?\s+from\s+(\w+)\s+worth\s+(\d+)\s+PED',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'damage': re.compile(
|
||||
r'You\s+(?:hit|inflicted)\s+(\d+)\s+damage',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'damage_taken': re.compile(
|
||||
r'You\s+were\s+hit\s+for\s+(\d+)\s+damage',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'heal': re.compile(
|
||||
r'You\s+(?:healed|restored)\s+(\d+)\s+(?:health|points)',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'mission_complete': re.compile(
|
||||
r'Mission\s+completed:\s+(.+)',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'tier_increase': re.compile(
|
||||
r'Your\s+(.+?)\s+has\s+reached\s+tier\s+(\d+)',
|
||||
re.IGNORECASE
|
||||
),
|
||||
'enhancer_break': re.compile(
|
||||
r'Your\s+(.+?)\s+broke',
|
||||
re.IGNORECASE
|
||||
),
|
||||
}
|
||||
|
||||
def __init__(self, log_path: Path = None):
|
||||
self.log_path = log_path or self._find_log_file()
|
||||
self.running = False
|
||||
self.thread = None
|
||||
self.last_position = 0
|
||||
|
||||
# Subscribers: {event_type: [callbacks]}
|
||||
self._subscribers: Dict[str, List[Callable]] = {}
|
||||
self._any_subscribers: List[Callable] = []
|
||||
|
||||
# Cache recent lines
|
||||
self._recent_lines: List[str] = []
|
||||
self._max_cache = 1000
|
||||
|
||||
# Stats
|
||||
self.stats = {
|
||||
'lines_read': 0,
|
||||
'events_parsed': 0,
|
||||
'start_time': None
|
||||
}
|
||||
|
||||
def _find_log_file(self) -> Optional[Path]:
|
||||
"""Find EU chat.log file."""
|
||||
for path in self.LOG_PATHS:
|
||||
if path.exists():
|
||||
return path
|
||||
return None
|
||||
|
||||
def start(self) -> bool:
|
||||
"""Start log monitoring in background thread."""
|
||||
if not self.log_path or not self.log_path.exists():
|
||||
print(f"[LogReader] Log file not found. Tried: {self.LOG_PATHS}")
|
||||
return False
|
||||
|
||||
self.running = True
|
||||
self.stats['start_time'] = datetime.now()
|
||||
|
||||
# Start at end of file (don't process old lines)
|
||||
self.last_position = self.log_path.stat().st_size
|
||||
|
||||
self.thread = threading.Thread(target=self._watch_loop, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
print(f"[LogReader] Started watching: {self.log_path}")
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
"""Stop log monitoring."""
|
||||
self.running = False
|
||||
if self.thread:
|
||||
self.thread.join(timeout=1.0)
|
||||
print("[LogReader] Stopped")
|
||||
|
||||
def _watch_loop(self):
|
||||
"""Main watching loop."""
|
||||
while self.running:
|
||||
try:
|
||||
self._check_for_new_lines()
|
||||
except Exception as e:
|
||||
print(f"[LogReader] Error: {e}")
|
||||
|
||||
time.sleep(0.5) # 500ms poll interval
|
||||
|
||||
def _check_for_new_lines(self):
|
||||
"""Check for and process new log lines."""
|
||||
current_size = self.log_path.stat().st_size
|
||||
|
||||
if current_size < self.last_position:
|
||||
# Log was rotated/truncated
|
||||
self.last_position = 0
|
||||
|
||||
if current_size == self.last_position:
|
||||
return
|
||||
|
||||
with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
f.seek(self.last_position)
|
||||
new_lines = f.readlines()
|
||||
self.last_position = f.tell()
|
||||
|
||||
for line in new_lines:
|
||||
line = line.strip()
|
||||
if line:
|
||||
self._process_line(line)
|
||||
|
||||
def _process_line(self, line: str):
|
||||
"""Process a single log line."""
|
||||
self.stats['lines_read'] += 1
|
||||
|
||||
# Add to cache
|
||||
self._recent_lines.append(line)
|
||||
if len(self._recent_lines) > self._max_cache:
|
||||
self._recent_lines.pop(0)
|
||||
|
||||
# Try to parse as event
|
||||
event = self._parse_event(line)
|
||||
if event:
|
||||
self.stats['events_parsed'] += 1
|
||||
self._notify_subscribers(event)
|
||||
|
||||
def _parse_event(self, line: str) -> Optional[LogEvent]:
|
||||
"""Parse a log line into a LogEvent."""
|
||||
for event_type, pattern in self.PATTERNS.items():
|
||||
match = pattern.search(line)
|
||||
if match:
|
||||
return LogEvent(
|
||||
timestamp=datetime.now(),
|
||||
raw_line=line,
|
||||
event_type=event_type,
|
||||
data={'groups': match.groups()}
|
||||
)
|
||||
return None
|
||||
|
||||
def _notify_subscribers(self, event: LogEvent):
|
||||
"""Notify all subscribers of an event."""
|
||||
# Type-specific subscribers
|
||||
callbacks = self._subscribers.get(event.event_type, [])
|
||||
for callback in callbacks:
|
||||
try:
|
||||
callback(event)
|
||||
except Exception as e:
|
||||
print(f"[LogReader] Subscriber error: {e}")
|
||||
|
||||
# "Any" subscribers
|
||||
for callback in self._any_subscribers:
|
||||
try:
|
||||
callback(event)
|
||||
except Exception as e:
|
||||
print(f"[LogReader] Subscriber error: {e}")
|
||||
|
||||
# ========== Public API ==========
|
||||
|
||||
def subscribe(self, event_type: str, callback: Callable):
|
||||
"""Subscribe to specific event type."""
|
||||
if event_type not in self._subscribers:
|
||||
self._subscribers[event_type] = []
|
||||
self._subscribers[event_type].append(callback)
|
||||
|
||||
def subscribe_all(self, callback: Callable):
|
||||
"""Subscribe to all events."""
|
||||
self._any_subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, event_type: str, callback: Callable):
|
||||
"""Unsubscribe from events."""
|
||||
if event_type in self._subscribers:
|
||||
self._subscribers[event_type] = [
|
||||
cb for cb in self._subscribers[event_type] if cb != callback
|
||||
]
|
||||
|
||||
def read_lines(self, count: int = 50, filter_text: str = None) -> List[str]:
|
||||
"""Read recent lines (API method)."""
|
||||
lines = self._recent_lines[-count:] if count < len(self._recent_lines) else self._recent_lines
|
||||
|
||||
if filter_text:
|
||||
lines = [l for l in lines if filter_text.lower() in l.lower()]
|
||||
|
||||
return lines
|
||||
|
||||
def get_stats(self) -> Dict:
|
||||
"""Get reader statistics."""
|
||||
return self.stats.copy()
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if log file is available."""
|
||||
return self.log_path is not None and self.log_path.exists()
|
||||
|
||||
|
||||
# Singleton instance
|
||||
_log_reader = None
|
||||
|
||||
def get_log_reader() -> LogReader:
|
||||
"""Get global LogReader instance."""
|
||||
global _log_reader
|
||||
if _log_reader is None:
|
||||
_log_reader = LogReader()
|
||||
return _log_reader
|
||||
|
|
@ -37,6 +37,8 @@ from core.floating_icon import FloatingIcon
|
|||
from core.settings import get_settings
|
||||
from core.overlay_widgets import OverlayManager
|
||||
from core.plugin_api import get_api, APIType
|
||||
from core.log_reader import get_log_reader
|
||||
from core.ocr_service import get_ocr_service
|
||||
|
||||
|
||||
class HotkeyHandler(QObject):
|
||||
|
|
@ -116,24 +118,31 @@ class EUUtilityApp:
|
|||
return self.app.exec()
|
||||
|
||||
def _setup_api_services(self):
|
||||
"""Setup shared API services."""
|
||||
# Register OCR service (placeholder)
|
||||
def ocr_handler(region=None):
|
||||
"""OCR service handler."""
|
||||
# TODO: Implement actual OCR
|
||||
return {"text": "", "confidence": 0, "note": "OCR not yet implemented"}
|
||||
"""Setup shared API services - OCR and Log are core services."""
|
||||
# Initialize and start Log Reader
|
||||
print("[Core] Initializing Log Reader...")
|
||||
self.log_reader = get_log_reader()
|
||||
if self.log_reader.start():
|
||||
print(f"[Core] Log Reader started - watching: {self.log_reader.log_path}")
|
||||
else:
|
||||
print("[Core] Log Reader not available - log file not found")
|
||||
|
||||
self.api.register_ocr_service(ocr_handler)
|
||||
# Register Log service with API
|
||||
self.api.register_log_service(self.log_reader.read_lines)
|
||||
|
||||
# Register Log service (placeholder)
|
||||
def log_handler(lines=50, filter_text=None):
|
||||
"""Log reading service handler."""
|
||||
# TODO: Implement actual log reading
|
||||
return []
|
||||
# Initialize OCR Service
|
||||
print("[Core] Initializing OCR Service...")
|
||||
self.ocr_service = get_ocr_service()
|
||||
if self.ocr_service.is_available():
|
||||
print(f"[Core] OCR Service ready - using {self.ocr_service._backend}")
|
||||
else:
|
||||
print("[Core] OCR Service not available - no backend installed")
|
||||
print("[Core] Install one of: easyocr, pytesseract, paddleocr")
|
||||
|
||||
self.api.register_log_service(log_handler)
|
||||
# Register OCR service with API
|
||||
self.api.register_ocr_service(self.ocr_service.recognize)
|
||||
|
||||
print("[API] Services registered: OCR, Log")
|
||||
print("[Core] API services registered: OCR, Log")
|
||||
|
||||
def _setup_hotkeys(self):
|
||||
"""Setup global hotkeys."""
|
||||
|
|
@ -187,6 +196,12 @@ class EUUtilityApp:
|
|||
|
||||
def quit(self):
|
||||
"""Quit the application."""
|
||||
print("[Core] Shutting down...")
|
||||
|
||||
# Stop log reader
|
||||
if hasattr(self, 'log_reader'):
|
||||
self.log_reader.stop()
|
||||
|
||||
if self.overlay_manager:
|
||||
self.overlay_manager.hide_all()
|
||||
if self.plugin_manager:
|
||||
|
|
|
|||
|
|
@ -0,0 +1,334 @@
|
|||
"""
|
||||
EU-Utility - OCR Service Core Module
|
||||
|
||||
Screen capture and OCR functionality for all plugins.
|
||||
Part of core - not a plugin. Plugins access via PluginAPI.
|
||||
"""
|
||||
|
||||
import io
|
||||
import base64
|
||||
from typing import Dict, List, Tuple, Optional, Any
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass
|
||||
class OCRResult:
|
||||
"""Result from OCR operation."""
|
||||
text: str
|
||||
confidence: float
|
||||
bounding_box: Tuple[int, int, int, int] # x, y, width, height
|
||||
raw_data: Any = None
|
||||
|
||||
|
||||
class OCRService:
|
||||
"""
|
||||
Core OCR service with multiple backend support.
|
||||
Fallback chain: EasyOCR -> Tesseract -> PaddleOCR
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._ocr_reader = None
|
||||
self._backend = None
|
||||
self._initialized = False
|
||||
|
||||
# Try backends in order of preference
|
||||
self._init_backends()
|
||||
|
||||
def _init_backends(self):
|
||||
"""Initialize available OCR backends."""
|
||||
# Try EasyOCR first (best accuracy)
|
||||
try:
|
||||
import easyocr
|
||||
self._ocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False)
|
||||
self._backend = 'easyocr'
|
||||
self._initialized = True
|
||||
print("[OCR] Using EasyOCR backend")
|
||||
return
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Try Tesseract (most common)
|
||||
try:
|
||||
import pytesseract
|
||||
from PIL import Image
|
||||
pytesseract.get_tesseract_version() # Test if available
|
||||
self._backend = 'tesseract'
|
||||
self._initialized = True
|
||||
print("[OCR] Using Tesseract backend")
|
||||
return
|
||||
except (ImportError, Exception):
|
||||
pass
|
||||
|
||||
# Try PaddleOCR (fallback)
|
||||
try:
|
||||
from paddleocr import PaddleOCR
|
||||
self._ocr_reader = PaddleOCR(
|
||||
use_angle_cls=True,
|
||||
lang='en',
|
||||
show_log=False,
|
||||
use_gpu=False
|
||||
)
|
||||
self._backend = 'paddle'
|
||||
self._initialized = True
|
||||
print("[OCR] Using PaddleOCR backend")
|
||||
return
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
print("[OCR] WARNING: No OCR backend available!")
|
||||
print("[OCR] Install one of: easyocr, pytesseract, paddleocr")
|
||||
|
||||
def is_available(self) -> bool:
|
||||
"""Check if OCR is available."""
|
||||
return self._initialized
|
||||
|
||||
def capture_screen(self, region: Tuple[int, int, int, int] = None) -> 'Image.Image':
|
||||
"""
|
||||
Capture screen or region.
|
||||
|
||||
Args:
|
||||
region: (x, y, width, height) or None for full screen
|
||||
|
||||
Returns:
|
||||
PIL Image
|
||||
"""
|
||||
try:
|
||||
import pyautogui
|
||||
|
||||
if region:
|
||||
x, y, width, height = region
|
||||
screenshot = pyautogui.screenshot(region=(x, y, width, height))
|
||||
else:
|
||||
screenshot = pyautogui.screenshot()
|
||||
|
||||
return screenshot
|
||||
|
||||
except ImportError:
|
||||
raise RuntimeError("pyautogui not installed. Run: pip install pyautogui")
|
||||
|
||||
def recognize(self, image=None, region: Tuple[int, int, int, int] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform OCR on image or screen region.
|
||||
|
||||
Args:
|
||||
image: PIL Image, numpy array, or None to capture screen
|
||||
region: Screen region to capture (if image is None)
|
||||
|
||||
Returns:
|
||||
Dict with 'text', 'confidence', 'results', 'image_size'
|
||||
"""
|
||||
if not self._initialized:
|
||||
return {
|
||||
'text': '',
|
||||
'confidence': 0,
|
||||
'error': 'OCR not initialized - no backend available',
|
||||
'results': []
|
||||
}
|
||||
|
||||
try:
|
||||
# Capture if needed
|
||||
if image is None:
|
||||
image = self.capture_screen(region)
|
||||
|
||||
# Convert to appropriate format
|
||||
if self._backend == 'easyocr':
|
||||
return self._ocr_easyocr(image)
|
||||
elif self._backend == 'tesseract':
|
||||
return self._ocr_tesseract(image)
|
||||
elif self._backend == 'paddle':
|
||||
return self._ocr_paddle(image)
|
||||
else:
|
||||
return {
|
||||
'text': '',
|
||||
'confidence': 0,
|
||||
'error': 'Unknown backend',
|
||||
'results': []
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'text': '',
|
||||
'confidence': 0,
|
||||
'error': str(e),
|
||||
'results': []
|
||||
}
|
||||
|
||||
def _ocr_easyocr(self, image) -> Dict[str, Any]:
|
||||
"""OCR using EasyOCR."""
|
||||
import numpy as np
|
||||
|
||||
# Convert PIL to numpy
|
||||
if hasattr(image, 'convert'):
|
||||
image_np = np.array(image)
|
||||
else:
|
||||
image_np = image
|
||||
|
||||
results = self._ocr_reader.readtext(image_np)
|
||||
|
||||
# Parse results
|
||||
texts = []
|
||||
total_confidence = 0
|
||||
parsed_results = []
|
||||
|
||||
for (bbox, text, conf) in results:
|
||||
texts.append(text)
|
||||
total_confidence += conf
|
||||
|
||||
# Get bounding box
|
||||
x_coords = [p[0] for p in bbox]
|
||||
y_coords = [p[1] for p in bbox]
|
||||
x_min, x_max = min(x_coords), max(x_coords)
|
||||
y_min, y_max = min(y_coords), max(y_coords)
|
||||
|
||||
parsed_results.append(OCRResult(
|
||||
text=text,
|
||||
confidence=conf,
|
||||
bounding_box=(int(x_min), int(y_min), int(x_max-x_min), int(y_max-y_min)),
|
||||
raw_data={'bbox': bbox}
|
||||
))
|
||||
|
||||
avg_confidence = total_confidence / len(results) if results else 0
|
||||
|
||||
return {
|
||||
'text': ' '.join(texts),
|
||||
'confidence': avg_confidence,
|
||||
'results': parsed_results,
|
||||
'image_size': image.size if hasattr(image, 'size') else None
|
||||
}
|
||||
|
||||
def _ocr_tesseract(self, image) -> Dict[str, Any]:
|
||||
"""OCR using Tesseract."""
|
||||
import pytesseract
|
||||
|
||||
# Get full text
|
||||
text = pytesseract.image_to_string(image).strip()
|
||||
|
||||
# Get detailed data
|
||||
data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
|
||||
|
||||
parsed_results = []
|
||||
for i, word in enumerate(data['text']):
|
||||
if word.strip():
|
||||
conf = int(data['conf'][i])
|
||||
if conf > 0: # Valid confidence
|
||||
parsed_results.append(OCRResult(
|
||||
text=word,
|
||||
confidence=conf / 100.0,
|
||||
bounding_box=(
|
||||
data['left'][i],
|
||||
data['top'][i],
|
||||
data['width'][i],
|
||||
data['height'][i]
|
||||
),
|
||||
raw_data={'block_num': data['block_num'][i]}
|
||||
))
|
||||
|
||||
avg_confidence = sum(r.confidence for r in parsed_results) / len(parsed_results) if parsed_results else 0
|
||||
|
||||
return {
|
||||
'text': text,
|
||||
'confidence': avg_confidence,
|
||||
'results': parsed_results,
|
||||
'image_size': image.size if hasattr(image, 'size') else None
|
||||
}
|
||||
|
||||
def _ocr_paddle(self, image) -> Dict[str, Any]:
|
||||
"""OCR using PaddleOCR."""
|
||||
import numpy as np
|
||||
|
||||
# Convert PIL to numpy
|
||||
if hasattr(image, 'convert'):
|
||||
image_np = np.array(image)
|
||||
else:
|
||||
image_np = image
|
||||
|
||||
result = self._ocr_reader.ocr(image_np, cls=True)
|
||||
|
||||
texts = []
|
||||
parsed_results = []
|
||||
total_confidence = 0
|
||||
|
||||
if result and result[0]:
|
||||
for line in result[0]:
|
||||
bbox, (text, conf) = line
|
||||
texts.append(text)
|
||||
total_confidence += conf
|
||||
|
||||
# Parse bounding box
|
||||
x_coords = [p[0] for p in bbox]
|
||||
y_coords = [p[1] for p in bbox]
|
||||
|
||||
parsed_results.append(OCRResult(
|
||||
text=text,
|
||||
confidence=conf,
|
||||
bounding_box=(
|
||||
int(min(x_coords)),
|
||||
int(min(y_coords)),
|
||||
int(max(x_coords) - min(x_coords)),
|
||||
int(max(y_coords) - min(y_coords))
|
||||
),
|
||||
raw_data={'bbox': bbox}
|
||||
))
|
||||
|
||||
avg_confidence = total_confidence / len(parsed_results) if parsed_results else 0
|
||||
|
||||
return {
|
||||
'text': ' '.join(texts),
|
||||
'confidence': avg_confidence,
|
||||
'results': parsed_results,
|
||||
'image_size': image.size if hasattr(image, 'size') else None
|
||||
}
|
||||
|
||||
def recognize_region(self, x: int, y: int, width: int, height: int) -> Dict[str, Any]:
|
||||
"""Convenience method for region OCR."""
|
||||
return self.recognize(region=(x, y, width, height))
|
||||
|
||||
def find_text(self, target_text: str, image=None, region: Tuple[int, int, int, int] = None) -> List[OCRResult]:
|
||||
"""
|
||||
Find specific text in image.
|
||||
|
||||
Returns list of OCRResult where target_text is found.
|
||||
"""
|
||||
result = self.recognize(image, region)
|
||||
matches = []
|
||||
|
||||
for r in result.get('results', []):
|
||||
if target_text.lower() in r.text.lower():
|
||||
matches.append(r)
|
||||
|
||||
return matches
|
||||
|
||||
def get_text_at_position(self, x: int, y: int, image=None) -> Optional[str]:
|
||||
"""Get text at specific screen position."""
|
||||
# Small region around point
|
||||
region = (x - 50, y - 10, 100, 20)
|
||||
result = self.recognize(image, region)
|
||||
return result.get('text') if result.get('text') else None
|
||||
|
||||
|
||||
# Singleton instance
|
||||
_ocr_service = None
|
||||
|
||||
def get_ocr_service() -> OCRService:
|
||||
"""Get global OCRService instance."""
|
||||
global _ocr_service
|
||||
if _ocr_service is None:
|
||||
_ocr_service = OCRService()
|
||||
return _ocr_service
|
||||
|
||||
|
||||
# Convenience function for quick OCR
|
||||
def quick_ocr(region: Tuple[int, int, int, int] = None) -> str:
|
||||
"""
|
||||
Quick OCR - capture and get text.
|
||||
|
||||
Usage:
|
||||
text = quick_ocr() # Full screen
|
||||
text = quick_ocr((100, 100, 200, 50)) # Region
|
||||
"""
|
||||
service = get_ocr_service()
|
||||
result = service.recognize(region=region)
|
||||
return result.get('text', '')
|
||||
|
|
@ -1,120 +1,50 @@
|
|||
"""
|
||||
EU-Utility - Skill Scanner Plugin with OCR + Log Tracking
|
||||
EU-Utility - Skill Scanner Plugin
|
||||
|
||||
Scans skills window AND tracks gains from chat log automatically.
|
||||
Uses core OCR and Log services via PluginAPI.
|
||||
"""
|
||||
|
||||
import re
|
||||
import json
|
||||
import time
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from decimal import Decimal
|
||||
|
||||
from PyQt6.QtWidgets import (
|
||||
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
|
||||
QPushButton, QTableWidget, QTableWidgetItem, QProgressBar,
|
||||
QFrame, QGroupBox, QTextEdit, QSplitter
|
||||
)
|
||||
from PyQt6.QtCore import Qt, QThread, pyqtSignal, QTimer
|
||||
from PyQt6.QtCore import Qt, QThread, pyqtSignal
|
||||
|
||||
from plugins.base_plugin import BasePlugin
|
||||
|
||||
|
||||
class LogWatcherThread(QThread):
|
||||
"""Watch chat log for skill gains."""
|
||||
skill_gain_detected = pyqtSignal(str, float, float) # skill_name, gain_amount, new_total
|
||||
|
||||
def __init__(self, log_path, parent=None):
|
||||
super().__init__(parent)
|
||||
self.log_path = Path(log_path)
|
||||
self.running = True
|
||||
self.last_position = 0
|
||||
|
||||
# Skill gain patterns
|
||||
self.patterns = [
|
||||
r'(\w+(?:\s+\w+)*)\s+has\s+improved\s+by\s+(\d+\.?\d*)\s+points?', # "Aim has improved by 5.2 points"
|
||||
r'You\s+gained\s+(\d+\.?\d*)\s+points?\s+in\s+(\w+(?:\s+\w+)*)', # "You gained 10 points in Rifle"
|
||||
r'(\w+(?:\s+\w+)*)\s+\+(\d+\.?\d*)', # "Rifle +15"
|
||||
]
|
||||
|
||||
def run(self):
|
||||
"""Watch log file for changes."""
|
||||
while self.running:
|
||||
try:
|
||||
if self.log_path.exists():
|
||||
with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
f.seek(self.last_position)
|
||||
new_lines = f.readlines()
|
||||
self.last_position = f.tell()
|
||||
|
||||
for line in new_lines:
|
||||
self._parse_line(line.strip())
|
||||
|
||||
except Exception as e:
|
||||
print(f"[LogWatcher] Error: {e}")
|
||||
|
||||
time.sleep(0.5) # Check every 500ms
|
||||
|
||||
def _parse_line(self, line):
|
||||
"""Parse a log line for skill gains."""
|
||||
for pattern in self.patterns:
|
||||
match = re.search(pattern, line, re.IGNORECASE)
|
||||
if match:
|
||||
groups = match.groups()
|
||||
if len(groups) == 2:
|
||||
# Determine which group is skill and which is value
|
||||
try:
|
||||
value = float(groups[1])
|
||||
skill = groups[0]
|
||||
self.skill_gain_detected.emit(skill, value, 0) # new_total calculated later
|
||||
except ValueError:
|
||||
# Try reversed
|
||||
try:
|
||||
value = float(groups[0])
|
||||
skill = groups[1]
|
||||
self.skill_gain_detected.emit(skill, value, 0)
|
||||
except:
|
||||
pass
|
||||
break
|
||||
|
||||
def stop(self):
|
||||
"""Stop watching."""
|
||||
self.running = False
|
||||
|
||||
|
||||
class OCRScannerThread(QThread):
|
||||
"""OCR scan thread for skills window."""
|
||||
class SkillOCRThread(QThread):
|
||||
"""OCR scan using core service."""
|
||||
scan_complete = pyqtSignal(dict)
|
||||
scan_error = pyqtSignal(str)
|
||||
progress_update = pyqtSignal(str)
|
||||
|
||||
def __init__(self, ocr_service):
|
||||
super().__init__()
|
||||
self.ocr_service = ocr_service
|
||||
|
||||
def run(self):
|
||||
"""Perform OCR scan."""
|
||||
"""Perform OCR using core service."""
|
||||
try:
|
||||
self.progress_update.emit("Capturing screen...")
|
||||
|
||||
# Use pyautogui for screenshot
|
||||
import pyautogui
|
||||
screenshot = pyautogui.screenshot()
|
||||
# Use core OCR service
|
||||
result = self.ocr_service.recognize()
|
||||
|
||||
self.progress_update.emit("Running OCR...")
|
||||
if 'error' in result and result['error']:
|
||||
self.scan_error.emit(result['error'])
|
||||
return
|
||||
|
||||
# Try easyocr first
|
||||
try:
|
||||
import easyocr
|
||||
reader = easyocr.Reader(['en'], verbose=False)
|
||||
results = reader.readtext(screenshot)
|
||||
text = '\n'.join([r[1] for r in results])
|
||||
except:
|
||||
# Fallback to pytesseract
|
||||
import pytesseract
|
||||
from PIL import Image
|
||||
text = pytesseract.image_to_string(screenshot)
|
||||
self.progress_update.emit("Parsing skills...")
|
||||
|
||||
# Parse skills from text
|
||||
skills_data = self._parse_skills(text)
|
||||
skills_data = self._parse_skills(result.get('text', ''))
|
||||
self.scan_complete.emit(skills_data)
|
||||
|
||||
except Exception as e:
|
||||
|
|
@ -125,11 +55,12 @@ class OCRScannerThread(QThread):
|
|||
skills = {}
|
||||
lines = text.split('\n')
|
||||
|
||||
# Look for skill patterns
|
||||
# Example: "Aim Amazing 5524"
|
||||
for line in lines:
|
||||
# Pattern: SkillName Rank Points
|
||||
match = re.search(r'(\w+(?:\s+\w+)*)\s+(Newbie|Inept|Beginner|Amateur|Average|Skilled|Expert|Professional|Master|Grand Master|Champion|Legendary|Guru|Astonishing|Remarkable|Outstanding|Marvelous|Prodigious|Amazing|Incredible|Awesome)\s+(\d+)', line, re.IGNORECASE)
|
||||
match = re.search(
|
||||
r'(\w+(?:\s+\w+)*)\s+(Newbie|Inept|Beginner|Amateur|Average|Skilled|Expert|Professional|Master|Grand Master|Champion|Legendary|Guru|Astonishing|Remarkable|Outstanding|Marvelous|Prodigious|Amazing|Incredible|Awesome)\s+(\d+)',
|
||||
line, re.IGNORECASE
|
||||
)
|
||||
if match:
|
||||
skill_name = match.group(1).strip()
|
||||
rank = match.group(2)
|
||||
|
|
@ -144,12 +75,12 @@ class OCRScannerThread(QThread):
|
|||
|
||||
|
||||
class SkillScannerPlugin(BasePlugin):
|
||||
"""Scan skills with OCR and track gains from log."""
|
||||
"""Scan skills using core OCR and track gains via core Log service."""
|
||||
|
||||
name = "Skill Scanner"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
author = "ImpulsiveFPS"
|
||||
description = "OCR skill scanning + automatic log tracking"
|
||||
description = "Uses core OCR and Log services"
|
||||
hotkey = "ctrl+shift+s"
|
||||
|
||||
def initialize(self):
|
||||
|
|
@ -162,26 +93,18 @@ class SkillScannerPlugin(BasePlugin):
|
|||
self.skill_gains = []
|
||||
self._load_data()
|
||||
|
||||
# Start log watcher
|
||||
log_path = self._find_chat_log()
|
||||
if log_path:
|
||||
self.log_watcher = LogWatcherThread(log_path)
|
||||
self.log_watcher.skill_gain_detected.connect(self._on_skill_gain)
|
||||
self.log_watcher.start()
|
||||
else:
|
||||
self.log_watcher = None
|
||||
|
||||
def _find_chat_log(self):
|
||||
"""Find EU chat log file."""
|
||||
# Common locations
|
||||
possible_paths = [
|
||||
Path.home() / "Documents" / "Entropia Universe" / "chat.log",
|
||||
Path.home() / "Documents" / "Entropia Universe" / "Logs" / "chat.log",
|
||||
]
|
||||
for path in possible_paths:
|
||||
if path.exists():
|
||||
return path
|
||||
return None
|
||||
# Subscribe to skill gain events from core Log service
|
||||
try:
|
||||
from core.plugin_api import get_api
|
||||
api = get_api()
|
||||
|
||||
# Check if log service available
|
||||
log_service = api.services.get('log')
|
||||
if log_service:
|
||||
print(f"[SkillScanner] Connected to core Log service")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[SkillScanner] Could not connect to Log service: {e}")
|
||||
|
||||
def _load_data(self):
|
||||
"""Load saved skill data."""
|
||||
|
|
@ -202,30 +125,6 @@ class SkillScannerPlugin(BasePlugin):
|
|||
'gains': self.skill_gains
|
||||
}, f, indent=2)
|
||||
|
||||
def _on_skill_gain(self, skill_name, gain_amount, new_total):
|
||||
"""Handle skill gain from log."""
|
||||
# Update skill data
|
||||
if skill_name in self.skills_data:
|
||||
old_points = self.skills_data[skill_name].get('points', 0)
|
||||
self.skills_data[skill_name]['points'] = old_points + gain_amount
|
||||
self.skills_data[skill_name]['last_gain'] = {
|
||||
'amount': gain_amount,
|
||||
'time': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Record gain
|
||||
self.skill_gains.append({
|
||||
'skill': skill_name,
|
||||
'gain': gain_amount,
|
||||
'time': datetime.now().isoformat()
|
||||
})
|
||||
|
||||
self._save_data()
|
||||
|
||||
# Update UI if visible
|
||||
if hasattr(self, 'gains_text'):
|
||||
self.gains_text.append(f"+{gain_amount} {skill_name}")
|
||||
|
||||
def get_ui(self):
|
||||
"""Create skill scanner UI."""
|
||||
widget = QWidget()
|
||||
|
|
@ -238,11 +137,17 @@ class SkillScannerPlugin(BasePlugin):
|
|||
header.setStyleSheet("font-size: 18px; font-weight: bold; color: white;")
|
||||
layout.addWidget(header)
|
||||
|
||||
# Splitter for resizable sections
|
||||
# Info about core services
|
||||
info = self._get_service_status()
|
||||
info_label = QLabel(info)
|
||||
info_label.setStyleSheet("color: rgba(255,255,255,100); font-size: 11px;")
|
||||
layout.addWidget(info_label)
|
||||
|
||||
# Splitter
|
||||
splitter = QSplitter(Qt.Orientation.Vertical)
|
||||
|
||||
# Scan section
|
||||
scan_group = QGroupBox("OCR Scan")
|
||||
scan_group = QGroupBox("OCR Scan (Core Service)")
|
||||
scan_layout = QVBoxLayout(scan_group)
|
||||
|
||||
scan_btn = QPushButton("Scan Skills Window")
|
||||
|
|
@ -259,12 +164,10 @@ class SkillScannerPlugin(BasePlugin):
|
|||
scan_btn.clicked.connect(self._scan_skills)
|
||||
scan_layout.addWidget(scan_btn)
|
||||
|
||||
# Progress
|
||||
self.scan_progress = QLabel("Ready to scan")
|
||||
self.scan_progress.setStyleSheet("color: rgba(255,255,255,150);")
|
||||
scan_layout.addWidget(self.scan_progress)
|
||||
|
||||
# Skills table
|
||||
self.skills_table = QTableWidget()
|
||||
self.skills_table.setColumnCount(3)
|
||||
self.skills_table.setHorizontalHeaderLabels(["Skill", "Rank", "Points"])
|
||||
|
|
@ -273,22 +176,21 @@ class SkillScannerPlugin(BasePlugin):
|
|||
|
||||
splitter.addWidget(scan_group)
|
||||
|
||||
# Log tracking section
|
||||
log_group = QGroupBox("Automatic Log Tracking")
|
||||
# Log section
|
||||
log_group = QGroupBox("Log Tracking (Core Service)")
|
||||
log_layout = QVBoxLayout(log_group)
|
||||
|
||||
log_status = QLabel("Watching chat log for skill gains...")
|
||||
log_status = QLabel("Skill gains tracked from chat log")
|
||||
log_status.setStyleSheet("color: #4ecdc4;")
|
||||
log_layout.addWidget(log_status)
|
||||
|
||||
self.gains_text = QTextEdit()
|
||||
self.gains_text.setReadOnly(True)
|
||||
self.gains_text.setMaximumHeight(150)
|
||||
self.gains_text.setPlaceholderText("Recent skill gains will appear here...")
|
||||
self.gains_text.setPlaceholderText("Recent skill gains from core Log service...")
|
||||
log_layout.addWidget(self.gains_text)
|
||||
|
||||
# Total gains summary
|
||||
self.total_gains_label = QLabel(f"Total gains tracked: {len(self.skill_gains)}")
|
||||
self.total_gains_label = QLabel(f"Total gains: {len(self.skill_gains)}")
|
||||
self.total_gains_label.setStyleSheet("color: rgba(255,255,255,150);")
|
||||
log_layout.addWidget(self.total_gains_label)
|
||||
|
||||
|
|
@ -296,36 +198,58 @@ class SkillScannerPlugin(BasePlugin):
|
|||
|
||||
layout.addWidget(splitter)
|
||||
|
||||
# Load existing data
|
||||
self._refresh_skills_table()
|
||||
|
||||
return widget
|
||||
|
||||
def _get_service_status(self) -> str:
|
||||
"""Get status of core services."""
|
||||
try:
|
||||
from core.ocr_service import get_ocr_service
|
||||
from core.log_reader import get_log_reader
|
||||
|
||||
ocr = get_ocr_service()
|
||||
log = get_log_reader()
|
||||
|
||||
ocr_status = "✓" if ocr.is_available() else "✗"
|
||||
log_status = "✓" if log.is_available() else "✗"
|
||||
|
||||
return f"Core Services - OCR: {ocr_status} Log: {log_status}"
|
||||
except:
|
||||
return "Core Services - status unknown"
|
||||
|
||||
def _scan_skills(self):
|
||||
"""Start OCR scan."""
|
||||
self.scanner = OCRScannerThread()
|
||||
self.scanner.scan_complete.connect(self._on_scan_complete)
|
||||
self.scanner.scan_error.connect(self._on_scan_error)
|
||||
self.scanner.progress_update.connect(self._on_scan_progress)
|
||||
self.scanner.start()
|
||||
"""Start OCR scan using core service."""
|
||||
try:
|
||||
from core.ocr_service import get_ocr_service
|
||||
ocr_service = get_ocr_service()
|
||||
|
||||
if not ocr_service.is_available():
|
||||
self.scan_progress.setText("Error: OCR service not available")
|
||||
return
|
||||
|
||||
self.scanner = SkillOCRThread(ocr_service)
|
||||
self.scanner.scan_complete.connect(self._on_scan_complete)
|
||||
self.scanner.scan_error.connect(self._on_scan_error)
|
||||
self.scanner.progress_update.connect(self._on_scan_progress)
|
||||
self.scanner.start()
|
||||
|
||||
except Exception as e:
|
||||
self.scan_progress.setText(f"Error: {e}")
|
||||
|
||||
def _on_scan_progress(self, message):
|
||||
"""Update scan progress."""
|
||||
self.scan_progress.setText(message)
|
||||
|
||||
def _on_scan_complete(self, skills_data):
|
||||
"""Handle scan completion."""
|
||||
self.skills_data.update(skills_data)
|
||||
self._save_data()
|
||||
self._refresh_skills_table()
|
||||
self.scan_progress.setText(f"Found {len(skills_data)} skills")
|
||||
|
||||
def _on_scan_error(self, error):
|
||||
"""Handle scan error."""
|
||||
self.scan_progress.setText(f"Error: {error}")
|
||||
|
||||
def _refresh_skills_table(self):
|
||||
"""Refresh skills table."""
|
||||
self.skills_table.setRowCount(len(self.skills_data))
|
||||
for i, (name, data) in enumerate(sorted(self.skills_data.items())):
|
||||
self.skills_table.setItem(i, 0, QTableWidgetItem(name))
|
||||
|
|
|
|||
Loading…
Reference in New Issue