diff --git a/modules/auto_screenshot.py b/modules/auto_screenshot.py new file mode 100644 index 0000000..9fa9731 --- /dev/null +++ b/modules/auto_screenshot.py @@ -0,0 +1,125 @@ +""" +Lemontropia Suite - Auto Screenshot +Automatically capture screenshots on important events. +""" + +import cv2 +import logging +from pathlib import Path +from datetime import datetime +from typing import Optional, List +import mss +import numpy as np + +logger = logging.getLogger(__name__) + + +class AutoScreenshot: + """ + Auto-screenshot capture system. + Captures screen on globals, HoFs, or user-defined events. + """ + + def __init__(self, screenshot_dir: Optional[Path] = None): + self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots" + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + + self.capture = mss.mss() + self.session_screenshots: List[Path] = [] + + # Settings + self.on_global = True + self.on_hof = True + self.on_profit_threshold = False + self.profit_threshold = 50 # PED + + def capture_full_screen(self, filename: Optional[str] = None) -> Path: + """Capture full screen.""" + if filename is None: + filename = f"screenshot_{datetime.now():%Y%m%d_%H%M%S}.png" + + filepath = self.screenshot_dir / filename + + try: + monitor = self.capture.monitors[1] # Primary monitor + screenshot = self.capture.grab(monitor) + img = np.array(screenshot) + img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + cv2.imwrite(str(filepath), img) + + self.session_screenshots.append(filepath) + logger.info(f"Screenshot saved: {filepath}") + return filepath + except Exception as e: + logger.error(f"Failed to capture screenshot: {e}") + return None + + def capture_region(self, x: int, y: int, w: int, h: int, + filename: Optional[str] = None) -> Path: + """Capture specific region.""" + if filename is None: + filename = f"region_{datetime.now():%Y%m%d_%H%M%S}.png" + + filepath = self.screenshot_dir / filename + + try: + monitor = {"left": x, "top": y, "width": w, "height": h} + screenshot = self.capture.grab(monitor) + img = np.array(screenshot) + img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + cv2.imwrite(str(filepath), img) + + self.session_screenshots.append(filepath) + logger.info(f"Region screenshot saved: {filepath}") + return filepath + except Exception as e: + logger.error(f"Failed to capture region: {e}") + return None + + def on_global(self, item_name: str, value: float): + """Handle global event.""" + if not self.on_global: + return + + filename = f"global_{datetime.now():%Y%m%d_%H%M%S}_{item_name[:20]}.png" + self.capture_full_screen(filename) + + def on_hof(self, item_name: str, value: float): + """Handle HoF event.""" + if not self.on_hof: + return + + filename = f"hof_{datetime.now():%Y%m%d_%H%M%S}_{item_name[:20]}.png" + self.capture_full_screen(filename) + + def get_session_summary(self) -> dict: + """Get summary of screenshots taken this session.""" + return { + 'total_screenshots': len(self.session_screenshots), + 'directory': str(self.screenshot_dir), + 'screenshots': [str(p) for p in self.session_screenshots] + } + + +class ScreenshotViewer: + """ + Simple viewer for browsing screenshots. + """ + + def __init__(self, screenshot_dir: Optional[Path] = None): + self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots" + + def list_screenshots(self, pattern: str = "*.png") -> List[Path]: + """List all screenshots.""" + return sorted(self.screenshot_dir.glob(pattern), key=lambda x: x.stat().st_mtime, reverse=True) + + def view_latest(self, n: int = 5): + """View latest N screenshots.""" + screenshots = self.list_screenshots()[:n] + + for i, path in enumerate(screenshots, 1): + print(f"{i}. {path.name} ({path.stat().st_size / 1024:.1f} KB)") + + +# Export main classes +__all__ = ['AutoScreenshot', 'ScreenshotViewer'] diff --git a/modules/crafting_tracker.py b/modules/crafting_tracker.py new file mode 100644 index 0000000..da9384c --- /dev/null +++ b/modules/crafting_tracker.py @@ -0,0 +1,359 @@ +""" +Lemontropia Suite - Crafting Tracker +Track crafting attempts, success rates, near successes, and profitability. +""" + +import json +import logging +from decimal import Decimal +from pathlib import Path +from dataclasses import dataclass, field +from typing import Dict, List, Optional +from datetime import datetime +from collections import defaultdict + +logger = logging.getLogger(__name__) + + +@dataclass +class CraftAttempt: + """Single crafting attempt record.""" + blueprint: str + qr_before: Decimal + qr_after: Decimal + success: bool + near_success: bool + materials_cost: Decimal + output_value: Decimal + timestamp: datetime + clicks: int = 1 + + +@dataclass +class BlueprintStats: + """Statistics for a specific blueprint.""" + blueprint_name: str + total_attempts: int = 0 + successes: int = 0 + near_successes: int = 0 + failures: int = 0 + qr_gained: Decimal = field(default_factory=lambda: Decimal("0")) + total_cost: Decimal = field(default_factory=lambda: Decimal("0")) + total_output_value: Decimal = field(default_factory=lambda: Decimal("0")) + current_qr: Decimal = field(default_factory=lambda: Decimal("0")) + + @property + def success_rate(self) -> Decimal: + """Calculate success rate percentage.""" + if self.total_attempts > 0: + return (Decimal(self.successes) / Decimal(self.total_attempts)) * 100 + return Decimal("0") + + @property + def near_success_rate(self) -> Decimal: + """Calculate near success rate.""" + if self.total_attempts > 0: + return (Decimal(self.near_successes) / Decimal(self.total_attempts)) * 100 + return Decimal("0") + + @property + def profit_loss(self) -> Decimal: + """Calculate profit/loss.""" + return self.total_output_value - self.total_cost + + @property + def cost_per_success(self) -> Decimal: + """Average cost per successful click.""" + if self.successes > 0: + return self.total_cost / self.successes + return Decimal("0") + + +class CraftingTracker: + """ + Comprehensive crafting session tracker. + + Tracks: + - Success/failure/near-success rates + - QR progression on blueprints + - Material costs vs output value + - Profitability per blueprint + """ + + def __init__(self, data_dir: Optional[Path] = None): + self.data_dir = data_dir or Path.home() / ".lemontropia" / "crafting" + self.data_dir.mkdir(parents=True, exist_ok=True) + + # Current session data + self.blueprint_stats: Dict[str, BlueprintStats] = {} + self.session_attempts: List[CraftAttempt] = [] + + # Load historical data + self._load_data() + + def _load_data(self): + """Load blueprint history.""" + try: + data_file = self.data_dir / "blueprint_history.json" + if data_file.exists(): + with open(data_file, 'r') as f: + data = json.load(f) + for name, stats in data.items(): + self.blueprint_stats[name] = BlueprintStats( + blueprint_name=name, + total_attempts=stats.get('attempts', 0), + successes=stats.get('successes', 0), + near_successes=stats.get('near_successes', 0), + failures=stats.get('failures', 0), + qr_gained=Decimal(str(stats.get('qr_gained', 0))), + total_cost=Decimal(str(stats.get('total_cost', 0))), + total_output_value=Decimal(str(stats.get('output_value', 0))), + current_qr=Decimal(str(stats.get('current_qr', 0))), + ) + except Exception as e: + logger.error(f"Failed to load crafting data: {e}") + + def _save_data(self): + """Save blueprint history.""" + try: + data_file = self.data_dir / "blueprint_history.json" + data = {} + for name, stats in self.blueprint_stats.items(): + data[name] = { + 'attempts': stats.total_attempts, + 'successes': stats.successes, + 'near_successes': stats.near_successes, + 'failures': stats.failures, + 'qr_gained': str(stats.qr_gained), + 'total_cost': str(stats.total_cost), + 'output_value': str(stats.total_output_value), + 'current_qr': str(stats.current_qr), + } + with open(data_file, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + logger.error(f"Failed to save crafting data: {e}") + + def record_attempt(self, blueprint: str, success: bool, near_success: bool, + materials_cost: Decimal, output_value: Decimal, + qr_before: Decimal = Decimal("0"), + qr_after: Decimal = Decimal("0"), + clicks: int = 1): + """Record a crafting attempt.""" + + # Create or get blueprint stats + if blueprint not in self.blueprint_stats: + self.blueprint_stats[blueprint] = BlueprintStats(blueprint) + + stats = self.blueprint_stats[blueprint] + + # Update stats + stats.total_attempts += clicks + if success: + stats.successes += clicks + elif near_success: + stats.near_successes += clicks + else: + stats.failures += clicks + + stats.total_cost += materials_cost + stats.total_output_value += output_value + stats.current_qr = qr_after + + qr_diff = qr_after - qr_before + if qr_diff > 0: + stats.qr_gained += qr_diff + + # Record attempt + attempt = CraftAttempt( + blueprint=blueprint, + qr_before=qr_before, + qr_after=qr_after, + success=success, + near_success=near_success, + materials_cost=materials_cost, + output_value=output_value, + timestamp=datetime.now(), + clicks=clicks + ) + self.session_attempts.append(attempt) + + # Auto-save + self._save_data() + + logger.info(f"Craft: {blueprint} - {'SUCCESS' if success else 'NEAR' if near_success else 'FAIL'} " + f"(QR: {qr_before}% -> {qr_after}%)") + + def get_blueprint_summary(self, blueprint: str) -> Optional[BlueprintStats]: + """Get stats for a specific blueprint.""" + return self.blueprint_stats.get(blueprint) + + def get_all_summaries(self) -> List[BlueprintStats]: + """Get all blueprint stats sorted by usage.""" + return sorted( + self.blueprint_stats.values(), + key=lambda x: x.total_attempts, + reverse=True + ) + + def get_session_summary(self) -> Dict: + """Get current session summary.""" + total_attempts = len(self.session_attempts) + total_successes = sum(1 for a in self.session_attempts if a.success) + total_near = sum(1 for a in self.session_attempts if a.near_success) + + total_cost = sum(a.materials_cost for a in self.session_attempts) + total_output = sum(a.output_value for a in self.session_attempts) + + return { + 'total_attempts': total_attempts, + 'successes': total_successes, + 'near_successes': total_near, + 'failures': total_attempts - total_successes - total_near, + 'success_rate': (Decimal(total_successes) / Decimal(total_attempts) * 100) if total_attempts > 0 else Decimal("0"), + 'total_cost': total_cost, + 'total_output': total_output, + 'profit_loss': total_output - total_cost, + 'blueprints_used': len(set(a.blueprint for a in self.session_attempts)), + } + + def get_success_rate_by_qr_range(self) -> Dict[str, Decimal]: + """Analyze success rate by QR ranges.""" + ranges = { + '0-25%': {'attempts': 0, 'successes': 0}, + '25-50%': {'attempts': 0, 'successes': 0}, + '50-75%': {'attempts': 0, 'successes': 0}, + '75-100%': {'attempts': 0, 'successes': 0}, + } + + for attempt in self.session_attempts: + qr = attempt.qr_before + if qr < 25: + key = '0-25%' + elif qr < 50: + key = '25-50%' + elif qr < 75: + key = '50-75%' + else: + key = '75-100%' + + ranges[key]['attempts'] += 1 + if attempt.success: + ranges[key]['successes'] += 1 + + return { + r: (Decimal(data['successes']) / Decimal(data['attempts']) * 100) if data['attempts'] > 0 else Decimal("0") + for r, data in ranges.items() + } + + def generate_report(self) -> str: + """Generate detailed crafting report.""" + session = self.get_session_summary() + + report = [] + report.append("=" * 60) + report.append("CRAFTING SESSION REPORT") + report.append("=" * 60) + report.append(f"Total Attempts: {session['total_attempts']}") + report.append(f"Success Rate: {session['success_rate']:.1f}%") + report.append(f"Material Cost: {session['total_cost']:.2f} PED") + report.append(f"Output Value: {session['total_output']:.2f} PED") + report.append(f"Profit/Loss: {session['profit_loss']:+.2f} PED") + report.append("") + + report.append("BLUEPRINT BREAKDOWN:") + for stats in self.get_all_summaries()[:10]: + report.append(f" {stats.blueprint_name[:25]:25} " + f"SR: {stats.success_rate:5.1f}% " + f"P/L: {stats.profit_loss:+7.2f}") + + report.append("") + report.append("SUCCESS RATE BY QR RANGE:") + for range_name, rate in self.get_success_rate_by_qr_range().items(): + report.append(f" {range_name}: {rate:.1f}%") + + return "\n".join(report) + + def should_continue_crafting(self, blueprint: str, max_loss: Decimal = Decimal("50")) -> bool: + """Advise whether to continue crafting a blueprint.""" + stats = self.blueprint_stats.get(blueprint) + if not stats: + return True + + # Stop if losing too much + if stats.profit_loss < -max_loss: + return False + + # Stop if success rate is terrible and QR is high + if stats.success_rate < 20 and stats.current_qr > 80: + return False + + return True + + +class MaterialTracker: + """ + Track material inventory and consumption. + """ + + def __init__(self, data_dir: Optional[Path] = None): + self.data_dir = data_dir or Path.home() / ".lemontropia" / "materials" + self.data_dir.mkdir(parents=True, exist_ok=True) + + self.inventory: Dict[str, Decimal] = {} + self._load_inventory() + + def _load_inventory(self): + """Load material inventory.""" + try: + inv_file = self.data_dir / "inventory.json" + if inv_file.exists(): + with open(inv_file, 'r') as f: + data = json.load(f) + self.inventory = {k: Decimal(str(v)) for k, v in data.items()} + except Exception as e: + logger.error(f"Failed to load inventory: {e}") + + def _save_inventory(self): + """Save material inventory.""" + try: + inv_file = self.data_dir / "inventory.json" + with open(inv_file, 'w') as f: + json.dump({k: str(v) for k, v in self.inventory.items()}, f, indent=2) + except Exception as e: + logger.error(f"Failed to save inventory: {e}") + + def add_material(self, name: str, quantity: Decimal, unit_value: Optional[Decimal] = None): + """Add materials to inventory.""" + if name in self.inventory: + self.inventory[name] += quantity + else: + self.inventory[name] = quantity + self._save_inventory() + + def consume_material(self, name: str, quantity: Decimal) -> bool: + """Consume materials from inventory.""" + if name not in self.inventory or self.inventory[name] < quantity: + logger.warning(f"Insufficient {name} (have {self.inventory.get(name, 0)}, need {quantity})") + return False + + self.inventory[name] -= quantity + if self.inventory[name] <= 0: + del self.inventory[name] + + self._save_inventory() + return True + + def get_inventory_value(self, price_lookup: Optional[Dict[str, Decimal]] = None) -> Decimal: + """Calculate total inventory value.""" + total = Decimal("0") + for item, qty in self.inventory.items(): + price = Decimal("1") # Default value + if price_lookup and item in price_lookup: + price = price_lookup[item] + total += qty * price + return total + + +# Export main classes +__all__ = ['CraftingTracker', 'MaterialTracker', 'BlueprintStats', 'CraftAttempt'] diff --git a/modules/game_vision.py b/modules/game_vision.py new file mode 100644 index 0000000..406b856 --- /dev/null +++ b/modules/game_vision.py @@ -0,0 +1,365 @@ +""" +Lemontropia Suite - Game Vision System +Computer vision module for reading UI elements from Entropia Universe. +""" + +import cv2 +import numpy as np +import logging +from pathlib import Path +from decimal import Decimal +from dataclasses import dataclass +from typing import Optional, Tuple, List, Dict, Any +import mss +import time + +# Try to import PaddleOCR, fallback to None if not available +try: + from paddleocr import PaddleOCR + PADDLE_AVAILABLE = True +except ImportError: + PADDLE_AVAILABLE = False + +logger = logging.getLogger(__name__) + + +@dataclass +class DetectedText: + """Detected text with metadata.""" + text: str + confidence: float + region: Tuple[int, int, int, int] # x, y, w, h + + +@dataclass +class EquippedGear: + """Currently equipped gear detected from game.""" + weapon_name: Optional[str] = None + weapon_confidence: float = 0.0 + armor_name: Optional[str] = None + armor_confidence: float = 0.0 + tool_name: Optional[str] = None + tool_confidence: float = 0.0 + detected_at: Optional[float] = None + + +@dataclass +class TargetInfo: + """Current target mob information.""" + mob_name: Optional[str] = None + confidence: float = 0.0 + health_percent: Optional[int] = None + detected_at: Optional[float] = None + + +class ScreenCapture: + """Cross-platform screen capture.""" + + def __init__(self): + self.sct = mss.mss() + + def capture_full_screen(self) -> np.ndarray: + """Capture full screen.""" + monitor = self.sct.monitors[1] # Primary monitor + screenshot = self.sct.grab(monitor) + # Convert to numpy array (BGR for OpenCV) + img = np.array(screenshot) + return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + + def capture_region(self, x: int, y: int, w: int, h: int) -> np.ndarray: + """Capture specific region.""" + monitor = {"left": x, "top": y, "width": w, "height": h} + screenshot = self.sct.grab(monitor) + img = np.array(screenshot) + return cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + + def find_window(self, window_title: str) -> Optional[Tuple[int, int, int, int]]: + """Find window by title (Windows only).""" + try: + import win32gui + + def callback(hwnd, extra): + if win32gui.IsWindowVisible(hwnd): + title = win32gui.GetWindowText(hwnd) + if window_title.lower() in title.lower(): + rect = win32gui.GetWindowRect(hwnd) + extra.append((rect[0], rect[1], rect[2] - rect[0], rect[3] - rect[1])) + + windows = [] + win32gui.EnumWindows(callback, windows) + return windows[0] if windows else None + except Exception as e: + logger.error(f"Failed to find window: {e}") + return None + + +class TemplateMatcher: + """Template matching for finding UI elements.""" + + def __init__(self, templates_dir: Optional[Path] = None): + self.templates_dir = templates_dir or Path(__file__).parent / "templates" + self.templates: Dict[str, np.ndarray] = {} + self._load_templates() + + def _load_templates(self): + """Load template images.""" + if not self.templates_dir.exists(): + logger.warning(f"Templates directory not found: {self.templates_dir}") + return + + for template_file in self.templates_dir.glob("*.png"): + try: + name = template_file.stem + self.templates[name] = cv2.imread(str(template_file), cv2.IMREAD_GRAYSCALE) + logger.info(f"Loaded template: {name}") + except Exception as e: + logger.error(f"Failed to load template {template_file}: {e}") + + def find_template(self, screenshot: np.ndarray, template_name: str, + threshold: float = 0.8) -> Optional[Tuple[int, int, int, int]]: + """Find template in screenshot.""" + if template_name not in self.templates: + logger.warning(f"Template not found: {template_name}") + return None + + template = self.templates[template_name] + gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY) + + result = cv2.matchTemplate(gray, template, cv2.TM_CCOEFF_NORMED) + min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) + + if max_val >= threshold: + x, y = max_loc + h, w = template.shape + return (x, y, w, h) + + return None + + def find_all_templates(self, screenshot: np.ndarray, threshold: float = 0.7) -> Dict[str, Tuple[int, int, int, int]]: + """Find all known templates in screenshot.""" + found = {} + for name in self.templates: + result = self.find_template(screenshot, name, threshold) + if result: + found[name] = result + return found + + +class GameVision: + """ + Main computer vision interface for reading game UI. + """ + + def __init__(self, use_ocr: bool = True): + self.capture = ScreenCapture() + self.template_matcher = TemplateMatcher() + + # Initialize OCR if available + self.ocr = None + if use_ocr and PADDLE_AVAILABLE: + try: + self.ocr = PaddleOCR( + lang='en', + use_gpu=False, + show_log=False, + det_model_dir=None, + rec_model_dir=None, + ) + logger.info("PaddleOCR initialized") + except Exception as e: + logger.error(f"Failed to initialize PaddleOCR: {e}") + + # Region definitions (relative to game window) + # These would be calibrated based on actual UI + self.regions = { + 'weapon_slot': None, # To be defined + 'armor_slot': None, + 'target_window': None, + 'health_bar': None, + } + + self.last_equipped: Optional[EquippedGear] = None + self.last_target: Optional[TargetInfo] = None + + def read_text_region(self, screenshot: np.ndarray, region: Tuple[int, int, int, int]) -> List[DetectedText]: + """Read text from a specific region using OCR.""" + if self.ocr is None: + logger.warning("OCR not available") + return [] + + x, y, w, h = region + crop = screenshot[y:y+h, x:x+w] + + # Preprocess for better OCR + gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) + _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY) + + try: + result = self.ocr.ocr(thresh, cls=False) + + detected = [] + if result and result[0]: + for line in result[0]: + bbox, (text, confidence) = line + detected.append(DetectedText( + text=text.strip(), + confidence=confidence, + region=(x, y, w, h) # Simplified region + )) + return detected + except Exception as e: + logger.error(f"OCR failed: {e}") + return [] + + def detect_equipped_weapon(self, screenshot: Optional[np.ndarray] = None) -> Optional[str]: + """Detect currently equipped weapon name.""" + if screenshot is None: + screenshot = self.capture.capture_full_screen() + + # Find weapon slot region using template matching + region = self.template_matcher.find_template(screenshot, 'weapon_slot') + if not region: + logger.debug("Weapon slot not found") + return None + + # Adjust region to focus on text area + x, y, w, h = region + text_region = (x, y + h, w, 20) # Below the icon + + # Read text + texts = self.read_text_region(screenshot, text_region) + if texts: + best = max(texts, key=lambda x: x.confidence) + if best.confidence > 0.7: + return best.text + + return None + + def detect_equipped_armor(self, screenshot: Optional[np.ndarray] = None) -> Optional[str]: + """Detect currently equipped armor name.""" + if screenshot is None: + screenshot = self.capture.capture_full_screen() + + region = self.template_matcher.find_template(screenshot, 'armor_slot') + if not region: + return None + + texts = self.read_text_region(screenshot, region) + if texts: + best = max(texts, key=lambda x: x.confidence) + if best.confidence > 0.7: + return best.text + + return None + + def detect_target_mob(self, screenshot: Optional[np.ndarray] = None) -> Optional[TargetInfo]: + """Detect current target mob name.""" + if screenshot is None: + screenshot = self.capture.capture_full_screen() + + region = self.template_matcher.find_template(screenshot, 'target_window') + if not region: + return None + + texts = self.read_text_region(screenshot, region) + if texts: + # First text is usually the mob name + best = texts[0] + if best.confidence > 0.6: + return TargetInfo( + mob_name=best.text, + confidence=best.confidence, + detected_at=time.time() + ) + + return None + + def scan_equipped_gear(self) -> EquippedGear: + """Full scan of all equipped gear.""" + screenshot = self.capture.capture_full_screen() + + gear = EquippedGear(detected_at=time.time()) + + weapon = self.detect_equipped_weapon(screenshot) + if weapon: + gear.weapon_name = weapon + gear.weapon_confidence = 0.8 # Placeholder + + armor = self.detect_equipped_armor(screenshot) + if armor: + gear.armor_name = armor + gear.armor_confidence = 0.8 + + self.last_equipped = gear + return gear + + def poll_target(self, interval: float = 2.0) -> Optional[TargetInfo]: + """Poll for target changes.""" + current_time = time.time() + + if (self.last_target and + self.last_target.detected_at and + current_time - self.last_target.detected_at < interval): + return self.last_target + + target = self.detect_target_mob() + if target: + self.last_target = target + + return target + + +class TemplateCaptureTool: + """ + Interactive tool for capturing UI templates. + Usage: Run this to create template images for UI elements. + """ + + def __init__(self): + self.capture = ScreenCapture() + self.templates_dir = Path.home() / ".lemontropia" / "templates" + self.templates_dir.mkdir(parents=True, exist_ok=True) + + def capture_template(self, name: str, region: Tuple[int, int, int, int]): + """Capture and save a template.""" + x, y, w, h = region + img = self.capture.capture_region(x, y, w, h) + + filepath = self.templates_dir / f"{name}.png" + cv2.imwrite(str(filepath), img) + logger.info(f"Template saved: {filepath}") + return filepath + + def interactive_capture(self): + """Interactive template capture.""" + print("Template Capture Tool") + print("=" * 50) + print("1. Position your mouse at top-left of UI element") + print("2. Press SPACE to capture") + print("3. Enter template name") + print("4. Repeat for all templates") + print("=" * 50) + + templates_to_capture = [ + 'weapon_slot', + 'armor_slot', + 'target_window', + 'health_bar', + 'inventory_icon', + ] + + for template_name in templates_to_capture: + input(f"\nReady to capture: {template_name}") + print("Taking screenshot in 2 seconds...") + time.sleep(2) + + # Full screenshot + full = self.capture.capture_full_screen() + + # TODO: Allow user to draw region + # For now, use hardcoded regions based on typical EU layout + print(f"Template {template_name} would be captured here") + + +# Export main classes +__all__ = ['GameVision', 'ScreenCapture', 'TemplateMatcher', 'EquippedGear', 'TargetInfo', 'TemplateCaptureTool'] diff --git a/modules/loot_analyzer.py b/modules/loot_analyzer.py new file mode 100644 index 0000000..507895e --- /dev/null +++ b/modules/loot_analyzer.py @@ -0,0 +1,359 @@ +""" +Lemontropia Suite - Enhanced Loot Analyzer +Detailed loot breakdown by mob type, item category, and value ranges. +""" + +import json +import logging +from decimal import Decimal +from pathlib import Path +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Tuple +from datetime import datetime +from collections import defaultdict + +logger = logging.getLogger(__name__) + + +@dataclass +class LootItem: + """Individual loot item record.""" + name: str + quantity: int + value_ped: Decimal + timestamp: datetime + mob_name: Optional[str] = None + session_id: Optional[int] = None + + +@dataclass +class LootStats: + """Statistics for a specific item type.""" + item_name: str + total_count: int = 0 + total_value: Decimal = field(default_factory=lambda: Decimal("0")) + max_value: Decimal = field(default_factory=lambda: Decimal("0")) + min_value: Decimal = field(default_factory=lambda: Decimal("999999")) + avg_value: Decimal = field(default_factory=lambda: Decimal("0")) + + def add_loot(self, value: Decimal, count: int = 1): + """Add loot to statistics.""" + self.total_count += count + self.total_value += value + if value > self.max_value: + self.max_value = value + if value < self.min_value: + self.min_value = value + self.avg_value = self.total_value / self.total_count if self.total_count > 0 else Decimal("0") + + +@dataclass +class MobStats: + """Statistics for a specific mob type.""" + mob_name: str + kill_count: int = 0 + total_loot: Decimal = field(default_factory=lambda: Decimal("0")) + item_breakdown: Dict[str, LootStats] = field(default_factory=dict) + + def add_kill(self, loot_value: Decimal, items: List[Tuple[str, Decimal]]): + """Record a kill with its loot.""" + self.kill_count += 1 + self.total_loot += loot_value + + for item_name, value in items: + if item_name not in self.item_breakdown: + self.item_breakdown[item_name] = LootStats(item_name) + self.item_breakdown[item_name].add_loot(value) + + +class LootAnalyzer: + """ + Advanced loot analysis for hunting sessions. + + Features: + - Track loot by mob type + - Item category breakdown + - Value distribution analysis + - Session comparison + """ + + def __init__(self, data_dir: Optional[Path] = None): + self.data_dir = data_dir or Path.home() / ".lemontropia" / "loot_analysis" + self.data_dir.mkdir(parents=True, exist_ok=True) + + # Current session data + self.current_session_items: List[LootItem] = [] + self.mob_stats: Dict[str, MobStats] = defaultdict(lambda: MobStats("")) + self.global_items: List[LootItem] = [] # Track globals separately + + # Item categories (expandable) + self.item_categories = { + 'shrapnel': ['shrapnel'], + 'ores': ['iron stone', 'lysterium stone', 'belkar stone', 'caldorite stone'], + 'enmatters': ['muscle oil', 'tir oil', 'crude oil'], + 'animal_parts': ['wool', 'hide', 'meat', 'bone'], + 'robot_parts': ['robot component', 'robot filter'], + 'weapons': ['weapon', 'rifle', 'pistol', 'sword'], + 'armor': ['armor', 'harness', 'helmet', 'boots'], + 'misc': [] + } + + self._load_historical_data() + + def _load_historical_data(self): + """Load historical loot data for comparison.""" + try: + history_file = self.data_dir / "loot_history.json" + if history_file.exists(): + with open(history_file, 'r') as f: + data = json.load(f) + # Could restore previous session stats here + except Exception as e: + logger.error(f"Failed to load loot history: {e}") + + def _save_historical_data(self): + """Save current session data for future comparison.""" + try: + history_file = self.data_dir / "loot_history.json" + # Save summary stats + data = { + 'last_session': datetime.now().isoformat(), + 'total_mobs': sum(m.kill_count for m in self.mob_stats.values()), + 'total_loot': str(sum(m.total_loot for m in self.mob_stats.values())), + } + with open(history_file, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + logger.error(f"Failed to save loot history: {e}") + + def categorize_item(self, item_name: str) -> str: + """Categorize an item by name.""" + name_lower = item_name.lower() + + for category, items in self.item_categories.items(): + for item_pattern in items: + if item_pattern in name_lower: + return category + + return 'misc' + + def record_loot(self, item_name: str, quantity: int, value_ped: Decimal, + mob_name: Optional[str] = None, is_global: bool = False) -> None: + """Record a loot item.""" + item = LootItem( + name=item_name, + quantity=quantity, + value_ped=value_ped, + timestamp=datetime.now(), + mob_name=mob_name + ) + + self.current_session_items.append(item) + + if is_global: + self.global_items.append(item) + + # Update mob stats if we know the mob + if mob_name: + if mob_name not in self.mob_stats: + self.mob_stats[mob_name] = MobStats(mob_name) + # Note: This is simplified - ideally we batch items per kill + + def get_session_summary(self) -> Dict: + """Get summary of current session.""" + total_items = len(self.current_session_items) + total_value = sum(item.value_ped for item in self.current_session_items) + + # Category breakdown + category_values = defaultdict(lambda: Decimal("0")) + for item in self.current_session_items: + cat = self.categorize_item(item.name) + category_values[cat] += item.value_ped + + return { + 'total_items': total_items, + 'total_value': total_value, + 'globals_count': len(self.global_items), + 'category_breakdown': dict(category_values), + 'mob_types': len(self.mob_stats), + } + + def get_top_loot(self, n: int = 10) -> List[LootItem]: + """Get top N highest value loot items.""" + sorted_items = sorted( + self.current_session_items, + key=lambda x: x.value_ped, + reverse=True + ) + return sorted_items[:n] + + def get_mob_efficiency(self) -> Dict[str, Decimal]: + """Get loot per kill for each mob type.""" + return { + mob_name: stats.total_loot / stats.kill_count + for mob_name, stats in self.mob_stats.items() + if stats.kill_count > 0 + } + + def generate_report(self) -> str: + """Generate text report of loot analysis.""" + summary = self.get_session_summary() + + report = [] + report.append("=" * 50) + report.append("LOOT ANALYSIS REPORT") + report.append("=" * 50) + report.append(f"Total Items: {summary['total_items']}") + report.append(f"Total Value: {summary['total_value']:.2f} PED") + report.append(f"Globals: {summary['globals_count']}") + report.append(f"Mob Types: {summary['mob_types']}") + report.append("") + + report.append("CATEGORY BREAKDOWN:") + for cat, value in sorted(summary['category_breakdown'].items(), + key=lambda x: x[1], reverse=True): + pct = (value / summary['total_value'] * 100) if summary['total_value'] > 0 else 0 + report.append(f" {cat.capitalize():12} {value:8.2f} PED ({pct:5.1f}%)") + + report.append("") + report.append("TOP 5 LOOT ITEMS:") + for i, item in enumerate(self.get_top_loot(5), 1): + report.append(f" {i}. {item.name[:25]:25} {item.value_ped:8.2f} PED") + + report.append("") + report.append("MOB EFFICIENCY:") + for mob, efficiency in sorted(self.get_mob_efficiency().items(), + key=lambda x: x[1], reverse=True)[:5]: + stats = self.mob_stats[mob] + report.append(f" {mob[:20]:20} {efficiency:6.2f} PED/kill ({stats.kill_count} kills)") + + return "\n".join(report) + + def export_to_csv(self, filepath: Path) -> None: + """Export loot data to CSV.""" + import csv + + with open(filepath, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['Timestamp', 'Item', 'Quantity', 'Value (PED)', 'Mob', 'Category']) + + for item in self.current_session_items: + writer.writerow([ + item.timestamp.isoformat(), + item.name, + item.quantity, + str(item.value_ped), + item.mob_name or 'Unknown', + self.categorize_item(item.name) + ]) + + +class DPSCalculator: + """ + Real-time DPS (Damage Per Second) and DPP (Damage Per Pec) calculator. + """ + + def __init__(self): + self.damage_events: List[Tuple[datetime, Decimal]] = [] + self.total_damage = Decimal("0") + self.total_cost = Decimal("0") # In PEC + + def record_damage(self, damage: Decimal, cost_pec: Decimal = Decimal("0")): + """Record a damage event.""" + self.damage_events.append((datetime.now(), damage)) + self.total_damage += damage + self.total_cost += cost_pec + + # Keep only last 60 seconds for DPS calculation + cutoff = datetime.now() - timedelta(seconds=60) + self.damage_events = [(t, d) for t, d in self.damage_events if t > cutoff] + + def get_current_dps(self) -> Decimal: + """Calculate DPS over last 60 seconds.""" + if len(self.damage_events) < 2: + return Decimal("0") + + total_damage = sum(d for _, d in self.damage_events) + time_span = (self.damage_events[-1][0] - self.damage_events[0][0]).total_seconds() + + if time_span > 0: + return Decimal(str(total_damage)) / Decimal(str(time_span)) + return Decimal("0") + + def get_dpp(self) -> Decimal: + """Calculate Damage Per Pec (efficiency metric).""" + if self.total_cost > 0: + return self.total_damage / self.total_cost + return Decimal("0") + + def get_session_stats(self) -> Dict: + """Get overall session stats.""" + return { + 'total_damage': self.total_damage, + 'total_cost_pec': self.total_cost, + 'total_cost_ped': self.total_cost / 100, + 'dpp': self.get_dpp(), + 'current_dps': self.get_current_dps(), + 'hit_count': len(self.damage_events), + } + + +class GlobalAlertSystem: + """ + Alert system for globals and HoFs. + Plays sound, shows notification, auto-screenshots. + """ + + def __init__(self, screenshot_dir: Optional[Path] = None): + self.screenshot_dir = screenshot_dir or Path.home() / ".lemontropia" / "screenshots" + self.screenshot_dir.mkdir(parents=True, exist_ok=True) + + self.global_sound_enabled = True + self.hof_sound_enabled = True + self.auto_screenshot = True + + def on_global(self, value_ped: Decimal, item_name: str): + """Handle global event.""" + logger.info(f"🌟 GLOBAL! {item_name} - {value_ped:.2f} PED") + + if self.global_sound_enabled: + self._play_sound("global") + + if self.auto_screenshot: + self._take_screenshot(f"global_{datetime.now():%Y%m%d_%H%M%S}") + + def on_hof(self, value_ped: Decimal, item_name: str): + """Handle Hall of Fame event.""" + logger.info(f"🏆 HALL OF FAME! {item_name} - {value_ped:.2f} PED") + + if self.hof_sound_enabled: + self._play_sound("hof") + + if self.auto_screenshot: + self._take_screenshot(f"hof_{datetime.now():%Y%m%d_%H%M%S}") + + def _play_sound(self, sound_type: str): + """Play alert sound.""" + try: + if sys.platform == 'win32': + import winsound + if sound_type == "hof": + winsound.MessageBeep(winsound.MB_ICONEXCLAMATION) + else: + winsound.MessageBeep(winsound.MB_OK) + except Exception as e: + logger.error(f"Failed to play sound: {e}") + + def _take_screenshot(self, filename: str): + """Take screenshot of game window.""" + try: + # This would integrate with the vision system + # For now, just log it + screenshot_path = self.screenshot_dir / f"{filename}.png" + logger.info(f"Screenshot saved: {screenshot_path}") + except Exception as e: + logger.error(f"Failed to take screenshot: {e}") + + +# Export main classes +__all__ = ['LootAnalyzer', 'DPSCalculator', 'GlobalAlertSystem', 'LootStats', 'MobStats'] diff --git a/modules/notifications.py b/modules/notifications.py new file mode 100644 index 0000000..4f77323 --- /dev/null +++ b/modules/notifications.py @@ -0,0 +1,281 @@ +""" +Lemontropia Suite - Notification System +Send alerts to Discord, Telegram, or other services on important events. +""" + +import json +import logging +import requests +from decimal import Decimal +from pathlib import Path +from dataclasses import dataclass +from typing import Optional, Dict, List +from datetime import datetime + +logger = logging.getLogger(__name__) + + +@dataclass +class NotificationConfig: + """Configuration for notifications.""" + discord_webhook: Optional[str] = None + telegram_bot_token: Optional[str] = None + telegram_chat_id: Optional[str] = None + + # Event filters + notify_on_global: bool = True + notify_on_hof: bool = True + notify_on_profit_threshold: bool = False + profit_threshold: Decimal = Decimal("100") + notify_on_loss_threshold: bool = True + loss_threshold: Decimal = Decimal("-50") + + +class DiscordNotifier: + """Send notifications to Discord via webhook.""" + + def __init__(self, webhook_url: str): + self.webhook_url = webhook_url + + def send_message(self, content: str, embeds: Optional[List[Dict]] = None) -> bool: + """Send message to Discord.""" + try: + data = {"content": content} + if embeds: + data["embeds"] = embeds + + response = requests.post( + self.webhook_url, + json=data, + timeout=10 + ) + return response.status_code == 204 + except Exception as e: + logger.error(f"Failed to send Discord notification: {e}") + return False + + def send_loot_alert(self, item_name: str, value: Decimal, is_hof: bool = False): + """Send loot alert.""" + color = 0xFFD700 if is_hof else 0x00FF00 # Gold for HoF, Green for Global + title = "🏆 HALL OF FAME!" if is_hof else "🌟 GLOBAL!" + + embed = { + "title": title, + "description": f"**{item_name}**", + "color": color, + "fields": [ + { + "name": "Value", + "value": f"{value:.2f} PED", + "inline": True + } + ], + "timestamp": datetime.now().isoformat() + } + + return self.send_message("", [embed]) + + def send_session_summary(self, session_data: Dict): + """Send session summary.""" + profit = session_data.get('profit_loss', Decimal("0")) + color = 0x00FF00 if profit >= 0 else 0xFF0000 + + embed = { + "title": "Hunting Session Complete", + "color": color, + "fields": [ + { + "name": "Duration", + "value": session_data.get('duration', 'Unknown'), + "inline": True + }, + { + "name": "Profit/Loss", + "value": f"{profit:+.2f} PED", + "inline": True + }, + { + "name": "Return %", + "value": f"{session_data.get('return_pct', 0):.1f}%", + "inline": True + }, + { + "name": "Globals", + "value": str(session_data.get('globals', 0)), + "inline": True + } + ], + "timestamp": datetime.now().isoformat() + } + + return self.send_message("", [embed]) + + +class TelegramNotifier: + """Send notifications to Telegram via bot.""" + + def __init__(self, bot_token: str, chat_id: str): + self.bot_token = bot_token + self.chat_id = chat_id + self.base_url = f"https://api.telegram.org/bot{bot_token}" + + def send_message(self, text: str) -> bool: + """Send message to Telegram.""" + try: + url = f"{self.base_url}/sendMessage" + data = { + "chat_id": self.chat_id, + "text": text, + "parse_mode": "Markdown" + } + + response = requests.post(url, json=data, timeout=10) + return response.json().get("ok", False) + except Exception as e: + logger.error(f"Failed to send Telegram notification: {e}") + return False + + def send_loot_alert(self, item_name: str, value: Decimal, is_hof: bool = False): + """Send loot alert.""" + emoji = "🏆" if is_hof else "🌟" + title = "HALL OF FAME!" if is_hof else "GLOBAL!" + + text = f"{emoji} *{title}* {emoji}\n\n" + text += f"*{item_name}*\n" + text += f"Value: *{value:.2f} PED*" + + return self.send_message(text) + + def send_session_summary(self, session_data: Dict): + """Send session summary.""" + profit = session_data.get('profit_loss', Decimal("0")) + emoji = "✅" if profit >= 0 else "❌" + + text = f"{emoji} *Session Complete*\n\n" + text += f"Duration: {session_data.get('duration', 'Unknown')}\n" + text += f"P/L: *{profit:+.2f} PED*\n" + text += f"Return: {session_data.get('return_pct', 0):.1f}%\n" + text += f"Globals: {session_data.get('globals', 0)}" + + return self.send_message(text) + + +class NotificationManager: + """ + Central notification manager. + Handles all notification services and event filtering. + """ + + def __init__(self, config: Optional[NotificationConfig] = None): + self.config = config or NotificationConfig() + + self.discord: Optional[DiscordNotifier] = None + self.telegram: Optional[TelegramNotifier] = None + + if self.config.discord_webhook: + self.discord = DiscordNotifier(self.config.discord_webhook) + + if self.config.telegram_bot_token and self.config.telegram_chat_id: + self.telegram = TelegramNotifier( + self.config.telegram_bot_token, + self.config.telegram_chat_id + ) + + def on_global(self, item_name: str, value: Decimal): + """Handle global event.""" + if not self.config.notify_on_global: + return + + logger.info(f"Sending global notification: {item_name} - {value} PED") + + if self.discord: + self.discord.send_loot_alert(item_name, value, is_hof=False) + + if self.telegram: + self.telegram.send_loot_alert(item_name, value, is_hof=False) + + def on_hof(self, item_name: str, value: Decimal): + """Handle HoF event.""" + if not self.config.notify_on_hof: + return + + logger.info(f"Sending HoF notification: {item_name} - {value} PED") + + if self.discord: + self.discord.send_loot_alert(item_name, value, is_hof=True) + + if self.telegram: + self.telegram.send_loot_alert(item_name, value, is_hof=True) + + def on_session_end(self, session_data: Dict): + """Handle session end.""" + profit = session_data.get('profit_loss', Decimal("0")) + + # Check thresholds + if self.config.notify_on_profit_threshold and profit >= self.config.profit_threshold: + pass # Will send below + elif self.config.notify_on_loss_threshold and profit <= self.config.loss_threshold: + pass # Will send below + else: + return + + logger.info(f"Sending session summary: {profit:+.2f} PED") + + if self.discord: + self.discord.send_session_summary(session_data) + + if self.telegram: + self.telegram.send_session_summary(session_data) + + def send_custom_message(self, message: str): + """Send custom message to all channels.""" + if self.discord: + self.discord.send_message(message) + + if self.telegram: + self.telegram.send_message(message) + + +class SoundNotifier: + """ + Play sound alerts locally. + """ + + def __init__(self): + self.enabled = True + self.global_sound = "global.wav" # Would need actual sound files + self.hof_sound = "hof.wav" + + def play_global_sound(self): + """Play global alert sound.""" + if not self.enabled: + return + + try: + if sys.platform == 'win32': + import winsound + winsound.MessageBeep(winsound.MB_OK) + except Exception as e: + logger.error(f"Failed to play sound: {e}") + + def play_hof_sound(self): + """Play HoF alert sound.""" + if not self.enabled: + return + + try: + if sys.platform == 'win32': + import winsound + winsound.MessageBeep(winsound.MB_ICONEXCLAMATION) + except Exception as e: + logger.error(f"Failed to play sound: {e}") + + +# Export main classes +__all__ = [ + 'NotificationManager', + 'DiscordNotifier', + 'TelegramNotifier', + 'SoundNotifier', + 'NotificationConfig' +]