# Description: LogWatcher implementing Observer Pattern for real-time log parsing # Monitors chat.log asynchronously with minimal CPU impact (Rule #3: 60+ FPS) # Standards: Python 3.11+, asyncio, regex patterns for Entropia Universe import asyncio import re import os from pathlib import Path from typing import Callable, List, Dict, Any, Optional from dataclasses import dataclass from datetime import datetime from decimal import Decimal import logging import time logger = logging.getLogger(__name__) @dataclass class LogEvent: """Represents a parsed event from chat.log.""" timestamp: datetime event_type: str raw_line: str data: Dict[str, Any] class LogWatcher: """ Watches Entropia Universe chat.log and notifies observers of events. Implements Observer Pattern: Multiple modules can subscribe to specific event types without tight coupling. Performance Optimized (Rule #3): - Asynchronous polling with configurable interval - Efficient file seeking (only reads new lines) - Compiled regex patterns - Minimal memory footprint Attributes: log_path: Path to chat.log file poll_interval: Polling interval in seconds (default: 1.0) observers: Dict of event_type -> list of callback functions _running: Whether the watcher is active _file_position: Current read position in file """ # ======================================================================== # REGEX PATTERNS (Compiled for performance) # ======================================================================== # Global/Hall of Fame patterns PATTERN_GLOBAL = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'([\w\s]+)\s+globals\s+in\s+([\w\s]+)\s+for\s+(\d+(?:\.\d+)?)\s+PED', re.IGNORECASE ) PATTERN_HOF = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'([\w\s]+)\s+is\s+in\s+the\s+Hall\s+of\s+Fame\s+.*?(\d+(?:\.\d+)?)\s+PED', re.IGNORECASE ) # Regular loot pattern PATTERN_LOOT = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'You\s+received\s+([\w\s]+)\s+x\s*(\d+)\s+.*?(?:Value:\s+(\d+(?:\.\d+)?)\s+PED)?', re.IGNORECASE ) # Alternative loot pattern (different wording in EU) PATTERN_LOOT_ALT = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'You\s+received\s+([\w\s]+)\s+\(.*?\d+\s+items\)', re.IGNORECASE ) # Skill gain pattern PATTERN_SKILL = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'You\s+gained\s+(\d+(?:\.\d+)?)\s+experience\s+in\s+your\s+([\w\s]+)\s+skill', re.IGNORECASE ) # Skill level up pattern PATTERN_SKILL_LEVEL = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'Congratulations!\s+You\s+have\s+advanced\s+to\s+level\s+(\d+)\s+in\s+([\w\s]+)', re.IGNORECASE ) # Weapon decay pattern PATTERN_DECAY = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'Your\s+([\w\s]+)\s+has\s+decayed\s+(\d+(?:\.\d+)?)\s+PEC', re.IGNORECASE ) # Creature killed / target info (useful for context) PATTERN_KILL = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'You\s+killed\s+a\s+([\w\s]+)', re.IGNORECASE ) # Enhancer break pattern PATTERN_ENHANCER = re.compile( r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+' r'Your\s+([\w\s]+)\s+enhancer\s+has\s+broken', re.IGNORECASE ) EVENT_PATTERNS = { 'global': PATTERN_GLOBAL, 'hof': PATTERN_HOF, 'loot': PATTERN_LOOT, 'loot_alt': PATTERN_LOOT_ALT, 'skill': PATTERN_SKILL, 'skill_level': PATTERN_SKILL_LEVEL, 'decay': PATTERN_DECAY, 'kill': PATTERN_KILL, 'enhancer_break': PATTERN_ENHANCER, } def __init__(self, log_path: Optional[str] = None, poll_interval: float = 1.0, mock_mode: bool = False): """ Initialize LogWatcher. Args: log_path: Path to chat.log. Defaults to EU standard location or ./test-data/ poll_interval: Seconds between polls (default: 1.0 for 60+ FPS compliance) mock_mode: Use mock data instead of real log file """ self.mock_mode = mock_mode if log_path is None: if mock_mode: core_dir = Path(__file__).parent log_path = core_dir.parent / "test-data" / "mock-chat.log" else: # Try to find EU log path log_path = self._find_eu_log_path() self.log_path = Path(log_path) self.poll_interval = poll_interval # Observer registry: event_type -> list of callbacks self.observers: Dict[str, List[Callable]] = { 'global': [], 'hof': [], 'loot': [], 'skill': [], 'skill_level': [], 'decay': [], 'kill': [], 'enhancer_break': [], 'any': [], # Catch-all for all events } self._running = False self._file_position = 0 self._last_file_size = 0 self._task: Optional[asyncio.Task] = None logger.info(f"LogWatcher initialized: {self.log_path} (mock={mock_mode})") def _find_eu_log_path(self) -> Path: """ Attempt to find Entropia Universe chat.log. Returns: Path to log file or test-data fallback """ # Common Windows paths possible_paths = [ Path.home() / "Documents" / "Entropia Universe" / "chat.log", Path("C:") / "Users" / os.getenv("USERNAME", "User") / "Documents" / "Entropia Universe" / "chat.log", ] # Linux/Wine paths wine_prefix = Path.home() / ".wine" / "drive_c" possible_paths.extend([ wine_prefix / "users" / os.getenv("USER", "user") / "Documents" / "Entropia Universe" / "chat.log", ]) for path in possible_paths: if path.exists(): logger.info(f"Found EU log: {path}") return path # Fallback to test data fallback = Path(__file__).parent.parent / "test-data" / "chat.log" logger.warning(f"EU log not found, using fallback: {fallback}") return fallback # ======================================================================== # OBSERVER PATTERN METHODS # ======================================================================== def subscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None: """ Subscribe to an event type. Args: event_type: Type of event to listen for callback: Function to call when event occurs """ if event_type not in self.observers: self.observers[event_type] = [] self.observers[event_type].append(callback) logger.debug(f"Subscribed to {event_type}: {callback.__name__}") def unsubscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None: """ Unsubscribe from an event type. Args: event_type: Type of event callback: Function to remove """ if event_type in self.observers: if callback in self.observers[event_type]: self.observers[event_type].remove(callback) logger.debug(f"Unsubscribed from {event_type}: {callback.__name__}") def _notify(self, event: LogEvent) -> None: """ Notify all observers of an event. Args: event: LogEvent to broadcast """ # Notify specific observers if event.event_type in self.observers: for callback in self.observers[event.event_type]: try: callback(event) except Exception as e: logger.error(f"Observer error for {event.event_type}: {e}") # Notify catch-all observers for callback in self.observers['any']: try: callback(event) except Exception as e: logger.error(f"Observer error for 'any': {e}") # ======================================================================== # PARSING METHODS # ======================================================================== def _parse_timestamp(self, ts_str: str) -> datetime: """Parse EU timestamp format.""" return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") def _parse_line(self, line: str) -> Optional[LogEvent]: """ Parse a single log line. Args: line: Raw log line Returns: LogEvent if parsed, None otherwise """ line = line.strip() if not line: return None # Try each pattern for event_type, pattern in self.EVENT_PATTERNS.items(): match = pattern.match(line) if match: return self._create_event(event_type, match, line) return None def _create_event(self, event_type: str, match: re.Match, raw_line: str) -> LogEvent: """ Create LogEvent from regex match. Args: event_type: Type of event matched match: Regex match object raw_line: Original log line Returns: Populated LogEvent """ groups = match.groups() timestamp = self._parse_timestamp(groups[0]) data: Dict[str, Any] = {} if event_type == 'global': data = { 'player_name': groups[1].strip(), 'zone': groups[2].strip(), 'value_ped': Decimal(groups[3]), } elif event_type == 'hof': data = { 'player_name': groups[1].strip(), 'value_ped': Decimal(groups[2]), } elif event_type == 'loot': data = { 'item_name': groups[1].strip(), 'quantity': int(groups[2]) if groups[2] else 1, 'value_ped': Decimal(groups[3]) if groups[3] else Decimal("0.0"), } elif event_type == 'skill': data = { 'gained': Decimal(groups[1]), 'skill_name': groups[2].strip(), } elif event_type == 'skill_level': data = { 'new_level': int(groups[1]), 'skill_name': groups[2].strip(), } elif event_type == 'decay': data = { 'item_name': groups[1].strip(), 'decay_pec': Decimal(groups[2]), } elif event_type == 'kill': data = { 'creature_name': groups[1].strip(), } elif event_type == 'enhancer_break': data = { 'enhancer_type': groups[1].strip(), } return LogEvent( timestamp=timestamp, event_type=event_type, raw_line=raw_line, data=data ) # ======================================================================== # ASYNC POLLING LOOP (Performance Optimized) # ======================================================================== async def start(self) -> None: """Start watching log file asynchronously.""" if self._running: logger.warning("LogWatcher already running") return self._running = True # Initialize file position if self.log_path.exists(): self._last_file_size = self.log_path.stat().st_size self._file_position = self._last_file_size # Start at end (new entries only) self._task = asyncio.create_task(self._watch_loop()) logger.info("LogWatcher started") async def stop(self) -> None: """Stop watching log file.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("LogWatcher stopped") async def _watch_loop(self) -> None: """ Main watching loop. Efficiently polls file for new content with minimal CPU usage. """ while self._running: try: await self._poll_once() except Exception as e: logger.error(f"Poll error: {e}") # Non-blocking sleep (Rule #3: preserve FPS) await asyncio.sleep(self.poll_interval) async def _poll_once(self) -> None: """ Single poll iteration. Reads new lines from file and processes them. """ if not self.log_path.exists(): return current_size = self.log_path.stat().st_size # Check if file was truncated (new session) if current_size < self._file_position: logger.info("Log file truncated, resetting position") self._file_position = 0 # Check if new content exists if current_size == self._file_position: return # No new content # Read new lines with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f: f.seek(self._file_position) new_lines = f.readlines() self._file_position = f.tell() # Parse and notify for line in new_lines: event = self._parse_line(line) if event: self._notify(event) self._last_file_size = current_size # ============================================================================ # MOCK MODE SUPPORT # ============================================================================ class MockLogGenerator: """ Generates mock log entries for testing. Simulates Entropia Universe chat.log output. """ MOCK_LINES = [ "2026-02-08 14:23:15 [System] You received Shrapnel x 123 (Value: 1.23 PED)", "2026-02-08 14:23:45 [System] You gained 0.45 experience in your Rifle skill", "2026-02-08 14:24:02 [System] Your Omegaton M2100 has decayed 15 PEC", "2026-02-08 14:25:30 [System] PlayerOne globals in Twin Peaks for 150.00 PED", "2026-02-08 14:26:10 [System] You received Animal Thyroid Oil x 5", "2026-02-08 14:27:55 [System] Congratulations! You have advanced to level 45 in Rifle", "2026-02-08 14:30:00 [System] PlayerTwo is in the Hall of Fame! Loot of 2500.00 PED", ] @classmethod def create_mock_file(cls, path: Path, lines: int = 100) -> None: """ Create a mock chat.log file. Args: path: Path for mock file lines: Number of lines to generate """ path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'w') as f: for i in range(lines): line = cls.MOCK_LINES[i % len(cls.MOCK_LINES)] # Vary timestamps slightly f.write(f"{line}\n") logger.info(f"Created mock log: {path} ({lines} lines)") # ============================================================================ # MODULE EXPORTS # ============================================================================ __all__ = [ 'LogWatcher', 'LogEvent', 'MockLogGenerator' ]