EU-Utility/benchmarks/performance_benchmark.py

376 lines
11 KiB
Python

"""
EU-Utility - Performance Benchmarks
Comprehensive performance benchmarks for core modules.
Run with: python -m benchmarks.performance_benchmark
"""
import time
import threading
import cProfile
import pstats
import io
import tracemalloc
from contextlib import contextmanager
from typing import Callable, Any, List, Dict
from dataclasses import dataclass
from pathlib import Path
import sys
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
@dataclass
class BenchmarkResult:
"""Result of a benchmark run."""
name: str
duration_ms: float
memory_kb: float
iterations: int
ops_per_sec: float
details: Dict[str, Any] = None
class PerformanceBenchmark:
"""Performance benchmark suite for EU-Utility."""
def __init__(self):
self.results: List[BenchmarkResult] = []
@contextmanager
def _measure_memory(self):
"""Context manager to measure memory usage."""
tracemalloc.start()
try:
yield
finally:
tracemalloc.stop()
def _get_memory_usage(self) -> float:
"""Get current memory usage in KB."""
current, peak = tracemalloc.get_traced_memory()
return peak / 1024 # Convert to KB
def benchmark(self, name: str, iterations: int = 1000):
"""Decorator to benchmark a function."""
def decorator(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
# Warm up
for _ in range(min(10, iterations // 10)):
func(*args, **kwargs)
# Measure
tracemalloc.start()
start_time = time.perf_counter()
for _ in range(iterations):
func(*args, **kwargs)
end_time = time.perf_counter()
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
duration_ms = (end_time - start_time) * 1000
ops_per_sec = iterations / (end_time - start_time)
result = BenchmarkResult(
name=name,
duration_ms=duration_ms,
memory_kb=peak / 1024,
iterations=iterations,
ops_per_sec=ops_per_sec,
details={'avg_ms': duration_ms / iterations}
)
self.results.append(result)
return result
return wrapper
return decorator
def profile(self, name: str, func: Callable, *args, **kwargs) -> str:
"""Profile a function and return stats."""
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
s = io.StringIO()
stats = pstats.Stats(profiler, stream=s)
stats.sort_stats('cumulative')
stats.print_stats(20)
return s.getvalue()
def print_results(self):
"""Print all benchmark results."""
print("\n" + "=" * 80)
print("PERFORMANCE BENCHMARK RESULTS")
print("=" * 80)
print(f"{'Benchmark':<40} {'Time (ms)':<12} {'Ops/sec':<12} {'Memory (KB)':<12}")
print("-" * 80)
for r in self.results:
print(f"{r.name:<40} {r.duration_ms:>10.2f} {r.ops_per_sec:>10.0f} {r.memory_kb:>10.2f}")
print("=" * 80)
# ========== LogReader Benchmarks ==========
def benchmark_log_reader():
"""Benchmark LogReader performance."""
from core.log_reader import LogReader, LogEvent
from datetime import datetime
bench = PerformanceBenchmark()
reader = LogReader()
# Pre-compile test lines
test_lines = [
"System: You received Animal Thyroid Oil x 1",
"System: You have gained 0.05 points in Rifle",
"System: You inflicted 45 damage",
"System: You were hit for 23 damage",
"System: Mission completed: Tutorial",
"System: Your Omegaton M2100 has reached tier 2",
"System: Your weapon broke",
] * 100 # 700 lines
@bench.benchmark("LogReader: Process Line", iterations=700)
def process_lines():
for line in test_lines:
reader._process_line(line)
@bench.benchmark("LogReader: Parse Event", iterations=7000)
def parse_event():
for line in test_lines[:7]:
reader._parse_event(line)
@bench.benchmark("LogReader: Subscribe/Unsubscribe", iterations=1000)
def subscribe_unsubscribe():
def callback(event):
pass
reader.subscribe('loot', callback)
reader.unsubscribe('loot', callback)
@bench.benchmark("LogReader: Read Lines (cache)", iterations=1000)
def read_lines():
reader.read_lines(count=50)
# Run benchmarks
process_lines()
parse_event()
subscribe_unsubscribe()
read_lines()
bench.print_results()
return bench.results
# ========== OCR Service Benchmarks ==========
def benchmark_ocr_service():
"""Benchmark OCRService performance."""
from PIL import Image
import numpy as np
bench = PerformanceBenchmark()
# Create test images of different sizes
test_images = {
'small': Image.new('RGB', (200, 50), color='white'),
'medium': Image.new('RGB', (800, 600), color='white'),
'large': Image.new('RGB', (1920, 1080), color='white'),
}
@bench.benchmark("OCR: Image to Numpy (small)", iterations=1000)
def img_to_numpy_small():
np.array(test_images['small'])
@bench.benchmark("OCR: Image to Numpy (medium)", iterations=100)
def img_to_numpy_medium():
np.array(test_images['medium'])
@bench.benchmark("OCR: Image to Numpy (large)", iterations=10)
def img_to_numpy_large():
np.array(test_images['large'])
img_to_numpy_small()
img_to_numpy_medium()
img_to_numpy_large()
bench.print_results()
return bench.results
# ========== Plugin Manager Benchmarks ==========
def benchmark_plugin_manager():
"""Benchmark PluginManager performance."""
bench = PerformanceBenchmark()
@bench.benchmark("PluginManager: Config Load/Save", iterations=100)
def config_io():
import json
config = {"enabled": ["plugin1", "plugin2"], "settings": {}}
text = json.dumps(config)
json.loads(text)
@bench.benchmark("PluginManager: Plugin Discovery", iterations=10)
def plugin_discovery():
# Simulate discovery without actual file system access
import importlib.util
# This is a placeholder - actual discovery requires file system
pass
config_io()
bench.print_results()
return bench.results
# ========== Event Bus Benchmarks ==========
def benchmark_event_bus():
"""Benchmark EventBus performance."""
from core.event_bus import get_event_bus, SkillGainEvent, LootEvent, DamageEvent
bench = PerformanceBenchmark()
bus = get_event_bus()
# Reset for clean benchmark
bus._subscriptions.clear()
bus._history.clear()
received_count = [0]
def callback(event):
received_count[0] += 1
# Subscribe
bus.subscribe(callback)
test_events = [
SkillGainEvent(skill_name="Rifle", skill_value=100.0, gain_amount=0.05),
LootEvent(mob_name="Kerberos", items=[{"name": "Oil", "count": 1}]),
DamageEvent(damage_amount=45.0, damage_type="impact", is_outgoing=True),
]
@bench.benchmark("EventBus: Publish (sync)", iterations=1000)
def publish_sync():
for event in test_events:
bus.publish_sync(event)
@bench.benchmark("EventBus: Subscribe/Unsubscribe", iterations=100)
def subscribe_unsubscribe():
sub_id = bus.subscribe(callback)
bus.unsubscribe(sub_id)
@bench.benchmark("EventBus: Get Recent Events", iterations=1000)
def get_recent():
bus.get_recent_events(count=100)
publish_sync()
subscribe_unsubscribe()
get_recent()
bench.print_results()
return bench.results
# ========== Screenshot Service Benchmarks ==========
def benchmark_screenshot_service():
"""Benchmark ScreenshotService performance."""
from PIL import Image
import io
bench = PerformanceBenchmark()
# Create test images
test_image = Image.new('RGB', (1920, 1080), color=(100, 150, 200))
@bench.benchmark("Screenshot: Image to Bytes (PNG)", iterations=100)
def img_to_bytes_png():
buffer = io.BytesIO()
test_image.save(buffer, 'PNG')
buffer.getvalue()
@bench.benchmark("Screenshot: Image to Bytes (JPEG)", iterations=100)
def img_to_bytes_jpeg():
buffer = io.BytesIO()
rgb_image = test_image.convert('RGB')
rgb_image.save(buffer, 'JPEG', quality=95)
buffer.getvalue()
@bench.benchmark("Screenshot: Image Copy", iterations=1000)
def img_copy():
test_image.copy()
img_to_bytes_png()
img_to_bytes_jpeg()
img_copy()
bench.print_results()
return bench.results
# ========== Main ==========
def run_all_benchmarks():
"""Run all benchmarks."""
print("Starting EU-Utility Performance Benchmarks...")
print("This may take a few minutes...\n")
all_results = []
try:
print("\n📊 LogReader Benchmarks")
all_results.extend(benchmark_log_reader())
except Exception as e:
print(f"LogReader benchmarks failed: {e}")
try:
print("\n📊 OCR Service Benchmarks")
all_results.extend(benchmark_ocr_service())
except Exception as e:
print(f"OCR benchmarks failed: {e}")
try:
print("\n📊 Plugin Manager Benchmarks")
all_results.extend(benchmark_plugin_manager())
except Exception as e:
print(f"Plugin Manager benchmarks failed: {e}")
try:
print("\n📊 Event Bus Benchmarks")
all_results.extend(benchmark_event_bus())
except Exception as e:
print(f"Event Bus benchmarks failed: {e}")
try:
print("\n📊 Screenshot Service Benchmarks")
all_results.extend(benchmark_screenshot_service())
except Exception as e:
print(f"Screenshot benchmarks failed: {e}")
# Print summary
print("\n" + "=" * 80)
print("BENCHMARK SUMMARY")
print("=" * 80)
print(f"Total benchmarks run: {len(all_results)}")
if all_results:
total_time = sum(r.duration_ms for r in all_results)
print(f"Total time: {total_time:.2f} ms")
print(f"Average ops/sec: {sum(r.ops_per_sec for r in all_results) / len(all_results):.0f}")
return all_results
if __name__ == "__main__":
run_all_benchmarks()