""" EU-Utility - Memory Leak Detector Tools for detecting and preventing memory leaks: 1. Object tracking and lifecycle monitoring 2. Reference counting 3. Memory snapshot comparison 4. Automatic leak detection in background """ import gc import sys import weakref import tracemalloc import threading import time from typing import Dict, List, Set, Optional, Any, Type, Callable from dataclasses import dataclass, field from collections import defaultdict import inspect @dataclass class MemorySnapshot: """Snapshot of memory state.""" timestamp: float snapshot: tracemalloc.Snapshot tracked_objects: Dict[str, int] def compare_to(self, other: 'MemorySnapshot') -> 'MemoryComparison': """Compare this snapshot to another.""" return MemoryComparison(self, other) @dataclass class MemoryComparison: """Comparison between two memory snapshots.""" before: MemorySnapshot after: MemorySnapshot def get_top_growth(self, limit: int = 10) -> List[tuple]: """Get top memory growth by file.""" diff = self.after.snapshot.compare_to(self.before.snapshot, 'lineno') return diff[:limit] def get_total_growth(self) -> int: """Get total memory growth in bytes.""" stats = self.after.snapshot.statistics('lineno') total_after = sum(s.size for s in stats) stats = self.before.snapshot.statistics('lineno') total_before = sum(s.size for s in stats) return total_after - total_before def print_report(self): """Print memory growth report.""" print("\n" + "=" * 80) print("MEMORY GROWTH REPORT") print("=" * 80) growth = self.get_total_growth() print(f"Total growth: {growth / 1024 / 1024:.2f} MB") print("\nTop growth by line:") for stat in self.get_top_growth(10): print(f" {stat}") print("=" * 80) class ObjectTracker: """ Track object lifecycle and detect potential leaks. """ def __init__(self): self._tracked: Dict[str, weakref.ref] = {} self._counts: Dict[str, int] = defaultdict(int) self._lock = threading.RLock() self._callbacks: List[Callable] = [] def track(self, obj: Any, name: Optional[str] = None) -> str: """ Track an object for lifecycle monitoring. Returns tracking ID. """ obj_id = id(obj) type_name = type(obj).__name__ track_id = name or f"{type_name}_{obj_id}" def on_destroyed(ref): with self._lock: self._counts[type_name] -= 1 if track_id in self._tracked: del self._tracked[track_id] # Notify callbacks for callback in self._callbacks: try: callback('destroyed', track_id, type_name) except Exception: pass ref = weakref.ref(obj, on_destroyed) with self._lock: self._tracked[track_id] = ref self._counts[type_name] += 1 return track_id def get_counts(self) -> Dict[str, int]: """Get count of tracked objects by type.""" with self._lock: # Clean up dead references dead = [] for track_id, ref in self._tracked.items(): if ref() is None: dead.append(track_id) for track_id in dead: del self._tracked[track_id] return dict(self._counts) def get_alive(self, type_name: Optional[str] = None) -> List[str]: """Get list of alive tracked objects.""" with self._lock: alive = [] for track_id, ref in self._tracked.items(): obj = ref() if obj is not None: if type_name is None or type(obj).__name__ == type_name: alive.append(track_id) return alive def register_callback(self, callback: Callable[[str, str, str], None]): """Register callback for lifecycle events.""" self._callbacks.append(callback) def clear(self): """Clear all tracked objects.""" with self._lock: self._tracked.clear() self._counts.clear() class MemoryLeakDetector: """ Automatic memory leak detection. Monitors memory usage over time and reports potential leaks. """ def __init__(self, check_interval: float = 60.0, growth_threshold_mb: float = 50.0, sample_count: int = 5): self.check_interval = check_interval self.growth_threshold_mb = growth_threshold_mb self.sample_count = sample_count self._running = False self._thread: Optional[threading.Thread] = None self._snapshots: List[MemorySnapshot] = [] self._lock = threading.Lock() self._callbacks: List[Callable] = [] # Statistics self._peak_memory = 0 self._total_growth = 0 def start(self): """Start background leak detection.""" if self._running: return self._running = True tracemalloc.start() self._thread = threading.Thread(target=self._monitor_loop, daemon=True) self._thread.start() print(f"[MemoryLeakDetector] Started (interval={self.check_interval}s)") def stop(self): """Stop leak detection.""" self._running = False if self._thread: self._thread.join(timeout=2.0) tracemalloc.stop() def _monitor_loop(self): """Main monitoring loop.""" while self._running: try: self._take_snapshot() self._analyze_trend() except Exception as e: print(f"[MemoryLeakDetector] Error: {e}") time.sleep(self.check_interval) def _take_snapshot(self): """Take a memory snapshot.""" gc.collect() # Force collection first snapshot = tracemalloc.take_snapshot() tracked = self._get_tracked_objects() with self._lock: self._snapshots.append(MemorySnapshot( timestamp=time.time(), snapshot=snapshot, tracked_objects=tracked )) # Keep only recent samples if len(self._snapshots) > self.sample_count: self._snapshots.pop(0) # Update peak stats = snapshot.statistics('lineno') total = sum(s.size for s in stats) self._peak_memory = max(self._peak_memory, total) def _get_tracked_objects(self) -> Dict[str, int]: """Get counts of tracked object types.""" counts = defaultdict(int) # Count objects by type gc.collect() for obj in gc.get_objects(): try: type_name = type(obj).__name__ counts[type_name] += 1 except ReferenceError: pass return dict(counts) def _analyze_trend(self): """Analyze memory trend for potential leaks.""" with self._lock: if len(self._snapshots) < 2: return first = self._snapshots[0] last = self._snapshots[-1] comparison = first.compare_to(last) growth = comparison.get_total_growth() growth_mb = growth / 1024 / 1024 self._total_growth = growth if growth_mb > self.growth_threshold_mb: self._on_leak_detected(comparison, growth_mb) def _on_leak_detected(self, comparison: MemoryComparison, growth_mb: float): """Called when potential leak is detected.""" for callback in self._callbacks: try: callback(comparison, growth_mb) except Exception: pass def get_current_stats(self) -> Dict[str, Any]: """Get current memory statistics.""" with self._lock: if not self._snapshots: return {} latest = self._snapshots[-1] stats = latest.snapshot.statistics('lineno') total = sum(s.size for s in stats) return { 'current_mb': total / 1024 / 1024, 'peak_mb': self._peak_memory / 1024 / 1024, 'growth_mb': self._total_growth / 1024 / 1024, 'samples': len(self._snapshots), 'top_allocations': [ {'file': s.traceback.format()[-1], 'size_mb': s.size / 1024 / 1024} for s in stats[:5] ] } def register_callback(self, callback: Callable[[MemoryComparison, float], None]): """Register callback for leak detection.""" self._callbacks.append(callback) def force_check(self): """Force immediate leak check.""" self._take_snapshot() self._analyze_trend() class MemoryOptimizer: """ Utilities for reducing memory footprint. """ @staticmethod def compact_objects(objects: List[Any]) -> int: """ Compact list/dict objects by removing duplicates. Returns number of bytes saved. """ # Implement string interning for memory savings interned = {} saved = 0 for obj in objects: if isinstance(obj, dict): for key, value in obj.items(): if isinstance(key, str): if key in interned: obj[interned[key]] = obj.pop(key) else: interned[key] = key if isinstance(value, str) and len(value) > 20: if value in interned: saved += len(value) else: interned[value] = value return saved @staticmethod def get_object_size(obj: Any) -> int: """Get approximate memory size of object.""" seen = set() def sizeof(o): if id(o) in seen: return 0 seen.add(id(o)) size = sys.getsizeof(o) if isinstance(o, dict): size += sum(sizeof(k) + sizeof(v) for k, v in o.items()) elif isinstance(o, (list, tuple, set)): size += sum(sizeof(x) for x in o) return size return sizeof(obj) @staticmethod def find_largest_objects(limit: int = 20) -> List[tuple]: """Find largest objects in memory.""" gc.collect() objects = [] for obj in gc.get_objects(): try: size = sys.getsizeof(obj) objects.append((type(obj).__name__, id(obj), size)) except Exception: pass objects.sort(key=lambda x: x[2], reverse=True) return objects[:limit] @staticmethod def force_garbage_collection(): """Force aggressive garbage collection.""" gc.collect(0) gc.collect(1) gc.collect(2) # Clear freelists gc.collect() @staticmethod def print_memory_summary(): """Print summary of memory usage.""" gc.collect() print("\n" + "=" * 80) print("MEMORY USAGE SUMMARY") print("=" * 80) # Count objects by type counts = defaultdict(int) sizes = defaultdict(int) for obj in gc.get_objects(): try: type_name = type(obj).__name__ counts[type_name] += 1 sizes[type_name] += sys.getsizeof(obj) except Exception: pass # Top by count print("\nTop 10 object types by count:") for type_name, count in sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]: size_mb = sizes[type_name] / 1024 / 1024 print(f" {type_name:30s} {count:8d} objects ({size_mb:6.2f} MB)") # Top by size print("\nTop 10 object types by size:") for type_name, size in sorted(sizes.items(), key=lambda x: x[1], reverse=True)[:10]: count = counts[type_name] size_mb = size / 1024 / 1024 print(f" {type_name:30s} {size_mb:8.2f} MB ({count} objects)") # Largest individual objects print("\nTop 10 largest individual objects:") for type_name, obj_id, size in MemoryOptimizer.find_largest_objects(10): size_kb = size / 1024 print(f" {type_name:30s} {size_kb:10.2f} KB (id={obj_id})") print("=" * 80) # Global instances _object_tracker = ObjectTracker() _leak_detector: Optional[MemoryLeakDetector] = None def get_object_tracker() -> ObjectTracker: """Get global ObjectTracker instance.""" return _object_tracker def get_leak_detector() -> Optional[MemoryLeakDetector]: """Get global MemoryLeakDetector instance.""" global _leak_detector if _leak_detector is None: _leak_detector = MemoryLeakDetector() return _leak_detector def start_leak_detection(check_interval: float = 60.0): """Start automatic leak detection.""" detector = get_leak_detector() detector.check_interval = check_interval detector.start() return detector def stop_leak_detection(): """Stop automatic leak detection.""" global _leak_detector if _leak_detector: _leak_detector.stop() # Decorator for tracking object lifecycle def track_lifecycle(name: Optional[str] = None): """Decorator to track an object's lifecycle.""" def decorator(cls): original_init = cls.__init__ @wraps(original_init) def new_init(self, *args, **kwargs): original_init(self, *args, **kwargs) tracker = get_object_tracker() tracker.track(self, name or cls.__name__) cls.__init__ = new_init return cls return decorator if __name__ == "__main__": # Test memory tracking print("Testing MemoryOptimizer...") # Create some test objects test_objects = [] for i in range(1000): test_objects.append({"id": i, "data": "x" * 1000}) MemoryOptimizer.print_memory_summary() # Clean up del test_objects MemoryOptimizer.force_garbage_collection() print("\nAfter cleanup:") MemoryOptimizer.print_memory_summary()