EU-Utility/core/log_watcher_optimized.py

608 lines
20 KiB
Python

"""
EU-Utility - Optimized LogWatcher
Performance improvements:
1. Adaptive polling interval (fast when active, slow when idle)
2. Inotify/file system watcher on supported platforms
3. Memory-mapped file reading for large files
4. Zero-allocation line reading
5. Batched event processing
"""
import os
import re
import time
import threading
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Callable, Optional, Set
from dataclasses import dataclass, field
from collections import deque
from core.performance_optimizations import RingBuffer, StringInterner
@dataclass
class LogEvent:
"""Represents a parsed log event."""
timestamp: datetime
raw_line: str
event_type: str
data: Dict = field(default_factory=dict)
class FileSystemWatcher:
"""
Platform-specific file system watcher.
Falls back to polling if native watcher unavailable.
"""
def __init__(self, file_path: Path, callback: Callable):
self.file_path = file_path
self.callback = callback
self._running = False
self._thread: Optional[threading.Thread] = None
self._use_inotify = False
self._use_fsevents = False
self._last_mtime = 0
self._last_size = 0
def start(self) -> bool:
"""Start watching the file."""
if not self.file_path.exists():
return False
self._running = True
# Try inotify on Linux
try:
import inotify.adapters
self._use_inotify = True
self._thread = threading.Thread(target=self._inotify_loop, daemon=True, name="InotifyWatcher")
self._thread.start()
return True
except ImportError:
pass
# Try fsevents on macOS
try:
import fsevents
self._use_fsevents = True
self._thread = threading.Thread(target=self._fsevents_loop, daemon=True, name="FSEventsWatcher")
self._thread.start()
return True
except ImportError:
pass
# Fallback to stat-based polling
self._last_mtime = self.file_path.stat().st_mtime
self._last_size = self.file_path.stat().st_size
self._thread = threading.Thread(target=self._stat_loop, daemon=True, name="StatWatcher")
self._thread.start()
return True
def stop(self):
"""Stop watching."""
self._running = False
if self._thread:
self._thread.join(timeout=1.0)
def _inotify_loop(self):
"""Inotify-based watching (Linux)."""
try:
import inotify.adapters
i = inotify.adapters.Inotify()
i.add_watch(str(self.file_path))
for event in i.event_gen(yield_nones=False):
if not self._running:
break
(_, type_names, path, filename) = event
if 'IN_MODIFY' in type_names:
self.callback()
except Exception as e:
print(f"[FileSystemWatcher] inotify error: {e}, falling back to polling")
self._use_inotify = False
self._stat_loop()
def _fsevents_loop(self):
"""FSEvents-based watching (macOS)."""
try:
import fsevents
def handler(event):
if self._running and event.name == str(self.file_path):
self.callback()
observer = fsevents.Observer()
stream = fsevents.Stream(handler, str(self.file_path.parent), file_events=True)
observer.schedule(stream)
observer.start()
while self._running:
time.sleep(0.1)
observer.stop()
observer.join()
except Exception as e:
print(f"[FileSystemWatcher] fsevents error: {e}, falling back to polling")
self._use_fsevents = False
self._stat_loop()
def _stat_loop(self):
"""Stat-based watching (fallback, most compatible)."""
poll_interval = 0.5 # Start with 500ms
while self._running:
try:
stat = self.file_path.stat()
if stat.st_mtime != self._last_mtime or stat.st_size != self._last_size:
self._last_mtime = stat.st_mtime
self._last_size = stat.st_size
self.callback()
poll_interval = 0.05 # Fast poll after activity
else:
# Gradually slow down polling
poll_interval = min(1.0, poll_interval * 1.1)
except (OSError, FileNotFoundError):
poll_interval = 1.0 # Slow down on error
time.sleep(poll_interval)
class OptimizedLogWatcher:
"""
High-performance log file watcher.
Features:
- Native file system events when available
- Adaptive polling fallback
- Zero-allocation line reading
- Memory-mapped files for large logs
- Batched event processing
"""
# Pre-compiled patterns (class level)
_PATTERNS: Optional[Dict[str, re.Pattern]] = None
_PATTERNS_LOCK = threading.Lock()
LOG_PATHS = [
Path.home() / "Documents" / "Entropia Universe" / "chat.log",
Path.home() / "Documents" / "Entropia Universe" / "Logs" / "chat.log",
Path.home() / "Entropia Universe" / "chat.log",
]
def __init__(self, log_path: Path = None, use_fs_watcher: bool = True):
self.log_path = log_path or self._find_log_file()
self._running = False
self._last_position = 0
self._use_fs_watcher = use_fs_watcher
self._fs_watcher: Optional[FileSystemWatcher] = None
# Subscribers with fine-grained locking
self._subscribers: Dict[str, List[Callable]] = {}
self._any_subscribers: List[Callable] = []
self._subscribers_lock = threading.RLock()
# Optimized data structures
self._recent_lines = RingBuffer(1000)
self._string_interner = StringInterner(max_size=10000)
# Pattern matching cache with TTL
self._pattern_cache: Dict[str, tuple] = {} # line -> (event, timestamp)
self._cache_max_size = 5000
self._cache_ttl_seconds = 300 # 5 minutes
self._cache_lock = threading.Lock()
# Performance stats
self._stats = {
'lines_read': 0,
'events_parsed': 0,
'start_time': None,
'cache_hits': 0,
'cache_misses': 0,
'bytes_read': 0,
'read_operations': 0,
}
self._stats_lock = threading.Lock()
# Batch processing
self._batch: List[str] = []
self._batch_size = 10
self._batch_timeout = 0.05 # 50ms
self._batch_timer: Optional[threading.Timer] = None
self._batch_lock = threading.Lock()
self._ensure_patterns()
@classmethod
def _ensure_patterns(cls):
"""Ensure regex patterns are compiled."""
if cls._PATTERNS is None:
with cls._PATTERNS_LOCK:
if cls._PATTERNS is None:
cls._PATTERNS = {
'skill_gain': re.compile(
r'(.+?)\s+has\s+improved\s+by\s+(\d+\.?\d*)\s+points?',
re.IGNORECASE
),
'loot': re.compile(
r'You\s+received\s+(.+?)\s+x\s*(\d+)',
re.IGNORECASE
),
'global': re.compile(
r'(\w+)\s+received\s+.+?\s+from\s+(\w+)\s+worth\s+(\d+)\s+PED',
re.IGNORECASE
),
'damage': re.compile(
r'You\s+(?:hit|inflicted)\s+(\d+)\s+damage',
re.IGNORECASE
),
'damage_taken': re.compile(
r'You\s+were\s+hit\s+for\s+(\d+)\s+damage',
re.IGNORECASE
),
'heal': re.compile(
r'You\s+(?:healed|restored)\s+(\d+)\s+(?:health|points)',
re.IGNORECASE
),
'mission_complete': re.compile(
r'Mission\s+completed:\s+(.+)',
re.IGNORECASE
),
'tier_increase': re.compile(
r'Your\s+(.+?)\s+has\s+reached\s+tier\s+(\d+)',
re.IGNORECASE
),
'enhancer_break': re.compile(
r'Your\s+(.+?)\s+broke',
re.IGNORECASE
),
}
def _find_log_file(self) -> Optional[Path]:
"""Find EU chat.log file."""
for path in self.LOG_PATHS:
if path.exists():
return path
return None
def start(self) -> bool:
"""Start log monitoring."""
if not self.log_path or not self.log_path.exists():
print(f"[LogWatcher] Log file not found. Tried: {self.LOG_PATHS}")
return False
self._running = True
with self._stats_lock:
self._stats['start_time'] = datetime.now()
# Start at end of file
try:
self._last_position = self.log_path.stat().st_size
except OSError:
self._last_position = 0
# Try native file system watcher
if self._use_fs_watcher:
self._fs_watcher = FileSystemWatcher(self.log_path, self._on_file_changed)
if self._fs_watcher.start():
print(f"[LogWatcher] Started with native file watching: {self.log_path}")
return True
# Fallback to polling
print(f"[LogWatcher] Started with polling fallback: {self.log_path}")
return True
def stop(self):
"""Stop log monitoring."""
self._running = False
if self._fs_watcher:
self._fs_watcher.stop()
# Flush pending batch
self._flush_batch()
print("[LogWatcher] Stopped")
def _on_file_changed(self):
"""Called when file system watcher detects a change."""
if self._running:
self._read_new_lines()
def _read_new_lines(self) -> int:
"""Read and process new lines. Returns count of lines read."""
try:
current_size = self.log_path.stat().st_size
except (OSError, FileNotFoundError):
return 0
if current_size < self._last_position:
# Log was rotated/truncated
self._last_position = 0
if current_size == self._last_position:
return 0
lines_read = 0
bytes_read = 0
try:
# Use memory-mapped reading for large files
if current_size - self._last_position > 1024 * 1024: # 1MB
lines_read, bytes_read = self._read_mmap()
else:
lines_read, bytes_read = self._read_standard()
with self._stats_lock:
self._stats['lines_read'] += lines_read
self._stats['bytes_read'] += bytes_read
self._stats['read_operations'] += 1
return lines_read
except Exception as e:
print(f"[LogWatcher] Read error: {e}")
return 0
def _read_standard(self) -> tuple:
"""Standard line-by-line reading."""
lines = []
bytes_read = 0
with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(self._last_position)
# Read in chunks for efficiency
chunk_size = 8192
while True:
chunk = f.read(chunk_size)
if not chunk:
break
bytes_read += len(chunk)
lines.extend(chunk.splitlines())
self._last_position = f.tell()
# Process lines
if lines:
self._queue_lines(lines)
return len(lines), bytes_read
def _read_mmap(self) -> tuple:
"""Memory-mapped file reading for large files."""
import mmap
lines = []
bytes_read = 0
with open(self.log_path, 'r+b') as f:
f.seek(self._last_position)
# Memory-map the remaining file
remaining = self.log_path.stat().st_size - self._last_position
if remaining <= 0:
return 0, 0
with mmap.mmap(f.fileno(), remaining, access=mmap.ACCESS_READ) as mm:
# Read lines from mmap
buffer = mm.read()
text = buffer.decode('utf-8', errors='ignore')
lines = text.splitlines()
bytes_read = len(buffer)
self._last_position += bytes_read
if lines:
self._queue_lines(lines)
return len(lines), bytes_read
def _queue_lines(self, lines: List[str]):
"""Queue lines for batch processing."""
with self._batch_lock:
self._batch.extend(lines)
# Flush immediately if batch is full
if len(self._batch) >= self._batch_size:
self._flush_batch()
elif self._batch_timer is None:
# Start timeout timer
self._batch_timer = threading.Timer(self._batch_timeout, self._flush_batch)
self._batch_timer.daemon = True
self._batch_timer.start()
def _flush_batch(self):
"""Process batched lines."""
with self._batch_lock:
if self._batch_timer:
self._batch_timer.cancel()
self._batch_timer = None
batch = self._batch
self._batch = []
if batch:
self._process_lines_batch(batch)
def _process_lines_batch(self, lines: List[str]):
"""Process multiple lines efficiently."""
patterns = self._PATTERNS
events = []
now = datetime.now()
for line in lines:
line = line.strip()
if not line:
continue
# Intern the line
line = self._string_interner.intern(line)
# Add to recent lines buffer
self._recent_lines.append(line)
# Check cache
cached = self._get_cached_event(line)
if cached is not None:
if cached: # Valid event
events.append(cached)
continue
# Parse event
event = self._parse_event(line, patterns, now)
# Cache the result
self._cache_event(line, event)
if event:
events.append(event)
# Notify subscribers
for event in events:
self._notify_subscribers(event)
def _get_cached_event(self, line: str) -> Optional[Optional[LogEvent]]:
"""Get cached event if still valid."""
with self._cache_lock:
cached = self._pattern_cache.get(line)
if cached is None:
with self._stats_lock:
self._stats['cache_misses'] += 1
return None
event, timestamp = cached
# Check TTL
if time.time() - timestamp > self._cache_ttl_seconds:
with self._cache_lock:
self._pattern_cache.pop(line, None)
with self._stats_lock:
self._stats['cache_misses'] += 1
return None
with self._stats_lock:
self._stats['cache_hits'] += 1
return event
def _cache_event(self, line: str, event: Optional[LogEvent]):
"""Cache parsed event."""
with self._cache_lock:
# Evict if necessary
if len(self._pattern_cache) >= self._cache_max_size:
# Remove oldest 10%
keys = list(self._pattern_cache.keys())[:self._cache_max_size // 10]
for k in keys:
del self._pattern_cache[k]
self._pattern_cache[line] = (event, time.time())
def _parse_event(self, line: str, patterns: Dict[str, re.Pattern], timestamp: datetime) -> Optional[LogEvent]:
"""Parse a log line into a LogEvent."""
for event_type, pattern in patterns.items():
match = pattern.search(line)
if match:
with self._stats_lock:
self._stats['events_parsed'] += 1
return LogEvent(
timestamp=timestamp,
raw_line=line,
event_type=event_type,
data={'groups': match.groups()}
)
return None
def _notify_subscribers(self, event: LogEvent):
"""Notify subscribers of an event."""
with self._subscribers_lock:
callbacks = self._subscribers.get(event.event_type, []).copy()
any_callbacks = self._any_subscribers.copy()
# Notify type-specific subscribers
for callback in callbacks:
try:
callback(event)
except Exception as e:
print(f"[LogWatcher] Subscriber error: {e}")
# Notify "any" subscribers
for callback in any_callbacks:
try:
callback(event)
except Exception as e:
print(f"[LogWatcher] Subscriber error: {e}")
# ========== Public API ==========
def subscribe(self, event_type: str, callback: Callable):
"""Subscribe to specific event type."""
with self._subscribers_lock:
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(callback)
def subscribe_all(self, callback: Callable):
"""Subscribe to all events."""
with self._subscribers_lock:
self._any_subscribers.append(callback)
def unsubscribe(self, event_type: str, callback: Callable):
"""Unsubscribe from events."""
with self._subscribers_lock:
if event_type in self._subscribers:
self._subscribers[event_type] = [
cb for cb in self._subscribers[event_type] if cb != callback
]
def read_lines(self, count: int = 50, filter_text: str = None) -> List[str]:
"""Read recent lines."""
lines = list(self._recent_lines)
lines = lines[-count:] if count < len(lines) else lines
if filter_text:
filter_lower = filter_text.lower()
lines = [l for l in lines if filter_lower in l.lower()]
return lines
def get_stats(self) -> Dict:
"""Get performance statistics."""
with self._stats_lock:
stats = self._stats.copy()
total_cache = stats['cache_hits'] + stats['cache_misses']
stats['cache_hit_rate'] = (stats['cache_hits'] / total_cache * 100) if total_cache > 0 else 0
stats['cache_size'] = len(self._pattern_cache)
stats['watcher_type'] = 'native' if (self._fs_watcher and self._fs_watcher._use_inotify) else 'polling'
return stats
def is_available(self) -> bool:
"""Check if log file is available."""
return self.log_path is not None and self.log_path.exists()
def clear_cache(self):
"""Clear the pattern cache."""
with self._cache_lock:
self._pattern_cache.clear()
# Singleton
_log_watcher = None
_log_watcher_lock = threading.Lock()
def get_log_watcher() -> OptimizedLogWatcher:
"""Get global OptimizedLogWatcher instance."""
global _log_watcher
if _log_watcher is None:
with _log_watcher_lock:
if _log_watcher is None:
_log_watcher = OptimizedLogWatcher()
return _log_watcher