diff --git a/core/log_watcher.py b/core/log_watcher.py new file mode 100644 index 0000000..3f3914a --- /dev/null +++ b/core/log_watcher.py @@ -0,0 +1,486 @@ +# Description: LogWatcher implementing Observer Pattern for real-time log parsing +# Monitors chat.log asynchronously with minimal CPU impact (Rule #3: 60+ FPS) +# Standards: Python 3.11+, asyncio, regex patterns for Entropia Universe + +import asyncio +import re +import os +from pathlib import Path +from typing import Callable, List, Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime +from decimal import Decimal +import logging +import time + +logger = logging.getLogger(__name__) + + +@dataclass +class LogEvent: + """Represents a parsed event from chat.log.""" + timestamp: datetime + event_type: str + raw_line: str + data: Dict[str, Any] + + +class LogWatcher: + """ + Watches Entropia Universe chat.log and notifies observers of events. + + Implements Observer Pattern: Multiple modules can subscribe to specific + event types without tight coupling. + + Performance Optimized (Rule #3): + - Asynchronous polling with configurable interval + - Efficient file seeking (only reads new lines) + - Compiled regex patterns + - Minimal memory footprint + + Attributes: + log_path: Path to chat.log file + poll_interval: Polling interval in seconds (default: 1.0) + observers: Dict of event_type -> list of callback functions + _running: Whether the watcher is active + _file_position: Current read position in file + """ + + # ======================================================================== + # REGEX PATTERNS (Compiled for performance) + # ======================================================================== + + # Global/Hall of Fame patterns + PATTERN_GLOBAL = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'([\w\s]+)\s+globals\s+in\s+([\w\s]+)\s+for\s+(\d+(?:\.\d+)?)\s+PED', + re.IGNORECASE + ) + + PATTERN_HOF = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'([\w\s]+)\s+is\s+in\s+the\s+Hall\s+of\s+Fame\s+.*?(\d+(?:\.\d+)?)\s+PED', + re.IGNORECASE + ) + + # Regular loot pattern + PATTERN_LOOT = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'You\s+received\s+([\w\s]+)\s+x\s*(\d+)\s+.*?(?:Value:\s+(\d+(?:\.\d+)?)\s+PED)?', + re.IGNORECASE + ) + + # Alternative loot pattern (different wording in EU) + PATTERN_LOOT_ALT = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'You\s+received\s+([\w\s]+)\s+\(.*?\d+\s+items\)', + re.IGNORECASE + ) + + # Skill gain pattern + PATTERN_SKILL = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'You\s+gained\s+(\d+(?:\.\d+)?)\s+experience\s+in\s+your\s+([\w\s]+)\s+skill', + re.IGNORECASE + ) + + # Skill level up pattern + PATTERN_SKILL_LEVEL = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'Congratulations!\s+You\s+have\s+advanced\s+to\s+level\s+(\d+)\s+in\s+([\w\s]+)', + re.IGNORECASE + ) + + # Weapon decay pattern + PATTERN_DECAY = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'Your\s+([\w\s]+)\s+has\s+decayed\s+(\d+(?:\.\d+)?)\s+PEC', + re.IGNORECASE + ) + + # Creature killed / target info (useful for context) + PATTERN_KILL = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'You\s+killed\s+a\s+([\w\s]+)', + re.IGNORECASE + ) + + # Enhancer break pattern + PATTERN_ENHANCER = re.compile( + r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' + r'Your\s+([\w\s]+)\s+enhancer\s+has\s+broken', + re.IGNORECASE + ) + + EVENT_PATTERNS = { + 'global': PATTERN_GLOBAL, + 'hof': PATTERN_HOF, + 'loot': PATTERN_LOOT, + 'loot_alt': PATTERN_LOOT_ALT, + 'skill': PATTERN_SKILL, + 'skill_level': PATTERN_SKILL_LEVEL, + 'decay': PATTERN_DECAY, + 'kill': PATTERN_KILL, + 'enhancer_break': PATTERN_ENHANCER, + } + + def __init__(self, log_path: Optional[str] = None, + poll_interval: float = 1.0, + mock_mode: bool = False): + """ + Initialize LogWatcher. + + Args: + log_path: Path to chat.log. Defaults to EU standard location or ./test-data/ + poll_interval: Seconds between polls (default: 1.0 for 60+ FPS compliance) + mock_mode: Use mock data instead of real log file + """ + self.mock_mode = mock_mode + + if log_path is None: + if mock_mode: + core_dir = Path(__file__).parent + log_path = core_dir.parent / "test-data" / "mock-chat.log" + else: + # Try to find EU log path + log_path = self._find_eu_log_path() + + self.log_path = Path(log_path) + self.poll_interval = poll_interval + + # Observer registry: event_type -> list of callbacks + self.observers: Dict[str, List[Callable]] = { + 'global': [], + 'hof': [], + 'loot': [], + 'skill': [], + 'skill_level': [], + 'decay': [], + 'kill': [], + 'enhancer_break': [], + 'any': [], # Catch-all for all events + } + + self._running = False + self._file_position = 0 + self._last_file_size = 0 + self._task: Optional[asyncio.Task] = None + + logger.info(f"LogWatcher initialized: {self.log_path} (mock={mock_mode})") + + def _find_eu_log_path(self) -> Path: + """ + Attempt to find Entropia Universe chat.log. + + Returns: + Path to log file or test-data fallback + """ + # Common Windows paths + possible_paths = [ + Path.home() / "Documents" / "Entropia Universe" / "chat.log", + Path("C:") / "Users" / os.getenv("USERNAME", "User") / "Documents" / "Entropia Universe" / "chat.log", + ] + + # Linux/Wine paths + wine_prefix = Path.home() / ".wine" / "drive_c" + possible_paths.extend([ + wine_prefix / "users" / os.getenv("USER", "user") / "Documents" / "Entropia Universe" / "chat.log", + ]) + + for path in possible_paths: + if path.exists(): + logger.info(f"Found EU log: {path}") + return path + + # Fallback to test data + fallback = Path(__file__).parent.parent / "test-data" / "chat.log" + logger.warning(f"EU log not found, using fallback: {fallback}") + return fallback + + # ======================================================================== + # OBSERVER PATTERN METHODS + # ======================================================================== + + def subscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None: + """ + Subscribe to an event type. + + Args: + event_type: Type of event to listen for + callback: Function to call when event occurs + """ + if event_type not in self.observers: + self.observers[event_type] = [] + + self.observers[event_type].append(callback) + logger.debug(f"Subscribed to {event_type}: {callback.__name__}") + + def unsubscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None: + """ + Unsubscribe from an event type. + + Args: + event_type: Type of event + callback: Function to remove + """ + if event_type in self.observers: + if callback in self.observers[event_type]: + self.observers[event_type].remove(callback) + logger.debug(f"Unsubscribed from {event_type}: {callback.__name__}") + + def _notify(self, event: LogEvent) -> None: + """ + Notify all observers of an event. + + Args: + event: LogEvent to broadcast + """ + # Notify specific observers + if event.event_type in self.observers: + for callback in self.observers[event.event_type]: + try: + callback(event) + except Exception as e: + logger.error(f"Observer error for {event.event_type}: {e}") + + # Notify catch-all observers + for callback in self.observers['any']: + try: + callback(event) + except Exception as e: + logger.error(f"Observer error for 'any': {e}") + + # ======================================================================== + # PARSING METHODS + # ======================================================================== + + def _parse_timestamp(self, ts_str: str) -> datetime: + """Parse EU timestamp format.""" + return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") + + def _parse_line(self, line: str) -> Optional[LogEvent]: + """ + Parse a single log line. + + Args: + line: Raw log line + + Returns: + LogEvent if parsed, None otherwise + """ + line = line.strip() + if not line: + return None + + # Try each pattern + for event_type, pattern in self.EVENT_PATTERNS.items(): + match = pattern.match(line) + if match: + return self._create_event(event_type, match, line) + + return None + + def _create_event(self, event_type: str, match: re.Match, raw_line: str) -> LogEvent: + """ + Create LogEvent from regex match. + + Args: + event_type: Type of event matched + match: Regex match object + raw_line: Original log line + + Returns: + Populated LogEvent + """ + groups = match.groups() + timestamp = self._parse_timestamp(groups[0]) + data: Dict[str, Any] = {} + + if event_type == 'global': + data = { + 'player_name': groups[1].strip(), + 'zone': groups[2].strip(), + 'value_ped': Decimal(groups[3]), + } + + elif event_type == 'hof': + data = { + 'player_name': groups[1].strip(), + 'value_ped': Decimal(groups[2]), + } + + elif event_type == 'loot': + data = { + 'item_name': groups[1].strip(), + 'quantity': int(groups[2]) if groups[2] else 1, + 'value_ped': Decimal(groups[3]) if groups[3] else Decimal("0.0"), + } + + elif event_type == 'skill': + data = { + 'gained': Decimal(groups[1]), + 'skill_name': groups[2].strip(), + } + + elif event_type == 'skill_level': + data = { + 'new_level': int(groups[1]), + 'skill_name': groups[2].strip(), + } + + elif event_type == 'decay': + data = { + 'item_name': groups[1].strip(), + 'decay_pec': Decimal(groups[2]), + } + + elif event_type == 'kill': + data = { + 'creature_name': groups[1].strip(), + } + + elif event_type == 'enhancer_break': + data = { + 'enhancer_type': groups[1].strip(), + } + + return LogEvent( + timestamp=timestamp, + event_type=event_type, + raw_line=raw_line, + data=data + ) + + # ======================================================================== + # ASYNC POLLING LOOP (Performance Optimized) + # ======================================================================== + + async def start(self) -> None: + """Start watching log file asynchronously.""" + if self._running: + logger.warning("LogWatcher already running") + return + + self._running = True + + # Initialize file position + if self.log_path.exists(): + self._last_file_size = self.log_path.stat().st_size + self._file_position = self._last_file_size # Start at end (new entries only) + + self._task = asyncio.create_task(self._watch_loop()) + logger.info("LogWatcher started") + + async def stop(self) -> None: + """Stop watching log file.""" + self._running = False + + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + logger.info("LogWatcher stopped") + + async def _watch_loop(self) -> None: + """ + Main watching loop. + + Efficiently polls file for new content with minimal CPU usage. + """ + while self._running: + try: + await self._poll_once() + except Exception as e: + logger.error(f"Poll error: {e}") + + # Non-blocking sleep (Rule #3: preserve FPS) + await asyncio.sleep(self.poll_interval) + + async def _poll_once(self) -> None: + """ + Single poll iteration. + + Reads new lines from file and processes them. + """ + if not self.log_path.exists(): + return + + current_size = self.log_path.stat().st_size + + # Check if file was truncated (new session) + if current_size < self._file_position: + logger.info("Log file truncated, resetting position") + self._file_position = 0 + + # Check if new content exists + if current_size == self._file_position: + return # No new content + + # Read new lines + with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f: + f.seek(self._file_position) + new_lines = f.readlines() + self._file_position = f.tell() + + # Parse and notify + for line in new_lines: + event = self._parse_line(line) + if event: + self._notify(event) + + self._last_file_size = current_size + + +# ============================================================================ +# MOCK MODE SUPPORT +# ============================================================================ + +class MockLogGenerator: + """ + Generates mock log entries for testing. + + Simulates Entropia Universe chat.log output. + """ + + MOCK_LINES = [ + "2026-02-08 14:23:15 [System] You received Shrapnel x 123 (Value: 1.23 PED)", + "2026-02-08 14:23:45 [System] You gained 0.45 experience in your Rifle skill", + "2026-02-08 14:24:02 [System] Your Omegaton M2100 has decayed 15 PEC", + "2026-02-08 14:25:30 [System] PlayerOne globals in Twin Peaks for 150.00 PED", + "2026-02-08 14:26:10 [System] You received Animal Thyroid Oil x 5", + "2026-02-08 14:27:55 [System] Congratulations! You have advanced to level 45 in Rifle", + "2026-02-08 14:30:00 [System] PlayerTwo is in the Hall of Fame! Loot of 2500.00 PED", + ] + + @classmethod + def create_mock_file(cls, path: Path, lines: int = 100) -> None: + """ + Create a mock chat.log file. + + Args: + path: Path for mock file + lines: Number of lines to generate + """ + path.parent.mkdir(parents=True, exist_ok=True) + + with open(path, 'w') as f: + for i in range(lines): + line = cls.MOCK_LINES[i % len(cls.MOCK_LINES)] + # Vary timestamps slightly + f.write(f"{line}\n") + + logger.info(f"Created mock log: {path} ({lines} lines)") + + +# ============================================================================ +# MODULE EXPORTS +# ============================================================================ + +__all__ = [ + 'LogWatcher', + 'LogEvent', + 'MockLogGenerator' +]