feat: add 4 new modules for enhanced gameplay tracking
- loot_analyzer.py: Detailed loot analysis by mob/item category, DPS/DPP tracking, global alerts - crafting_tracker.py: Blueprint success rates, QR progression, material tracking - game_vision.py: Computer vision for detecting equipped gear and target mobs - notifications.py: Discord/Telegram alerts for globals, HoFs, session summaries - auto_screenshot.py: Automatic screenshot capture on important events All modules ready for integration with main application
This commit is contained in:
parent
61c45fac8b
commit
ca9b4fb862
|
|
@ -0,0 +1,125 @@
|
|||
"""
|
||||
Lemontropia Suite - Auto Screenshot
|
||||
Automatically capture screenshots on important events.
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
import mss
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AutoScreenshot:
|
||||
"""
|
||||
Auto-screenshot capture system.
|
||||
Captures screen on globals, HoFs, or user-defined events.
|
||||
"""
|
||||
|
||||
def __init__(self, screenshot_dir: Optional[Path] = None):
|
||||
self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots"
|
||||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.capture = mss.mss()
|
||||
self.session_screenshots: List[Path] = []
|
||||
|
||||
# Settings
|
||||
self.on_global = True
|
||||
self.on_hof = True
|
||||
self.on_profit_threshold = False
|
||||
self.profit_threshold = 50 # PED
|
||||
|
||||
def capture_full_screen(self, filename: Optional[str] = None) -> Path:
|
||||
"""Capture full screen."""
|
||||
if filename is None:
|
||||
filename = f"screenshot_{datetime.now():%Y%m%d_%H%M%S}.png"
|
||||
|
||||
filepath = self.screenshot_dir / filename
|
||||
|
||||
try:
|
||||
monitor = self.capture.monitors[1] # Primary monitor
|
||||
screenshot = self.capture.grab(monitor)
|
||||
img = np.array(screenshot)
|
||||
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||||
cv2.imwrite(str(filepath), img)
|
||||
|
||||
self.session_screenshots.append(filepath)
|
||||
logger.info(f"Screenshot saved: {filepath}")
|
||||
return filepath
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to capture screenshot: {e}")
|
||||
return None
|
||||
|
||||
def capture_region(self, x: int, y: int, w: int, h: int,
|
||||
filename: Optional[str] = None) -> Path:
|
||||
"""Capture specific region."""
|
||||
if filename is None:
|
||||
filename = f"region_{datetime.now():%Y%m%d_%H%M%S}.png"
|
||||
|
||||
filepath = self.screenshot_dir / filename
|
||||
|
||||
try:
|
||||
monitor = {"left": x, "top": y, "width": w, "height": h}
|
||||
screenshot = self.capture.grab(monitor)
|
||||
img = np.array(screenshot)
|
||||
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||||
cv2.imwrite(str(filepath), img)
|
||||
|
||||
self.session_screenshots.append(filepath)
|
||||
logger.info(f"Region screenshot saved: {filepath}")
|
||||
return filepath
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to capture region: {e}")
|
||||
return None
|
||||
|
||||
def on_global(self, item_name: str, value: float):
|
||||
"""Handle global event."""
|
||||
if not self.on_global:
|
||||
return
|
||||
|
||||
filename = f"global_{datetime.now():%Y%m%d_%H%M%S}_{item_name[:20]}.png"
|
||||
self.capture_full_screen(filename)
|
||||
|
||||
def on_hof(self, item_name: str, value: float):
|
||||
"""Handle HoF event."""
|
||||
if not self.on_hof:
|
||||
return
|
||||
|
||||
filename = f"hof_{datetime.now():%Y%m%d_%H%M%S}_{item_name[:20]}.png"
|
||||
self.capture_full_screen(filename)
|
||||
|
||||
def get_session_summary(self) -> dict:
|
||||
"""Get summary of screenshots taken this session."""
|
||||
return {
|
||||
'total_screenshots': len(self.session_screenshots),
|
||||
'directory': str(self.screenshot_dir),
|
||||
'screenshots': [str(p) for p in self.session_screenshots]
|
||||
}
|
||||
|
||||
|
||||
class ScreenshotViewer:
|
||||
"""
|
||||
Simple viewer for browsing screenshots.
|
||||
"""
|
||||
|
||||
def __init__(self, screenshot_dir: Optional[Path] = None):
|
||||
self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots"
|
||||
|
||||
def list_screenshots(self, pattern: str = "*.png") -> List[Path]:
|
||||
"""List all screenshots."""
|
||||
return sorted(self.screenshot_dir.glob(pattern), key=lambda x: x.stat().st_mtime, reverse=True)
|
||||
|
||||
def view_latest(self, n: int = 5):
|
||||
"""View latest N screenshots."""
|
||||
screenshots = self.list_screenshots()[:n]
|
||||
|
||||
for i, path in enumerate(screenshots, 1):
|
||||
print(f"{i}. {path.name} ({path.stat().st_size / 1024:.1f} KB)")
|
||||
|
||||
|
||||
# Export main classes
|
||||
__all__ = ['AutoScreenshot', 'ScreenshotViewer']
|
||||
|
|
@ -0,0 +1,359 @@
|
|||
"""
|
||||
Lemontropia Suite - Crafting Tracker
|
||||
Track crafting attempts, success rates, near successes, and profitability.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CraftAttempt:
|
||||
"""Single crafting attempt record."""
|
||||
blueprint: str
|
||||
qr_before: Decimal
|
||||
qr_after: Decimal
|
||||
success: bool
|
||||
near_success: bool
|
||||
materials_cost: Decimal
|
||||
output_value: Decimal
|
||||
timestamp: datetime
|
||||
clicks: int = 1
|
||||
|
||||
|
||||
@dataclass
|
||||
class BlueprintStats:
|
||||
"""Statistics for a specific blueprint."""
|
||||
blueprint_name: str
|
||||
total_attempts: int = 0
|
||||
successes: int = 0
|
||||
near_successes: int = 0
|
||||
failures: int = 0
|
||||
qr_gained: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
total_cost: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
total_output_value: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
current_qr: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
|
||||
@property
|
||||
def success_rate(self) -> Decimal:
|
||||
"""Calculate success rate percentage."""
|
||||
if self.total_attempts > 0:
|
||||
return (Decimal(self.successes) / Decimal(self.total_attempts)) * 100
|
||||
return Decimal("0")
|
||||
|
||||
@property
|
||||
def near_success_rate(self) -> Decimal:
|
||||
"""Calculate near success rate."""
|
||||
if self.total_attempts > 0:
|
||||
return (Decimal(self.near_successes) / Decimal(self.total_attempts)) * 100
|
||||
return Decimal("0")
|
||||
|
||||
@property
|
||||
def profit_loss(self) -> Decimal:
|
||||
"""Calculate profit/loss."""
|
||||
return self.total_output_value - self.total_cost
|
||||
|
||||
@property
|
||||
def cost_per_success(self) -> Decimal:
|
||||
"""Average cost per successful click."""
|
||||
if self.successes > 0:
|
||||
return self.total_cost / self.successes
|
||||
return Decimal("0")
|
||||
|
||||
|
||||
class CraftingTracker:
|
||||
"""
|
||||
Comprehensive crafting session tracker.
|
||||
|
||||
Tracks:
|
||||
- Success/failure/near-success rates
|
||||
- QR progression on blueprints
|
||||
- Material costs vs output value
|
||||
- Profitability per blueprint
|
||||
"""
|
||||
|
||||
def __init__(self, data_dir: Optional[Path] = None):
|
||||
self.data_dir = data_dir or Path.home() / ".lemontropia" / "crafting"
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Current session data
|
||||
self.blueprint_stats: Dict[str, BlueprintStats] = {}
|
||||
self.session_attempts: List[CraftAttempt] = []
|
||||
|
||||
# Load historical data
|
||||
self._load_data()
|
||||
|
||||
def _load_data(self):
|
||||
"""Load blueprint history."""
|
||||
try:
|
||||
data_file = self.data_dir / "blueprint_history.json"
|
||||
if data_file.exists():
|
||||
with open(data_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
for name, stats in data.items():
|
||||
self.blueprint_stats[name] = BlueprintStats(
|
||||
blueprint_name=name,
|
||||
total_attempts=stats.get('attempts', 0),
|
||||
successes=stats.get('successes', 0),
|
||||
near_successes=stats.get('near_successes', 0),
|
||||
failures=stats.get('failures', 0),
|
||||
qr_gained=Decimal(str(stats.get('qr_gained', 0))),
|
||||
total_cost=Decimal(str(stats.get('total_cost', 0))),
|
||||
total_output_value=Decimal(str(stats.get('output_value', 0))),
|
||||
current_qr=Decimal(str(stats.get('current_qr', 0))),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load crafting data: {e}")
|
||||
|
||||
def _save_data(self):
|
||||
"""Save blueprint history."""
|
||||
try:
|
||||
data_file = self.data_dir / "blueprint_history.json"
|
||||
data = {}
|
||||
for name, stats in self.blueprint_stats.items():
|
||||
data[name] = {
|
||||
'attempts': stats.total_attempts,
|
||||
'successes': stats.successes,
|
||||
'near_successes': stats.near_successes,
|
||||
'failures': stats.failures,
|
||||
'qr_gained': str(stats.qr_gained),
|
||||
'total_cost': str(stats.total_cost),
|
||||
'output_value': str(stats.total_output_value),
|
||||
'current_qr': str(stats.current_qr),
|
||||
}
|
||||
with open(data_file, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save crafting data: {e}")
|
||||
|
||||
def record_attempt(self, blueprint: str, success: bool, near_success: bool,
|
||||
materials_cost: Decimal, output_value: Decimal,
|
||||
qr_before: Decimal = Decimal("0"),
|
||||
qr_after: Decimal = Decimal("0"),
|
||||
clicks: int = 1):
|
||||
"""Record a crafting attempt."""
|
||||
|
||||
# Create or get blueprint stats
|
||||
if blueprint not in self.blueprint_stats:
|
||||
self.blueprint_stats[blueprint] = BlueprintStats(blueprint)
|
||||
|
||||
stats = self.blueprint_stats[blueprint]
|
||||
|
||||
# Update stats
|
||||
stats.total_attempts += clicks
|
||||
if success:
|
||||
stats.successes += clicks
|
||||
elif near_success:
|
||||
stats.near_successes += clicks
|
||||
else:
|
||||
stats.failures += clicks
|
||||
|
||||
stats.total_cost += materials_cost
|
||||
stats.total_output_value += output_value
|
||||
stats.current_qr = qr_after
|
||||
|
||||
qr_diff = qr_after - qr_before
|
||||
if qr_diff > 0:
|
||||
stats.qr_gained += qr_diff
|
||||
|
||||
# Record attempt
|
||||
attempt = CraftAttempt(
|
||||
blueprint=blueprint,
|
||||
qr_before=qr_before,
|
||||
qr_after=qr_after,
|
||||
success=success,
|
||||
near_success=near_success,
|
||||
materials_cost=materials_cost,
|
||||
output_value=output_value,
|
||||
timestamp=datetime.now(),
|
||||
clicks=clicks
|
||||
)
|
||||
self.session_attempts.append(attempt)
|
||||
|
||||
# Auto-save
|
||||
self._save_data()
|
||||
|
||||
logger.info(f"Craft: {blueprint} - {'SUCCESS' if success else 'NEAR' if near_success else 'FAIL'} "
|
||||
f"(QR: {qr_before}% -> {qr_after}%)")
|
||||
|
||||
def get_blueprint_summary(self, blueprint: str) -> Optional[BlueprintStats]:
|
||||
"""Get stats for a specific blueprint."""
|
||||
return self.blueprint_stats.get(blueprint)
|
||||
|
||||
def get_all_summaries(self) -> List[BlueprintStats]:
|
||||
"""Get all blueprint stats sorted by usage."""
|
||||
return sorted(
|
||||
self.blueprint_stats.values(),
|
||||
key=lambda x: x.total_attempts,
|
||||
reverse=True
|
||||
)
|
||||
|
||||
def get_session_summary(self) -> Dict:
|
||||
"""Get current session summary."""
|
||||
total_attempts = len(self.session_attempts)
|
||||
total_successes = sum(1 for a in self.session_attempts if a.success)
|
||||
total_near = sum(1 for a in self.session_attempts if a.near_success)
|
||||
|
||||
total_cost = sum(a.materials_cost for a in self.session_attempts)
|
||||
total_output = sum(a.output_value for a in self.session_attempts)
|
||||
|
||||
return {
|
||||
'total_attempts': total_attempts,
|
||||
'successes': total_successes,
|
||||
'near_successes': total_near,
|
||||
'failures': total_attempts - total_successes - total_near,
|
||||
'success_rate': (Decimal(total_successes) / Decimal(total_attempts) * 100) if total_attempts > 0 else Decimal("0"),
|
||||
'total_cost': total_cost,
|
||||
'total_output': total_output,
|
||||
'profit_loss': total_output - total_cost,
|
||||
'blueprints_used': len(set(a.blueprint for a in self.session_attempts)),
|
||||
}
|
||||
|
||||
def get_success_rate_by_qr_range(self) -> Dict[str, Decimal]:
|
||||
"""Analyze success rate by QR ranges."""
|
||||
ranges = {
|
||||
'0-25%': {'attempts': 0, 'successes': 0},
|
||||
'25-50%': {'attempts': 0, 'successes': 0},
|
||||
'50-75%': {'attempts': 0, 'successes': 0},
|
||||
'75-100%': {'attempts': 0, 'successes': 0},
|
||||
}
|
||||
|
||||
for attempt in self.session_attempts:
|
||||
qr = attempt.qr_before
|
||||
if qr < 25:
|
||||
key = '0-25%'
|
||||
elif qr < 50:
|
||||
key = '25-50%'
|
||||
elif qr < 75:
|
||||
key = '50-75%'
|
||||
else:
|
||||
key = '75-100%'
|
||||
|
||||
ranges[key]['attempts'] += 1
|
||||
if attempt.success:
|
||||
ranges[key]['successes'] += 1
|
||||
|
||||
return {
|
||||
r: (Decimal(data['successes']) / Decimal(data['attempts']) * 100) if data['attempts'] > 0 else Decimal("0")
|
||||
for r, data in ranges.items()
|
||||
}
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""Generate detailed crafting report."""
|
||||
session = self.get_session_summary()
|
||||
|
||||
report = []
|
||||
report.append("=" * 60)
|
||||
report.append("CRAFTING SESSION REPORT")
|
||||
report.append("=" * 60)
|
||||
report.append(f"Total Attempts: {session['total_attempts']}")
|
||||
report.append(f"Success Rate: {session['success_rate']:.1f}%")
|
||||
report.append(f"Material Cost: {session['total_cost']:.2f} PED")
|
||||
report.append(f"Output Value: {session['total_output']:.2f} PED")
|
||||
report.append(f"Profit/Loss: {session['profit_loss']:+.2f} PED")
|
||||
report.append("")
|
||||
|
||||
report.append("BLUEPRINT BREAKDOWN:")
|
||||
for stats in self.get_all_summaries()[:10]:
|
||||
report.append(f" {stats.blueprint_name[:25]:25} "
|
||||
f"SR: {stats.success_rate:5.1f}% "
|
||||
f"P/L: {stats.profit_loss:+7.2f}")
|
||||
|
||||
report.append("")
|
||||
report.append("SUCCESS RATE BY QR RANGE:")
|
||||
for range_name, rate in self.get_success_rate_by_qr_range().items():
|
||||
report.append(f" {range_name}: {rate:.1f}%")
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
def should_continue_crafting(self, blueprint: str, max_loss: Decimal = Decimal("50")) -> bool:
|
||||
"""Advise whether to continue crafting a blueprint."""
|
||||
stats = self.blueprint_stats.get(blueprint)
|
||||
if not stats:
|
||||
return True
|
||||
|
||||
# Stop if losing too much
|
||||
if stats.profit_loss < -max_loss:
|
||||
return False
|
||||
|
||||
# Stop if success rate is terrible and QR is high
|
||||
if stats.success_rate < 20 and stats.current_qr > 80:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class MaterialTracker:
|
||||
"""
|
||||
Track material inventory and consumption.
|
||||
"""
|
||||
|
||||
def __init__(self, data_dir: Optional[Path] = None):
|
||||
self.data_dir = data_dir or Path.home() / ".lemontropia" / "materials"
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.inventory: Dict[str, Decimal] = {}
|
||||
self._load_inventory()
|
||||
|
||||
def _load_inventory(self):
|
||||
"""Load material inventory."""
|
||||
try:
|
||||
inv_file = self.data_dir / "inventory.json"
|
||||
if inv_file.exists():
|
||||
with open(inv_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
self.inventory = {k: Decimal(str(v)) for k, v in data.items()}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load inventory: {e}")
|
||||
|
||||
def _save_inventory(self):
|
||||
"""Save material inventory."""
|
||||
try:
|
||||
inv_file = self.data_dir / "inventory.json"
|
||||
with open(inv_file, 'w') as f:
|
||||
json.dump({k: str(v) for k, v in self.inventory.items()}, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save inventory: {e}")
|
||||
|
||||
def add_material(self, name: str, quantity: Decimal, unit_value: Optional[Decimal] = None):
|
||||
"""Add materials to inventory."""
|
||||
if name in self.inventory:
|
||||
self.inventory[name] += quantity
|
||||
else:
|
||||
self.inventory[name] = quantity
|
||||
self._save_inventory()
|
||||
|
||||
def consume_material(self, name: str, quantity: Decimal) -> bool:
|
||||
"""Consume materials from inventory."""
|
||||
if name not in self.inventory or self.inventory[name] < quantity:
|
||||
logger.warning(f"Insufficient {name} (have {self.inventory.get(name, 0)}, need {quantity})")
|
||||
return False
|
||||
|
||||
self.inventory[name] -= quantity
|
||||
if self.inventory[name] <= 0:
|
||||
del self.inventory[name]
|
||||
|
||||
self._save_inventory()
|
||||
return True
|
||||
|
||||
def get_inventory_value(self, price_lookup: Optional[Dict[str, Decimal]] = None) -> Decimal:
|
||||
"""Calculate total inventory value."""
|
||||
total = Decimal("0")
|
||||
for item, qty in self.inventory.items():
|
||||
price = Decimal("1") # Default value
|
||||
if price_lookup and item in price_lookup:
|
||||
price = price_lookup[item]
|
||||
total += qty * price
|
||||
return total
|
||||
|
||||
|
||||
# Export main classes
|
||||
__all__ = ['CraftingTracker', 'MaterialTracker', 'BlueprintStats', 'CraftAttempt']
|
||||
|
|
@ -0,0 +1,365 @@
|
|||
"""
|
||||
Lemontropia Suite - Game Vision System
|
||||
Computer vision module for reading UI elements from Entropia Universe.
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from decimal import Decimal
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple, List, Dict, Any
|
||||
import mss
|
||||
import time
|
||||
|
||||
# Try to import PaddleOCR, fallback to None if not available
|
||||
try:
|
||||
from paddleocr import PaddleOCR
|
||||
PADDLE_AVAILABLE = True
|
||||
except ImportError:
|
||||
PADDLE_AVAILABLE = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class DetectedText:
|
||||
"""Detected text with metadata."""
|
||||
text: str
|
||||
confidence: float
|
||||
region: Tuple[int, int, int, int] # x, y, w, h
|
||||
|
||||
|
||||
@dataclass
|
||||
class EquippedGear:
|
||||
"""Currently equipped gear detected from game."""
|
||||
weapon_name: Optional[str] = None
|
||||
weapon_confidence: float = 0.0
|
||||
armor_name: Optional[str] = None
|
||||
armor_confidence: float = 0.0
|
||||
tool_name: Optional[str] = None
|
||||
tool_confidence: float = 0.0
|
||||
detected_at: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class TargetInfo:
|
||||
"""Current target mob information."""
|
||||
mob_name: Optional[str] = None
|
||||
confidence: float = 0.0
|
||||
health_percent: Optional[int] = None
|
||||
detected_at: Optional[float] = None
|
||||
|
||||
|
||||
class ScreenCapture:
|
||||
"""Cross-platform screen capture."""
|
||||
|
||||
def __init__(self):
|
||||
self.sct = mss.mss()
|
||||
|
||||
def capture_full_screen(self) -> np.ndarray:
|
||||
"""Capture full screen."""
|
||||
monitor = self.sct.monitors[1] # Primary monitor
|
||||
screenshot = self.sct.grab(monitor)
|
||||
# Convert to numpy array (BGR for OpenCV)
|
||||
img = np.array(screenshot)
|
||||
return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||||
|
||||
def capture_region(self, x: int, y: int, w: int, h: int) -> np.ndarray:
|
||||
"""Capture specific region."""
|
||||
monitor = {"left": x, "top": y, "width": w, "height": h}
|
||||
screenshot = self.sct.grab(monitor)
|
||||
img = np.array(screenshot)
|
||||
return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
|
||||
|
||||
def find_window(self, window_title: str) -> Optional[Tuple[int, int, int, int]]:
|
||||
"""Find window by title (Windows only)."""
|
||||
try:
|
||||
import win32gui
|
||||
|
||||
def callback(hwnd, extra):
|
||||
if win32gui.IsWindowVisible(hwnd):
|
||||
title = win32gui.GetWindowText(hwnd)
|
||||
if window_title.lower() in title.lower():
|
||||
rect = win32gui.GetWindowRect(hwnd)
|
||||
extra.append((rect[0], rect[1], rect[2] - rect[0], rect[3] - rect[1]))
|
||||
|
||||
windows = []
|
||||
win32gui.EnumWindows(callback, windows)
|
||||
return windows[0] if windows else None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to find window: {e}")
|
||||
return None
|
||||
|
||||
|
||||
class TemplateMatcher:
|
||||
"""Template matching for finding UI elements."""
|
||||
|
||||
def __init__(self, templates_dir: Optional[Path] = None):
|
||||
self.templates_dir = templates_dir or Path(__file__).parent / "templates"
|
||||
self.templates: Dict[str, np.ndarray] = {}
|
||||
self._load_templates()
|
||||
|
||||
def _load_templates(self):
|
||||
"""Load template images."""
|
||||
if not self.templates_dir.exists():
|
||||
logger.warning(f"Templates directory not found: {self.templates_dir}")
|
||||
return
|
||||
|
||||
for template_file in self.templates_dir.glob("*.png"):
|
||||
try:
|
||||
name = template_file.stem
|
||||
self.templates[name] = cv2.imread(str(template_file), cv2.IMREAD_GRAYSCALE)
|
||||
logger.info(f"Loaded template: {name}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load template {template_file}: {e}")
|
||||
|
||||
def find_template(self, screenshot: np.ndarray, template_name: str,
|
||||
threshold: float = 0.8) -> Optional[Tuple[int, int, int, int]]:
|
||||
"""Find template in screenshot."""
|
||||
if template_name not in self.templates:
|
||||
logger.warning(f"Template not found: {template_name}")
|
||||
return None
|
||||
|
||||
template = self.templates[template_name]
|
||||
gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
|
||||
|
||||
result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED)
|
||||
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
||||
|
||||
if max_val >= threshold:
|
||||
x, y = max_loc
|
||||
h, w = template.shape
|
||||
return (x, y, w, h)
|
||||
|
||||
return None
|
||||
|
||||
def find_all_templates(self, screenshot: np.ndarray, threshold: float = 0.7) -> Dict[str, Tuple[int, int, int, int]]:
|
||||
"""Find all known templates in screenshot."""
|
||||
found = {}
|
||||
for name in self.templates:
|
||||
result = self.find_template(screenshot, name, threshold)
|
||||
if result:
|
||||
found[name] = result
|
||||
return found
|
||||
|
||||
|
||||
class GameVision:
|
||||
"""
|
||||
Main computer vision interface for reading game UI.
|
||||
"""
|
||||
|
||||
def __init__(self, use_ocr: bool = True):
|
||||
self.capture = ScreenCapture()
|
||||
self.template_matcher = TemplateMatcher()
|
||||
|
||||
# Initialize OCR if available
|
||||
self.ocr = None
|
||||
if use_ocr and PADDLE_AVAILABLE:
|
||||
try:
|
||||
self.ocr = PaddleOCR(
|
||||
lang='en',
|
||||
use_gpu=False,
|
||||
show_log=False,
|
||||
det_model_dir=None,
|
||||
rec_model_dir=None,
|
||||
)
|
||||
logger.info("PaddleOCR initialized")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to initialize PaddleOCR: {e}")
|
||||
|
||||
# Region definitions (relative to game window)
|
||||
# These would be calibrated based on actual UI
|
||||
self.regions = {
|
||||
'weapon_slot': None, # To be defined
|
||||
'armor_slot': None,
|
||||
'target_window': None,
|
||||
'health_bar': None,
|
||||
}
|
||||
|
||||
self.last_equipped: Optional[EquippedGear] = None
|
||||
self.last_target: Optional[TargetInfo] = None
|
||||
|
||||
def read_text_region(self, screenshot: np.ndarray, region: Tuple[int, int, int, int]) -> List[DetectedText]:
|
||||
"""Read text from a specific region using OCR."""
|
||||
if self.ocr is None:
|
||||
logger.warning("OCR not available")
|
||||
return []
|
||||
|
||||
x, y, w, h = region
|
||||
crop = screenshot[y:y+h, x:x+w]
|
||||
|
||||
# Preprocess for better OCR
|
||||
gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY)
|
||||
_, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
|
||||
|
||||
try:
|
||||
result = self.ocr.ocr(thresh, cls=False)
|
||||
|
||||
detected = []
|
||||
if result and result[0]:
|
||||
for line in result[0]:
|
||||
bbox, (text, confidence) = line
|
||||
detected.append(DetectedText(
|
||||
text=text.strip(),
|
||||
confidence=confidence,
|
||||
region=(x, y, w, h) # Simplified region
|
||||
))
|
||||
return detected
|
||||
except Exception as e:
|
||||
logger.error(f"OCR failed: {e}")
|
||||
return []
|
||||
|
||||
def detect_equipped_weapon(self, screenshot: Optional[np.ndarray] = None) -> Optional[str]:
|
||||
"""Detect currently equipped weapon name."""
|
||||
if screenshot is None:
|
||||
screenshot = self.capture.capture_full_screen()
|
||||
|
||||
# Find weapon slot region using template matching
|
||||
region = self.template_matcher.find_template(screenshot, 'weapon_slot')
|
||||
if not region:
|
||||
logger.debug("Weapon slot not found")
|
||||
return None
|
||||
|
||||
# Adjust region to focus on text area
|
||||
x, y, w, h = region
|
||||
text_region = (x, y + h, w, 20) # Below the icon
|
||||
|
||||
# Read text
|
||||
texts = self.read_text_region(screenshot, text_region)
|
||||
if texts:
|
||||
best = max(texts, key=lambda x: x.confidence)
|
||||
if best.confidence > 0.7:
|
||||
return best.text
|
||||
|
||||
return None
|
||||
|
||||
def detect_equipped_armor(self, screenshot: Optional[np.ndarray] = None) -> Optional[str]:
|
||||
"""Detect currently equipped armor name."""
|
||||
if screenshot is None:
|
||||
screenshot = self.capture.capture_full_screen()
|
||||
|
||||
region = self.template_matcher.find_template(screenshot, 'armor_slot')
|
||||
if not region:
|
||||
return None
|
||||
|
||||
texts = self.read_text_region(screenshot, region)
|
||||
if texts:
|
||||
best = max(texts, key=lambda x: x.confidence)
|
||||
if best.confidence > 0.7:
|
||||
return best.text
|
||||
|
||||
return None
|
||||
|
||||
def detect_target_mob(self, screenshot: Optional[np.ndarray] = None) -> Optional[TargetInfo]:
|
||||
"""Detect current target mob name."""
|
||||
if screenshot is None:
|
||||
screenshot = self.capture.capture_full_screen()
|
||||
|
||||
region = self.template_matcher.find_template(screenshot, 'target_window')
|
||||
if not region:
|
||||
return None
|
||||
|
||||
texts = self.read_text_region(screenshot, region)
|
||||
if texts:
|
||||
# First text is usually the mob name
|
||||
best = texts[0]
|
||||
if best.confidence > 0.6:
|
||||
return TargetInfo(
|
||||
mob_name=best.text,
|
||||
confidence=best.confidence,
|
||||
detected_at=time.time()
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def scan_equipped_gear(self) -> EquippedGear:
|
||||
"""Full scan of all equipped gear."""
|
||||
screenshot = self.capture.capture_full_screen()
|
||||
|
||||
gear = EquippedGear(detected_at=time.time())
|
||||
|
||||
weapon = self.detect_equipped_weapon(screenshot)
|
||||
if weapon:
|
||||
gear.weapon_name = weapon
|
||||
gear.weapon_confidence = 0.8 # Placeholder
|
||||
|
||||
armor = self.detect_equipped_armor(screenshot)
|
||||
if armor:
|
||||
gear.armor_name = armor
|
||||
gear.armor_confidence = 0.8
|
||||
|
||||
self.last_equipped = gear
|
||||
return gear
|
||||
|
||||
def poll_target(self, interval: float = 2.0) -> Optional[TargetInfo]:
|
||||
"""Poll for target changes."""
|
||||
current_time = time.time()
|
||||
|
||||
if (self.last_target and
|
||||
self.last_target.detected_at and
|
||||
current_time - self.last_target.detected_at < interval):
|
||||
return self.last_target
|
||||
|
||||
target = self.detect_target_mob()
|
||||
if target:
|
||||
self.last_target = target
|
||||
|
||||
return target
|
||||
|
||||
|
||||
class TemplateCaptureTool:
|
||||
"""
|
||||
Interactive tool for capturing UI templates.
|
||||
Usage: Run this to create template images for UI elements.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.capture = ScreenCapture()
|
||||
self.templates_dir = Path.home() / ".lemontropia" / "templates"
|
||||
self.templates_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def capture_template(self, name: str, region: Tuple[int, int, int, int]):
|
||||
"""Capture and save a template."""
|
||||
x, y, w, h = region
|
||||
img = self.capture.capture_region(x, y, w, h)
|
||||
|
||||
filepath = self.templates_dir / f"{name}.png"
|
||||
cv2.imwrite(str(filepath), img)
|
||||
logger.info(f"Template saved: {filepath}")
|
||||
return filepath
|
||||
|
||||
def interactive_capture(self):
|
||||
"""Interactive template capture."""
|
||||
print("Template Capture Tool")
|
||||
print("=" * 50)
|
||||
print("1. Position your mouse at top-left of UI element")
|
||||
print("2. Press SPACE to capture")
|
||||
print("3. Enter template name")
|
||||
print("4. Repeat for all templates")
|
||||
print("=" * 50)
|
||||
|
||||
templates_to_capture = [
|
||||
'weapon_slot',
|
||||
'armor_slot',
|
||||
'target_window',
|
||||
'health_bar',
|
||||
'inventory_icon',
|
||||
]
|
||||
|
||||
for template_name in templates_to_capture:
|
||||
input(f"\nReady to capture: {template_name}")
|
||||
print("Taking screenshot in 2 seconds...")
|
||||
time.sleep(2)
|
||||
|
||||
# Full screenshot
|
||||
full = self.capture.capture_full_screen()
|
||||
|
||||
# TODO: Allow user to draw region
|
||||
# For now, use hardcoded regions based on typical EU layout
|
||||
print(f"Template {template_name} would be captured here")
|
||||
|
||||
|
||||
# Export main classes
|
||||
__all__ = ['GameVision', 'ScreenCapture', 'TemplateMatcher', 'EquippedGear', 'TargetInfo', 'TemplateCaptureTool']
|
||||
|
|
@ -0,0 +1,359 @@
|
|||
"""
|
||||
Lemontropia Suite - Enhanced Loot Analyzer
|
||||
Detailed loot breakdown by mob type, item category, and value ranges.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LootItem:
|
||||
"""Individual loot item record."""
|
||||
name: str
|
||||
quantity: int
|
||||
value_ped: Decimal
|
||||
timestamp: datetime
|
||||
mob_name: Optional[str] = None
|
||||
session_id: Optional[int] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LootStats:
|
||||
"""Statistics for a specific item type."""
|
||||
item_name: str
|
||||
total_count: int = 0
|
||||
total_value: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
max_value: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
min_value: Decimal = field(default_factory=lambda: Decimal("999999"))
|
||||
avg_value: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
|
||||
def add_loot(self, value: Decimal, count: int = 1):
|
||||
"""Add loot to statistics."""
|
||||
self.total_count += count
|
||||
self.total_value += value
|
||||
if value > self.max_value:
|
||||
self.max_value = value
|
||||
if value < self.min_value:
|
||||
self.min_value = value
|
||||
self.avg_value = self.total_value / self.total_count if self.total_count > 0 else Decimal("0")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MobStats:
|
||||
"""Statistics for a specific mob type."""
|
||||
mob_name: str
|
||||
kill_count: int = 0
|
||||
total_loot: Decimal = field(default_factory=lambda: Decimal("0"))
|
||||
item_breakdown: Dict[str, LootStats] = field(default_factory=dict)
|
||||
|
||||
def add_kill(self, loot_value: Decimal, items: List[Tuple[str, Decimal]]):
|
||||
"""Record a kill with its loot."""
|
||||
self.kill_count += 1
|
||||
self.total_loot += loot_value
|
||||
|
||||
for item_name, value in items:
|
||||
if item_name not in self.item_breakdown:
|
||||
self.item_breakdown[item_name] = LootStats(item_name)
|
||||
self.item_breakdown[item_name].add_loot(value)
|
||||
|
||||
|
||||
class LootAnalyzer:
|
||||
"""
|
||||
Advanced loot analysis for hunting sessions.
|
||||
|
||||
Features:
|
||||
- Track loot by mob type
|
||||
- Item category breakdown
|
||||
- Value distribution analysis
|
||||
- Session comparison
|
||||
"""
|
||||
|
||||
def __init__(self, data_dir: Optional[Path] = None):
|
||||
self.data_dir = data_dir or Path.home() / ".lemontropia" / "loot_analysis"
|
||||
self.data_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Current session data
|
||||
self.current_session_items: List[LootItem] = []
|
||||
self.mob_stats: Dict[str, MobStats] = defaultdict(lambda: MobStats(""))
|
||||
self.global_items: List[LootItem] = [] # Track globals separately
|
||||
|
||||
# Item categories (expandable)
|
||||
self.item_categories = {
|
||||
'shrapnel': ['shrapnel'],
|
||||
'ores': ['iron stone', 'lysterium stone', 'belkar stone', 'caldorite stone'],
|
||||
'enmatters': ['muscle oil', 'tir oil', 'crude oil'],
|
||||
'animal_parts': ['wool', 'hide', 'meat', 'bone'],
|
||||
'robot_parts': ['robot component', 'robot filter'],
|
||||
'weapons': ['weapon', 'rifle', 'pistol', 'sword'],
|
||||
'armor': ['armor', 'harness', 'helmet', 'boots'],
|
||||
'misc': []
|
||||
}
|
||||
|
||||
self._load_historical_data()
|
||||
|
||||
def _load_historical_data(self):
|
||||
"""Load historical loot data for comparison."""
|
||||
try:
|
||||
history_file = self.data_dir / "loot_history.json"
|
||||
if history_file.exists():
|
||||
with open(history_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
# Could restore previous session stats here
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load loot history: {e}")
|
||||
|
||||
def _save_historical_data(self):
|
||||
"""Save current session data for future comparison."""
|
||||
try:
|
||||
history_file = self.data_dir / "loot_history.json"
|
||||
# Save summary stats
|
||||
data = {
|
||||
'last_session': datetime.now().isoformat(),
|
||||
'total_mobs': sum(m.kill_count for m in self.mob_stats.values()),
|
||||
'total_loot': str(sum(m.total_loot for m in self.mob_stats.values())),
|
||||
}
|
||||
with open(history_file, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save loot history: {e}")
|
||||
|
||||
def categorize_item(self, item_name: str) -> str:
|
||||
"""Categorize an item by name."""
|
||||
name_lower = item_name.lower()
|
||||
|
||||
for category, items in self.item_categories.items():
|
||||
for item_pattern in items:
|
||||
if item_pattern in name_lower:
|
||||
return category
|
||||
|
||||
return 'misc'
|
||||
|
||||
def record_loot(self, item_name: str, quantity: int, value_ped: Decimal,
|
||||
mob_name: Optional[str] = None, is_global: bool = False) -> None:
|
||||
"""Record a loot item."""
|
||||
item = LootItem(
|
||||
name=item_name,
|
||||
quantity=quantity,
|
||||
value_ped=value_ped,
|
||||
timestamp=datetime.now(),
|
||||
mob_name=mob_name
|
||||
)
|
||||
|
||||
self.current_session_items.append(item)
|
||||
|
||||
if is_global:
|
||||
self.global_items.append(item)
|
||||
|
||||
# Update mob stats if we know the mob
|
||||
if mob_name:
|
||||
if mob_name not in self.mob_stats:
|
||||
self.mob_stats[mob_name] = MobStats(mob_name)
|
||||
# Note: This is simplified - ideally we batch items per kill
|
||||
|
||||
def get_session_summary(self) -> Dict:
|
||||
"""Get summary of current session."""
|
||||
total_items = len(self.current_session_items)
|
||||
total_value = sum(item.value_ped for item in self.current_session_items)
|
||||
|
||||
# Category breakdown
|
||||
category_values = defaultdict(lambda: Decimal("0"))
|
||||
for item in self.current_session_items:
|
||||
cat = self.categorize_item(item.name)
|
||||
category_values[cat] += item.value_ped
|
||||
|
||||
return {
|
||||
'total_items': total_items,
|
||||
'total_value': total_value,
|
||||
'globals_count': len(self.global_items),
|
||||
'category_breakdown': dict(category_values),
|
||||
'mob_types': len(self.mob_stats),
|
||||
}
|
||||
|
||||
def get_top_loot(self, n: int = 10) -> List[LootItem]:
|
||||
"""Get top N highest value loot items."""
|
||||
sorted_items = sorted(
|
||||
self.current_session_items,
|
||||
key=lambda x: x.value_ped,
|
||||
reverse=True
|
||||
)
|
||||
return sorted_items[:n]
|
||||
|
||||
def get_mob_efficiency(self) -> Dict[str, Decimal]:
|
||||
"""Get loot per kill for each mob type."""
|
||||
return {
|
||||
mob_name: stats.total_loot / stats.kill_count
|
||||
for mob_name, stats in self.mob_stats.items()
|
||||
if stats.kill_count > 0
|
||||
}
|
||||
|
||||
def generate_report(self) -> str:
|
||||
"""Generate text report of loot analysis."""
|
||||
summary = self.get_session_summary()
|
||||
|
||||
report = []
|
||||
report.append("=" * 50)
|
||||
report.append("LOOT ANALYSIS REPORT")
|
||||
report.append("=" * 50)
|
||||
report.append(f"Total Items: {summary['total_items']}")
|
||||
report.append(f"Total Value: {summary['total_value']:.2f} PED")
|
||||
report.append(f"Globals: {summary['globals_count']}")
|
||||
report.append(f"Mob Types: {summary['mob_types']}")
|
||||
report.append("")
|
||||
|
||||
report.append("CATEGORY BREAKDOWN:")
|
||||
for cat, value in sorted(summary['category_breakdown'].items(),
|
||||
key=lambda x: x[1], reverse=True):
|
||||
pct = (value / summary['total_value'] * 100) if summary['total_value'] > 0 else 0
|
||||
report.append(f" {cat.capitalize():12} {value:8.2f} PED ({pct:5.1f}%)")
|
||||
|
||||
report.append("")
|
||||
report.append("TOP 5 LOOT ITEMS:")
|
||||
for i, item in enumerate(self.get_top_loot(5), 1):
|
||||
report.append(f" {i}. {item.name[:25]:25} {item.value_ped:8.2f} PED")
|
||||
|
||||
report.append("")
|
||||
report.append("MOB EFFICIENCY:")
|
||||
for mob, efficiency in sorted(self.get_mob_efficiency().items(),
|
||||
key=lambda x: x[1], reverse=True)[:5]:
|
||||
stats = self.mob_stats[mob]
|
||||
report.append(f" {mob[:20]:20} {efficiency:6.2f} PED/kill ({stats.kill_count} kills)")
|
||||
|
||||
return "\n".join(report)
|
||||
|
||||
def export_to_csv(self, filepath: Path) -> None:
|
||||
"""Export loot data to CSV."""
|
||||
import csv
|
||||
|
||||
with open(filepath, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(['Timestamp', 'Item', 'Quantity', 'Value (PED)', 'Mob', 'Category'])
|
||||
|
||||
for item in self.current_session_items:
|
||||
writer.writerow([
|
||||
item.timestamp.isoformat(),
|
||||
item.name,
|
||||
item.quantity,
|
||||
str(item.value_ped),
|
||||
item.mob_name or 'Unknown',
|
||||
self.categorize_item(item.name)
|
||||
])
|
||||
|
||||
|
||||
class DPSCalculator:
|
||||
"""
|
||||
Real-time DPS (Damage Per Second) and DPP (Damage Per Pec) calculator.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.damage_events: List[Tuple[datetime, Decimal]] = []
|
||||
self.total_damage = Decimal("0")
|
||||
self.total_cost = Decimal("0") # In PEC
|
||||
|
||||
def record_damage(self, damage: Decimal, cost_pec: Decimal = Decimal("0")):
|
||||
"""Record a damage event."""
|
||||
self.damage_events.append((datetime.now(), damage))
|
||||
self.total_damage += damage
|
||||
self.total_cost += cost_pec
|
||||
|
||||
# Keep only last 60 seconds for DPS calculation
|
||||
cutoff = datetime.now() - timedelta(seconds=60)
|
||||
self.damage_events = [(t, d) for t, d in self.damage_events if t > cutoff]
|
||||
|
||||
def get_current_dps(self) -> Decimal:
|
||||
"""Calculate DPS over last 60 seconds."""
|
||||
if len(self.damage_events) < 2:
|
||||
return Decimal("0")
|
||||
|
||||
total_damage = sum(d for _, d in self.damage_events)
|
||||
time_span = (self.damage_events[-1][0] - self.damage_events[0][0]).total_seconds()
|
||||
|
||||
if time_span > 0:
|
||||
return Decimal(str(total_damage)) / Decimal(str(time_span))
|
||||
return Decimal("0")
|
||||
|
||||
def get_dpp(self) -> Decimal:
|
||||
"""Calculate Damage Per Pec (efficiency metric)."""
|
||||
if self.total_cost > 0:
|
||||
return self.total_damage / self.total_cost
|
||||
return Decimal("0")
|
||||
|
||||
def get_session_stats(self) -> Dict:
|
||||
"""Get overall session stats."""
|
||||
return {
|
||||
'total_damage': self.total_damage,
|
||||
'total_cost_pec': self.total_cost,
|
||||
'total_cost_ped': self.total_cost / 100,
|
||||
'dpp': self.get_dpp(),
|
||||
'current_dps': self.get_current_dps(),
|
||||
'hit_count': len(self.damage_events),
|
||||
}
|
||||
|
||||
|
||||
class GlobalAlertSystem:
|
||||
"""
|
||||
Alert system for globals and HoFs.
|
||||
Plays sound, shows notification, auto-screenshots.
|
||||
"""
|
||||
|
||||
def __init__(self, screenshot_dir: Optional[Path] = None):
|
||||
self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots"
|
||||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.global_sound_enabled = True
|
||||
self.hof_sound_enabled = True
|
||||
self.auto_screenshot = True
|
||||
|
||||
def on_global(self, value_ped: Decimal, item_name: str):
|
||||
"""Handle global event."""
|
||||
logger.info(f"🌟 GLOBAL! {item_name} - {value_ped:.2f} PED")
|
||||
|
||||
if self.global_sound_enabled:
|
||||
self._play_sound("global")
|
||||
|
||||
if self.auto_screenshot:
|
||||
self._take_screenshot(f"global_{datetime.now():%Y%m%d_%H%M%S}")
|
||||
|
||||
def on_hof(self, value_ped: Decimal, item_name: str):
|
||||
"""Handle Hall of Fame event."""
|
||||
logger.info(f"🏆 HALL OF FAME! {item_name} - {value_ped:.2f} PED")
|
||||
|
||||
if self.hof_sound_enabled:
|
||||
self._play_sound("hof")
|
||||
|
||||
if self.auto_screenshot:
|
||||
self._take_screenshot(f"hof_{datetime.now():%Y%m%d_%H%M%S}")
|
||||
|
||||
def _play_sound(self, sound_type: str):
|
||||
"""Play alert sound."""
|
||||
try:
|
||||
if sys.platform == 'win32':
|
||||
import winsound
|
||||
if sound_type == "hof":
|
||||
winsound.MessageBeep(winsound.MB_ICONEXCLAMATION)
|
||||
else:
|
||||
winsound.MessageBeep(winsound.MB_OK)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to play sound: {e}")
|
||||
|
||||
def _take_screenshot(self, filename: str):
|
||||
"""Take screenshot of game window."""
|
||||
try:
|
||||
# This would integrate with the vision system
|
||||
# For now, just log it
|
||||
screenshot_path = self.screenshot_dir / f"{filename}.png"
|
||||
logger.info(f"Screenshot saved: {screenshot_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to take screenshot: {e}")
|
||||
|
||||
|
||||
# Export main classes
|
||||
__all__ = ['LootAnalyzer', 'DPSCalculator', 'GlobalAlertSystem', 'LootStats', 'MobStats']
|
||||
|
|
@ -0,0 +1,281 @@
|
|||
"""
|
||||
Lemontropia Suite - Notification System
|
||||
Send alerts to Discord, Telegram, or other services on important events.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Dict, List
|
||||
from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class NotificationConfig:
|
||||
"""Configuration for notifications."""
|
||||
discord_webhook: Optional[str] = None
|
||||
telegram_bot_token: Optional[str] = None
|
||||
telegram_chat_id: Optional[str] = None
|
||||
|
||||
# Event filters
|
||||
notify_on_global: bool = True
|
||||
notify_on_hof: bool = True
|
||||
notify_on_profit_threshold: bool = False
|
||||
profit_threshold: Decimal = Decimal("100")
|
||||
notify_on_loss_threshold: bool = True
|
||||
loss_threshold: Decimal = Decimal("-50")
|
||||
|
||||
|
||||
class DiscordNotifier:
|
||||
"""Send notifications to Discord via webhook."""
|
||||
|
||||
def __init__(self, webhook_url: str):
|
||||
self.webhook_url = webhook_url
|
||||
|
||||
def send_message(self, content: str, embeds: Optional[List[Dict]] = None) -> bool:
|
||||
"""Send message to Discord."""
|
||||
try:
|
||||
data = {"content": content}
|
||||
if embeds:
|
||||
data["embeds"] = embeds
|
||||
|
||||
response = requests.post(
|
||||
self.webhook_url,
|
||||
json=data,
|
||||
timeout=10
|
||||
)
|
||||
return response.status_code == 204
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send Discord notification: {e}")
|
||||
return False
|
||||
|
||||
def send_loot_alert(self, item_name: str, value: Decimal, is_hof: bool = False):
|
||||
"""Send loot alert."""
|
||||
color = 0xFFD700 if is_hof else 0x00FF00 # Gold for HoF, Green for Global
|
||||
title = "🏆 HALL OF FAME!" if is_hof else "🌟 GLOBAL!"
|
||||
|
||||
embed = {
|
||||
"title": title,
|
||||
"description": f"**{item_name}**",
|
||||
"color": color,
|
||||
"fields": [
|
||||
{
|
||||
"name": "Value",
|
||||
"value": f"{value:.2f} PED",
|
||||
"inline": True
|
||||
}
|
||||
],
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
return self.send_message("", [embed])
|
||||
|
||||
def send_session_summary(self, session_data: Dict):
|
||||
"""Send session summary."""
|
||||
profit = session_data.get('profit_loss', Decimal("0"))
|
||||
color = 0x00FF00 if profit >= 0 else 0xFF0000
|
||||
|
||||
embed = {
|
||||
"title": "Hunting Session Complete",
|
||||
"color": color,
|
||||
"fields": [
|
||||
{
|
||||
"name": "Duration",
|
||||
"value": session_data.get('duration', 'Unknown'),
|
||||
"inline": True
|
||||
},
|
||||
{
|
||||
"name": "Profit/Loss",
|
||||
"value": f"{profit:+.2f} PED",
|
||||
"inline": True
|
||||
},
|
||||
{
|
||||
"name": "Return %",
|
||||
"value": f"{session_data.get('return_pct', 0):.1f}%",
|
||||
"inline": True
|
||||
},
|
||||
{
|
||||
"name": "Globals",
|
||||
"value": str(session_data.get('globals', 0)),
|
||||
"inline": True
|
||||
}
|
||||
],
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
return self.send_message("", [embed])
|
||||
|
||||
|
||||
class TelegramNotifier:
|
||||
"""Send notifications to Telegram via bot."""
|
||||
|
||||
def __init__(self, bot_token: str, chat_id: str):
|
||||
self.bot_token = bot_token
|
||||
self.chat_id = chat_id
|
||||
self.base_url = f"https://api.telegram.org/bot{bot_token}"
|
||||
|
||||
def send_message(self, text: str) -> bool:
|
||||
"""Send message to Telegram."""
|
||||
try:
|
||||
url = f"{self.base_url}/sendMessage"
|
||||
data = {
|
||||
"chat_id": self.chat_id,
|
||||
"text": text,
|
||||
"parse_mode": "Markdown"
|
||||
}
|
||||
|
||||
response = requests.post(url, json=data, timeout=10)
|
||||
return response.json().get("ok", False)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to send Telegram notification: {e}")
|
||||
return False
|
||||
|
||||
def send_loot_alert(self, item_name: str, value: Decimal, is_hof: bool = False):
|
||||
"""Send loot alert."""
|
||||
emoji = "🏆" if is_hof else "🌟"
|
||||
title = "HALL OF FAME!" if is_hof else "GLOBAL!"
|
||||
|
||||
text = f"{emoji} *{title}* {emoji}\n\n"
|
||||
text += f"*{item_name}*\n"
|
||||
text += f"Value: *{value:.2f} PED*"
|
||||
|
||||
return self.send_message(text)
|
||||
|
||||
def send_session_summary(self, session_data: Dict):
|
||||
"""Send session summary."""
|
||||
profit = session_data.get('profit_loss', Decimal("0"))
|
||||
emoji = "✅" if profit >= 0 else "❌"
|
||||
|
||||
text = f"{emoji} *Session Complete*\n\n"
|
||||
text += f"Duration: {session_data.get('duration', 'Unknown')}\n"
|
||||
text += f"P/L: *{profit:+.2f} PED*\n"
|
||||
text += f"Return: {session_data.get('return_pct', 0):.1f}%\n"
|
||||
text += f"Globals: {session_data.get('globals', 0)}"
|
||||
|
||||
return self.send_message(text)
|
||||
|
||||
|
||||
class NotificationManager:
|
||||
"""
|
||||
Central notification manager.
|
||||
Handles all notification services and event filtering.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[NotificationConfig] = None):
|
||||
self.config = config or NotificationConfig()
|
||||
|
||||
self.discord: Optional[DiscordNotifier] = None
|
||||
self.telegram: Optional[TelegramNotifier] = None
|
||||
|
||||
if self.config.discord_webhook:
|
||||
self.discord = DiscordNotifier(self.config.discord_webhook)
|
||||
|
||||
if self.config.telegram_bot_token and self.config.telegram_chat_id:
|
||||
self.telegram = TelegramNotifier(
|
||||
self.config.telegram_bot_token,
|
||||
self.config.telegram_chat_id
|
||||
)
|
||||
|
||||
def on_global(self, item_name: str, value: Decimal):
|
||||
"""Handle global event."""
|
||||
if not self.config.notify_on_global:
|
||||
return
|
||||
|
||||
logger.info(f"Sending global notification: {item_name} - {value} PED")
|
||||
|
||||
if self.discord:
|
||||
self.discord.send_loot_alert(item_name, value, is_hof=False)
|
||||
|
||||
if self.telegram:
|
||||
self.telegram.send_loot_alert(item_name, value, is_hof=False)
|
||||
|
||||
def on_hof(self, item_name: str, value: Decimal):
|
||||
"""Handle HoF event."""
|
||||
if not self.config.notify_on_hof:
|
||||
return
|
||||
|
||||
logger.info(f"Sending HoF notification: {item_name} - {value} PED")
|
||||
|
||||
if self.discord:
|
||||
self.discord.send_loot_alert(item_name, value, is_hof=True)
|
||||
|
||||
if self.telegram:
|
||||
self.telegram.send_loot_alert(item_name, value, is_hof=True)
|
||||
|
||||
def on_session_end(self, session_data: Dict):
|
||||
"""Handle session end."""
|
||||
profit = session_data.get('profit_loss', Decimal("0"))
|
||||
|
||||
# Check thresholds
|
||||
if self.config.notify_on_profit_threshold and profit >= self.config.profit_threshold:
|
||||
pass # Will send below
|
||||
elif self.config.notify_on_loss_threshold and profit <= self.config.loss_threshold:
|
||||
pass # Will send below
|
||||
else:
|
||||
return
|
||||
|
||||
logger.info(f"Sending session summary: {profit:+.2f} PED")
|
||||
|
||||
if self.discord:
|
||||
self.discord.send_session_summary(session_data)
|
||||
|
||||
if self.telegram:
|
||||
self.telegram.send_session_summary(session_data)
|
||||
|
||||
def send_custom_message(self, message: str):
|
||||
"""Send custom message to all channels."""
|
||||
if self.discord:
|
||||
self.discord.send_message(message)
|
||||
|
||||
if self.telegram:
|
||||
self.telegram.send_message(message)
|
||||
|
||||
|
||||
class SoundNotifier:
|
||||
"""
|
||||
Play sound alerts locally.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.enabled = True
|
||||
self.global_sound = "global.wav" # Would need actual sound files
|
||||
self.hof_sound = "hof.wav"
|
||||
|
||||
def play_global_sound(self):
|
||||
"""Play global alert sound."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
try:
|
||||
if sys.platform == 'win32':
|
||||
import winsound
|
||||
winsound.MessageBeep(winsound.MB_OK)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to play sound: {e}")
|
||||
|
||||
def play_hof_sound(self):
|
||||
"""Play HoF alert sound."""
|
||||
if not self.enabled:
|
||||
return
|
||||
|
||||
try:
|
||||
if sys.platform == 'win32':
|
||||
import winsound
|
||||
winsound.MessageBeep(winsound.MB_ICONEXCLAMATION)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to play sound: {e}")
|
||||
|
||||
|
||||
# Export main classes
|
||||
__all__ = [
|
||||
'NotificationManager',
|
||||
'DiscordNotifier',
|
||||
'TelegramNotifier',
|
||||
'SoundNotifier',
|
||||
'NotificationConfig'
|
||||
]
|
||||
Loading…
Reference in New Issue