608 lines
20 KiB
Python
608 lines
20 KiB
Python
"""
|
|
EU-Utility - Optimized LogWatcher
|
|
|
|
Performance improvements:
|
|
1. Adaptive polling interval (fast when active, slow when idle)
|
|
2. Inotify/file system watcher on supported platforms
|
|
3. Memory-mapped file reading for large files
|
|
4. Zero-allocation line reading
|
|
5. Batched event processing
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Callable, Optional, Set
|
|
from dataclasses import dataclass, field
|
|
from collections import deque
|
|
|
|
from core.performance_optimizations import RingBuffer, StringInterner
|
|
|
|
|
|
@dataclass
|
|
class LogEvent:
|
|
"""Represents a parsed log event."""
|
|
timestamp: datetime
|
|
raw_line: str
|
|
event_type: str
|
|
data: Dict = field(default_factory=dict)
|
|
|
|
|
|
class FileSystemWatcher:
|
|
"""
|
|
Platform-specific file system watcher.
|
|
Falls back to polling if native watcher unavailable.
|
|
"""
|
|
|
|
def __init__(self, file_path: Path, callback: Callable):
|
|
self.file_path = file_path
|
|
self.callback = callback
|
|
self._running = False
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._use_inotify = False
|
|
self._use_fsevents = False
|
|
self._last_mtime = 0
|
|
self._last_size = 0
|
|
|
|
def start(self) -> bool:
|
|
"""Start watching the file."""
|
|
if not self.file_path.exists():
|
|
return False
|
|
|
|
self._running = True
|
|
|
|
# Try inotify on Linux
|
|
try:
|
|
import inotify.adapters
|
|
self._use_inotify = True
|
|
self._thread = threading.Thread(target=self._inotify_loop, daemon=True, name="InotifyWatcher")
|
|
self._thread.start()
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
|
|
# Try fsevents on macOS
|
|
try:
|
|
import fsevents
|
|
self._use_fsevents = True
|
|
self._thread = threading.Thread(target=self._fsevents_loop, daemon=True, name="FSEventsWatcher")
|
|
self._thread.start()
|
|
return True
|
|
except ImportError:
|
|
pass
|
|
|
|
# Fallback to stat-based polling
|
|
self._last_mtime = self.file_path.stat().st_mtime
|
|
self._last_size = self.file_path.stat().st_size
|
|
self._thread = threading.Thread(target=self._stat_loop, daemon=True, name="StatWatcher")
|
|
self._thread.start()
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop watching."""
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join(timeout=1.0)
|
|
|
|
def _inotify_loop(self):
|
|
"""Inotify-based watching (Linux)."""
|
|
try:
|
|
import inotify.adapters
|
|
|
|
i = inotify.adapters.Inotify()
|
|
i.add_watch(str(self.file_path))
|
|
|
|
for event in i.event_gen(yield_nones=False):
|
|
if not self._running:
|
|
break
|
|
(_, type_names, path, filename) = event
|
|
if 'IN_MODIFY' in type_names:
|
|
self.callback()
|
|
except Exception as e:
|
|
print(f"[FileSystemWatcher] inotify error: {e}, falling back to polling")
|
|
self._use_inotify = False
|
|
self._stat_loop()
|
|
|
|
def _fsevents_loop(self):
|
|
"""FSEvents-based watching (macOS)."""
|
|
try:
|
|
import fsevents
|
|
|
|
def handler(event):
|
|
if self._running and event.name == str(self.file_path):
|
|
self.callback()
|
|
|
|
observer = fsevents.Observer()
|
|
stream = fsevents.Stream(handler, str(self.file_path.parent), file_events=True)
|
|
observer.schedule(stream)
|
|
observer.start()
|
|
|
|
while self._running:
|
|
time.sleep(0.1)
|
|
|
|
observer.stop()
|
|
observer.join()
|
|
except Exception as e:
|
|
print(f"[FileSystemWatcher] fsevents error: {e}, falling back to polling")
|
|
self._use_fsevents = False
|
|
self._stat_loop()
|
|
|
|
def _stat_loop(self):
|
|
"""Stat-based watching (fallback, most compatible)."""
|
|
poll_interval = 0.5 # Start with 500ms
|
|
|
|
while self._running:
|
|
try:
|
|
stat = self.file_path.stat()
|
|
|
|
if stat.st_mtime != self._last_mtime or stat.st_size != self._last_size:
|
|
self._last_mtime = stat.st_mtime
|
|
self._last_size = stat.st_size
|
|
self.callback()
|
|
poll_interval = 0.05 # Fast poll after activity
|
|
else:
|
|
# Gradually slow down polling
|
|
poll_interval = min(1.0, poll_interval * 1.1)
|
|
|
|
except (OSError, FileNotFoundError):
|
|
poll_interval = 1.0 # Slow down on error
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
class OptimizedLogWatcher:
|
|
"""
|
|
High-performance log file watcher.
|
|
|
|
Features:
|
|
- Native file system events when available
|
|
- Adaptive polling fallback
|
|
- Zero-allocation line reading
|
|
- Memory-mapped files for large logs
|
|
- Batched event processing
|
|
"""
|
|
|
|
# Pre-compiled patterns (class level)
|
|
_PATTERNS: Optional[Dict[str, re.Pattern]] = None
|
|
_PATTERNS_LOCK = threading.Lock()
|
|
|
|
LOG_PATHS = [
|
|
Path.home() / "Documents" / "Entropia Universe" / "chat.log",
|
|
Path.home() / "Documents" / "Entropia Universe" / "Logs" / "chat.log",
|
|
Path.home() / "Entropia Universe" / "chat.log",
|
|
]
|
|
|
|
def __init__(self, log_path: Path = None, use_fs_watcher: bool = True):
|
|
self.log_path = log_path or self._find_log_file()
|
|
self._running = False
|
|
self._last_position = 0
|
|
self._use_fs_watcher = use_fs_watcher
|
|
self._fs_watcher: Optional[FileSystemWatcher] = None
|
|
|
|
# Subscribers with fine-grained locking
|
|
self._subscribers: Dict[str, List[Callable]] = {}
|
|
self._any_subscribers: List[Callable] = []
|
|
self._subscribers_lock = threading.RLock()
|
|
|
|
# Optimized data structures
|
|
self._recent_lines = RingBuffer(1000)
|
|
self._string_interner = StringInterner(max_size=10000)
|
|
|
|
# Pattern matching cache with TTL
|
|
self._pattern_cache: Dict[str, tuple] = {} # line -> (event, timestamp)
|
|
self._cache_max_size = 5000
|
|
self._cache_ttl_seconds = 300 # 5 minutes
|
|
self._cache_lock = threading.Lock()
|
|
|
|
# Performance stats
|
|
self._stats = {
|
|
'lines_read': 0,
|
|
'events_parsed': 0,
|
|
'start_time': None,
|
|
'cache_hits': 0,
|
|
'cache_misses': 0,
|
|
'bytes_read': 0,
|
|
'read_operations': 0,
|
|
}
|
|
self._stats_lock = threading.Lock()
|
|
|
|
# Batch processing
|
|
self._batch: List[str] = []
|
|
self._batch_size = 10
|
|
self._batch_timeout = 0.05 # 50ms
|
|
self._batch_timer: Optional[threading.Timer] = None
|
|
self._batch_lock = threading.Lock()
|
|
|
|
self._ensure_patterns()
|
|
|
|
@classmethod
|
|
def _ensure_patterns(cls):
|
|
"""Ensure regex patterns are compiled."""
|
|
if cls._PATTERNS is None:
|
|
with cls._PATTERNS_LOCK:
|
|
if cls._PATTERNS is None:
|
|
cls._PATTERNS = {
|
|
'skill_gain': re.compile(
|
|
r'(.+?)\s+has\s+improved\s+by\s+(\d+\.?\d*)\s+points?',
|
|
re.IGNORECASE
|
|
),
|
|
'loot': re.compile(
|
|
r'You\s+received\s+(.+?)\s+x\s*(\d+)',
|
|
re.IGNORECASE
|
|
),
|
|
'global': re.compile(
|
|
r'(\w+)\s+received\s+.+?\s+from\s+(\w+)\s+worth\s+(\d+)\s+PED',
|
|
re.IGNORECASE
|
|
),
|
|
'damage': re.compile(
|
|
r'You\s+(?:hit|inflicted)\s+(\d+)\s+damage',
|
|
re.IGNORECASE
|
|
),
|
|
'damage_taken': re.compile(
|
|
r'You\s+were\s+hit\s+for\s+(\d+)\s+damage',
|
|
re.IGNORECASE
|
|
),
|
|
'heal': re.compile(
|
|
r'You\s+(?:healed|restored)\s+(\d+)\s+(?:health|points)',
|
|
re.IGNORECASE
|
|
),
|
|
'mission_complete': re.compile(
|
|
r'Mission\s+completed:\s+(.+)',
|
|
re.IGNORECASE
|
|
),
|
|
'tier_increase': re.compile(
|
|
r'Your\s+(.+?)\s+has\s+reached\s+tier\s+(\d+)',
|
|
re.IGNORECASE
|
|
),
|
|
'enhancer_break': re.compile(
|
|
r'Your\s+(.+?)\s+broke',
|
|
re.IGNORECASE
|
|
),
|
|
}
|
|
|
|
def _find_log_file(self) -> Optional[Path]:
|
|
"""Find EU chat.log file."""
|
|
for path in self.LOG_PATHS:
|
|
if path.exists():
|
|
return path
|
|
return None
|
|
|
|
def start(self) -> bool:
|
|
"""Start log monitoring."""
|
|
if not self.log_path or not self.log_path.exists():
|
|
print(f"[LogWatcher] Log file not found. Tried: {self.LOG_PATHS}")
|
|
return False
|
|
|
|
self._running = True
|
|
with self._stats_lock:
|
|
self._stats['start_time'] = datetime.now()
|
|
|
|
# Start at end of file
|
|
try:
|
|
self._last_position = self.log_path.stat().st_size
|
|
except OSError:
|
|
self._last_position = 0
|
|
|
|
# Try native file system watcher
|
|
if self._use_fs_watcher:
|
|
self._fs_watcher = FileSystemWatcher(self.log_path, self._on_file_changed)
|
|
if self._fs_watcher.start():
|
|
print(f"[LogWatcher] Started with native file watching: {self.log_path}")
|
|
return True
|
|
|
|
# Fallback to polling
|
|
print(f"[LogWatcher] Started with polling fallback: {self.log_path}")
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop log monitoring."""
|
|
self._running = False
|
|
|
|
if self._fs_watcher:
|
|
self._fs_watcher.stop()
|
|
|
|
# Flush pending batch
|
|
self._flush_batch()
|
|
|
|
print("[LogWatcher] Stopped")
|
|
|
|
def _on_file_changed(self):
|
|
"""Called when file system watcher detects a change."""
|
|
if self._running:
|
|
self._read_new_lines()
|
|
|
|
def _read_new_lines(self) -> int:
|
|
"""Read and process new lines. Returns count of lines read."""
|
|
try:
|
|
current_size = self.log_path.stat().st_size
|
|
except (OSError, FileNotFoundError):
|
|
return 0
|
|
|
|
if current_size < self._last_position:
|
|
# Log was rotated/truncated
|
|
self._last_position = 0
|
|
|
|
if current_size == self._last_position:
|
|
return 0
|
|
|
|
lines_read = 0
|
|
bytes_read = 0
|
|
|
|
try:
|
|
# Use memory-mapped reading for large files
|
|
if current_size - self._last_position > 1024 * 1024: # 1MB
|
|
lines_read, bytes_read = self._read_mmap()
|
|
else:
|
|
lines_read, bytes_read = self._read_standard()
|
|
|
|
with self._stats_lock:
|
|
self._stats['lines_read'] += lines_read
|
|
self._stats['bytes_read'] += bytes_read
|
|
self._stats['read_operations'] += 1
|
|
|
|
return lines_read
|
|
|
|
except Exception as e:
|
|
print(f"[LogWatcher] Read error: {e}")
|
|
return 0
|
|
|
|
def _read_standard(self) -> tuple:
|
|
"""Standard line-by-line reading."""
|
|
lines = []
|
|
bytes_read = 0
|
|
|
|
with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
f.seek(self._last_position)
|
|
|
|
# Read in chunks for efficiency
|
|
chunk_size = 8192
|
|
while True:
|
|
chunk = f.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
bytes_read += len(chunk)
|
|
lines.extend(chunk.splitlines())
|
|
|
|
self._last_position = f.tell()
|
|
|
|
# Process lines
|
|
if lines:
|
|
self._queue_lines(lines)
|
|
|
|
return len(lines), bytes_read
|
|
|
|
def _read_mmap(self) -> tuple:
|
|
"""Memory-mapped file reading for large files."""
|
|
import mmap
|
|
|
|
lines = []
|
|
bytes_read = 0
|
|
|
|
with open(self.log_path, 'r+b') as f:
|
|
f.seek(self._last_position)
|
|
|
|
# Memory-map the remaining file
|
|
remaining = self.log_path.stat().st_size - self._last_position
|
|
if remaining <= 0:
|
|
return 0, 0
|
|
|
|
with mmap.mmap(f.fileno(), remaining, access=mmap.ACCESS_READ) as mm:
|
|
# Read lines from mmap
|
|
buffer = mm.read()
|
|
text = buffer.decode('utf-8', errors='ignore')
|
|
lines = text.splitlines()
|
|
bytes_read = len(buffer)
|
|
|
|
self._last_position += bytes_read
|
|
|
|
if lines:
|
|
self._queue_lines(lines)
|
|
|
|
return len(lines), bytes_read
|
|
|
|
def _queue_lines(self, lines: List[str]):
|
|
"""Queue lines for batch processing."""
|
|
with self._batch_lock:
|
|
self._batch.extend(lines)
|
|
|
|
# Flush immediately if batch is full
|
|
if len(self._batch) >= self._batch_size:
|
|
self._flush_batch()
|
|
elif self._batch_timer is None:
|
|
# Start timeout timer
|
|
self._batch_timer = threading.Timer(self._batch_timeout, self._flush_batch)
|
|
self._batch_timer.daemon = True
|
|
self._batch_timer.start()
|
|
|
|
def _flush_batch(self):
|
|
"""Process batched lines."""
|
|
with self._batch_lock:
|
|
if self._batch_timer:
|
|
self._batch_timer.cancel()
|
|
self._batch_timer = None
|
|
|
|
batch = self._batch
|
|
self._batch = []
|
|
|
|
if batch:
|
|
self._process_lines_batch(batch)
|
|
|
|
def _process_lines_batch(self, lines: List[str]):
|
|
"""Process multiple lines efficiently."""
|
|
patterns = self._PATTERNS
|
|
events = []
|
|
now = datetime.now()
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Intern the line
|
|
line = self._string_interner.intern(line)
|
|
|
|
# Add to recent lines buffer
|
|
self._recent_lines.append(line)
|
|
|
|
# Check cache
|
|
cached = self._get_cached_event(line)
|
|
if cached is not None:
|
|
if cached: # Valid event
|
|
events.append(cached)
|
|
continue
|
|
|
|
# Parse event
|
|
event = self._parse_event(line, patterns, now)
|
|
|
|
# Cache the result
|
|
self._cache_event(line, event)
|
|
|
|
if event:
|
|
events.append(event)
|
|
|
|
# Notify subscribers
|
|
for event in events:
|
|
self._notify_subscribers(event)
|
|
|
|
def _get_cached_event(self, line: str) -> Optional[Optional[LogEvent]]:
|
|
"""Get cached event if still valid."""
|
|
with self._cache_lock:
|
|
cached = self._pattern_cache.get(line)
|
|
|
|
if cached is None:
|
|
with self._stats_lock:
|
|
self._stats['cache_misses'] += 1
|
|
return None
|
|
|
|
event, timestamp = cached
|
|
|
|
# Check TTL
|
|
if time.time() - timestamp > self._cache_ttl_seconds:
|
|
with self._cache_lock:
|
|
self._pattern_cache.pop(line, None)
|
|
with self._stats_lock:
|
|
self._stats['cache_misses'] += 1
|
|
return None
|
|
|
|
with self._stats_lock:
|
|
self._stats['cache_hits'] += 1
|
|
return event
|
|
|
|
def _cache_event(self, line: str, event: Optional[LogEvent]):
|
|
"""Cache parsed event."""
|
|
with self._cache_lock:
|
|
# Evict if necessary
|
|
if len(self._pattern_cache) >= self._cache_max_size:
|
|
# Remove oldest 10%
|
|
keys = list(self._pattern_cache.keys())[:self._cache_max_size // 10]
|
|
for k in keys:
|
|
del self._pattern_cache[k]
|
|
|
|
self._pattern_cache[line] = (event, time.time())
|
|
|
|
def _parse_event(self, line: str, patterns: Dict[str, re.Pattern], timestamp: datetime) -> Optional[LogEvent]:
|
|
"""Parse a log line into a LogEvent."""
|
|
for event_type, pattern in patterns.items():
|
|
match = pattern.search(line)
|
|
if match:
|
|
with self._stats_lock:
|
|
self._stats['events_parsed'] += 1
|
|
return LogEvent(
|
|
timestamp=timestamp,
|
|
raw_line=line,
|
|
event_type=event_type,
|
|
data={'groups': match.groups()}
|
|
)
|
|
return None
|
|
|
|
def _notify_subscribers(self, event: LogEvent):
|
|
"""Notify subscribers of an event."""
|
|
with self._subscribers_lock:
|
|
callbacks = self._subscribers.get(event.event_type, []).copy()
|
|
any_callbacks = self._any_subscribers.copy()
|
|
|
|
# Notify type-specific subscribers
|
|
for callback in callbacks:
|
|
try:
|
|
callback(event)
|
|
except Exception as e:
|
|
print(f"[LogWatcher] Subscriber error: {e}")
|
|
|
|
# Notify "any" subscribers
|
|
for callback in any_callbacks:
|
|
try:
|
|
callback(event)
|
|
except Exception as e:
|
|
print(f"[LogWatcher] Subscriber error: {e}")
|
|
|
|
# ========== Public API ==========
|
|
|
|
def subscribe(self, event_type: str, callback: Callable):
|
|
"""Subscribe to specific event type."""
|
|
with self._subscribers_lock:
|
|
if event_type not in self._subscribers:
|
|
self._subscribers[event_type] = []
|
|
self._subscribers[event_type].append(callback)
|
|
|
|
def subscribe_all(self, callback: Callable):
|
|
"""Subscribe to all events."""
|
|
with self._subscribers_lock:
|
|
self._any_subscribers.append(callback)
|
|
|
|
def unsubscribe(self, event_type: str, callback: Callable):
|
|
"""Unsubscribe from events."""
|
|
with self._subscribers_lock:
|
|
if event_type in self._subscribers:
|
|
self._subscribers[event_type] = [
|
|
cb for cb in self._subscribers[event_type] if cb != callback
|
|
]
|
|
|
|
def read_lines(self, count: int = 50, filter_text: str = None) -> List[str]:
|
|
"""Read recent lines."""
|
|
lines = list(self._recent_lines)
|
|
lines = lines[-count:] if count < len(lines) else lines
|
|
|
|
if filter_text:
|
|
filter_lower = filter_text.lower()
|
|
lines = [l for l in lines if filter_lower in l.lower()]
|
|
|
|
return lines
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get performance statistics."""
|
|
with self._stats_lock:
|
|
stats = self._stats.copy()
|
|
|
|
total_cache = stats['cache_hits'] + stats['cache_misses']
|
|
stats['cache_hit_rate'] = (stats['cache_hits'] / total_cache * 100) if total_cache > 0 else 0
|
|
stats['cache_size'] = len(self._pattern_cache)
|
|
stats['watcher_type'] = 'native' if (self._fs_watcher and self._fs_watcher._use_inotify) else 'polling'
|
|
return stats
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if log file is available."""
|
|
return self.log_path is not None and self.log_path.exists()
|
|
|
|
def clear_cache(self):
|
|
"""Clear the pattern cache."""
|
|
with self._cache_lock:
|
|
self._pattern_cache.clear()
|
|
|
|
|
|
# Singleton
|
|
_log_watcher = None
|
|
_log_watcher_lock = threading.Lock()
|
|
|
|
|
|
def get_log_watcher() -> OptimizedLogWatcher:
|
|
"""Get global OptimizedLogWatcher instance."""
|
|
global _log_watcher
|
|
if _log_watcher is None:
|
|
with _log_watcher_lock:
|
|
if _log_watcher is None:
|
|
_log_watcher = OptimizedLogWatcher()
|
|
return _log_watcher
|