EU-Utility/core/memory_leak_detector.py

490 lines
15 KiB
Python

"""
EU-Utility - Memory Leak Detector
Tools for detecting and preventing memory leaks:
1. Object tracking and lifecycle monitoring
2. Reference counting
3. Memory snapshot comparison
4. Automatic leak detection in background
"""
import gc
import sys
import weakref
import tracemalloc
import threading
import time
from typing import Dict, List, Set, Optional, Any, Type, Callable
from dataclasses import dataclass, field
from collections import defaultdict
import inspect
@dataclass
class MemorySnapshot:
"""Snapshot of memory state."""
timestamp: float
snapshot: tracemalloc.Snapshot
tracked_objects: Dict[str, int]
def compare_to(self, other: 'MemorySnapshot') -> 'MemoryComparison':
"""Compare this snapshot to another."""
return MemoryComparison(self, other)
@dataclass
class MemoryComparison:
"""Comparison between two memory snapshots."""
before: MemorySnapshot
after: MemorySnapshot
def get_top_growth(self, limit: int = 10) -> List[tuple]:
"""Get top memory growth by file."""
diff = self.after.snapshot.compare_to(self.before.snapshot, 'lineno')
return diff[:limit]
def get_total_growth(self) -> int:
"""Get total memory growth in bytes."""
stats = self.after.snapshot.statistics('lineno')
total_after = sum(s.size for s in stats)
stats = self.before.snapshot.statistics('lineno')
total_before = sum(s.size for s in stats)
return total_after - total_before
def print_report(self):
"""Print memory growth report."""
print("\n" + "=" * 80)
print("MEMORY GROWTH REPORT")
print("=" * 80)
growth = self.get_total_growth()
print(f"Total growth: {growth / 1024 / 1024:.2f} MB")
print("\nTop growth by line:")
for stat in self.get_top_growth(10):
print(f" {stat}")
print("=" * 80)
class ObjectTracker:
"""
Track object lifecycle and detect potential leaks.
"""
def __init__(self):
self._tracked: Dict[str, weakref.ref] = {}
self._counts: Dict[str, int] = defaultdict(int)
self._lock = threading.RLock()
self._callbacks: List[Callable] = []
def track(self, obj: Any, name: Optional[str] = None) -> str:
"""
Track an object for lifecycle monitoring.
Returns tracking ID.
"""
obj_id = id(obj)
type_name = type(obj).__name__
track_id = name or f"{type_name}_{obj_id}"
def on_destroyed(ref):
with self._lock:
self._counts[type_name] -= 1
if track_id in self._tracked:
del self._tracked[track_id]
# Notify callbacks
for callback in self._callbacks:
try:
callback('destroyed', track_id, type_name)
except Exception:
pass
ref = weakref.ref(obj, on_destroyed)
with self._lock:
self._tracked[track_id] = ref
self._counts[type_name] += 1
return track_id
def get_counts(self) -> Dict[str, int]:
"""Get count of tracked objects by type."""
with self._lock:
# Clean up dead references
dead = []
for track_id, ref in self._tracked.items():
if ref() is None:
dead.append(track_id)
for track_id in dead:
del self._tracked[track_id]
return dict(self._counts)
def get_alive(self, type_name: Optional[str] = None) -> List[str]:
"""Get list of alive tracked objects."""
with self._lock:
alive = []
for track_id, ref in self._tracked.items():
obj = ref()
if obj is not None:
if type_name is None or type(obj).__name__ == type_name:
alive.append(track_id)
return alive
def register_callback(self, callback: Callable[[str, str, str], None]):
"""Register callback for lifecycle events."""
self._callbacks.append(callback)
def clear(self):
"""Clear all tracked objects."""
with self._lock:
self._tracked.clear()
self._counts.clear()
class MemoryLeakDetector:
"""
Automatic memory leak detection.
Monitors memory usage over time and reports potential leaks.
"""
def __init__(self,
check_interval: float = 60.0,
growth_threshold_mb: float = 50.0,
sample_count: int = 5):
self.check_interval = check_interval
self.growth_threshold_mb = growth_threshold_mb
self.sample_count = sample_count
self._running = False
self._thread: Optional[threading.Thread] = None
self._snapshots: List[MemorySnapshot] = []
self._lock = threading.Lock()
self._callbacks: List[Callable] = []
# Statistics
self._peak_memory = 0
self._total_growth = 0
def start(self):
"""Start background leak detection."""
if self._running:
return
self._running = True
tracemalloc.start()
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
print(f"[MemoryLeakDetector] Started (interval={self.check_interval}s)")
def stop(self):
"""Stop leak detection."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
tracemalloc.stop()
def _monitor_loop(self):
"""Main monitoring loop."""
while self._running:
try:
self._take_snapshot()
self._analyze_trend()
except Exception as e:
print(f"[MemoryLeakDetector] Error: {e}")
time.sleep(self.check_interval)
def _take_snapshot(self):
"""Take a memory snapshot."""
gc.collect() # Force collection first
snapshot = tracemalloc.take_snapshot()
tracked = self._get_tracked_objects()
with self._lock:
self._snapshots.append(MemorySnapshot(
timestamp=time.time(),
snapshot=snapshot,
tracked_objects=tracked
))
# Keep only recent samples
if len(self._snapshots) > self.sample_count:
self._snapshots.pop(0)
# Update peak
stats = snapshot.statistics('lineno')
total = sum(s.size for s in stats)
self._peak_memory = max(self._peak_memory, total)
def _get_tracked_objects(self) -> Dict[str, int]:
"""Get counts of tracked object types."""
counts = defaultdict(int)
# Count objects by type
gc.collect()
for obj in gc.get_objects():
try:
type_name = type(obj).__name__
counts[type_name] += 1
except ReferenceError:
pass
return dict(counts)
def _analyze_trend(self):
"""Analyze memory trend for potential leaks."""
with self._lock:
if len(self._snapshots) < 2:
return
first = self._snapshots[0]
last = self._snapshots[-1]
comparison = first.compare_to(last)
growth = comparison.get_total_growth()
growth_mb = growth / 1024 / 1024
self._total_growth = growth
if growth_mb > self.growth_threshold_mb:
self._on_leak_detected(comparison, growth_mb)
def _on_leak_detected(self, comparison: MemoryComparison, growth_mb: float):
"""Called when potential leak is detected."""
for callback in self._callbacks:
try:
callback(comparison, growth_mb)
except Exception:
pass
def get_current_stats(self) -> Dict[str, Any]:
"""Get current memory statistics."""
with self._lock:
if not self._snapshots:
return {}
latest = self._snapshots[-1]
stats = latest.snapshot.statistics('lineno')
total = sum(s.size for s in stats)
return {
'current_mb': total / 1024 / 1024,
'peak_mb': self._peak_memory / 1024 / 1024,
'growth_mb': self._total_growth / 1024 / 1024,
'samples': len(self._snapshots),
'top_allocations': [
{'file': s.traceback.format()[-1], 'size_mb': s.size / 1024 / 1024}
for s in stats[:5]
]
}
def register_callback(self, callback: Callable[[MemoryComparison, float], None]):
"""Register callback for leak detection."""
self._callbacks.append(callback)
def force_check(self):
"""Force immediate leak check."""
self._take_snapshot()
self._analyze_trend()
class MemoryOptimizer:
"""
Utilities for reducing memory footprint.
"""
@staticmethod
def compact_objects(objects: List[Any]) -> int:
"""
Compact list/dict objects by removing duplicates.
Returns number of bytes saved.
"""
# Implement string interning for memory savings
interned = {}
saved = 0
for obj in objects:
if isinstance(obj, dict):
for key, value in obj.items():
if isinstance(key, str):
if key in interned:
obj[interned[key]] = obj.pop(key)
else:
interned[key] = key
if isinstance(value, str) and len(value) > 20:
if value in interned:
saved += len(value)
else:
interned[value] = value
return saved
@staticmethod
def get_object_size(obj: Any) -> int:
"""Get approximate memory size of object."""
seen = set()
def sizeof(o):
if id(o) in seen:
return 0
seen.add(id(o))
size = sys.getsizeof(o)
if isinstance(o, dict):
size += sum(sizeof(k) + sizeof(v) for k, v in o.items())
elif isinstance(o, (list, tuple, set)):
size += sum(sizeof(x) for x in o)
return size
return sizeof(obj)
@staticmethod
def find_largest_objects(limit: int = 20) -> List[tuple]:
"""Find largest objects in memory."""
gc.collect()
objects = []
for obj in gc.get_objects():
try:
size = sys.getsizeof(obj)
objects.append((type(obj).__name__, id(obj), size))
except Exception:
pass
objects.sort(key=lambda x: x[2], reverse=True)
return objects[:limit]
@staticmethod
def force_garbage_collection():
"""Force aggressive garbage collection."""
gc.collect(0)
gc.collect(1)
gc.collect(2)
# Clear freelists
gc.collect()
@staticmethod
def print_memory_summary():
"""Print summary of memory usage."""
gc.collect()
print("\n" + "=" * 80)
print("MEMORY USAGE SUMMARY")
print("=" * 80)
# Count objects by type
counts = defaultdict(int)
sizes = defaultdict(int)
for obj in gc.get_objects():
try:
type_name = type(obj).__name__
counts[type_name] += 1
sizes[type_name] += sys.getsizeof(obj)
except Exception:
pass
# Top by count
print("\nTop 10 object types by count:")
for type_name, count in sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]:
size_mb = sizes[type_name] / 1024 / 1024
print(f" {type_name:30s} {count:8d} objects ({size_mb:6.2f} MB)")
# Top by size
print("\nTop 10 object types by size:")
for type_name, size in sorted(sizes.items(), key=lambda x: x[1], reverse=True)[:10]:
count = counts[type_name]
size_mb = size / 1024 / 1024
print(f" {type_name:30s} {size_mb:8.2f} MB ({count} objects)")
# Largest individual objects
print("\nTop 10 largest individual objects:")
for type_name, obj_id, size in MemoryOptimizer.find_largest_objects(10):
size_kb = size / 1024
print(f" {type_name:30s} {size_kb:10.2f} KB (id={obj_id})")
print("=" * 80)
# Global instances
_object_tracker = ObjectTracker()
_leak_detector: Optional[MemoryLeakDetector] = None
def get_object_tracker() -> ObjectTracker:
"""Get global ObjectTracker instance."""
return _object_tracker
def get_leak_detector() -> Optional[MemoryLeakDetector]:
"""Get global MemoryLeakDetector instance."""
global _leak_detector
if _leak_detector is None:
_leak_detector = MemoryLeakDetector()
return _leak_detector
def start_leak_detection(check_interval: float = 60.0):
"""Start automatic leak detection."""
detector = get_leak_detector()
detector.check_interval = check_interval
detector.start()
return detector
def stop_leak_detection():
"""Stop automatic leak detection."""
global _leak_detector
if _leak_detector:
_leak_detector.stop()
# Decorator for tracking object lifecycle
def track_lifecycle(name: Optional[str] = None):
"""Decorator to track an object's lifecycle."""
def decorator(cls):
original_init = cls.__init__
@wraps(original_init)
def new_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
tracker = get_object_tracker()
tracker.track(self, name or cls.__name__)
cls.__init__ = new_init
return cls
return decorator
if __name__ == "__main__":
# Test memory tracking
print("Testing MemoryOptimizer...")
# Create some test objects
test_objects = []
for i in range(1000):
test_objects.append({"id": i, "data": "x" * 1000})
MemoryOptimizer.print_memory_summary()
# Clean up
del test_objects
MemoryOptimizer.force_garbage_collection()
print("\nAfter cleanup:")
MemoryOptimizer.print_memory_summary()