""" EU-Utility - Plugin Store Fetch and install community plugins from GitHub. """ import json import zipfile import shutil from pathlib import Path from PyQt6.QtCore import QObject, QThread, pyqtSignal from core.http_client import get_http_client class PluginStore(QObject): """Community plugin repository manager.""" # Repository configuration REPO_URL = "https://raw.githubusercontent.com/ImpulsiveFPS/EU-Utility-Plugins/main/" INDEX_URL = REPO_URL + "plugins.json" # Signals plugins_loaded = pyqtSignal(list) plugin_installed = pyqtSignal(str, bool) plugin_removed = pyqtSignal(str, bool) error_occurred = pyqtSignal(str) def __init__(self, plugins_dir="user_plugins"): super().__init__() self.plugins_dir = Path(plugins_dir) self.plugins_dir.mkdir(parents=True, exist_ok=True) self.available_plugins = [] self.installed_plugins = [] self._load_installed() def fetch_plugins(self): """Fetch available plugins from repository.""" self.fetch_thread = PluginFetchThread(self.INDEX_URL) self.fetch_thread.fetched.connect(self._on_plugins_fetched) self.fetch_thread.error.connect(self._on_fetch_error) self.fetch_thread.start() def _on_plugins_fetched(self, plugins): """Handle fetched plugins.""" self.available_plugins = plugins self.plugins_loaded.emit(plugins) def _on_fetch_error(self, error): """Handle fetch error.""" self.error_occurred.emit(error) def install_plugin(self, plugin_id): """Install a plugin from the store.""" plugin = self._find_plugin(plugin_id) if not plugin: self.plugin_installed.emit(plugin_id, False) return self.install_thread = PluginInstallThread( plugin, self.plugins_dir ) self.install_thread.installed.connect( lambda success: self._on_installed(plugin_id, success) ) self.install_thread.error.connect(self._on_fetch_error) self.install_thread.start() def _on_installed(self, plugin_id, success): """Handle plugin installation.""" if success: self.installed_plugins.append(plugin_id) self._save_installed() self.plugin_installed.emit(plugin_id, success) def remove_plugin(self, plugin_id): """Remove an installed plugin.""" try: plugin_dir = self.plugins_dir / plugin_id if plugin_dir.exists(): shutil.rmtree(plugin_dir) if plugin_id in self.installed_plugins: self.installed_plugins.remove(plugin_id) self._save_installed() self.plugin_removed.emit(plugin_id, True) except Exception as e: self.error_occurred.emit(str(e)) self.plugin_removed.emit(plugin_id, False) def _find_plugin(self, plugin_id): """Find plugin by ID.""" for plugin in self.available_plugins: if plugin.get('id') == plugin_id: return plugin return None def _load_installed(self): """Load list of installed plugins.""" installed_file = self.plugins_dir / ".installed" if installed_file.exists(): try: with open(installed_file, 'r') as f: self.installed_plugins = json.load(f) except: self.installed_plugins = [] def _save_installed(self): """Save list of installed plugins.""" installed_file = self.plugins_dir / ".installed" with open(installed_file, 'w') as f: json.dump(self.installed_plugins, f) def is_installed(self, plugin_id): """Check if a plugin is installed.""" return plugin_id in self.installed_plugins def get_installed_plugins(self): """Get list of installed plugin IDs.""" return self.installed_plugins.copy() class PluginFetchThread(QThread): """Background thread to fetch plugin index.""" fetched = pyqtSignal(list) error = pyqtSignal(str) def __init__(self, url): super().__init__() self.url = url def run(self): """Fetch plugin index.""" try: http_client = get_http_client() response = http_client.get( self.url, cache_ttl=300, # 5 minute cache for plugin list headers={'User-Agent': 'EU-Utility/1.0'} ) if response.get('json'): data = response['json'] self.fetched.emit(data.get('plugins', [])) else: # Try to parse JSON from text data = json.loads(response['text']) self.fetched.emit(data.get('plugins', [])) except Exception as e: self.error.emit(str(e)) class PluginInstallThread(QThread): """Background thread to install a plugin.""" installed = pyqtSignal(bool) error = pyqtSignal(str) progress = pyqtSignal(str) def __init__(self, plugin, install_dir): super().__init__() self.plugin = plugin self.install_dir = Path(install_dir) def run(self): """Install plugin.""" try: self.progress.emit(f"Downloading {self.plugin['name']}...") # Download zip download_url = self.plugin.get('download_url') if not download_url: self.error.emit("No download URL") self.installed.emit(False) return temp_zip = self.install_dir / "temp.zip" http_client = get_http_client() response = http_client.get( download_url, cache_ttl=0, # Don't cache downloads headers={'User-Agent': 'EU-Utility/1.0'} ) with open(temp_zip, 'wb') as f: f.write(response['content']) self.progress.emit("Extracting...") # Extract plugin_dir = self.install_dir / self.plugin['id'] if plugin_dir.exists(): shutil.rmtree(plugin_dir) plugin_dir.mkdir() with zipfile.ZipFile(temp_zip, 'r') as zip_ref: zip_ref.extractall(plugin_dir) # Cleanup temp_zip.unlink() self.progress.emit("Installed!") self.installed.emit(True) except Exception as e: self.error.emit(str(e)) self.installed.emit(False) # Sample plugins.json structure for GitHub repo: SAMPLE_PLUGINS_JSON = { "version": "1.0.0", "plugins": [ { "id": "loot_tracker", "name": "Loot Tracker", "description": "Track and analyze hunting loot", "version": "1.0.0", "author": "ImpulsiveFPS", "category": "hunting", "download_url": "https://github.com/.../loot_tracker.zip", "icon": "🎁", "min_app_version": "1.0.0" }, { "id": "mining_helper", "name": "Mining Helper", "description": "Track mining finds and claims", "version": "1.1.0", "author": "Community", "category": "mining", "download_url": "https://github.com/.../mining_helper.zip", "icon": "⛏️", "min_app_version": "1.0.0" }, { "id": "market_analyzer", "name": "Market Analyzer", "description": "Analyze auction prices and trends", "version": "0.9.0", "author": "EU Community", "category": "trading", "download_url": "https://github.com/.../market_analyzer.zip", "icon": "📈", "min_app_version": "1.0.0" }, ] }