""" EU-Utility - Performance Optimizations Optimized versions of core modules with performance improvements. """ import re import time import threading from collections import deque from typing import Dict, List, Callable, Optional, Any from functools import lru_cache from pathlib import Path # ========== 1. LogReader Optimizations ========== class OptimizedLogReader: """ Optimized LogReader with: - Compiled regex patterns (cached) - Ring buffer for recent lines (O(1) append) - Batch processing for multiple lines - Memory-efficient line storage """ # Pre-compiled patterns (module level for reuse) _COMPILED_PATTERNS = None @classmethod def _get_patterns(cls) -> Dict[str, re.Pattern]: """Get compiled patterns (cached at class level).""" if cls._COMPILED_PATTERNS is None: cls._COMPILED_PATTERNS = { 'skill_gain': re.compile( r'(.+?)\s+has\s+improved\s+by\s+(\d+\.?\d*)\s+points?', re.IGNORECASE ), 'loot': re.compile( r'You\s+received\s+(.+?)\s+x\s*(\d+)', re.IGNORECASE ), 'global': re.compile( r'(\w+)\s+received\s+.+?\s+from\s+(\w+)\s+worth\s+(\d+)\s+PED', re.IGNORECASE ), 'damage': re.compile( r'You\s+(?:hit|inflicted)\s+(\d+)\s+damage', re.IGNORECASE ), 'damage_taken': re.compile( r'You\s+were\s+hit\s+for\s+(\d+)\s+damage', re.IGNORECASE ), 'heal': re.compile( r'You\s+(?:healed|restored)\s+(\d+)\s+(?:health|points)', re.IGNORECASE ), 'mission_complete': re.compile( r'Mission\s+completed:\s+(.+)', re.IGNORECASE ), 'tier_increase': re.compile( r'Your\s+(.+?)\s+has\s+reached\s+tier\s+(\d+)', re.IGNORECASE ), 'enhancer_break': re.compile( r'Your\s+(.+?)\s+broke', re.IGNORECASE ), } return cls._COMPILED_PATTERNS def __init__(self, log_path: Path = None): self.log_path = log_path or self._find_log_file() self.running = False self.thread = None self.last_position = 0 # Subscribers self._subscribers: Dict[str, List[Callable]] = {} self._any_subscribers: List[Callable] = [] # Optimized: Use deque with maxlen for O(1) append/pop self._recent_lines: deque = deque(maxlen=1000) # Stats self._stats = { 'lines_read': 0, 'events_parsed': 0, 'start_time': None } # Cache for pattern matching self._pattern_cache: Dict[str, Any] = {} self._cache_hits = 0 self._cache_misses = 0 def _find_log_file(self) -> Optional[Path]: """Find EU chat.log file.""" paths = [ Path.home() / "Documents" / "Entropia Universe" / "chat.log", Path.home() / "Documents" / "Entropia Universe" / "Logs" / "chat.log", Path.home() / "Entropia Universe" / "chat.log", ] for path in paths: if path.exists(): return path return None def _process_lines_batch(self, lines: List[str]): """Process multiple lines in batch (more efficient).""" patterns = self._get_patterns() events = [] for line in lines: line = line.strip() if not line: continue self._stats['lines_read'] += 1 self._recent_lines.append(line) # Try cache first cached = self._pattern_cache.get(line) if cached is not None: self._cache_hits += 1 if cached: # Not None and not False events.append(cached) continue self._cache_misses += 1 # Parse event event = None for event_type, pattern in patterns.items(): match = pattern.search(line) if match: from datetime import datetime from dataclasses import dataclass, field @dataclass class LogEvent: timestamp: Any raw_line: str event_type: str data: Dict = field(default_factory=dict) event = LogEvent( timestamp=datetime.now(), raw_line=line, event_type=event_type, data={'groups': match.groups()} ) self._stats['events_parsed'] += 1 break # Cache result (even if None) if len(self._pattern_cache) > 10000: self._pattern_cache.clear() self._pattern_cache[line] = event if event: events.append(event) # Batch notify for event in events: self._notify_subscribers(event) def _notify_subscribers(self, event: Any): """Notify subscribers with error handling.""" callbacks = self._subscribers.get(event.event_type, []) for callback in callbacks: try: callback(event) except Exception: pass for callback in self._any_subscribers: try: callback(event) except Exception: pass def read_lines(self, count: int = 50, filter_text: str = None) -> List[str]: """Read recent lines - O(1) with deque.""" lines = list(self._recent_lines)[-count:] if count < len(self._recent_lines) else list(self._recent_lines) if filter_text: filter_lower = filter_text.lower() lines = [l for l in lines if filter_lower in l.lower()] return lines def get_cache_stats(self) -> Dict[str, int]: """Get pattern cache statistics.""" total = self._cache_hits + self._cache_misses hit_rate = (self._cache_hits / total * 100) if total > 0 else 0 return { 'hits': self._cache_hits, 'misses': self._cache_misses, 'hit_rate': hit_rate, 'size': len(self._pattern_cache) } # ========== 2. Memory-Efficient Data Structures ========== class RingBuffer: """ Fixed-size ring buffer for efficient circular storage. O(1) append, O(n) iteration. """ __slots__ = ['_buffer', '_size', '_index', '_count'] def __init__(self, size: int): self._size = size self._buffer = [None] * size self._index = 0 self._count = 0 def append(self, item: Any): """Add item to buffer (O(1)).""" self._buffer[self._index] = item self._index = (self._index + 1) % self._size self._count = min(self._count + 1, self._size) def __iter__(self): """Iterate over buffer in order (oldest first).""" if self._count < self._size: for i in range(self._count): yield self._buffer[i] else: for i in range(self._count): idx = (self._index + i) % self._size yield self._buffer[idx] def __len__(self) -> int: return self._count def __getitem__(self, idx: int) -> Any: if idx < 0: idx += self._count if idx < 0 or idx >= self._count: raise IndexError("Index out of range") if self._count < self._size: return self._buffer[idx] else: actual_idx = (self._index + idx) % self._size return self._buffer[actual_idx] def clear(self): """Clear the buffer.""" self._buffer = [None] * self._size self._index = 0 self._count = 0 # ========== 3. Lazy Loading Decorator ========== class LazyProperty: """ Lazy property decorator - computes value only once on first access. Useful for expensive initializations. """ def __init__(self, func): self.func = func self.name = func.__name__ self.__doc__ = func.__doc__ def __get__(self, instance, owner): if instance is None: return self # Check if already computed if self.name not in instance.__dict__: instance.__dict__[self.name] = self.func(instance) return instance.__dict__[self.name] def lazy_init(method): """ Decorator for lazy initialization. Thread-safe lazy initialization. """ attr_name = f"_lazy_{method.__name__}" lock_name = f"_lazy_{method.__name__}_lock" def wrapper(self, *args, **kwargs): if not hasattr(self, attr_name): if not hasattr(self, lock_name): setattr(self, lock_name, threading.Lock()) with getattr(self, lock_name): if not hasattr(self, attr_name): setattr(self, attr_name, method(self, *args, **kwargs)) return getattr(self, attr_name) return wrapper # ========== 4. Connection Pool ========== class ConnectionPool: """ Simple connection pool for reusing expensive resources. """ def __init__(self, factory: Callable, max_size: int = 5, timeout: float = 30.0): self.factory = factory self.max_size = max_size self.timeout = timeout self._pool: deque = deque() self._in_use: set = set() self._lock = threading.Lock() self._condition = threading.Condition(self._lock) def acquire(self) -> Any: """Get a connection from the pool.""" with self._condition: # Wait for available connection while len(self._pool) == 0 and len(self._in_use) >= self.max_size: self._condition.wait(timeout=self.timeout) if len(self._pool) > 0: conn = self._pool.popleft() else: conn = self.factory() self._in_use.add(id(conn)) return conn def release(self, conn: Any): """Return a connection to the pool.""" with self._condition: self._in_use.discard(id(conn)) self._pool.append(conn) self._condition.notify() def close_all(self): """Close all connections.""" with self._lock: for conn in self._pool: if hasattr(conn, 'close'): conn.close() self._pool.clear() self._in_use.clear() # ========== 5. Batch Processor ========== class BatchProcessor: """ Batches items for efficient processing. Flushes when batch is full or timeout expires. """ def __init__( self, processor: Callable[[List[Any]], None], batch_size: int = 100, timeout_ms: float = 100.0 ): self.processor = processor self.batch_size = batch_size self.timeout_ms = timeout_ms self._batch: List[Any] = [] self._lock = threading.Lock() self._last_flush = time.time() self._timer: Optional[threading.Timer] = None self._running = True def add(self, item: Any): """Add item to batch.""" with self._lock: self._batch.append(item) if len(self._batch) >= self.batch_size: self._flush() elif self._timer is None: self._start_timer() def _start_timer(self): """Start flush timer.""" self._timer = threading.Timer(self.timeout_ms / 1000.0, self._timer_flush) self._timer.daemon = True self._timer.start() def _timer_flush(self): """Flush on timer expiry.""" with self._lock: self._timer = None if self._batch: self._flush() def _flush(self): """Flush the batch.""" if not self._batch: return batch = self._batch self._batch = [] self._last_flush = time.time() # Cancel timer if active if self._timer: self._timer.cancel() self._timer = None # Process outside lock try: self.processor(batch) except Exception as e: print(f"BatchProcessor error: {e}") def flush(self): """Force flush.""" with self._lock: self._flush() def close(self): """Close processor and flush remaining items.""" self._running = False self.flush() # ========== 6. String Interning for Memory Efficiency ========== class StringInterner: """ Interns frequently used strings to reduce memory usage. """ def __init__(self, max_size: int = 10000): self._interned: Dict[str, str] = {} self._access_count: Dict[str, int] = {} self._max_size = max_size def intern(self, s: str) -> str: """Get interned version of string.""" if s in self._interned: self._access_count[s] += 1 return self._interned[s] # Add to intern pool if len(self._interned) >= self._max_size: # Remove least used min_key = min(self._access_count, key=self._access_count.get) del self._interned[min_key] del self._access_count[min_key] self._interned[s] = s self._access_count[s] = 1 return s # ========== 7. Fast JSON Serialization ========== import json from json import JSONEncoder class FastJSONEncoder(JSONEncoder): """Optimized JSON encoder for common types.""" def default(self, obj): # Fast path for common types if isinstance(obj, (list, dict, str, int, float, bool, type(None))): return obj # Handle datetime if hasattr(obj, 'isoformat'): return obj.isoformat() # Handle dataclasses if hasattr(obj, '__dataclass_fields__'): return {k: getattr(obj, k) for k in obj.__dataclass_fields__} return super().default(obj) def fast_json_dumps(obj, **kwargs) -> str: """Fast JSON serialization.""" return json.dumps(obj, cls=FastJSONEncoder, **kwargs) # ========== Usage Example ========== if __name__ == "__main__": # Test ring buffer print("Testing RingBuffer...") rb = RingBuffer(5) for i in range(10): rb.append(i) print(f"Buffer contents: {list(rb)}") # Should be [5, 6, 7, 8, 9] # Test string interner print("\nTesting StringInterner...") si = StringInterner(max_size=100) s1 = si.intern("test_string") s2 = si.intern("test_string") print(f"Same object: {s1 is s2}") # Should be True # Test batch processor print("\nTesting BatchProcessor...") processed = [] def process_batch(items): processed.extend(items) print(f"Processed batch of {len(items)} items") bp = BatchProcessor(process_batch, batch_size=3, timeout_ms=500) for i in range(7): bp.add(i) time.sleep(0.6) # Wait for timeout bp.close() print(f"Total processed: {processed}")