Compare commits

..

No commits in common. "665d140b49bc8de64f54e3ef6b9bfe279688911e" and "70b7e9b237424c8492b10d836c48c31ca8935c86" have entirely different histories.

19 changed files with 0 additions and 5567 deletions

View File

@ -1,378 +0,0 @@
# EU-Utility Testing Guide
Complete testing instructions for EU-Utility Premium with Entropia Universe.
## 🚀 Quick Start
### 1. Install Dependencies
```bash
cd ~/EU-Utility
pip install -r requirements.txt
```
**Required packages:**
- PyQt6 (GUI framework)
- psutil (process monitoring)
- pywin32 (Windows API) - Windows only
- pillow (screenshots)
### 2. Configure Game Path
Edit the config file (created automatically on first run):
```bash
# Windows default paths:
# C:\Program Files (x86)\Entropia Universe\ClientLoader.exe
# C:\Users\<username>\AppData\Local\Entropia Universe
# Or create manually:
python -c "
import json
from pathlib import Path
config = {
'game_path': r'C:\\Program Files (x86)\\Entropia Universe',
'overlay_enabled': True,
'overlay_mode': 'overlay_toggle',
'hotkey': 'ctrl+shift+b',
'plugins_enabled': ['dashboard_widget'],
}
Path.home().joinpath('.eu-utility', 'config.json').write_text(json.dumps(config, indent=2))
"
```
### 3. Launch EU-Utility
```bash
python main.py
```
**Command line options:**
```bash
python main.py --verbose # Debug logging
python main.py --test # Test mode (no game required)
python main.py --no-overlay # Headless mode
```
---
## 🎮 Testing With Entropia Universe
### Test 1: Basic Launch
**Steps:**
1. Launch EU-Utility: `python main.py`
2. Check console output - should show "EU-Utility Premium initialized"
3. Look for system tray icon (small EU logo)
4. Press `Ctrl+Shift+B` to toggle overlay
**Expected:** Overlay appears with dashboard widget
### Test 2: Game Detection
**Steps:**
1. Launch Entropia Universe game client
2. Wait 5-10 seconds
3. Check EU-Utility console for "Game client detected"
**Expected:** Log shows process detection working
**Debug:** If not detected, check game path in config
### Test 3: Overlay Modes
Test each overlay mode in config:
**Mode: `overlay_toggle`** (default)
- Press `Ctrl+Shift+B` to show/hide overlay
- Should toggle instantly (<100ms)
**Mode: `overlay_game`**
- Overlay auto-shows when EU window is focused
- Overlay auto-hides when EU loses focus
- Should be smooth (no lag)
**Mode: `overlay_temp`**
- Press hotkey to show for 8 seconds
- Auto-hides after duration
### Test 4: Dashboard Widget
**Steps:**
1. Open overlay with hotkey
2. Verify dashboard widget loads
3. Check for player stats display
**Features to test:**
- [ ] Player name display
- [ ] Skill levels visible
- [ ] Health/Energy bars
- [ ] PED balance (if available)
### Test 5: Log File Parsing
**Steps:**
1. In game, get some loot or skill gain
2. Check EU-Utility console for events
3. Look for: `[GameEvent] Loot: ...` or `[GameEvent] Skill: ...`
**Expected:** Real-time event detection
**Log locations:**
```
%LOCALAPPDATA%\Entropia Universe\chat.log
%LOCALAPPDATA%\Entropia Universe\Entropia.log
```
### Test 6: Performance Test
**Steps:**
1. Run EU-Utility with game for 30 minutes
2. Monitor CPU usage (should be <5%)
3. Check for "Focus check took Xms" warnings
**Expected:**
- No lag spikes
- Smooth 60fps overlay
- CPU usage minimal
**Debug:** If slow, check overlay_controller.py optimizations
---
## 🔧 Troubleshooting
### Issue: "Game not detected"
**Check:**
1. Game path in config is correct
2. Game is actually running
3. Try running EU-Utility as administrator
**Fix:**
```python
# Update config with correct path
config['game_path'] = r'C:\Your\Actual\Path\To\Entropia Universe'
```
### Issue: "Overlay not appearing"
**Check:**
1. Hotkey is not conflicting with other apps
2. Overlay mode is set correctly
3. No Qt/OpenGL errors in console
**Fix:**
```bash
# Try test mode
python main.py --test
# Or disable overlay temporarily
python main.py --no-overlay
```
### Issue: "App is slow/laggy"
**Check:**
1. Focus check timing in logs
2. CPU/memory usage
3. Number of plugins loaded
**Fix:**
- Reduce poll interval in config
- Disable unused plugins
- Check for window manager conflicts
### Issue: "Plugins not loading"
**Check:**
1. Plugin files exist in `plugins/builtin/`
2. plugin.json is valid JSON
3. No import errors in console
**Fix:**
```bash
# Verify plugin structure
ls plugins/builtin/dashboard_widget/
# Should show: plugin.json, main.py
# Check for syntax errors
python -m py_compile plugins/builtin/dashboard_widget/main.py
```
---
## 🧪 Advanced Testing
### Test Plugin Development
Create a test plugin:
```python
# plugins/user/test_plugin/main.py
from premium.plugins.api import BasePlugin, PluginManifest
class TestPlugin(BasePlugin):
manifest = PluginManifest(
name="Test Plugin",
version="1.0.0",
author="Tester"
)
def on_load(self):
print("Test plugin loaded!")
return True
def on_enable(self):
print("Test plugin enabled!")
def on_disable(self):
print("Test plugin disabled!")
```
```json
// plugins/user/test_plugin/plugin.json
{
"name": "Test Plugin",
"version": "1.0.0",
"author": "Tester",
"main": "main.py",
"entry_point": "TestPlugin"
}
```
### Test Event System
```python
# Test event bus
from premium.core.event_bus import EventBus, Event
bus = EventBus()
@bus.on("game.loot")
def on_loot(event):
print(f"Got loot: {event.data}")
bus.emit("game.loot", {"item": "Ammunition", "amount": 100})
```
### Test State Management
```python
# Test state store
from premium.core.state.store import StateStore, ActionBase
class IncrementAction(ActionBase):
type = "INCREMENT"
def counter_reducer(state, action):
if action.type == "INCREMENT":
return state + 1
return state
store = StateStore(counter_reducer, initial_state=0)
store.dispatch(IncrementAction())
print(store.state) # 1
```
---
## 📊 Performance Benchmarks
### Expected Performance
| Metric | Target | Acceptable |
|--------|--------|------------|
| Startup time | <3s | <5s |
| Focus check | <10ms | <50ms |
| Overlay toggle | <100ms | <200ms |
| CPU usage | <3% | <10% |
| Memory usage | <200MB | <500MB |
| Log parsing | Real-time | <1s delay |
### Benchmark Script
```bash
# Run performance test
python -c "
import time
from premium.eu_integration.game_client import GameClient
start = time.time()
client = GameClient()
client.initialize()
print(f'Init time: {time.time() - start:.2f}s')
# Test detection
start = time.time()
found = client.find_game_process()
print(f'Detection time: {time.time() - start:.2f}s')
print(f'Game found: {found}')
"
```
---
## 🐛 Debug Mode
### Enable Debug Logging
```bash
python main.py --verbose
```
### Key Log Messages
| Message | Meaning |
|---------|---------|
| "Game client detected" | Process found |
| "Window focused" | EU window active |
| "Overlay shown/hidden" | Toggle working |
| "Plugin loaded: X" | Plugin loaded |
| "Focus check took Xms" | Performance warning |
| "Event: game.loot" | Game event detected |
### Check Logs
```bash
# Real-time log view
tail -f ~/.eu-utility/logs/app.log
# Search for errors
grep ERROR ~/.eu-utility/logs/app.log
```
---
## ✅ Final Checklist
Before considering ready for production:
- [ ] App launches without errors
- [ ] Game detection works
- [ ] Overlay appears with hotkey
- [ ] Dashboard shows data
- [ ] Log events are captured
- [ ] Performance is smooth
- [ ] No memory leaks (stable after 1 hour)
- [ ] Plugins load/unload correctly
- [ ] Settings save/load properly
- [ ] Exits gracefully on close
---
## 🆘 Getting Help
If issues persist:
1. Check logs: `~/.eu-utility/logs/`
2. Run tests: `python -m pytest tests/`
3. Check config: `~/.eu-utility/config.json`
4. Verify game running and accessible
**Debug info to collect:**
- Console output
- Log files
- System specs (CPU, RAM, GPU)
- Windows version
- Game version

297
main.py
View File

@ -1,297 +0,0 @@
#!/usr/bin/env python3
"""
EU-Utility Premium - Main Entry Point
======================================
Launch the EU-Utility premium overlay system.
Usage:
python main.py # Start with default config
python main.py --config path/to/config.json
python main.py --no-overlay # Run without overlay (headless)
python main.py --test # Run in test mode
For more information, see TESTING.md
"""
import sys
import json
import logging
import argparse
from pathlib import Path
# Add the EU-Utility directory to path
sys.path.insert(0, str(Path(__file__).parent))
def setup_logging(verbose: bool = False) -> logging.Logger:
"""Setup logging configuration."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
]
)
return logging.getLogger("EU-Utility")
def load_config(config_path: Path) -> dict:
"""Load configuration from file."""
if not config_path.exists():
# Create default config
default_config = {
"game_path": None,
"overlay_enabled": True,
"overlay_mode": "overlay_toggle",
"hotkey": "ctrl+shift+b",
"plugins": {
"enabled": [],
"disabled": []
},
"ui": {
"theme": "dark",
"opacity": 0.95,
"scale": 1.0
},
"features": {
"loot_tracker": True,
"skill_tracker": True,
"global_alerts": True,
"analytics": True
}
}
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
with open(config_path, 'r') as f:
return json.load(f)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="EU-Utility Premium - Entropia Universe Overlay"
)
parser.add_argument(
'--config',
type=Path,
default=Path.home() / '.eu-utility' / 'config.json',
help='Path to configuration file'
)
parser.add_argument(
'--no-overlay',
action='store_true',
help='Run without overlay (headless mode)'
)
parser.add_argument(
'--test',
action='store_true',
help='Run in test mode (no game required)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--setup',
action='store_true',
help='Run first-time setup'
)
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.verbose)
logger.info("=" * 50)
logger.info("EU-Utility Premium Starting")
logger.info("=" * 50)
# Load config
config = load_config(args.config)
logger.info(f"Loaded config from: {args.config}")
# Run setup if requested
if args.setup:
run_setup(config, args.config)
return 0
# Check for game path
if not config.get('game_path') and not args.test:
logger.error("No game path configured. Run with --setup or edit config.json")
print("\n" + "=" * 50)
print("First-time setup required!")
print("=" * 50)
print("\nPlease run: python main.py --setup")
print("\nOr manually edit the config file:")
print(f" {args.config}")
print("\nAnd set the game_path to your Entropia Universe installation.")
print("=" * 50)
return 1
# Import and start the application
try:
from premium import EUUtilityApp
logger.info("Initializing application...")
app = EUUtilityApp(config_path=args.config)
# Override config with CLI args
if args.no_overlay:
config['overlay_enabled'] = False
if args.test:
logger.info("Running in TEST mode")
return run_test_mode(app, config)
# Normal operation
logger.info("Starting application...")
app.initialize()
app.run()
except KeyboardInterrupt:
logger.info("Interrupted by user")
return 0
except Exception as e:
logger.exception("Fatal error")
print(f"\nError: {e}")
return 1
def run_setup(config: dict, config_path: Path) -> None:
"""Run first-time setup wizard."""
print("\n" + "=" * 50)
print("EU-Utility Setup Wizard")
print("=" * 50 + "\n")
# Detect game path
print("Looking for Entropia Universe installation...")
possible_paths = [
Path("C:/Program Files (x86)/Entropia Universe"),
Path("C:/Program Files/Entropia Universe"),
Path.home() / "AppData" / "Local" / "Entropia Universe",
]
detected = None
for path in possible_paths:
if path.exists():
detected = path
print(f" Found: {path}")
break
if detected:
use = input(f"\nUse this path? [Y/n]: ").strip().lower()
if use in ('', 'y', 'yes'):
config['game_path'] = str(detected)
else:
detected = None
if not detected:
custom = input("Enter your Entropia Universe installation path: ").strip()
if custom:
config['game_path'] = custom
# Save config
config_path.parent.mkdir(parents=True, exist_ok=True)
with open(config_path, 'w') as f:
json.dump(config, f, indent=2)
print(f"\nConfiguration saved to: {config_path}")
print("\nYou can now run: python main.py")
print("=" * 50)
def run_test_mode(app, config: dict) -> int:
"""Run in test mode without requiring game."""
print("\n" + "=" * 50)
print("EU-Utility TEST MODE")
print("=" * 50 + "\n")
try:
# Test imports
print("✓ Checking imports...")
from premium.plugins.api import PluginAPI, PluginManifest
from premium.plugins.manager import PluginManager
from premium.core.event_bus import EventBus
from premium.core.state.store import StateStore
print(" All imports successful")
# Test plugin system
print("\n✓ Testing plugin system...")
event_bus = EventBus()
store = StateStore(reducer=lambda s, a: s or {}, initial_state={})
plugin_manager = PluginManager(
plugin_dirs=[],
data_dir=Path.home() / '.eu-utility' / 'data',
event_bus=event_bus,
state_store=store
)
print(" Plugin manager initialized")
# Test event bus
print("\n✓ Testing event bus...")
test_events = []
@event_bus.on('test')
def on_test(event):
test_events.append(event)
event_bus.emit('test', {'message': 'hello'})
import time
time.sleep(0.1)
if len(test_events) == 1:
print(" Event bus working")
else:
print(" ⚠ Event bus may have issues")
# Test game client (simulated)
print("\n✓ Testing game client integration...")
from premium.eu_integration import GameClient
# Test with dummy path
client = GameClient(
game_path=Path.home(), # Dummy path for testing
event_bus=event_bus
)
print(" Game client initialized (simulated mode)")
# Test widget system
print("\n✓ Testing widget system...")
from premium.widgets import Widget, WidgetConfig
class TestWidget(Widget):
def create_ui(self, parent):
return None # No UI in test mode
widget = TestWidget(WidgetConfig(name="Test"))
print(" Widget system functional")
print("\n" + "=" * 50)
print("All tests passed!")
print("=" * 50)
print("\nThe application is ready to use.")
print("Run 'python main.py' to start with the actual game.")
return 0
except Exception as e:
print(f"\n✗ Test failed: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())

View File

@ -1,322 +0,0 @@
"""
EU-Utility Premium - Dashboard Widget Plugin
=============================================
A built-in plugin that provides a dashboard for player statistics.
Shows skills, loot, and game status in a convenient overlay widget.
"""
from pathlib import Path
from typing import Optional, Dict, Any
# Import the plugin API
from premium.plugins.api import (
PluginAPI, PluginContext, PluginManifest, PermissionLevel
)
# Manifest - REQUIRED
manifest = PluginManifest(
name="Player Dashboard",
version="1.0.0",
author="EU-Utility Team",
description="Player statistics dashboard with skills, loot, and game status",
entry_point="main.py",
permissions={PermissionLevel.UI, PermissionLevel.FILE_READ},
tags=["dashboard", "stats", "built-in"],
)
class DashboardPlugin(PluginAPI):
"""Player statistics dashboard plugin.
This plugin displays:
- Current character name and level
- Skill gains (latest and session total)
- Loot tracker (recent items and total value)
- Game connection status
"""
manifest = manifest
def __init__(self):
super().__init__()
self._widget = None
self._stats = {
'character': None,
'session_loot_value': 0.0,
'session_loot_count': 0,
'session_skills': [],
'last_loot': None,
'last_skill': None,
}
def on_init(self, ctx: PluginContext) -> None:
"""Called when plugin is initialized."""
self.ctx = ctx
ctx.logger.info("Dashboard plugin initialized")
# Load saved stats if any
self._load_stats()
# Subscribe to events
if ctx.event_bus:
ctx.event_bus.subscribe('game.loot', self._on_loot)
ctx.event_bus.subscribe('game.skill', self._on_skill)
ctx.event_bus.subscribe('game.connected', self._on_connected)
ctx.event_bus.subscribe('game.disconnected', self._on_disconnected)
def on_activate(self) -> None:
"""Called when plugin is activated."""
self.ctx.logger.info("Dashboard plugin activated")
def on_deactivate(self) -> None:
"""Called when plugin is deactivated."""
self._save_stats()
def on_shutdown(self) -> None:
"""Called when plugin is being unloaded."""
self._save_stats()
def create_widget(self) -> Optional[Any]:
"""Create the dashboard UI widget."""
try:
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QFrame, QPushButton, QGridLayout
)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont
# Main widget
widget = QWidget()
widget.setStyleSheet("""
QWidget {
background-color: #1e1e1e;
color: #ffffff;
font-family: 'Segoe UI', Arial, sans-serif;
}
QFrame {
background-color: #2d2d2d;
border-radius: 6px;
padding: 10px;
}
QLabel {
color: #ffffff;
}
.stat-label {
color: #888888;
font-size: 11px;
}
.stat-value {
font-size: 16px;
font-weight: bold;
}
""")
layout = QVBoxLayout(widget)
layout.setSpacing(15)
layout.setContentsMargins(15, 15, 15, 15)
# Header
header = QLabel("👤 Player Dashboard")
font = QFont("Segoe UI", 14, QFont.Weight.Bold)
header.setFont(font)
layout.addWidget(header)
# Status section
status_frame = QFrame()
status_layout = QHBoxLayout(status_frame)
self.status_label = QLabel("● Disconnected")
self.status_label.setStyleSheet("color: #f44336; font-weight: bold;")
status_layout.addWidget(self.status_label)
self.char_label = QLabel("No character")
status_layout.addStretch()
status_layout.addWidget(self.char_label)
layout.addWidget(status_frame)
# Stats grid
stats_frame = QFrame()
stats_layout = QGridLayout(stats_frame)
stats_layout.setSpacing(10)
# Session loot
loot_title = QLabel("💰 Session Loot")
loot_title.setStyleSheet("font-weight: bold;")
stats_layout.addWidget(loot_title, 0, 0)
self.loot_value_label = QLabel("0.00 PED")
self.loot_value_label.setStyleSheet("font-size: 18px; font-weight: bold; color: #4caf50;")
stats_layout.addWidget(self.loot_value_label, 1, 0)
self.loot_count_label = QLabel("0 items")
self.loot_count_label.setStyleSheet("color: #888888;")
stats_layout.addWidget(self.loot_count_label, 2, 0)
# Skills
skills_title = QLabel("📈 Skills")
skills_title.setStyleSheet("font-weight: bold;")
stats_layout.addWidget(skills_title, 0, 1)
self.skills_count_label = QLabel("0 gains")
self.skills_count_label.setStyleSheet("font-size: 18px; font-weight: bold; color: #2196f3;")
stats_layout.addWidget(self.skills_count_label, 1, 1)
self.last_skill_label = QLabel("-")
self.last_skill_label.setStyleSheet("color: #888888;")
stats_layout.addWidget(self.last_skill_label, 2, 1)
layout.addWidget(stats_frame)
# Recent activity
activity_title = QLabel("📝 Recent Activity")
activity_title.setStyleSheet("font-weight: bold; margin-top: 10px;")
layout.addWidget(activity_title)
self.activity_label = QLabel("No activity yet...")
self.activity_label.setWordWrap(True)
self.activity_label.setStyleSheet("color: #aaaaaa; padding: 5px;")
layout.addWidget(self.activity_label)
layout.addStretch()
# Save reference for updates
self._widget = widget
self._update_display()
return widget
except ImportError:
self.ctx.logger.warning("PyQt6 not available, cannot create widget")
return None
# ========== Event Handlers ==========
def _on_loot(self, event: Dict[str, Any]) -> None:
"""Handle loot event."""
item = event.get('item', 'Unknown')
value = event.get('value', 0.0)
quantity = event.get('quantity', 1)
self._stats['session_loot_value'] += value
self._stats['session_loot_count'] += quantity
self._stats['last_loot'] = {
'item': item,
'value': value,
'quantity': quantity,
}
self.ctx.logger.info(f"Loot: {item} x{quantity} ({value:.2f} PED)")
self._update_display()
def _on_skill(self, event: Dict[str, Any]) -> None:
"""Handle skill gain event."""
skill = event.get('skill', 'Unknown')
gain = event.get('gain', 0.0)
self._stats['session_skills'].append({
'skill': skill,
'gain': gain,
})
self._stats['last_skill'] = {
'skill': skill,
'gain': gain,
}
self.ctx.logger.info(f"Skill: {skill} +{gain:.4f}")
self._update_display()
def _on_connected(self, event: Dict[str, Any]) -> None:
"""Handle game connection."""
self.ctx.logger.info("Game connected")
self._update_status(True)
def _on_disconnected(self, event: Dict[str, Any]) -> None:
"""Handle game disconnection."""
self.ctx.logger.info("Game disconnected")
self._update_status(False)
# ========== Display Updates ==========
def _update_display(self) -> None:
"""Update the widget display."""
if not self._widget:
return
try:
# Update loot stats
self.loot_value_label.setText(
f"{self._stats['session_loot_value']:.2f} PED"
)
self.loot_count_label.setText(
f"{self._stats['session_loot_count']} items"
)
# Update skill stats
self.skills_count_label.setText(
f"{len(self._stats['session_skills'])} gains"
)
if self._stats['last_skill']:
skill = self._stats['last_skill']
self.last_skill_label.setText(
f"{skill['skill']} +{skill['gain']:.4f}"
)
# Update activity
activity_text = ""
if self._stats['last_loot']:
loot = self._stats['last_loot']
activity_text += f"Loot: {loot['item']} ({loot['value']:.2f} PED)\n"
if self._stats['last_skill']:
skill = self._stats['last_skill']
activity_text += f"Skill: {skill['skill']} +{skill['gain']:.4f}"
if activity_text:
self.activity_label.setText(activity_text)
except Exception as e:
self.ctx.logger.error(f"Error updating display: {e}")
def _update_status(self, connected: bool) -> None:
"""Update connection status display."""
if not self._widget:
return
if connected:
self.status_label.setText("● Connected")
self.status_label.setStyleSheet("color: #4caf50; font-weight: bold;")
else:
self.status_label.setText("● Disconnected")
self.status_label.setStyleSheet("color: #f44336; font-weight: bold;")
# ========== Persistence ==========
def _load_stats(self) -> None:
"""Load stats from disk."""
try:
import json
stats_path = self.ctx.data_dir / "session_stats.json"
if stats_path.exists():
with open(stats_path, 'r') as f:
saved = json.load(f)
self._stats.update(saved)
except Exception as e:
self.ctx.logger.error(f"Failed to load stats: {e}")
def _save_stats(self) -> None:
"""Save stats to disk."""
try:
import json
stats_path = self.ctx.data_dir / "session_stats.json"
with open(stats_path, 'w') as f:
json.dump(self._stats, f, indent=2)
except Exception as e:
self.ctx.logger.error(f"Failed to save stats: {e}")
# Export the plugin class
Plugin = DashboardPlugin

View File

@ -1,12 +0,0 @@
{
"name": "Player Dashboard",
"version": "1.0.0",
"author": "EU-Utility Team",
"description": "Player statistics dashboard with skills, loot, and game status",
"entry_point": "main.py",
"permissions": ["ui", "file_read"],
"dependencies": {},
"min_api_version": "3.0.0",
"tags": ["dashboard", "stats", "built-in"],
"icon": "📊"
}

View File

@ -1,173 +0,0 @@
"""
EU-Utility Premium
==================
Enterprise-grade modular overlay system for Entropia Universe.
This package provides the premium/enterprise layer with:
- Advanced plugin system with sandboxing
- Redux-inspired state management
- High-performance event bus
- Widget system for dashboard
- Entropia Universe game integration
Quick Start:
from premium import EUUtilityApp
app = EUUtilityApp()
app.run()
Modules:
plugins - Plugin management system
core - Core infrastructure (state, events)
widgets - Dashboard widget system
eu_integration - Entropia Universe integration
"""
__version__ = "3.0.0"
__author__ = "EU-Utility Team"
# Core exports
from premium.core.state.store import StateStore, ActionBase, create_store, get_store
from premium.core.event_bus import EventBus, Event, EventPriority
from premium.plugins.api import (
PluginAPI, PluginManifest, PluginContext, PluginInstance,
PluginState, PermissionLevel,
PluginError, PluginLoadError, PluginInitError
)
from premium.plugins.manager import PluginManager
# Widget system
from premium.widgets.base import Widget, WidgetConfig
from premium.widgets.dashboard import Dashboard
# EU Integration
from premium.eu_integration.game_client import GameClient
from premium.eu_integration.log_parser import LogParser
from premium.eu_integration.events import GameEvent, LootEvent, SkillEvent
class EUUtilityApp:
"""Main application class for EU-Utility Premium.
This is the high-level API for the entire premium system.
Example:
app = EUUtilityApp()
app.initialize()
app.run()
"""
def __init__(self, config_path=None):
self.config_path = config_path
self.plugin_manager = None
self.state_store = None
self.event_bus = None
self.game_client = None
self.dashboard = None
self._initialized = False
def initialize(self):
"""Initialize all subsystems."""
from pathlib import Path
# Create event bus
self.event_bus = EventBus()
# Create state store
self.state_store = StateStore(
reducer=self._root_reducer,
initial_state=self._get_initial_state()
)
# Create plugin manager
plugin_dirs = [
Path(__file__).parent.parent / "plugins" / "builtin",
Path(__file__).parent.parent / "plugins" / "user",
]
data_dir = Path.home() / ".eu-utility" / "data"
self.plugin_manager = PluginManager(
plugin_dirs=plugin_dirs,
data_dir=data_dir,
event_bus=self.event_bus,
state_store=self.state_store
)
# Create game client
self.game_client = GameClient(event_bus=self.event_bus)
self._initialized = True
def run(self):
"""Run the application."""
if not self._initialized:
self.initialize()
# Discover and load plugins
self.plugin_manager.discover_all()
self.plugin_manager.load_all(auto_activate=True)
# Start game client
self.game_client.start()
# Run main loop
try:
self._main_loop()
except KeyboardInterrupt:
self.shutdown()
def shutdown(self):
"""Shutdown the application gracefully."""
if self.plugin_manager:
self.plugin_manager.shutdown()
if self.game_client:
self.game_client.stop()
def _root_reducer(self, state, action):
"""Root state reducer."""
# Default reducer - just returns state
# Plugins can register their own reducers
return state or {}
def _get_initial_state(self):
"""Get initial application state."""
return {
'app': {
'version': __version__,
'initialized': False,
'game_connected': False,
},
'player': {
'name': None,
'skills': {},
'loot': [],
},
'plugins': {},
}
def _main_loop(self):
"""Main application loop."""
import time
while True:
time.sleep(0.1)
__all__ = [
# Version
'__version__',
# Core classes
'EUUtilityApp',
# State management
'StateStore', 'ActionBase', 'create_store', 'get_store',
# Event system
'EventBus', 'Event', 'EventPriority',
# Plugin system
'PluginManager', 'PluginAPI', 'PluginManifest', 'PluginContext',
'PluginInstance', 'PluginState', 'PermissionLevel',
'PluginError', 'PluginLoadError', 'PluginInitError',
# Widgets
'Widget', 'WidgetConfig', 'Dashboard',
# EU Integration
'GameClient', 'LogParser', 'GameEvent', 'LootEvent', 'SkillEvent',
]

View File

@ -1,599 +0,0 @@
"""
EU-Utility Premium - Event Bus
===============================
High-performance async event system for inter-plugin communication.
Features:
- Async/await support
- Event prioritization
- Event filtering and routing
- Buffered events for high-throughput scenarios
- Type-safe event definitions
Example:
from premium.core.event_bus import EventBus, Event, EventPriority
bus = EventBus()
# Subscribe to events
@bus.on('game.loot')
async def handle_loot(event: Event):
print(f"Got loot: {event.data}")
# Emit events
bus.emit('game.loot', {'item': 'Angel Scales', 'value': 150.50})
"""
from __future__ import annotations
import asyncio
import inspect
import logging
import time
import traceback
import weakref
from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import (
Any, Callable, Coroutine, Dict, List, Optional, Set, Type, TypeVar,
Union, Generic, Protocol, runtime_checkable
)
# =============================================================================
# EVENT PRIORITY
# =============================================================================
class EventPriority(Enum):
"""Priority levels for event handlers.
Higher priority handlers are called first.
"""
CRITICAL = 100 # System-critical events (error handling, shutdown)
HIGH = 75 # Important game events (loot, globals)
NORMAL = 50 # Standard events
LOW = 25 # Background tasks
BACKGROUND = 10 # Analytics, logging
# =============================================================================
# EVENT CLASS
# =============================================================================
@dataclass
class Event:
"""Represents an event in the system.
Attributes:
type: Event type/category
data: Event payload data
source: Source of the event (plugin_id or system)
timestamp: When the event was created
priority: Event priority
id: Unique event identifier
"""
type: str
data: Dict[str, Any] = field(default_factory=dict)
source: str = "system"
timestamp: float = field(default_factory=time.time)
priority: EventPriority = EventPriority.NORMAL
id: str = field(default_factory=lambda: f"evt_{int(time.time()*1000)}_{id(Event) % 10000}")
def get(self, key: str, default: Any = None) -> Any:
"""Get a value from event data."""
return self.data.get(key, default)
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary."""
return {
'id': self.id,
'type': self.type,
'data': self.data,
'source': self.source,
'timestamp': self.timestamp,
'priority': self.priority.name,
}
@classmethod
def create(
cls,
event_type: str,
data: Dict[str, Any],
source: str = "system",
priority: EventPriority = EventPriority.NORMAL
) -> Event:
"""Create a new event."""
return cls(
type=event_type,
data=data,
source=source,
priority=priority
)
# =============================================================================
# EVENT FILTER
# =============================================================================
@dataclass
class EventFilter:
"""Filter for event subscription.
Allows subscribers to receive only events matching specific criteria.
Example:
filter = EventFilter(
event_types=['game.loot', 'game.skill'],
sources=['game_client'],
min_priority=EventPriority.HIGH
)
"""
event_types: Optional[Set[str]] = None
sources: Optional[Set[str]] = None
min_priority: Optional[EventPriority] = None
data_filter: Optional[Callable[[Dict[str, Any]], bool]] = None
def matches(self, event: Event) -> bool:
"""Check if an event matches this filter."""
if self.event_types and event.type not in self.event_types:
return False
if self.sources and event.source not in self.sources:
return False
if self.min_priority and event.priority.value < self.min_priority.value:
return False
if self.data_filter and not self.data_filter(event.data):
return False
return True
# =============================================================================
# SUBSCRIPTION
# =============================================================================
@dataclass
class Subscription:
"""Represents an event subscription."""
id: str
event_types: Set[str]
callback: Callable[[Event], Any]
filter: Optional[EventFilter]
priority: EventPriority
once: bool
def matches(self, event: Event) -> bool:
"""Check if this subscription matches an event."""
if self.event_types and event.type not in self.event_types:
return False
if self.filter and not self.filter.matches(event):
return False
return True
# =============================================================================
# EVENT BUS
# =============================================================================
HandlerType = Callable[[Event], Any]
class EventBus:
"""High-performance async event bus.
Features:
- Async and sync handler support
- Priority-based execution order
- Event filtering
- Buffered event queues
- Wildcard subscriptions
Example:
bus = EventBus()
# Sync handler
@bus.on('game.loot')
def handle_loot(event):
print(f"Loot: {event.data}")
# Async handler
@bus.on('game.global', priority=EventPriority.HIGH)
async def handle_global(event):
await notify_discord(event.data)
# Emit event
bus.emit('game.loot', {'item': 'Uber Item', 'value': 1000})
"""
def __init__(self, max_queue_size: int = 10000):
"""Initialize event bus.
Args:
max_queue_size: Maximum events in queue before dropping
"""
self._subscribers: Dict[str, Set[Subscription]] = defaultdict(set)
self._wildcard_subscribers: Set[Subscription] = set()
self._queue: deque = deque(maxlen=max_queue_size)
self._logger = logging.getLogger("EventBus")
self._sub_counter = 0
self._running = False
self._lock = asyncio.Lock()
self._event_count = 0
self._dropped_count = 0
# ========== Subscription Methods ==========
def on(
self,
event_type: Union[str, List[str]],
priority: EventPriority = EventPriority.NORMAL,
filter: Optional[EventFilter] = None,
once: bool = False
) -> Callable[[HandlerType], HandlerType]:
"""Decorator to subscribe to events.
Args:
event_type: Event type(s) to subscribe to
priority: Handler priority
filter: Optional event filter
once: Remove after first matching event
Example:
@bus.on('game.loot')
def handle_loot(event):
print(event.data)
"""
def decorator(handler: HandlerType) -> HandlerType:
self.subscribe(event_type, handler, priority, filter, once)
return handler
return decorator
def subscribe(
self,
event_type: Union[str, List[str]],
handler: HandlerType,
priority: EventPriority = EventPriority.NORMAL,
filter: Optional[EventFilter] = None,
once: bool = False
) -> str:
"""Subscribe to events.
Args:
event_type: Event type(s) to subscribe to, or '*' for all
handler: Callback function
priority: Handler priority
filter: Optional event filter
once: Remove after first matching event
Returns:
Subscription ID
"""
self._sub_counter += 1
sub_id = f"sub_{self._sub_counter}"
# Handle wildcard subscription
if event_type == '*':
sub = Subscription(
id=sub_id,
event_types=set(),
callback=handler,
filter=filter,
priority=priority,
once=once
)
self._wildcard_subscribers.add(sub)
return sub_id
# Handle single or multiple event types
if isinstance(event_type, str):
event_types = {event_type}
else:
event_types = set(event_type)
sub = Subscription(
id=sub_id,
event_types=event_types,
callback=handler,
filter=filter,
priority=priority,
once=once
)
for et in event_types:
self._subscribers[et].add(sub)
return sub_id
def unsubscribe(self, subscription_id: str) -> bool:
"""Unsubscribe from events.
Args:
subscription_id: ID returned by subscribe()
Returns:
True if subscription was found and removed
"""
# Check wildcard subscribers
for sub in self._wildcard_subscribers:
if sub.id == subscription_id:
self._wildcard_subscribers.remove(sub)
return True
# Check typed subscribers
for event_type, subs in self._subscribers.items():
for sub in list(subs):
if sub.id == subscription_id:
subs.remove(sub)
return True
return False
def once(
self,
event_type: str,
handler: HandlerType,
priority: EventPriority = EventPriority.NORMAL,
timeout: Optional[float] = None
) -> asyncio.Future:
"""Subscribe to a single event.
Args:
event_type: Event type to wait for
handler: Optional handler callback
priority: Handler priority
timeout: Optional timeout in seconds
Returns:
Future that resolves with the event
"""
future = asyncio.Future()
def wrapper(event: Event):
if handler:
handler(event)
if not future.done():
future.set_result(event)
self.subscribe(event_type, wrapper, priority, once=True)
if timeout:
async def timeout_handler():
await asyncio.sleep(timeout)
if not future.done():
future.set_exception(TimeoutError(f"Event {event_type} timed out"))
asyncio.create_task(timeout_handler())
return future
# ========== Event Emission ==========
def emit(
self,
event_type: str,
data: Dict[str, Any],
source: str = "system",
priority: EventPriority = EventPriority.NORMAL
) -> Event:
"""Emit an event.
Args:
event_type: Type of event
data: Event data
source: Event source
priority: Event priority
Returns:
The created event
"""
event = Event.create(
event_type=event_type,
data=data,
source=source,
priority=priority
)
self._queue.append(event)
self._event_count += 1
# Process immediately in async context
if asyncio.get_event_loop().is_running():
asyncio.create_task(self._process_event(event))
return event
async def emit_async(
self,
event_type: str,
data: Dict[str, Any],
source: str = "system",
priority: EventPriority = EventPriority.NORMAL
) -> Event:
"""Emit an event asynchronously."""
event = Event.create(
event_type=event_type,
data=data,
source=source,
priority=priority
)
await self._process_event(event)
return event
async def _process_event(self, event: Event) -> None:
"""Process a single event."""
# Collect all matching handlers
handlers: List[tuple] = []
# Get handlers for specific event type
for sub in self._subscribers.get(event.type, set()):
if sub.matches(event):
handlers.append((sub.priority.value, sub))
# Get wildcard handlers
for sub in self._wildcard_subscribers:
if sub.filter is None or sub.filter.matches(event):
handlers.append((sub.priority.value, sub))
# Sort by priority (highest first)
handlers.sort(key=lambda x: -x[0])
# Execute handlers
once_subs = []
for _, sub in handlers:
try:
if inspect.iscoroutinefunction(sub.callback):
await sub.callback(event)
else:
sub.callback(event)
if sub.once:
once_subs.append(sub)
except Exception as e:
self._logger.error(f"Error in event handler: {e}")
traceback.print_exc()
# Remove once subscriptions
for sub in once_subs:
self.unsubscribe(sub.id)
# ========== Utility Methods ==========
def clear(self) -> None:
"""Clear all subscriptions and queued events."""
self._subscribers.clear()
self._wildcard_subscribers.clear()
self._queue.clear()
def get_stats(self) -> Dict[str, Any]:
"""Get event bus statistics."""
return {
'total_events': self._event_count,
'dropped_events': self._dropped_count,
'queue_size': len(self._queue),
'subscription_count': sum(
len(subs) for subs in self._subscribers.values()
),
'wildcard_subscriptions': len(self._wildcard_subscribers),
}
def wait_for(
self,
event_type: str,
condition: Optional[Callable[[Event], bool]] = None,
timeout: Optional[float] = None
) -> asyncio.Future:
"""Wait for a specific event.
Args:
event_type: Event type to wait for
condition: Optional condition function
timeout: Optional timeout in seconds
Returns:
Future that resolves with the matching event
"""
future = asyncio.Future()
def handler(event: Event):
if condition and not condition(event):
return
if not future.done():
future.set_result(event)
return True # Unsubscribe
self.subscribe(event_type, handler, once=True)
if timeout:
async def timeout_coro():
await asyncio.sleep(timeout)
if not future.done():
future.set_exception(TimeoutError(f"Timeout waiting for {event_type}"))
asyncio.create_task(timeout_coro())
return future
# =============================================================================
# TYPED EVENTS
# =============================================================================
class TypedEventBus(EventBus):
"""Type-safe event bus with predefined event types."""
# Game events
GAME_CONNECTED = "game.connected"
GAME_DISCONNECTED = "game.disconnected"
GAME_FOCUS_CHANGED = "game.focus_changed"
# Loot events
LOOT_RECEIVED = "game.loot"
GLOBAL_HOF = "game.global"
# Skill events
SKILL_GAIN = "game.skill"
PROFESSION_GAIN = "game.profession"
# Chat events
CHAT_MESSAGE = "game.chat"
# System events
PLUGIN_LOADED = "system.plugin.loaded"
PLUGIN_UNLOADED = "system.plugin.unloaded"
ERROR_OCCURRED = "system.error"
def emit_game_connected(self, character_name: str) -> Event:
"""Emit game connected event."""
return self.emit(self.GAME_CONNECTED, {
'character_name': character_name
})
def emit_loot(self, item: str, value: float, quantity: int = 1) -> Event:
"""Emit loot received event."""
return self.emit(self.LOOT_RECEIVED, {
'item': item,
'value': value,
'quantity': quantity,
'timestamp': time.time(),
})
def emit_skill(self, skill_name: str, value: float, gain: float) -> Event:
"""Emit skill gain event."""
return self.emit(self.SKILL_GAIN, {
'skill': skill_name,
'value': value,
'gain': gain,
})
def emit_global(self, mob_name: str, value: float, player: str) -> Event:
"""Emit global/HoF event."""
return self.emit(self.GLOBAL_HOF, {
'mob': mob_name,
'value': value,
'player': player,
'is_hof': value >= 1000,
})
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
'EventPriority',
'Event',
'EventFilter',
'Subscription',
'EventBus',
'TypedEventBus',
]

View File

@ -1,711 +0,0 @@
"""
EU-Utility Premium - State Management
======================================
Redux-inspired state store with:
- Immutable state updates
- Time-travel debugging
- Selectors for derived state
- Middleware support
- State persistence
Example:
from premium.core.state import StateStore, Action, Reducer
# Define actions
class IncrementAction(Action):
type = "INCREMENT"
# Define reducer
def counter_reducer(state: int, action: Action) -> int:
if action.type == "INCREMENT":
return state + 1
return state
# Create store
store = StateStore(counter_reducer, initial_state=0)
store.dispatch(IncrementAction())
"""
from __future__ import annotations
import copy
import json
import logging
import threading
import time
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import (
Any, Callable, Dict, Generic, List, Optional, Set, TypeVar, Union,
Protocol, runtime_checkable
)
# =============================================================================
# TYPE DEFINITIONS
# =============================================================================
T = TypeVar('T')
State = TypeVar('State')
@runtime_checkable
class Action(Protocol):
"""Protocol for actions."""
type: str
payload: Optional[Any] = None
meta: Optional[Dict[str, Any]] = None
class ActionBase:
"""Base class for actions."""
type: str = "UNKNOWN"
payload: Any = None
meta: Optional[Dict[str, Any]] = None
timestamp: datetime = field(default_factory=datetime.now)
def __init__(self, payload: Any = None, meta: Optional[Dict[str, Any]] = None):
self.payload = payload
self.meta = meta or {}
self.timestamp = datetime.now()
def to_dict(self) -> Dict[str, Any]:
"""Convert action to dictionary."""
return {
'type': self.type,
'payload': self.payload,
'meta': self.meta,
'timestamp': self.timestamp.isoformat(),
}
# Reducer type: (state, action) -> new_state
Reducer = Callable[[State, Action], State]
# Selector type: state -> derived_value
Selector = Callable[[State], T]
# Subscriber type: (new_state, old_state) -> None
Subscriber = Callable[[State, State], None]
# Middleware type: store -> next -> action -> result
Middleware = Callable[['StateStore', Callable[[Action], Any], Action], Any]
# =============================================================================
# BUILT-IN ACTIONS
# =============================================================================
class StateResetAction(ActionBase):
"""Reset state to initial value."""
type = "@@STATE/RESET"
class StateRestoreAction(ActionBase):
"""Restore state from snapshot."""
type = "@@STATE/RESTORE"
def __init__(self, state: State, meta: Optional[Dict[str, Any]] = None):
super().__init__(payload=state, meta=meta)
class StateBatchAction(ActionBase):
"""Batch multiple actions."""
type = "@@STATE/BATCH"
def __init__(self, actions: List[Action], meta: Optional[Dict[str, Any]] = None):
super().__init__(payload=actions, meta=meta)
class StateHydrateAction(ActionBase):
"""Hydrate state from persisted data."""
type = "@@STATE/HYDRATE"
def __init__(self, state: State, meta: Optional[Dict[str, Any]] = None):
super().__init__(payload=state, meta=meta)
# =============================================================================
# STATE SNAPSHOT
# =============================================================================
@dataclass
class StateSnapshot:
"""Immutable snapshot of state at a point in time."""
state: Any
action: Optional[Action] = None
timestamp: datetime = field(default_factory=datetime.now)
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
def to_dict(self) -> Dict[str, Any]:
"""Convert snapshot to dictionary."""
return {
'id': self.id,
'state': self.state,
'action': self.action.to_dict() if self.action else None,
'timestamp': self.timestamp.isoformat(),
}
# =============================================================================
# STORE SLICE
# =============================================================================
@dataclass
class StoreSlice(Generic[T]):
"""A slice of the store with its own reducer and state."""
name: str
reducer: Reducer
initial_state: T
selectors: Dict[str, Selector] = field(default_factory=dict)
# =============================================================================
# COMBINED REDUCER
# =============================================================================
def combine_reducers(reducers: Dict[str, Reducer]) -> Reducer:
"""Combine multiple reducers into one.
Each reducer manages a slice of the state.
Args:
reducers: Dict mapping slice names to reducers
Returns:
Combined reducer function
Example:
root_reducer = combine_reducers({
'counter': counter_reducer,
'todos': todos_reducer,
})
"""
def combined(state: Dict[str, Any], action: Action) -> Dict[str, Any]:
new_state = {}
has_changed = False
for key, reducer in reducers.items():
previous_state = state.get(key) if isinstance(state, dict) else None
new_slice = reducer(previous_state, action)
new_state[key] = new_slice
if new_slice is not previous_state:
has_changed = True
return new_state if has_changed else state
return combined
# =============================================================================
# MIDDLEWARE
# =============================================================================
def logging_middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any:
"""Middleware that logs all actions."""
print(f"[State] Action: {action.type}")
result = next(action)
print(f"[State] New state: {store.get_state()}")
return result
def thunk_middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any:
"""Middleware that allows thunk actions (functions)."""
if callable(action) and not isinstance(action, ActionBase):
# It's a thunk - call it with dispatch and get_state
return action(store.dispatch, store.get_state)
return next(action)
def persistence_middleware(
storage_path: Path,
debounce_ms: int = 1000
) -> Middleware:
"""Create middleware that persists state to disk.
Args:
storage_path: Path to store state
debounce_ms: Debounce time in milliseconds
Returns:
Middleware function
"""
last_save = 0
pending_save = False
def middleware(store: StateStore, next: Callable[[Action], Any], action: Action) -> Any:
nonlocal last_save, pending_save
result = next(action)
# Debounce saves
current_time = time.time() * 1000
if current_time - last_save > debounce_ms:
try:
state = store.get_state()
with open(storage_path, 'w', encoding='utf-8') as f:
json.dump(state, f, indent=2, default=str)
last_save = current_time
pending_save = False
except Exception as e:
logging.getLogger("StateStore").error(f"Failed to persist state: {e}")
else:
pending_save = True
return result
return middleware
# =============================================================================
# STATE STORE
# =============================================================================
class StateStore(Generic[State]):
"""Redux-inspired state store with time-travel debugging.
Features:
- Immutable state updates
- Action history for debugging
- State snapshots for time travel
- Selectors for derived state
- Middleware support
- State persistence
Example:
store = StateStore(
reducer=root_reducer,
initial_state={'count': 0},
middleware=[logging_middleware]
)
# Subscribe to changes
unsubscribe = store.subscribe(lambda new, old: print(f"Changed: {old} -> {new}"))
# Dispatch actions
store.dispatch(IncrementAction())
# Use selectors
count = store.select(lambda state: state['count'])
# Time travel
store.undo() # Undo last action
store.jump_to_snapshot(0) # Jump to initial state
"""
def __init__(
self,
reducer: Reducer[State, Action],
initial_state: Optional[State] = None,
middleware: Optional[List[Middleware]] = None,
max_history: int = 1000,
enable_time_travel: bool = True
):
"""Initialize state store.
Args:
reducer: Root reducer function
initial_state: Initial state value
middleware: List of middleware functions
max_history: Maximum action history size
enable_time_travel: Enable time-travel debugging
"""
self._reducer = reducer
self._state: State = initial_state
self._initial_state = copy.deepcopy(initial_state)
self._middleware = middleware or []
self._max_history = max_history
self._enable_time_travel = enable_time_travel
# Subscribers
self._subscribers: Dict[str, Subscriber] = {}
self._subscriber_counter = 0
# History for time travel
self._history: List[StateSnapshot] = []
self._current_index = -1
# Lock for thread safety
self._lock = threading.RLock()
self._logger = logging.getLogger("StateStore")
# Create initial snapshot
if enable_time_travel:
self._add_snapshot(StateSnapshot(
state=copy.deepcopy(self._state),
action=None
))
# ========== Core Methods ==========
def get_state(self) -> State:
"""Get current state (immutable)."""
with self._lock:
return copy.deepcopy(self._state)
def dispatch(self, action: Action) -> Action:
"""Dispatch an action to update state.
Args:
action: Action to dispatch
Returns:
The dispatched action
"""
# Apply middleware chain
def dispatch_action(a: Action) -> Action:
return self._apply_reducer(a)
# Build middleware chain
chain = dispatch_action
for mw in reversed(self._middleware):
chain = lambda a, mw=mw, next=chain: mw(self, next, a)
return chain(action)
def _apply_reducer(self, action: Action) -> Action:
"""Apply reducer and update state."""
with self._lock:
old_state = self._state
new_state = self._reducer(copy.deepcopy(old_state), action)
# Only update if state changed
if new_state is not old_state:
self._state = new_state
# Add to history
if self._enable_time_travel:
# Remove any future states if we're not at the end
if self._current_index < len(self._history) - 1:
self._history = self._history[:self._current_index + 1]
self._add_snapshot(StateSnapshot(
state=copy.deepcopy(new_state),
action=action
))
# Notify subscribers
self._notify_subscribers(new_state, old_state)
return action
def _add_snapshot(self, snapshot: StateSnapshot) -> None:
"""Add snapshot to history."""
self._history.append(snapshot)
self._current_index = len(self._history) - 1
# Trim history if needed
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
self._current_index = len(self._history) - 1
# ========== Subscription ==========
def subscribe(self, callback: Subscriber, selector: Optional[Selector] = None) -> Callable[[], None]:
"""Subscribe to state changes.
Args:
callback: Function called when state changes
selector: Optional selector to compare specific parts
Returns:
Unsubscribe function
"""
with self._lock:
self._subscriber_counter += 1
sub_id = f"sub_{self._subscriber_counter}"
# Wrap callback with selector if provided
if selector:
last_value = selector(self._state)
def wrapped_callback(new_state: State, old_state: State) -> None:
nonlocal last_value
new_value = selector(new_state)
if new_value != last_value:
last_value = new_value
callback(new_state, old_state)
self._subscribers[sub_id] = wrapped_callback
else:
self._subscribers[sub_id] = callback
def unsubscribe() -> None:
with self._lock:
self._subscribers.pop(sub_id, None)
return unsubscribe
def _notify_subscribers(self, new_state: State, old_state: State) -> None:
"""Notify all subscribers of state change."""
for callback in list(self._subscribers.values()):
try:
callback(new_state, old_state)
except Exception as e:
self._logger.error(f"Error in subscriber: {e}")
# ========== Selectors ==========
def select(self, selector: Selector[T]) -> T:
"""Select a derived value from state.
Args:
selector: Function that extracts value from state
Returns:
Selected value
"""
with self._lock:
return selector(copy.deepcopy(self._state))
def create_selector(self, *input_selectors: Selector, combiner: Callable) -> Selector:
"""Create a memoized selector.
Args:
input_selectors: Selectors that provide input values
combiner: Function that combines inputs into output
Returns:
Memoized selector function
"""
last_inputs = [None] * len(input_selectors)
last_result = None
def memoized_selector(state: State) -> Any:
nonlocal last_inputs, last_result
inputs = [s(state) for s in input_selectors]
# Check if inputs changed
if inputs != last_inputs:
last_inputs = inputs
last_result = combiner(*inputs)
return last_result
return memoized_selector
# ========== Time Travel ==========
def get_history(self) -> List[StateSnapshot]:
"""Get action history."""
with self._lock:
return self._history.copy()
def get_current_index(self) -> int:
"""Get current position in history."""
return self._current_index
def can_undo(self) -> bool:
"""Check if undo is possible."""
return self._current_index > 0
def can_redo(self) -> bool:
"""Check if redo is possible."""
return self._current_index < len(self._history) - 1
def undo(self) -> bool:
"""Undo last action.
Returns:
True if undo was performed
"""
if not self.can_undo():
return False
with self._lock:
self._current_index -= 1
old_state = self._state
self._state = copy.deepcopy(self._history[self._current_index].state)
self._notify_subscribers(self._state, old_state)
return True
def redo(self) -> bool:
"""Redo last undone action.
Returns:
True if redo was performed
"""
if not self.can_redo():
return False
with self._lock:
self._current_index += 1
old_state = self._state
self._state = copy.deepcopy(self._history[self._current_index].state)
self._notify_subscribers(self._state, old_state)
return True
def jump_to_snapshot(self, index: int) -> bool:
"""Jump to a specific snapshot in history.
Args:
index: Snapshot index
Returns:
True if jump was successful
"""
if index < 0 or index >= len(self._history):
return False
with self._lock:
old_state = self._state
self._current_index = index
self._state = copy.deepcopy(self._history[index].state)
self._notify_subscribers(self._state, old_state)
return True
def reset(self) -> None:
"""Reset to initial state."""
self.dispatch(StateResetAction())
with self._lock:
old_state = self._state
self._state = copy.deepcopy(self._initial_state)
if self._enable_time_travel:
self._history.clear()
self._current_index = -1
self._add_snapshot(StateSnapshot(
state=copy.deepcopy(self._state),
action=None
))
self._notify_subscribers(self._state, old_state)
# ========== Persistence ==========
def save_to_disk(self, path: Path) -> bool:
"""Save current state to disk.
Args:
path: File path to save to
Returns:
True if saved successfully
"""
try:
with self._lock:
state_data = {
'state': self._state,
'timestamp': datetime.now().isoformat(),
'history_count': len(self._history),
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
json.dump(state_data, f, indent=2, default=str)
return True
except Exception as e:
self._logger.error(f"Failed to save state: {e}")
return False
def load_from_disk(self, path: Path) -> bool:
"""Load state from disk.
Args:
path: File path to load from
Returns:
True if loaded successfully
"""
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
state = data.get('state')
if state is not None:
self.dispatch(StateHydrateAction(state))
return True
except Exception as e:
self._logger.error(f"Failed to load state: {e}")
return False
# =============================================================================
# MODULE-LEVEL STORE (Application-wide state)
# =============================================================================
_module_stores: Dict[str, StateStore] = {}
def create_store(
name: str,
reducer: Reducer,
initial_state: Optional[Any] = None,
**kwargs
) -> StateStore:
"""Create or get a named store.
Args:
name: Unique store name
reducer: Reducer function
initial_state: Initial state
**kwargs: Additional arguments for StateStore
Returns:
StateStore instance
"""
if name not in _module_stores:
_module_stores[name] = StateStore(
reducer=reducer,
initial_state=initial_state,
**kwargs
)
return _module_stores[name]
def get_store(name: str) -> Optional[StateStore]:
"""Get a named store.
Args:
name: Store name
Returns:
StateStore or None if not found
"""
return _module_stores.get(name)
def remove_store(name: str) -> bool:
"""Remove a named store.
Args:
name: Store name
Returns:
True if removed
"""
if name in _module_stores:
del _module_stores[name]
return True
return False
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
# Types
'Action', 'ActionBase', 'Reducer', 'Selector', 'Subscriber', 'Middleware',
# Actions
'StateResetAction', 'StateRestoreAction', 'StateBatchAction', 'StateHydrateAction',
# Classes
'StateSnapshot', 'StoreSlice', 'StateStore',
# Functions
'combine_reducers', 'create_store', 'get_store', 'remove_store',
# Middleware
'logging_middleware', 'thunk_middleware', 'persistence_middleware',
]

View File

@ -1,33 +0,0 @@
"""
EU-Utility Premium - Entropia Universe Integration
===================================================
Modules for integrating with the Entropia Universe game client.
Example:
from premium.eu_integration import GameClient
client = GameClient()
client.start()
@client.on_loot
def handle_loot(event):
print(f"Got loot: {event.item}")
"""
from .game_client import GameClient, GameState
from .log_parser import LogParser, LogEvent
from .window_tracker import WindowTracker
from .events import GameEvent, LootEvent, SkillEvent, ChatEvent
__all__ = [
'GameClient',
'GameState',
'LogParser',
'LogEvent',
'WindowTracker',
'GameEvent',
'LootEvent',
'SkillEvent',
'ChatEvent',
]

View File

@ -1,131 +0,0 @@
"""
Typed event definitions for Entropia Universe.
"""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Dict, Any
@dataclass
class GameEvent:
"""Base game event."""
timestamp: datetime
type: str
data: Dict[str, Any]
@dataclass
class LootEvent(GameEvent):
"""Loot received event."""
item: str
quantity: int
value: float
@classmethod
def from_data(cls, data: Dict[str, Any]) -> 'LootEvent':
return cls(
timestamp=datetime.now(),
type='loot',
data=data,
item=data.get('item', 'Unknown'),
quantity=data.get('quantity', 1),
value=data.get('value', 0.0),
)
@dataclass
class SkillEvent(GameEvent):
"""Skill gain event."""
skill: str
gain: float
new_value: Optional[float] = None
@classmethod
def from_data(cls, data: Dict[str, Any]) -> 'SkillEvent':
return cls(
timestamp=datetime.now(),
type='skill',
data=data,
skill=data.get('skill', 'Unknown'),
gain=data.get('gain', 0.0),
new_value=data.get('new_value'),
)
@dataclass
class GlobalEvent(GameEvent):
"""Global/HoF event."""
player: str
mob: str
item: str
value: float
is_hof: bool
@classmethod
def from_data(cls, data: Dict[str, Any]) -> 'GlobalEvent':
return cls(
timestamp=datetime.now(),
type='global',
data=data,
player=data.get('player', 'Unknown'),
mob=data.get('mob', 'Unknown'),
item=data.get('item', 'Unknown'),
value=data.get('value', 0.0),
is_hof=data.get('is_hof', False),
)
@dataclass
class ChatEvent(GameEvent):
"""Chat message event."""
channel: str
player: str
message: str
@classmethod
def from_data(cls, data: Dict[str, Any]) -> 'ChatEvent':
return cls(
timestamp=datetime.now(),
type='chat',
data=data,
channel=data.get('channel', 'Unknown'),
player=data.get('player', 'Unknown'),
message=data.get('message', ''),
)
@dataclass
class DamageEvent(GameEvent):
"""Damage dealt/received event."""
damage: float
target: Optional[str] = None
is_critical: bool = False
@classmethod
def from_data(cls, data: Dict[str, Any], dealt: bool = True) -> 'DamageEvent':
return cls(
timestamp=datetime.now(),
type='damage_dealt' if dealt else 'damage_received',
data=data,
damage=data.get('damage', 0.0),
target=data.get('target'),
is_critical=data.get('is_critical', False),
)
@dataclass
class KillEvent(GameEvent):
"""Creature killed event."""
creature: str
experience: Optional[float] = None
@classmethod
def from_data(cls, data: Dict[str, Any]) -> 'KillEvent':
return cls(
timestamp=datetime.now(),
type='kill',
data=data,
creature=data.get('creature', 'Unknown'),
experience=data.get('experience'),
)

View File

@ -1,344 +0,0 @@
"""
Entropia Universe Game Client Integration
==========================================
Provides integration with the EU game client:
- Process detection
- Window tracking
- Log file monitoring
- Event emission
"""
import os
import re
import time
import logging
import threading
from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
from typing import Optional, Callable, Dict, List, Any, Set
class GameState(Enum):
"""Game client connection state."""
DISCONNECTED = auto()
DETECTED = auto() # Process found
CONNECTED = auto() # Log file accessible
PLAYING = auto() # Character active
@dataclass
class CharacterInfo:
"""Information about the current character."""
name: Optional[str] = None
level: Optional[int] = None
profession: Optional[str] = None
health: Optional[float] = None
position: Optional[tuple] = None
class GameClient:
"""Main interface for Entropia Universe game client.
This class monitors the game client process, tracks window state,
parses log files, and emits events for game actions.
Example:
client = GameClient()
@client.on('loot')
def handle_loot(event):
print(f"Got: {event['item']}")
client.start()
"""
# Default paths
DEFAULT_INSTALL_PATHS = [
Path("C:/Program Files (x86)/Entropia Universe"),
Path("C:/Program Files/Entropia Universe"),
Path.home() / "AppData" / "Local" / "Entropia Universe",
]
def __init__(
self,
game_path: Optional[Path] = None,
event_bus: Optional[Any] = None,
poll_interval: float = 1.0
):
"""Initialize game client.
Args:
game_path: Path to EU installation (auto-detected if None)
event_bus: Event bus for emitting events
poll_interval: Seconds between process checks
"""
self.game_path = game_path
self.event_bus = event_bus
self.poll_interval = poll_interval
self._state = GameState.DISCONNECTED
self._character = CharacterInfo()
self._process_id: Optional[int] = None
self._window_handle: Optional[int] = None
self._log_path: Optional[Path] = None
self._running = False
self._monitor_thread: Optional[threading.Thread] = None
self._callbacks: Dict[str, List[Callable]] = {}
self._logger = logging.getLogger("GameClient")
# Sub-components
self._log_parser: Optional[Any] = None
self._window_tracker: Optional[Any] = None
# ========== Properties ==========
@property
def state(self) -> GameState:
"""Current game connection state."""
return self._state
@property
def is_connected(self) -> bool:
"""Whether game client is connected."""
return self._state in (GameState.CONNECTED, GameState.PLAYING)
@property
def is_playing(self) -> bool:
"""Whether character is active in game."""
return self._state == GameState.PLAYING
@property
def character(self) -> CharacterInfo:
"""Current character information."""
return self._character
# ========== Lifecycle ==========
def start(self) -> bool:
"""Start monitoring the game client.
Returns:
True if started successfully
"""
if self._running:
return True
self._running = True
# Auto-detect game path if not provided
if not self.game_path:
self.game_path = self._detect_game_path()
if self.game_path:
self._log_path = self.game_path / "chat.log"
# Start monitoring thread
self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._monitor_thread.start()
self._logger.info("Game client monitor started")
return True
def stop(self) -> None:
"""Stop monitoring the game client."""
self._running = False
if self._monitor_thread:
self._monitor_thread.join(timeout=2.0)
if self._log_parser:
self._log_parser.stop()
self._logger.info("Game client monitor stopped")
# ========== Event Handling ==========
def on(self, event_type: str, callback: Callable) -> Callable:
"""Register event callback.
Args:
event_type: Event type ('loot', 'skill', 'global', etc.)
callback: Function to call when event occurs
Returns:
The callback function (for use as decorator)
"""
if event_type not in self._callbacks:
self._callbacks[event_type] = []
self._callbacks[event_type].append(callback)
return callback
def off(self, event_type: str, callback: Callable) -> bool:
"""Unregister event callback."""
if event_type in self._callbacks:
if callback in self._callbacks[event_type]:
self._callbacks[event_type].remove(callback)
return True
return False
def _emit(self, event_type: str, data: Dict[str, Any]) -> None:
"""Emit event to callbacks and event bus."""
# Call local callbacks
for callback in self._callbacks.get(event_type, []):
try:
callback(data)
except Exception as e:
self._logger.error(f"Error in callback: {e}")
# Emit to event bus
if self.event_bus:
try:
self.event_bus.emit(f"game.{event_type}", data, source="game_client")
except Exception as e:
self._logger.error(f"Error emitting to event bus: {e}")
# ========== Detection ==========
def _detect_game_path(self) -> Optional[Path]:
"""Auto-detect EU installation path."""
# Check default paths
for path in self.DEFAULT_INSTALL_PATHS:
if path.exists() and (path / "Entropia.exe").exists():
self._logger.info(f"Found EU at: {path}")
return path
# Check registry on Windows
try:
import winreg
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r"SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall") as key:
for i in range(winreg.QueryInfoKey(key)[0]):
try:
subkey_name = winreg.EnumKey(key, i)
with winreg.OpenKey(key, subkey_name) as subkey:
name = winreg.QueryValueEx(subkey, "DisplayName")[0]
if "Entropia" in name:
path = winreg.QueryValueEx(subkey, "InstallLocation")[0]
return Path(path)
except:
continue
except Exception as e:
self._logger.debug(f"Registry search failed: {e}")
return None
def _find_process(self) -> Optional[int]:
"""Find EU game process ID."""
try:
import psutil
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] and 'entropia' in proc.info['name'].lower():
return proc.info['pid']
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
except ImportError:
pass
# Fallback: check if log file exists and is being written
if self._log_path and self._log_path.exists():
# Check if file was modified recently
mtime = self._log_path.stat().st_mtime
if time.time() - mtime < 60: # Modified in last minute
return -1 # Unknown PID but likely running
return None
def _is_window_focused(self) -> bool:
"""Check if EU window is focused."""
try:
import win32gui
import win32process
hwnd = win32gui.GetForegroundWindow()
_, pid = win32process.GetWindowThreadProcessId(hwnd)
return pid == self._process_id
except ImportError:
pass
return False
# ========== Monitoring Loop ==========
def _monitor_loop(self) -> None:
"""Main monitoring loop."""
last_log_position = 0
while self._running:
try:
# Check process
pid = self._find_process()
if pid and self._state == GameState.DISCONNECTED:
self._process_id = pid
self._state = GameState.DETECTED
self._emit('connected', {'pid': pid})
self._start_log_parsing()
elif not pid and self._state != GameState.DISCONNECTED:
self._state = GameState.DISCONNECTED
self._process_id = None
self._emit('disconnected', {})
self._stop_log_parsing()
# Check window focus
if self._state != GameState.DISCONNECTED:
focused = self._is_window_focused()
self._emit('focus_changed', {'focused': focused})
time.sleep(self.poll_interval)
except Exception as e:
self._logger.error(f"Monitor error: {e}")
time.sleep(self.poll_interval)
def _start_log_parsing(self) -> None:
"""Start parsing log files."""
if not self._log_path or not self._log_path.exists():
return
from .log_parser import LogParser
self._log_parser = LogParser(self._log_path)
@self._log_parser.on_event
def handle_log_event(event_type, data):
self._handle_log_event(event_type, data)
self._log_parser.start()
self._state = GameState.CONNECTED
def _stop_log_parsing(self) -> None:
"""Stop parsing log files."""
if self._log_parser:
self._log_parser.stop()
self._log_parser = None
def _handle_log_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""Handle events from log parser."""
# Update character info
if event_type == 'system_message' and 'character' in data:
self._character.name = data.get('character')
self._state = GameState.PLAYING
# Emit to listeners
self._emit(event_type, data)
# ========== Public API ==========
def get_game_path(self) -> Optional[Path]:
"""Get detected game path."""
return self.game_path
def get_log_path(self) -> Optional[Path]:
"""Get path to chat.log."""
return self._log_path
def set_game_path(self, path: Path) -> None:
"""Manually set game path."""
self.game_path = Path(path)
self._log_path = self.game_path / "chat.log"

View File

@ -1,323 +0,0 @@
"""
Log file parser for Entropia Universe chat.log
===============================================
Parses EU chat.log file to extract:
- Loot events
- Skill gains
- Chat messages
- System messages
- Globals/HoFs
"""
import re
import time
import logging
import threading
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable, Dict, List, Any, Pattern
@dataclass
class LogEvent:
"""Represents a parsed log event."""
timestamp: datetime
type: str
data: Dict[str, Any]
raw: str
class LogParser:
"""Parser for Entropia Universe chat.log file.
Monitors the log file and emits events for interesting game actions.
Example:
parser = LogParser(Path("C:/.../chat.log"))
@parser.on_event
def handle(event_type, data):
if event_type == 'loot':
print(f"Loot: {data['item']}")
parser.start()
"""
# Regex patterns for different event types
PATTERNS = {
# Loot: "2024-01-15 14:30:25 [System] [] [Player] You received Angel Scales x1 Value: 150 PED"
'loot': re.compile(
r'You received\s+(.+?)\s+x(\d+)\s+Value:\s+([\d.]+)'
),
# Skill gain: "Skill increase: Rifle (Gain: 0.5234)"
'skill': re.compile(
r'Skill increase:\s+(.+?)\s+\(Gain:\s+([\d.]+)\)'
),
# Global/HoF: "[Global] [Player] found something rare! Angel Scales (Uber)"
'global': re.compile(
r'\[Global\].*?found something rare!\s+(.+?)\s+\((\d+)\s*PED\)'
),
# HoF (Hall of Fame): "[Hall of Fame] [Player] found something extraordinary!"
'hof': re.compile(
r'\[Hall of Fame\].*?found something extraordinary!\s+(.+?)\s+\((\d+)\s*PED\)'
),
# Chat message: "[Trade] [PlayerName]: Message text"
'chat': re.compile(
r'\[(\w+)\]\s+\[(.+?)\]:\s+(.*)'
),
# Damage dealt: "You inflicted 45.2 points of damage"
'damage_dealt': re.compile(
r'You inflicted\s+([\d.]+)\s+points of damage'
),
# Damage received: "You took 12.3 points of damage"
'damage_received': re.compile(
r'You took\s+([\d.]+)\s+points of damage'
),
# Enemy killed: "You killed [Creature Name]"
'kill': re.compile(
r'You killed\s+\[(.+?)\]'
),
# Item crafted: "You have successfully crafted [Item Name]"
'craft': re.compile(
r'You have successfully crafted\s+\[(.+?)\]'
),
# Mission completed: "Mission "Mission Name" completed!"
'mission_complete': re.compile(
r'Mission\s+"(.+?)"\s+completed!'
),
# Experience gain: "You gained 0.2345 experience in your Rifle skill"
'experience': re.compile(
r'You gained\s+([\d.]+)\s+experience in your\s+(.+?)\s+skill'
),
# Item looted (alt format): "You looted [Item Name]"
'loot_alt': re.compile(
r'You looted\s+\[(.+?)\]\s*(?:x(\d+))?'
),
}
def __init__(
self,
log_path: Path,
poll_interval: float = 0.5,
encoding: str = 'utf-8'
):
"""Initialize log parser.
Args:
log_path: Path to chat.log file
poll_interval: Seconds between file checks
encoding: File encoding
"""
self.log_path = Path(log_path)
self.poll_interval = poll_interval
self.encoding = encoding
self._running = False
self._thread: Optional[threading.Thread] = None
self._file_position = 0
self._callbacks: List[Callable] = []
self._logger = logging.getLogger("LogParser")
# Track last position to detect new content
self._last_size = 0
self._last_modified = 0
def start(self) -> bool:
"""Start parsing log file.
Returns:
True if started successfully
"""
if self._running:
return True
if not self.log_path.exists():
self._logger.error(f"Log file not found: {self.log_path}")
return False
self._running = True
# Start at end of file (only read new content)
try:
self._last_size = self.log_path.stat().st_size
self._file_position = self._last_size
except Exception as e:
self._logger.error(f"Failed to get file size: {e}")
return False
self._thread = threading.Thread(target=self._parse_loop, daemon=True)
self._thread.start()
self._logger.info(f"Started parsing {self.log_path}")
return True
def stop(self) -> None:
"""Stop parsing log file."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
self._logger.info("Stopped parsing")
def on_event(self, callback: Callable[[str, Dict[str, Any]], None]) -> Callable:
"""Register event callback.
Args:
callback: Function(event_type, data) called for each event
Returns:
The callback function (for use as decorator)
"""
self._callbacks.append(callback)
return callback
def _emit(self, event_type: str, data: Dict[str, Any]) -> None:
"""Emit event to all callbacks."""
for callback in self._callbacks:
try:
callback(event_type, data)
except Exception as e:
self._logger.error(f"Error in callback: {e}")
def _parse_loop(self) -> None:
"""Main parsing loop."""
while self._running:
try:
self._check_file()
time.sleep(self.poll_interval)
except Exception as e:
self._logger.error(f"Parse error: {e}")
time.sleep(self.poll_interval)
def _check_file(self) -> None:
"""Check log file for new content."""
try:
stat = self.log_path.stat()
current_size = stat.st_size
current_modified = stat.st_mtime
# Check if file has new content
if current_size == self._last_size:
return
if current_size < self._last_size:
# File was truncated or rotated, start from beginning
self._file_position = 0
self._last_size = current_size
self._last_modified = current_modified
# Read new content
with open(self.log_path, 'r', encoding=self.encoding, errors='ignore') as f:
f.seek(self._file_position)
new_lines = f.readlines()
self._file_position = f.tell()
# Parse each new line
for line in new_lines:
self._parse_line(line.strip())
except Exception as e:
self._logger.error(f"Error checking file: {e}")
def _parse_line(self, line: str) -> Optional[LogEvent]:
"""Parse a single log line."""
if not line:
return None
# Extract timestamp if present
timestamp = datetime.now()
ts_match = re.match(r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})', line)
if ts_match:
try:
timestamp = datetime.strptime(ts_match.group(1), '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
# Try each pattern
for event_type, pattern in self.PATTERNS.items():
match = pattern.search(line)
if match:
data = self._extract_data(event_type, match, line)
event = LogEvent(
timestamp=timestamp,
type=event_type,
data=data,
raw=line
)
self._emit(event_type, data)
return event
return None
def _extract_data(
self,
event_type: str,
match: re.Match,
raw_line: str
) -> Dict[str, Any]:
"""Extract data from regex match."""
groups = match.groups()
data = {
'raw': raw_line,
'timestamp': datetime.now().isoformat(),
}
if event_type == 'loot':
data['item'] = groups[0].strip()
data['quantity'] = int(groups[1]) if len(groups) > 1 else 1
data['value'] = float(groups[2]) if len(groups) > 2 else 0.0
elif event_type == 'skill':
data['skill'] = groups[0].strip()
data['gain'] = float(groups[1]) if len(groups) > 1 else 0.0
elif event_type in ('global', 'hof'):
data['item'] = groups[0].strip() if groups else 'Unknown'
data['value'] = float(groups[1]) if len(groups) > 1 else 0.0
data['is_hof'] = event_type == 'hof'
elif event_type == 'chat':
data['channel'] = groups[0] if groups else 'Unknown'
data['player'] = groups[1] if len(groups) > 1 else 'Unknown'
data['message'] = groups[2] if len(groups) > 2 else ''
elif event_type == 'damage_dealt':
data['damage'] = float(groups[0]) if groups else 0.0
elif event_type == 'damage_received':
data['damage'] = float(groups[0]) if groups else 0.0
elif event_type == 'kill':
data['creature'] = groups[0] if groups else 'Unknown'
elif event_type == 'craft':
data['item'] = groups[0] if groups else 'Unknown'
elif event_type == 'mission_complete':
data['mission'] = groups[0] if groups else 'Unknown'
elif event_type == 'experience':
data['amount'] = float(groups[0]) if groups else 0.0
data['skill'] = groups[1].strip() if len(groups) > 1 else 'Unknown'
elif event_type == 'loot_alt':
data['item'] = groups[0] if groups else 'Unknown'
data['quantity'] = int(groups[1]) if len(groups) > 1 and groups[1] else 1
return data

View File

@ -1,107 +0,0 @@
"""
Window tracker for monitoring Entropia Universe window state.
"""
import time
import logging
import threading
from typing import Optional, Callable, List
class WindowTracker:
"""Tracks the EU game window state.
Monitors window focus, visibility, and position without
interfering with the game.
"""
def __init__(self, poll_interval: float = 0.5):
"""Initialize window tracker.
Args:
poll_interval: Seconds between window checks
"""
self.poll_interval = poll_interval
self._running = False
self._thread: Optional[threading.Thread] = None
self._callbacks: List[Callable] = []
self._logger = logging.getLogger("WindowTracker")
self._is_focused = False
self._window_handle: Optional[int] = None
self._process_id: Optional[int] = None
def start(self, process_id: Optional[int] = None) -> bool:
"""Start tracking window."""
if self._running:
return True
self._process_id = process_id
self._running = True
self._thread = threading.Thread(target=self._track_loop, daemon=True)
self._thread.start()
return True
def stop(self) -> None:
"""Stop tracking window."""
self._running = False
if self._thread:
self._thread.join(timeout=1.0)
def on_change(self, callback: Callable[[bool], None]) -> Callable:
"""Register focus change callback."""
self._callbacks.append(callback)
return callback
def is_focused(self) -> bool:
"""Check if game window is focused."""
return self._is_focused
def _track_loop(self) -> None:
"""Main tracking loop."""
while self._running:
try:
focused = self._check_focus()
if focused != self._is_focused:
self._is_focused = focused
self._notify_change(focused)
time.sleep(self.poll_interval)
except Exception as e:
self._logger.error(f"Tracking error: {e}")
time.sleep(self.poll_interval)
def _check_focus(self) -> bool:
"""Check if game window is focused."""
try:
import win32gui
import win32process
hwnd = win32gui.GetForegroundWindow()
if self._process_id:
_, pid = win32process.GetWindowThreadProcessId(hwnd)
return pid == self._process_id
else:
# Check window title
title = win32gui.GetWindowText(hwnd)
return 'entropia' in title.lower()
except ImportError:
pass
return False
def _notify_change(self, focused: bool) -> None:
"""Notify callbacks of focus change."""
for callback in self._callbacks:
try:
callback(focused)
except Exception as e:
self._logger.error(f"Callback error: {e}")

View File

@ -1,465 +0,0 @@
"""
EU-Utility Premium - Plugin API
================================
Plugin API surface for the plugin system.
This module defines the contracts that plugins must implement
to be loaded by the PluginManager.
Example:
from premium.plugins.api import PluginAPI, PluginManifest, PluginContext
class MyPlugin(PluginAPI):
manifest = PluginManifest(
name="My Plugin",
version="1.0.0",
author="Your Name"
)
def on_init(self, ctx: PluginContext):
self.ctx = ctx
ctx.logger.info("Plugin initialized!")
def on_activate(self):
self.ctx.logger.info("Plugin activated!")
"""
from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Type, TYPE_CHECKING
if TYPE_CHECKING:
from PyQt6.QtWidgets import QWidget
# =============================================================================
# PERMISSION LEVELS
# =============================================================================
class PermissionLevel(Enum):
"""Permission levels for plugin sandboxing.
Plugins must declare which permissions they need.
The user must approve these permissions before the plugin runs.
"""
FILE_READ = auto() # Read files from disk
FILE_WRITE = auto() # Write files to disk
NETWORK = auto() # Access network (API calls)
UI = auto() # Create/manipulate UI widgets
MEMORY = auto() # Access game memory (dangerous)
PROCESS = auto() # Access other processes
SYSTEM = auto() # System-level access (very dangerous)
# =============================================================================
# PLUGIN STATE
# =============================================================================
class PluginState(Enum):
"""Lifecycle states for plugins."""
DISCOVERED = auto() # Found but not loaded
LOADING = auto() # Currently loading
LOADED = auto() # Code loaded, not initialized
INITIALIZING = auto() # Currently initializing
INACTIVE = auto() # Initialized but not active
ACTIVATING = auto() # Currently activating
ACTIVE = auto() # Fully active and running
DEACTIVATING = auto() # Currently deactivating
UNLOADING = auto() # Currently unloading
UNLOADED = auto() # Unloaded from memory
ERROR = auto() # Error state
# =============================================================================
# PLUGIN MANIFEST
# =============================================================================
@dataclass
class PluginManifest:
"""Plugin manifest - metadata about a plugin.
This is loaded from plugin.json in the plugin directory.
Example plugin.json:
{
"name": "My Plugin",
"version": "1.0.0",
"author": "Your Name",
"description": "Does cool things",
"entry_point": "main.py",
"permissions": ["file_read", "ui"],
"dependencies": {
"other_plugin": ">=1.0.0"
}
}
"""
name: str
version: str
author: str
description: str = ""
entry_point: str = "main.py"
permissions: Set[PermissionLevel] = field(default_factory=set)
dependencies: Dict[str, str] = field(default_factory=dict)
min_api_version: str = "3.0.0"
tags: List[str] = field(default_factory=list)
homepage: str = ""
icon: str = ""
@classmethod
def from_json(cls, path: Path) -> PluginManifest:
"""Load manifest from JSON file."""
import json
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Parse permissions
permission_map = {
'file_read': PermissionLevel.FILE_READ,
'file_write': PermissionLevel.FILE_WRITE,
'network': PermissionLevel.NETWORK,
'ui': PermissionLevel.UI,
'memory': PermissionLevel.MEMORY,
'process': PermissionLevel.PROCESS,
'system': PermissionLevel.SYSTEM,
}
permissions = set()
for perm_str in data.get('permissions', []):
if perm_str in permission_map:
permissions.add(permission_map[perm_str])
return cls(
name=data['name'],
version=data['version'],
author=data.get('author', 'Unknown'),
description=data.get('description', ''),
entry_point=data.get('entry_point', 'main.py'),
permissions=permissions,
dependencies=data.get('dependencies', {}),
min_api_version=data.get('min_api_version', '3.0.0'),
tags=data.get('tags', []),
homepage=data.get('homepage', ''),
icon=data.get('icon', ''),
)
def to_json(self, path: Path) -> None:
"""Save manifest to JSON file."""
import json
# Convert permissions back to strings
permission_reverse_map = {
PermissionLevel.FILE_READ: 'file_read',
PermissionLevel.FILE_WRITE: 'file_write',
PermissionLevel.NETWORK: 'network',
PermissionLevel.UI: 'ui',
PermissionLevel.MEMORY: 'memory',
PermissionLevel.PROCESS: 'process',
PermissionLevel.SYSTEM: 'system',
}
data = {
'name': self.name,
'version': self.version,
'author': self.author,
'description': self.description,
'entry_point': self.entry_point,
'permissions': [permission_reverse_map[p] for p in self.permissions],
'dependencies': self.dependencies,
'min_api_version': self.min_api_version,
'tags': self.tags,
'homepage': self.homepage,
'icon': self.icon,
}
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
# =============================================================================
# PLUGIN CONTEXT
# =============================================================================
@dataclass
class PluginContext:
"""Context passed to plugins during initialization.
This provides plugins with access to system resources while
maintaining sandbox boundaries.
Attributes:
plugin_id: Unique plugin identifier
manifest: Plugin manifest
data_dir: Directory for plugin data storage
config: Plugin configuration dictionary
logger: Logger instance for this plugin
event_bus: Event bus for publishing/subscribing to events
state_store: State store for accessing global state
widget_api: API for creating UI widgets
nexus_api: API for Entropia Nexus data
permissions: Set of granted permissions
"""
plugin_id: str
manifest: PluginManifest
data_dir: Path
config: Dict[str, Any]
logger: logging.Logger
event_bus: Optional[Any] = None
state_store: Optional[Any] = None
widget_api: Optional[Any] = None
nexus_api: Optional[Any] = None
permissions: Set[PermissionLevel] = field(default_factory=set)
def has_permission(self, permission: PermissionLevel) -> bool:
"""Check if plugin has a specific permission."""
return permission in self.permissions
def require_permission(self, permission: PermissionLevel) -> None:
"""Require a permission or raise an error."""
if not self.has_permission(permission):
raise PluginPermissionError(
f"Plugin '{self.manifest.name}' requires permission: {permission.name}"
)
# =============================================================================
# PLUGIN INSTANCE
# =============================================================================
@dataclass
class PluginInstance:
"""Represents a loaded plugin instance.
Tracks the lifecycle state and metadata of a plugin.
"""
plugin_id: str
manifest: PluginManifest
state: PluginState = PluginState.DISCOVERED
instance: Optional[PluginAPI] = None
load_time: Optional[datetime] = None
activate_time: Optional[datetime] = None
error_message: Optional[str] = None
error_traceback: Optional[str] = None
def is_active(self) -> bool:
"""Check if plugin is currently active."""
return self.state == PluginState.ACTIVE
def has_error(self) -> bool:
"""Check if plugin is in error state."""
return self.state == PluginState.ERROR
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'plugin_id': self.plugin_id,
'name': self.manifest.name,
'version': self.manifest.version,
'state': self.state.name,
'is_active': self.is_active(),
'has_error': self.has_error(),
'load_time': self.load_time.isoformat() if self.load_time else None,
'activate_time': self.activate_time.isoformat() if self.activate_time else None,
'error_message': self.error_message,
}
# =============================================================================
# PLUGIN ERRORS
# =============================================================================
class PluginError(Exception):
"""Base exception for plugin-related errors."""
pass
class PluginLoadError(PluginError):
"""Error loading a plugin (invalid code, missing files, etc)."""
pass
class PluginInitError(PluginError):
"""Error initializing a plugin."""
pass
class PluginPermissionError(PluginError):
"""Plugin tried to use a permission it doesn't have."""
pass
class PluginDependencyError(PluginError):
"""Error with plugin dependencies."""
pass
class PluginVersionError(PluginError):
"""Error with plugin version compatibility."""
pass
class PluginAPIError(PluginError):
"""Error with plugin API usage."""
pass
# =============================================================================
# PLUGIN API BASE CLASS
# =============================================================================
class PluginAPI(ABC):
"""Base class for all plugins.
Plugins must inherit from this class and implement the lifecycle methods.
Example:
class MyPlugin(PluginAPI):
manifest = PluginManifest(
name="My Plugin",
version="1.0.0",
author="Your Name"
)
def on_init(self, ctx: PluginContext):
self.ctx = ctx
self.config = ctx.config
def on_activate(self):
# Start doing work
pass
def on_deactivate(self):
# Stop doing work
pass
def on_shutdown(self):
# Cleanup resources
pass
def create_widget(self) -> Optional[QWidget]:
# Return UI widget for dashboard
return None
"""
# Must be defined by subclass
manifest: PluginManifest
def __init__(self):
self.ctx: Optional[PluginContext] = None
self._initialized = False
self._active = False
def _set_context(self, ctx: PluginContext) -> None:
"""Set the plugin context (called by PluginManager)."""
self.ctx = ctx
# ========== Lifecycle Methods ==========
@abstractmethod
def on_init(self, ctx: PluginContext) -> None:
"""Called when plugin is initialized.
Use this to set up initial state, load config, etc.
Don't start any background work here - use on_activate for that.
Args:
ctx: Plugin context with resources and configuration
"""
pass
def on_activate(self) -> None:
"""Called when plugin is activated.
Start background tasks, register event handlers, etc.
"""
pass
def on_deactivate(self) -> None:
"""Called when plugin is deactivated.
Stop background tasks, unregister event handlers.
"""
pass
def on_shutdown(self) -> None:
"""Called when plugin is being unloaded.
Clean up all resources, save state, etc.
"""
pass
# ========== UI Methods ==========
def create_widget(self) -> Optional[Any]:
"""Create a widget for the dashboard.
Returns:
QWidget or None if plugin has no UI
"""
return None
def get_settings_widget(self) -> Optional[Any]:
"""Create a settings widget.
Returns:
QWidget or None if plugin has no settings
"""
return None
# ========== Utility Methods ==========
def log(self, level: str, message: str) -> None:
"""Log a message through the plugin's logger."""
if self.ctx and self.ctx.logger:
getattr(self.ctx.logger, level.lower(), self.ctx.logger.info)(message)
def emit_event(self, event_type: str, data: Dict[str, Any]) -> None:
"""Emit an event to the event bus."""
if self.ctx and self.ctx.event_bus:
self.ctx.event_bus.emit(event_type, data, source=self.ctx.plugin_id)
def save_config(self) -> bool:
"""Save plugin configuration to disk."""
if not self.ctx:
return False
config_path = self.ctx.data_dir / "config.json"
try:
import json
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(self.ctx.config, f, indent=2)
return True
except Exception as e:
self.log('error', f"Failed to save config: {e}")
return False
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
# Permissions
'PermissionLevel',
# State
'PluginState',
# Manifest
'PluginManifest',
# Context
'PluginContext',
# Instance
'PluginInstance',
# Errors
'PluginError', 'PluginLoadError', 'PluginInitError',
'PluginPermissionError', 'PluginDependencyError',
'PluginVersionError', 'PluginAPIError',
# Base class
'PluginAPI',
]

View File

@ -1,814 +0,0 @@
"""
EU-Utility Premium - Plugin Manager
====================================
Enterprise-grade plugin manager with:
- Sandboxed execution
- Dynamic loading/unloading
- Dependency resolution
- Lifecycle management
- Security validation
Example:
manager = PluginManager()
manager.discover_plugins(Path("./plugins"))
manager.load_all()
# Activate specific plugin
manager.activate_plugin("my_plugin")
"""
from __future__ import annotations
import asyncio
import hashlib
import importlib.util
import json
import logging
import os
import sys
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Type, Union
from uuid import uuid4
from premium.plugins.api import (
PluginAPI, PluginContext, PluginError, PluginInstance, PluginLoadError,
PluginInitError, PluginManifest, PluginPermissionError, PluginState,
PermissionLevel, PluginDependencyError, PluginVersionError
)
# API version for compatibility checking
API_VERSION = "3.0.0"
# =============================================================================
# PLUGIN SANDBOX
# =============================================================================
class PluginSandbox:
"""Sandbox for plugin execution.
Restricts plugin access based on permissions.
"""
def __init__(self, plugin_id: str, permissions: Set[PermissionLevel]):
self.plugin_id = plugin_id
self.permissions = permissions
self._original_open = open
self._restricted_paths: Set[Path] = set()
def can_access_file(self, path: Path, mode: str = 'r') -> bool:
"""Check if plugin can access a file."""
if 'w' in mode or 'a' in mode or 'x' in mode:
if PermissionLevel.FILE_WRITE not in self.permissions:
return False
if PermissionLevel.FILE_READ not in self.permissions:
return False
# Check restricted paths
resolved = path.resolve()
for restricted in self._restricted_paths:
try:
resolved.relative_to(restricted)
return False
except ValueError:
pass
return True
def can_access_network(self) -> bool:
"""Check if plugin can access network."""
return PermissionLevel.NETWORK in self.permissions
def can_access_ui(self) -> bool:
"""Check if plugin can manipulate UI."""
return PermissionLevel.UI in self.permissions
def can_access_memory(self) -> bool:
"""Check if plugin can access memory (dangerous)."""
return PermissionLevel.MEMORY in self.permissions
# =============================================================================
# PLUGIN LOADER
# =============================================================================
class PluginLoader:
"""Loads plugin modules with validation and security checks."""
def __init__(self, sandbox: Optional[PluginSandbox] = None):
self.sandbox = sandbox
self._loaded_modules: Dict[str, Any] = {}
def load_plugin_class(
self,
plugin_path: Path,
manifest: PluginManifest
) -> Type[PluginAPI]:
"""Load a plugin class from a directory.
Args:
plugin_path: Path to plugin directory
manifest: Plugin manifest
Returns:
PluginAPI subclass
Raises:
PluginLoadError: If loading fails
"""
entry_file = plugin_path / manifest.entry_point
if not entry_file.exists():
raise PluginLoadError(
f"Entry point not found: {manifest.entry_point}"
)
# Validate file hash for security
file_hash = self._compute_hash(entry_file)
# Create unique module name
module_name = f"premium_plugin_{manifest.name.lower().replace(' ', '_')}_{file_hash[:8]}"
try:
# Load module
spec = importlib.util.spec_from_file_location(
module_name, entry_file
)
if spec is None or spec.loader is None:
raise PluginLoadError("Failed to create module spec")
module = importlib.util.module_from_spec(spec)
# Add to sys.modules temporarily
sys.modules[module_name] = module
try:
spec.loader.exec_module(module)
except Exception as e:
raise PluginLoadError(f"Failed to execute module: {e}")
# Find PluginAPI subclass
plugin_class = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type) and
issubclass(attr, PluginAPI) and
attr is not PluginAPI and
not attr.__name__.startswith('Base')
):
plugin_class = attr
break
if plugin_class is None:
raise PluginLoadError(
f"No PluginAPI subclass found in {manifest.entry_point}"
)
# Validate manifest matches
if not hasattr(plugin_class, 'manifest'):
raise PluginLoadError("Plugin class missing manifest attribute")
return plugin_class
except PluginLoadError:
raise
except Exception as e:
raise PluginLoadError(f"Unexpected error loading plugin: {e}")
def _compute_hash(self, file_path: Path) -> str:
"""Compute SHA256 hash of a file."""
h = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
h.update(chunk)
return h.hexdigest()
# =============================================================================
# DEPENDENCY RESOLVER
# =============================================================================
@dataclass
class DependencyNode:
"""Node in dependency graph."""
plugin_id: str
manifest: PluginManifest
dependencies: Set[str] = field(default_factory=set)
dependents: Set[str] = field(default_factory=set)
resolved: bool = False
class DependencyResolver:
"""Resolves plugin dependencies using topological sort."""
def __init__(self):
self._nodes: Dict[str, DependencyNode] = {}
def add_plugin(self, plugin_id: str, manifest: PluginManifest) -> None:
"""Add a plugin to the dependency graph."""
if plugin_id not in self._nodes:
self._nodes[plugin_id] = DependencyNode(
plugin_id=plugin_id,
manifest=manifest
)
node = self._nodes[plugin_id]
# Add dependencies
for dep_id in manifest.dependencies.keys():
node.dependencies.add(dep_id)
if dep_id not in self._nodes:
self._nodes[dep_id] = DependencyNode(
plugin_id=dep_id,
manifest=PluginManifest(
name=dep_id,
version="0.0.0",
author="Unknown"
)
)
self._nodes[dep_id].dependents.add(plugin_id)
def resolve_load_order(self, plugin_ids: List[str]) -> List[str]:
"""Resolve plugin load order using topological sort.
Returns:
List of plugin IDs in load order
Raises:
PluginDependencyError: If circular dependency detected
"""
order: List[str] = []
visited: Set[str] = set()
temp_mark: Set[str] = set()
def visit(node_id: str, path: List[str]) -> None:
if node_id in temp_mark:
cycle = " -> ".join(path + [node_id])
raise PluginDependencyError(f"Circular dependency: {cycle}")
if node_id in visited:
return
temp_mark.add(node_id)
path.append(node_id)
node = self._nodes.get(node_id)
if node:
for dep_id in node.dependencies:
visit(dep_id, path.copy())
temp_mark.remove(node_id)
visited.add(node_id)
order.append(node_id)
for plugin_id in plugin_ids:
if plugin_id not in visited:
visit(plugin_id, [])
return order
def get_dependents(self, plugin_id: str) -> Set[str]:
"""Get all plugins that depend on a given plugin."""
node = self._nodes.get(plugin_id)
if node:
return node.dependents.copy()
return set()
def check_conflicts(self) -> List[str]:
"""Check for version conflicts in dependencies."""
conflicts = []
# TODO: Implement version conflict checking
return conflicts
# =============================================================================
# PLUGIN MANAGER
# =============================================================================
class PluginManager:
"""Manages plugin lifecycle with sandboxing and dependency resolution.
This is the main entry point for plugin management. It handles:
- Discovery of plugins in directories
- Loading and unloading
- Dependency resolution
- Lifecycle state management
- Security sandboxing
Example:
manager = PluginManager(
plugin_dirs=[Path("./plugins"), Path("~/.eu-utility/plugins")],
data_dir=Path("~/.eu-utility/data")
)
manager.discover_all()
manager.load_all()
# Get active plugins
for plugin_id, instance in manager.get_active_plugins().items():
print(f"{plugin_id}: {instance.manifest.name}")
"""
def __init__(
self,
plugin_dirs: Optional[List[Path]] = None,
data_dir: Optional[Path] = None,
event_bus: Optional[Any] = None,
state_store: Optional[Any] = None,
widget_api: Optional[Any] = None,
nexus_api: Optional[Any] = None,
max_workers: int = 4
):
"""Initialize plugin manager.
Args:
plugin_dirs: Directories to search for plugins
data_dir: Directory for plugin data storage
event_bus: Event bus for plugin communication
state_store: State store for plugins
widget_api: Widget API for UI plugins
nexus_api: Nexus API for Entropia data
max_workers: Max worker threads for background tasks
"""
self.plugin_dirs = plugin_dirs or []
self.data_dir = data_dir or Path.home() / ".eu-utility" / "data"
self.event_bus = event_bus
self.state_store = state_store
self.widget_api = widget_api
self.nexus_api = nexus_api
# Ensure data directory exists
self.data_dir.mkdir(parents=True, exist_ok=True)
# Plugin storage
self._instances: Dict[str, PluginInstance] = {}
self._classes: Dict[str, Type[PluginAPI]] = {}
self._paths: Dict[str, Path] = {}
# Support systems
self._loader = PluginLoader()
self._resolver = DependencyResolver()
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._logger = logging.getLogger("PluginManager")
# State
self._discovered = False
self._lock = threading.RLock()
# ========== Discovery ==========
def discover_plugins(self, directory: Path) -> List[str]:
"""Discover plugins in a directory.
Args:
directory: Directory to search for plugins
Returns:
List of discovered plugin IDs
"""
discovered: List[str] = []
if not directory.exists():
self._logger.warning(f"Plugin directory does not exist: {directory}")
return discovered
for item in directory.iterdir():
if not item.is_dir():
continue
if item.name.startswith('.') or item.name.startswith('__'):
continue
manifest_path = item / "plugin.json"
if not manifest_path.exists():
continue
try:
manifest = PluginManifest.from_json(manifest_path)
plugin_id = self._generate_plugin_id(manifest, item)
with self._lock:
self._paths[plugin_id] = item
self._resolver.add_plugin(plugin_id, manifest)
discovered.append(plugin_id)
self._logger.debug(f"Discovered plugin: {manifest.name} ({plugin_id})")
except Exception as e:
self._logger.error(f"Failed to load manifest from {item}: {e}")
return discovered
def discover_all(self) -> int:
"""Discover plugins in all configured directories.
Returns:
Total number of plugins discovered
"""
total = 0
for directory in self.plugin_dirs:
total += len(self.discover_plugins(directory))
self._discovered = True
self._logger.info(f"Discovered {total} plugins")
return total
# ========== Loading ==========
def load_plugin(self, plugin_id: str) -> bool:
"""Load a plugin by ID.
Args:
plugin_id: Unique plugin identifier
Returns:
True if loaded successfully
"""
with self._lock:
if plugin_id in self._instances:
return True
path = self._paths.get(plugin_id)
if not path:
self._logger.error(f"Plugin not found: {plugin_id}")
return False
manifest_path = path / "plugin.json"
manifest = PluginManifest.from_json(manifest_path)
# Create instance record
instance = PluginInstance(
plugin_id=plugin_id,
manifest=manifest,
state=PluginState.LOADING
)
self._instances[plugin_id] = instance
try:
# Load plugin class
plugin_class = self._loader.load_plugin_class(path, manifest)
self._classes[plugin_id] = plugin_class
with self._lock:
instance.state = PluginState.LOADED
instance.load_time = datetime.now()
self._logger.info(f"Loaded plugin: {manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
instance.error_traceback = traceback.format_exc()
self._logger.error(f"Failed to load plugin {manifest.name}: {e}")
return False
def load_all(self, auto_activate: bool = False) -> Dict[str, bool]:
"""Load all discovered plugins.
Args:
auto_activate: Automatically activate loaded plugins
Returns:
Dict mapping plugin IDs to success status
"""
if not self._discovered:
self.discover_all()
# Resolve load order
plugin_ids = list(self._paths.keys())
try:
load_order = self._resolver.resolve_load_order(plugin_ids)
except PluginDependencyError as e:
self._logger.error(f"Dependency resolution failed: {e}")
load_order = plugin_ids # Fall back to default order
# Load in order
results: Dict[str, bool] = {}
for plugin_id in load_order:
results[plugin_id] = self.load_plugin(plugin_id)
if auto_activate and results[plugin_id]:
self.activate_plugin(plugin_id)
return results
# ========== Initialization ==========
def init_plugin(self, plugin_id: str, config: Optional[Dict[str, Any]] = None) -> bool:
"""Initialize a loaded plugin.
Args:
plugin_id: Plugin ID
config: Optional configuration override
Returns:
True if initialized successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
self._logger.error(f"Plugin not loaded: {plugin_id}")
return False
if instance.state not in (PluginState.LOADED, PluginState.INACTIVE):
self._logger.warning(f"Cannot initialize plugin in state: {instance.state}")
return False
plugin_class = self._classes.get(plugin_id)
if not plugin_class:
self._logger.error(f"Plugin class not found: {plugin_id}")
return False
instance.state = PluginState.INITIALIZING
try:
# Create plugin directory for data
plugin_data_dir = self.data_dir / plugin_id
plugin_data_dir.mkdir(parents=True, exist_ok=True)
# Load saved config or use provided
saved_config = self._load_plugin_config(plugin_id)
if config:
saved_config.update(config)
# Create logger
logger = logging.getLogger(f"Plugin.{instance.manifest.name}")
# Create sandbox
sandbox = PluginSandbox(plugin_id, instance.manifest.permissions)
# Create context
ctx = PluginContext(
plugin_id=plugin_id,
manifest=instance.manifest,
data_dir=plugin_data_dir,
config=saved_config,
logger=logger,
event_bus=self.event_bus,
state_store=self.state_store,
widget_api=self.widget_api,
nexus_api=self.nexus_api,
permissions=instance.manifest.permissions
)
# Create and initialize plugin instance
plugin = plugin_class()
plugin._set_context(ctx)
plugin.on_init(ctx)
with self._lock:
instance.instance = plugin
instance.state = PluginState.INACTIVE
self._logger.info(f"Initialized plugin: {instance.manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
instance.error_traceback = traceback.format_exc()
self._logger.error(f"Failed to initialize plugin {plugin_id}: {e}")
return False
# ========== Activation ==========
def activate_plugin(self, plugin_id: str) -> bool:
"""Activate an initialized plugin.
Args:
plugin_id: Plugin ID
Returns:
True if activated successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
return False
if instance.state == PluginState.ACTIVE:
return True
if instance.state != PluginState.INACTIVE:
# Try to initialize first
if instance.state == PluginState.LOADED:
self.init_plugin(plugin_id)
if instance.state != PluginState.INACTIVE:
return False
instance.state = PluginState.ACTIVATING
plugin = instance.instance
try:
plugin.on_activate()
with self._lock:
instance.state = PluginState.ACTIVE
instance.activate_time = datetime.now()
self._logger.info(f"Activated plugin: {instance.manifest.name}")
return True
except Exception as e:
with self._lock:
instance.state = PluginState.ERROR
instance.error_message = str(e)
self._logger.error(f"Failed to activate plugin {plugin_id}: {e}")
return False
def deactivate_plugin(self, plugin_id: str) -> bool:
"""Deactivate an active plugin.
Args:
plugin_id: Plugin ID
Returns:
True if deactivated successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance or instance.state != PluginState.ACTIVE:
return False
instance.state = PluginState.DEACTIVATING
plugin = instance.instance
try:
plugin.on_deactivate()
with self._lock:
instance.state = PluginState.INACTIVE
self._logger.info(f"Deactivated plugin: {instance.manifest.name}")
return True
except Exception as e:
self._logger.error(f"Error deactivating plugin {plugin_id}: {e}")
return False
# ========== Unloading ==========
def unload_plugin(self, plugin_id: str, force: bool = False) -> bool:
"""Unload a plugin.
Args:
plugin_id: Plugin ID
force: Force unload even if dependents exist
Returns:
True if unloaded successfully
"""
with self._lock:
instance = self._instances.get(plugin_id)
if not instance:
return True
# Check dependents
if not force:
dependents = self._resolver.get_dependents(plugin_id)
active_dependents = [
d for d in dependents
if d in self._instances and self._instances[d].is_active()
]
if active_dependents:
self._logger.error(
f"Cannot unload {plugin_id}: active dependents {active_dependents}"
)
return False
# Deactivate if active
if instance.state == PluginState.ACTIVE:
self.deactivate_plugin(plugin_id)
instance.state = PluginState.UNLOADING
plugin = instance.instance
try:
if plugin:
# Save config before shutdown
self._save_plugin_config(plugin_id, plugin.ctx.config)
plugin.on_shutdown()
with self._lock:
instance.state = PluginState.UNLOADED
instance.instance = None
del self._instances[plugin_id]
del self._classes[plugin_id]
self._logger.info(f"Unloaded plugin: {instance.manifest.name}")
return True
except Exception as e:
self._logger.error(f"Error unloading plugin {plugin_id}: {e}")
return False
def unload_all(self) -> None:
"""Unload all plugins in reverse dependency order."""
# Get active plugins in reverse load order
plugin_ids = list(self._instances.keys())
for plugin_id in reversed(plugin_ids):
self.unload_plugin(plugin_id, force=True)
# ========== Queries ==========
def get_instance(self, plugin_id: str) -> Optional[PluginInstance]:
"""Get plugin instance info."""
return self._instances.get(plugin_id)
def get_plugin(self, plugin_id: str) -> Optional[PluginAPI]:
"""Get active plugin instance."""
instance = self._instances.get(plugin_id)
if instance and instance.state == PluginState.ACTIVE:
return instance.instance
return None
def get_all_instances(self) -> Dict[str, PluginInstance]:
"""Get all plugin instances."""
return self._instances.copy()
def get_active_plugins(self) -> Dict[str, PluginInstance]:
"""Get all active plugins."""
return {
k: v for k, v in self._instances.items()
if v.state == PluginState.ACTIVE
}
def get_plugin_ui(self, plugin_id: str) -> Optional[Any]:
"""Get plugin's UI widget."""
plugin = self.get_plugin(plugin_id)
if plugin:
return plugin.create_widget()
return None
def get_plugin_states(self) -> Dict[str, PluginState]:
"""Get state of all plugins."""
return {
k: v.state for k, v in self._instances.items()
}
# ========== Configuration ==========
def _load_plugin_config(self, plugin_id: str) -> Dict[str, Any]:
"""Load plugin configuration from disk."""
config_path = self.data_dir / plugin_id / "config.json"
if config_path.exists():
try:
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
self._logger.error(f"Failed to load config for {plugin_id}: {e}")
return {}
def _save_plugin_config(self, plugin_id: str, config: Dict[str, Any]) -> None:
"""Save plugin configuration to disk."""
config_path = self.data_dir / plugin_id / "config.json"
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2)
except Exception as e:
self._logger.error(f"Failed to save config for {plugin_id}: {e}")
# ========== Utility ==========
def _generate_plugin_id(self, manifest: PluginManifest, path: Path) -> str:
"""Generate unique plugin ID."""
# Use path hash for uniqueness
path_hash = hashlib.md5(str(path).encode()).hexdigest()[:8]
name_slug = manifest.name.lower().replace(' ', '_').replace('-', '_')
return f"{name_slug}_{path_hash}"
def shutdown(self) -> None:
"""Shutdown the plugin manager."""
self.unload_all()
self._executor.shutdown(wait=True)
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
'PluginManager',
'PluginLoader',
'DependencyResolver',
'PluginSandbox',
]

View File

@ -1,378 +0,0 @@
"""
EU-Utility Premium - Widget System
===================================
Dashboard widget system for creating plugin UIs.
Example:
from premium.widgets import Widget, WidgetConfig
class MyWidget(Widget):
def __init__(self):
super().__init__(WidgetConfig(
name="My Widget",
icon="📊",
size=(300, 200)
))
def create_ui(self, parent):
# Create and return your widget
label = QLabel("Hello World", parent)
return label
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# =============================================================================
# WIDGET CONFIG
# =============================================================================
class WidgetSize(Enum):
"""Standard widget sizes."""
SMALL = (150, 100)
MEDIUM = (300, 200)
LARGE = (450, 300)
WIDE = (600, 200)
TALL = (300, 400)
FULL = (600, 400)
@dataclass
class WidgetConfig:
"""Configuration for a widget.
Attributes:
name: Display name of the widget
icon: Emoji or icon name
size: Widget size tuple (width, height)
resizable: Whether widget can be resized
collapsible: Whether widget can be collapsed
refresh_interval: Auto-refresh interval in seconds (0 = disabled)
settings: Widget-specific settings
"""
name: str = "Widget"
icon: str = "📦"
size: Tuple[int, int] = (300, 200)
resizable: bool = True
collapsible: bool = True
refresh_interval: float = 0.0
settings: Dict[str, Any] = field(default_factory=dict)
category: str = "General"
description: str = ""
author: str = ""
version: str = "1.0.0"
# =============================================================================
# WIDGET BASE CLASS
# =============================================================================
class Widget(ABC):
"""Base class for dashboard widgets.
Widgets are the UI components that plugins can create.
They are displayed in the dashboard overlay.
Example:
class StatsWidget(Widget):
def __init__(self):
super().__init__(WidgetConfig(
name="Player Stats",
icon="👤",
size=WidgetSize.MEDIUM.value
))
def create_ui(self, parent):
self.widget = QWidget(parent)
layout = QVBoxLayout(self.widget)
self.label = QLabel("Loading...")
layout.addWidget(self.label)
return self.widget
def update(self, data):
self.label.setText(f"Level: {data['level']}")
"""
def __init__(self, config: WidgetConfig):
self.config = config
self._ui: Optional[Any] = None
self._visible = True
self._collapsed = False
self._data: Dict[str, Any] = {}
self._refresh_timer = None
self._callbacks: Dict[str, List[Callable]] = {
'update': [],
'resize': [],
'collapse': [],
'close': [],
}
# ========== Abstract Methods ==========
@abstractmethod
def create_ui(self, parent: Any) -> Any:
"""Create the widget UI.
Args:
parent: Parent widget (QWidget)
Returns:
The created widget
"""
pass
# ========== Lifecycle Methods ==========
def initialize(self, parent: Any) -> Any:
"""Initialize the widget with a parent."""
self._ui = self.create_ui(parent)
if self.config.refresh_interval > 0:
self._start_refresh_timer()
return self._ui
def destroy(self) -> None:
"""Destroy the widget and clean up resources."""
self._stop_refresh_timer()
self._emit('close')
if self._ui:
self._ui.deleteLater()
self._ui = None
# ========== Update Methods ==========
def update(self, data: Dict[str, Any]) -> None:
"""Update widget with new data.
Override this to update your widget's display.
"""
self._data.update(data)
self._emit('update', data)
self.on_update(data)
def on_update(self, data: Dict[str, Any]) -> None:
"""Called when widget receives new data.
Override this instead of update() for custom behavior.
"""
pass
def refresh(self) -> None:
"""Manually trigger a refresh."""
self.on_refresh()
def on_refresh(self) -> None:
"""Called when widget should refresh its display."""
pass
# ========== State Methods ==========
def show(self) -> None:
"""Show the widget."""
self._visible = True
if self._ui:
self._ui.show()
def hide(self) -> None:
"""Hide the widget."""
self._visible = False
if self._ui:
self._ui.hide()
def collapse(self) -> None:
"""Collapse the widget."""
self._collapsed = True
self._emit('collapse', True)
if self._ui:
self._ui.setMaximumHeight(40)
def expand(self) -> None:
"""Expand the widget."""
self._collapsed = False
self._emit('collapse', False)
if self._ui:
self._ui.setMaximumHeight(self.config.size[1])
def toggle_collapse(self) -> None:
"""Toggle collapsed state."""
if self._collapsed:
self.expand()
else:
self.collapse()
# ========== Property Methods ==========
@property
def visible(self) -> bool:
"""Whether widget is visible."""
return self._visible
@property
def collapsed(self) -> bool:
"""Whether widget is collapsed."""
return self._collapsed
@property
def data(self) -> Dict[str, Any]:
"""Current widget data."""
return self._data.copy()
@property
def ui(self) -> Optional[Any]:
"""The UI widget."""
return self._ui
# ========== Event Handling ==========
def on(self, event: str, callback: Callable) -> Callable:
"""Subscribe to a widget event."""
if event in self._callbacks:
self._callbacks[event].append(callback)
return callback
def _emit(self, event: str, *args, **kwargs) -> None:
"""Emit an event to subscribers."""
for callback in self._callbacks.get(event, []):
try:
callback(*args, **kwargs)
except Exception as e:
print(f"Error in widget callback: {e}")
# ========== Timer Methods ==========
def _start_refresh_timer(self) -> None:
"""Start auto-refresh timer."""
if self.config.refresh_interval <= 0:
return
try:
from PyQt6.QtCore import QTimer
self._refresh_timer = QTimer()
self._refresh_timer.timeout.connect(self.refresh)
self._refresh_timer.start(int(self.config.refresh_interval * 1000))
except ImportError:
pass
def _stop_refresh_timer(self) -> None:
"""Stop auto-refresh timer."""
if self._refresh_timer:
self._refresh_timer.stop()
self._refresh_timer = None
# =============================================================================
# DASHBOARD
# =============================================================================
class Dashboard:
"""Dashboard for managing widgets.
The dashboard is the container for all plugin widgets.
It manages layout, visibility, and widget lifecycle.
"""
def __init__(self, parent: Any = None):
self.parent = parent
self._widgets: Dict[str, Widget] = {}
self._layout: Optional[Any] = None
self._visible = False
self._position: Tuple[int, int] = (100, 100)
def initialize(self, parent: Any) -> Any:
"""Initialize the dashboard."""
try:
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QScrollArea
self.container = QWidget(parent)
self.container.setWindowTitle("EU-Utility Dashboard")
self.container.setGeometry(*self._position, 350, 600)
scroll = QScrollArea(self.container)
scroll.setWidgetResizable(True)
self.widget_container = QWidget()
self._layout = QVBoxLayout(self.widget_container)
self._layout.setSpacing(10)
self._layout.addStretch()
scroll.setWidget(self.widget_container)
layout = QVBoxLayout(self.container)
layout.addWidget(scroll)
return self.container
except ImportError:
return None
def add_widget(self, widget_id: str, widget: Widget) -> bool:
"""Add a widget to the dashboard."""
if widget_id in self._widgets:
return False
self._widgets[widget_id] = widget
if self._layout:
ui = widget.initialize(self.widget_container)
if ui:
# Insert before the stretch
self._layout.insertWidget(self._layout.count() - 1, ui)
return True
def remove_widget(self, widget_id: str) -> bool:
"""Remove a widget from the dashboard."""
widget = self._widgets.pop(widget_id, None)
if widget:
widget.destroy()
return True
return False
def get_widget(self, widget_id: str) -> Optional[Widget]:
"""Get a widget by ID."""
return self._widgets.get(widget_id)
def show(self) -> None:
"""Show the dashboard."""
self._visible = True
if hasattr(self, 'container') and self.container:
self.container.show()
def hide(self) -> None:
"""Hide the dashboard."""
self._visible = False
if hasattr(self, 'container') and self.container:
self.container.hide()
def toggle(self) -> None:
"""Toggle dashboard visibility."""
if self._visible:
self.hide()
else:
self.show()
def get_all_widgets(self) -> Dict[str, Widget]:
"""Get all widgets."""
return self._widgets.copy()
# =============================================================================
# EXPORTS
# =============================================================================
__all__ = [
'WidgetSize',
'WidgetConfig',
'Widget',
'Dashboard',
]

View File

@ -1,20 +0,0 @@
"""
EU-Utility Premium - Base Widget Components
============================================
Base widget classes and components for building custom widgets.
"""
from .dashboard_widget import DashboardWidget, WidgetHeader, WidgetContent
from .metrics_card import MetricsCard, StatItem
from .chart_widget import ChartWidget, ChartType
__all__ = [
'DashboardWidget',
'WidgetHeader',
'WidgetContent',
'MetricsCard',
'StatItem',
'ChartWidget',
'ChartType',
]

View File

@ -1,145 +0,0 @@
"""
Chart widget for displaying data visualizations.
"""
from enum import Enum
from typing import List, Dict, Any, Optional
try:
from PyQt6.QtWidgets import QWidget, QVBoxLayout
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QPainter, QColor, QPen, QBrush, QFont
HAS_QT = True
except ImportError:
HAS_QT = False
QWidget = object
class ChartType(Enum):
LINE = "line"
BAR = "bar"
PIE = "pie"
class ChartWidget(QWidget if HAS_QT else object):
"""Simple chart widget."""
def __init__(
self,
chart_type: ChartType = ChartType.LINE,
title: str = "Chart",
parent=None
):
if not HAS_QT:
return
super().__init__(parent)
self.chart_type = chart_type
self.title = title
self._data: List[Dict[str, Any]] = []
self._max_points = 50
self._y_max = 100
self.setMinimumHeight(150)
self.setStyleSheet("background-color: #2d2d2d; border-radius: 8px;")
def add_point(self, x: Any, y: float, label: Optional[str] = None):
"""Add a data point."""
self._data.append({'x': x, 'y': y, 'label': label})
# Limit points
if len(self._data) > self._max_points:
self._data = self._data[-self._max_points:]
# Update scale
if y > self._y_max:
self._y_max = y * 1.1
if HAS_QT:
self.update()
def set_data(self, data: List[Dict[str, Any]]):
"""Set all data points."""
self._data = data
if data:
self._y_max = max(d['y'] for d in data) * 1.1
if HAS_QT:
self.update()
def clear(self):
"""Clear all data."""
self._data = []
if HAS_QT:
self.update()
def paintEvent(self, event):
if not HAS_QT or not self._data:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
width = self.width()
height = self.height()
padding = 40
chart_width = width - padding * 2
chart_height = height - padding * 2
# Background
painter.fillRect(self.rect(), QColor(45, 45, 45))
# Draw title
painter.setPen(QColor(255, 255, 255))
painter.setFont(QFont("Arial", 10, QFont.Weight.Bold))
painter.drawText(10, 20, self.title)
if len(self._data) < 2:
return
# Draw chart based on type
if self.chart_type == ChartType.LINE:
self._draw_line_chart(painter, padding, chart_width, chart_height)
elif self.chart_type == ChartType.BAR:
self._draw_bar_chart(painter, padding, chart_width, chart_height)
def _draw_line_chart(self, painter, padding, width, height):
"""Draw a line chart."""
if len(self._data) < 2:
return
pen = QPen(QColor(76, 175, 80))
pen.setWidth(2)
painter.setPen(pen)
x_step = width / max(len(self._data) - 1, 1)
# Draw line
points = []
for i, point in enumerate(self._data):
x = padding + i * x_step
y = padding + height - (point['y'] / self._y_max * height)
points.append((x, y))
for i in range(len(points) - 1):
painter.drawLine(int(points[i][0]), int(points[i][1]),
int(points[i+1][0]), int(points[i+1][1]))
# Draw points
painter.setBrush(QColor(76, 175, 80))
for x, y in points:
painter.drawEllipse(int(x - 3), int(y - 3), 6, 6)
def _draw_bar_chart(self, painter, padding, width, height):
"""Draw a bar chart."""
bar_width = width / len(self._data) * 0.8
gap = width / len(self._data) * 0.2
painter.setBrush(QColor(76, 175, 80))
painter.setPen(Qt.PenStyle.NoPen)
for i, point in enumerate(self._data):
bar_height = point['y'] / self._y_max * height
x = padding + i * (bar_width + gap) + gap / 2
y = padding + height - bar_height
painter.drawRect(int(x), int(y), int(bar_width), int(bar_height))

View File

@ -1,194 +0,0 @@
"""
Base widget components for the dashboard.
"""
from typing import Any, Optional, Callable
try:
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton,
QFrame, QSizePolicy
)
from PyQt6.QtCore import Qt, pyqtSignal
from PyQt6.QtGui import QFont
HAS_QT = True
except ImportError:
HAS_QT = False
QWidget = object
pyqtSignal = lambda *a, **k: None
class WidgetHeader(QFrame if HAS_QT else object):
"""Header component for widgets with title and controls."""
collapsed_changed = pyqtSignal(bool) if HAS_QT else None
close_requested = pyqtSignal() if HAS_QT else None
def __init__(self, title: str = "Widget", icon: str = "📦", parent=None):
if not HAS_QT:
return
super().__init__(parent)
self._collapsed = False
self._setup_ui(title, icon)
def _setup_ui(self, title: str, icon: str):
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setStyleSheet("""
WidgetHeader {
background-color: #2d2d2d;
border-radius: 8px 8px 0 0;
padding: 8px;
}
""")
layout = QHBoxLayout(self)
layout.setContentsMargins(10, 5, 10, 5)
layout.setSpacing(10)
# Icon
self.icon_label = QLabel(icon)
self.icon_label.setStyleSheet("font-size: 16px;")
layout.addWidget(self.icon_label)
# Title
self.title_label = QLabel(title)
font = QFont()
font.setBold(True)
self.title_label.setFont(font)
self.title_label.setStyleSheet("color: #ffffff;")
layout.addWidget(self.title_label)
layout.addStretch()
# Collapse button
self.collapse_btn = QPushButton("")
self.collapse_btn.setFixedSize(24, 24)
self.collapse_btn.setStyleSheet("""
QPushButton {
background-color: #3d3d3d;
color: #ffffff;
border: none;
border-radius: 4px;
}
QPushButton:hover {
background-color: #4d4d4d;
}
""")
self.collapse_btn.clicked.connect(self._toggle_collapse)
layout.addWidget(self.collapse_btn)
# Close button
self.close_btn = QPushButton("×")
self.close_btn.setFixedSize(24, 24)
self.close_btn.setStyleSheet("""
QPushButton {
background-color: #3d3d3d;
color: #ff6b6b;
border: none;
border-radius: 4px;
font-size: 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #ff6b6b;
color: #ffffff;
}
""")
self.close_btn.clicked.connect(lambda: self.close_requested.emit())
layout.addWidget(self.close_btn)
def _toggle_collapse(self):
self._collapsed = not self._collapsed
self.collapse_btn.setText("" if self._collapsed else "")
self.collapsed_changed.emit(self._collapsed)
def set_collapsed(self, collapsed: bool):
self._collapsed = collapsed
self.collapse_btn.setText("" if collapsed else "")
class WidgetContent(QFrame if HAS_QT else object):
"""Content container for widgets."""
def __init__(self, parent=None):
if not HAS_QT:
return
super().__init__(parent)
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setStyleSheet("""
WidgetContent {
background-color: #1e1e1e;
border-radius: 0 0 8px 8px;
padding: 10px;
}
""")
self._layout = QVBoxLayout(self)
self._layout.setContentsMargins(10, 10, 10, 10)
self._layout.setSpacing(10)
def add_widget(self, widget):
if HAS_QT:
self._layout.addWidget(widget)
def layout(self):
return self._layout if HAS_QT else None
class DashboardWidget(QFrame if HAS_QT else object):
"""Complete dashboard widget with header and content."""
def __init__(
self,
title: str = "Widget",
icon: str = "📦",
size: tuple = (300, 200),
parent=None
):
if not HAS_QT:
return
super().__init__(parent)
self._size = size
self._setup_ui(title, icon)
def _setup_ui(self, title: str, icon: str):
self.setStyleSheet("""
DashboardWidget {
background-color: transparent;
border: 1px solid #3d3d3d;
border-radius: 8px;
}
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Header
self.header = WidgetHeader(title, icon, self)
self.header.collapsed_changed.connect(self._on_collapse_changed)
layout.addWidget(self.header)
# Content
self.content = WidgetContent(self)
layout.addWidget(self.content)
self.setFixedSize(*self._size)
def _on_collapse_changed(self, collapsed: bool):
self.content.setVisible(not collapsed)
if collapsed:
self.setFixedHeight(self.header.height())
else:
self.setFixedHeight(self._size[1])
def set_content_widget(self, widget: QWidget):
"""Set the main content widget."""
if HAS_QT:
# Clear existing
while self.content.layout().count():
item = self.content.layout().takeAt(0)
if item.widget():
item.widget().deleteLater()
self.content.layout().addWidget(widget)

View File

@ -1,121 +0,0 @@
"""
Metrics card widget for displaying statistics.
"""
from typing import List, Optional
from dataclasses import dataclass
try:
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QGridLayout
)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont
HAS_QT = True
except ImportError:
HAS_QT = False
QWidget = object
@dataclass
class StatItem:
"""Single statistic item."""
label: str
value: str
change: Optional[str] = None
positive: bool = True
class MetricsCard(QWidget if HAS_QT else object):
"""Card displaying multiple metrics."""
def __init__(self, title: str = "Metrics", parent=None):
if not HAS_QT:
return
super().__init__(parent)
self._stats: List[StatItem] = []
self._setup_ui(title)
def _setup_ui(self, title: str):
self.setStyleSheet("""
MetricsCard {
background-color: #2d2d2d;
border-radius: 8px;
padding: 15px;
}
QLabel {
color: #ffffff;
}
.metric-label {
color: #888888;
font-size: 12px;
}
.metric-value {
font-size: 18px;
font-weight: bold;
}
.metric-positive {
color: #4caf50;
}
.metric-negative {
color: #f44336;
}
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(15, 15, 15, 15)
layout.setSpacing(15)
# Title
self.title_label = QLabel(title)
font = QFont()
font.setBold(True)
font.setPointSize(14)
self.title_label.setFont(font)
layout.addWidget(self.title_label)
# Stats grid
self.stats_layout = QGridLayout()
self.stats_layout.setSpacing(10)
layout.addLayout(self.stats_layout)
layout.addStretch()
def set_stats(self, stats: List[StatItem]):
"""Update displayed statistics."""
if not HAS_QT:
return
self._stats = stats
# Clear existing
while self.stats_layout.count():
item = self.stats_layout.takeAt(0)
if item.widget():
item.widget().deleteLater()
# Add stats
for i, stat in enumerate(stats):
row = i // 2
col = (i % 2) * 2
# Label
label = QLabel(stat.label)
label.setStyleSheet("color: #888888; font-size: 11px;")
self.stats_layout.addWidget(label, row * 2, col)
# Value with optional change
value_text = stat.value
if stat.change:
value_text += f" <span style='color: {'#4caf50' if stat.positive else '#f44336'}'>{stat.change}</span>"
value_label = QLabel(value_text)
value_label.setStyleSheet("font-size: 16px; font-weight: bold; color: #ffffff;")
self.stats_layout.addWidget(value_label, row * 2 + 1, col)
def update_stat(self, index: int, value: str, change: Optional[str] = None):
"""Update a single statistic."""
if 0 <= index < len(self._stats):
self._stats[index].value = value
self._stats[index].change = change
self.set_stats(self._stats)