feat(core): implement LogWatcher with Observer Pattern

- Create LogWatcher using Observer Pattern for loose coupling
- Implement compiled regex patterns for EU chat.log parsing:
  - Global/HoF detection
  - Regular loot parsing
  - Skill gain tracking
  - Weapon decay monitoring
- Add async polling loop (1s interval for 60+ FPS compliance)
- Implement MockLogGenerator for testing
- Efficient file seeking (only reads new lines)
- Event types: global, hof, loot, skill, decay, kill, enhancer_break

Performance optimized: minimal CPU, non-blocking async.
This commit is contained in:
LemonNexus 2026-02-08 16:56:49 +00:00
parent 28b8921efa
commit 4efbf39e7e
1 changed files with 486 additions and 0 deletions

486
core/log_watcher.py Normal file
View File

@ -0,0 +1,486 @@
# Description: LogWatcher implementing Observer Pattern for real-time log parsing
# Monitors chat.log asynchronously with minimal CPU impact (Rule #3: 60+ FPS)
# Standards: Python 3.11+, asyncio, regex patterns for Entropia Universe
import asyncio
import re
import os
from pathlib import Path
from typing import Callable, List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
import logging
import time
logger = logging.getLogger(__name__)
@dataclass
class LogEvent:
"""Represents a parsed event from chat.log."""
timestamp: datetime
event_type: str
raw_line: str
data: Dict[str, Any]
class LogWatcher:
"""
Watches Entropia Universe chat.log and notifies observers of events.
Implements Observer Pattern: Multiple modules can subscribe to specific
event types without tight coupling.
Performance Optimized (Rule #3):
- Asynchronous polling with configurable interval
- Efficient file seeking (only reads new lines)
- Compiled regex patterns
- Minimal memory footprint
Attributes:
log_path: Path to chat.log file
poll_interval: Polling interval in seconds (default: 1.0)
observers: Dict of event_type -> list of callback functions
_running: Whether the watcher is active
_file_position: Current read position in file
"""
# ========================================================================
# REGEX PATTERNS (Compiled for performance)
# ========================================================================
# Global/Hall of Fame patterns
PATTERN_GLOBAL = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'([\w\s]+)\s+globals\s+in\s+([\w\s]+)\s+for\s+(\d+(?:\.\d+)?)\s+PED',
re.IGNORECASE
)
PATTERN_HOF = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'([\w\s]+)\s+is\s+in\s+the\s+Hall\s+of\s+Fame\s+.*?(\d+(?:\.\d+)?)\s+PED',
re.IGNORECASE
)
# Regular loot pattern
PATTERN_LOOT = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'You\s+received\s+([\w\s]+)\s+x\s*(\d+)\s+.*?(?:Value:\s+(\d+(?:\.\d+)?)\s+PED)?',
re.IGNORECASE
)
# Alternative loot pattern (different wording in EU)
PATTERN_LOOT_ALT = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'You\s+received\s+([\w\s]+)\s+\(.*?\d+\s+items\)',
re.IGNORECASE
)
# Skill gain pattern
PATTERN_SKILL = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'You\s+gained\s+(\d+(?:\.\d+)?)\s+experience\s+in\s+your\s+([\w\s]+)\s+skill',
re.IGNORECASE
)
# Skill level up pattern
PATTERN_SKILL_LEVEL = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'Congratulations!\s+You\s+have\s+advanced\s+to\s+level\s+(\d+)\s+in\s+([\w\s]+)',
re.IGNORECASE
)
# Weapon decay pattern
PATTERN_DECAY = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'Your\s+([\w\s]+)\s+has\s+decayed\s+(\d+(?:\.\d+)?)\s+PEC',
re.IGNORECASE
)
# Creature killed / target info (useful for context)
PATTERN_KILL = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'You\s+killed\s+a\s+([\w\s]+)',
re.IGNORECASE
)
# Enhancer break pattern
PATTERN_ENHANCER = re.compile(
r'^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[System\]\s+'
r'Your\s+([\w\s]+)\s+enhancer\s+has\s+broken',
re.IGNORECASE
)
EVENT_PATTERNS = {
'global': PATTERN_GLOBAL,
'hof': PATTERN_HOF,
'loot': PATTERN_LOOT,
'loot_alt': PATTERN_LOOT_ALT,
'skill': PATTERN_SKILL,
'skill_level': PATTERN_SKILL_LEVEL,
'decay': PATTERN_DECAY,
'kill': PATTERN_KILL,
'enhancer_break': PATTERN_ENHANCER,
}
def __init__(self, log_path: Optional[str] = None,
poll_interval: float = 1.0,
mock_mode: bool = False):
"""
Initialize LogWatcher.
Args:
log_path: Path to chat.log. Defaults to EU standard location or ./test-data/
poll_interval: Seconds between polls (default: 1.0 for 60+ FPS compliance)
mock_mode: Use mock data instead of real log file
"""
self.mock_mode = mock_mode
if log_path is None:
if mock_mode:
core_dir = Path(__file__).parent
log_path = core_dir.parent / "test-data" / "mock-chat.log"
else:
# Try to find EU log path
log_path = self._find_eu_log_path()
self.log_path = Path(log_path)
self.poll_interval = poll_interval
# Observer registry: event_type -> list of callbacks
self.observers: Dict[str, List[Callable]] = {
'global': [],
'hof': [],
'loot': [],
'skill': [],
'skill_level': [],
'decay': [],
'kill': [],
'enhancer_break': [],
'any': [], # Catch-all for all events
}
self._running = False
self._file_position = 0
self._last_file_size = 0
self._task: Optional[asyncio.Task] = None
logger.info(f"LogWatcher initialized: {self.log_path} (mock={mock_mode})")
def _find_eu_log_path(self) -> Path:
"""
Attempt to find Entropia Universe chat.log.
Returns:
Path to log file or test-data fallback
"""
# Common Windows paths
possible_paths = [
Path.home() / "Documents" / "Entropia Universe" / "chat.log",
Path("C:") / "Users" / os.getenv("USERNAME", "User") / "Documents" / "Entropia Universe" / "chat.log",
]
# Linux/Wine paths
wine_prefix = Path.home() / ".wine" / "drive_c"
possible_paths.extend([
wine_prefix / "users" / os.getenv("USER", "user") / "Documents" / "Entropia Universe" / "chat.log",
])
for path in possible_paths:
if path.exists():
logger.info(f"Found EU log: {path}")
return path
# Fallback to test data
fallback = Path(__file__).parent.parent / "test-data" / "chat.log"
logger.warning(f"EU log not found, using fallback: {fallback}")
return fallback
# ========================================================================
# OBSERVER PATTERN METHODS
# ========================================================================
def subscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None:
"""
Subscribe to an event type.
Args:
event_type: Type of event to listen for
callback: Function to call when event occurs
"""
if event_type not in self.observers:
self.observers[event_type] = []
self.observers[event_type].append(callback)
logger.debug(f"Subscribed to {event_type}: {callback.__name__}")
def unsubscribe(self, event_type: str, callback: Callable[[LogEvent], None]) -> None:
"""
Unsubscribe from an event type.
Args:
event_type: Type of event
callback: Function to remove
"""
if event_type in self.observers:
if callback in self.observers[event_type]:
self.observers[event_type].remove(callback)
logger.debug(f"Unsubscribed from {event_type}: {callback.__name__}")
def _notify(self, event: LogEvent) -> None:
"""
Notify all observers of an event.
Args:
event: LogEvent to broadcast
"""
# Notify specific observers
if event.event_type in self.observers:
for callback in self.observers[event.event_type]:
try:
callback(event)
except Exception as e:
logger.error(f"Observer error for {event.event_type}: {e}")
# Notify catch-all observers
for callback in self.observers['any']:
try:
callback(event)
except Exception as e:
logger.error(f"Observer error for 'any': {e}")
# ========================================================================
# PARSING METHODS
# ========================================================================
def _parse_timestamp(self, ts_str: str) -> datetime:
"""Parse EU timestamp format."""
return datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
def _parse_line(self, line: str) -> Optional[LogEvent]:
"""
Parse a single log line.
Args:
line: Raw log line
Returns:
LogEvent if parsed, None otherwise
"""
line = line.strip()
if not line:
return None
# Try each pattern
for event_type, pattern in self.EVENT_PATTERNS.items():
match = pattern.match(line)
if match:
return self._create_event(event_type, match, line)
return None
def _create_event(self, event_type: str, match: re.Match, raw_line: str) -> LogEvent:
"""
Create LogEvent from regex match.
Args:
event_type: Type of event matched
match: Regex match object
raw_line: Original log line
Returns:
Populated LogEvent
"""
groups = match.groups()
timestamp = self._parse_timestamp(groups[0])
data: Dict[str, Any] = {}
if event_type == 'global':
data = {
'player_name': groups[1].strip(),
'zone': groups[2].strip(),
'value_ped': Decimal(groups[3]),
}
elif event_type == 'hof':
data = {
'player_name': groups[1].strip(),
'value_ped': Decimal(groups[2]),
}
elif event_type == 'loot':
data = {
'item_name': groups[1].strip(),
'quantity': int(groups[2]) if groups[2] else 1,
'value_ped': Decimal(groups[3]) if groups[3] else Decimal("0.0"),
}
elif event_type == 'skill':
data = {
'gained': Decimal(groups[1]),
'skill_name': groups[2].strip(),
}
elif event_type == 'skill_level':
data = {
'new_level': int(groups[1]),
'skill_name': groups[2].strip(),
}
elif event_type == 'decay':
data = {
'item_name': groups[1].strip(),
'decay_pec': Decimal(groups[2]),
}
elif event_type == 'kill':
data = {
'creature_name': groups[1].strip(),
}
elif event_type == 'enhancer_break':
data = {
'enhancer_type': groups[1].strip(),
}
return LogEvent(
timestamp=timestamp,
event_type=event_type,
raw_line=raw_line,
data=data
)
# ========================================================================
# ASYNC POLLING LOOP (Performance Optimized)
# ========================================================================
async def start(self) -> None:
"""Start watching log file asynchronously."""
if self._running:
logger.warning("LogWatcher already running")
return
self._running = True
# Initialize file position
if self.log_path.exists():
self._last_file_size = self.log_path.stat().st_size
self._file_position = self._last_file_size # Start at end (new entries only)
self._task = asyncio.create_task(self._watch_loop())
logger.info("LogWatcher started")
async def stop(self) -> None:
"""Stop watching log file."""
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("LogWatcher stopped")
async def _watch_loop(self) -> None:
"""
Main watching loop.
Efficiently polls file for new content with minimal CPU usage.
"""
while self._running:
try:
await self._poll_once()
except Exception as e:
logger.error(f"Poll error: {e}")
# Non-blocking sleep (Rule #3: preserve FPS)
await asyncio.sleep(self.poll_interval)
async def _poll_once(self) -> None:
"""
Single poll iteration.
Reads new lines from file and processes them.
"""
if not self.log_path.exists():
return
current_size = self.log_path.stat().st_size
# Check if file was truncated (new session)
if current_size < self._file_position:
logger.info("Log file truncated, resetting position")
self._file_position = 0
# Check if new content exists
if current_size == self._file_position:
return # No new content
# Read new lines
with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(self._file_position)
new_lines = f.readlines()
self._file_position = f.tell()
# Parse and notify
for line in new_lines:
event = self._parse_line(line)
if event:
self._notify(event)
self._last_file_size = current_size
# ============================================================================
# MOCK MODE SUPPORT
# ============================================================================
class MockLogGenerator:
"""
Generates mock log entries for testing.
Simulates Entropia Universe chat.log output.
"""
MOCK_LINES = [
"2026-02-08 14:23:15 [System] You received Shrapnel x 123 (Value: 1.23 PED)",
"2026-02-08 14:23:45 [System] You gained 0.45 experience in your Rifle skill",
"2026-02-08 14:24:02 [System] Your Omegaton M2100 has decayed 15 PEC",
"2026-02-08 14:25:30 [System] PlayerOne globals in Twin Peaks for 150.00 PED",
"2026-02-08 14:26:10 [System] You received Animal Thyroid Oil x 5",
"2026-02-08 14:27:55 [System] Congratulations! You have advanced to level 45 in Rifle",
"2026-02-08 14:30:00 [System] PlayerTwo is in the Hall of Fame! Loot of 2500.00 PED",
]
@classmethod
def create_mock_file(cls, path: Path, lines: int = 100) -> None:
"""
Create a mock chat.log file.
Args:
path: Path for mock file
lines: Number of lines to generate
"""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w') as f:
for i in range(lines):
line = cls.MOCK_LINES[i % len(cls.MOCK_LINES)]
# Vary timestamps slightly
f.write(f"{line}\n")
logger.info(f"Created mock log: {path} ({lines} lines)")
# ============================================================================
# MODULE EXPORTS
# ============================================================================
__all__ = [
'LogWatcher',
'LogEvent',
'MockLogGenerator'
]