EU-Utility/core/plugin_store.py

254 lines
7.8 KiB
Python

"""
EU-Utility - Plugin Store
Fetch and install community plugins from GitHub.
"""
import json
import zipfile
import shutil
from pathlib import Path
from PyQt6.QtCore import QObject, QThread, pyqtSignal
from core.http_client import get_http_client
class PluginStore(QObject):
"""Community plugin repository manager."""
# Repository configuration
REPO_URL = "https://raw.githubusercontent.com/ImpulsiveFPS/EU-Utility-Plugins/main/"
INDEX_URL = REPO_URL + "plugins.json"
# Signals
plugins_loaded = pyqtSignal(list)
plugin_installed = pyqtSignal(str, bool)
plugin_removed = pyqtSignal(str, bool)
error_occurred = pyqtSignal(str)
def __init__(self, plugins_dir="user_plugins"):
super().__init__()
self.plugins_dir = Path(plugins_dir)
self.plugins_dir.mkdir(parents=True, exist_ok=True)
self.available_plugins = []
self.installed_plugins = []
self._load_installed()
def fetch_plugins(self):
"""Fetch available plugins from repository."""
self.fetch_thread = PluginFetchThread(self.INDEX_URL)
self.fetch_thread.fetched.connect(self._on_plugins_fetched)
self.fetch_thread.error.connect(self._on_fetch_error)
self.fetch_thread.start()
def _on_plugins_fetched(self, plugins):
"""Handle fetched plugins."""
self.available_plugins = plugins
self.plugins_loaded.emit(plugins)
def _on_fetch_error(self, error):
"""Handle fetch error."""
self.error_occurred.emit(error)
def install_plugin(self, plugin_id):
"""Install a plugin from the store."""
plugin = self._find_plugin(plugin_id)
if not plugin:
self.plugin_installed.emit(plugin_id, False)
return
self.install_thread = PluginInstallThread(
plugin,
self.plugins_dir
)
self.install_thread.installed.connect(
lambda success: self._on_installed(plugin_id, success)
)
self.install_thread.error.connect(self._on_fetch_error)
self.install_thread.start()
def _on_installed(self, plugin_id, success):
"""Handle plugin installation."""
if success:
self.installed_plugins.append(plugin_id)
self._save_installed()
self.plugin_installed.emit(plugin_id, success)
def remove_plugin(self, plugin_id):
"""Remove an installed plugin."""
try:
plugin_dir = self.plugins_dir / plugin_id
if plugin_dir.exists():
shutil.rmtree(plugin_dir)
if plugin_id in self.installed_plugins:
self.installed_plugins.remove(plugin_id)
self._save_installed()
self.plugin_removed.emit(plugin_id, True)
except Exception as e:
self.error_occurred.emit(str(e))
self.plugin_removed.emit(plugin_id, False)
def _find_plugin(self, plugin_id):
"""Find plugin by ID."""
for plugin in self.available_plugins:
if plugin.get('id') == plugin_id:
return plugin
return None
def _load_installed(self):
"""Load list of installed plugins."""
installed_file = self.plugins_dir / ".installed"
if installed_file.exists():
try:
with open(installed_file, 'r') as f:
self.installed_plugins = json.load(f)
except:
self.installed_plugins = []
def _save_installed(self):
"""Save list of installed plugins."""
installed_file = self.plugins_dir / ".installed"
with open(installed_file, 'w') as f:
json.dump(self.installed_plugins, f)
def is_installed(self, plugin_id):
"""Check if a plugin is installed."""
return plugin_id in self.installed_plugins
def get_installed_plugins(self):
"""Get list of installed plugin IDs."""
return self.installed_plugins.copy()
class PluginFetchThread(QThread):
"""Background thread to fetch plugin index."""
fetched = pyqtSignal(list)
error = pyqtSignal(str)
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
"""Fetch plugin index."""
try:
http_client = get_http_client()
response = http_client.get(
self.url,
cache_ttl=300, # 5 minute cache for plugin list
headers={'User-Agent': 'EU-Utility/1.0'}
)
if response.get('json'):
data = response['json']
self.fetched.emit(data.get('plugins', []))
else:
# Try to parse JSON from text
data = json.loads(response['text'])
self.fetched.emit(data.get('plugins', []))
except Exception as e:
self.error.emit(str(e))
class PluginInstallThread(QThread):
"""Background thread to install a plugin."""
installed = pyqtSignal(bool)
error = pyqtSignal(str)
progress = pyqtSignal(str)
def __init__(self, plugin, install_dir):
super().__init__()
self.plugin = plugin
self.install_dir = Path(install_dir)
def run(self):
"""Install plugin."""
try:
self.progress.emit(f"Downloading {self.plugin['name']}...")
# Download zip
download_url = self.plugin.get('download_url')
if not download_url:
self.error.emit("No download URL")
self.installed.emit(False)
return
temp_zip = self.install_dir / "temp.zip"
http_client = get_http_client()
response = http_client.get(
download_url,
cache_ttl=0, # Don't cache downloads
headers={'User-Agent': 'EU-Utility/1.0'}
)
with open(temp_zip, 'wb') as f:
f.write(response['content'])
self.progress.emit("Extracting...")
# Extract
plugin_dir = self.install_dir / self.plugin['id']
if plugin_dir.exists():
shutil.rmtree(plugin_dir)
plugin_dir.mkdir()
with zipfile.ZipFile(temp_zip, 'r') as zip_ref:
zip_ref.extractall(plugin_dir)
# Cleanup
temp_zip.unlink()
self.progress.emit("Installed!")
self.installed.emit(True)
except Exception as e:
self.error.emit(str(e))
self.installed.emit(False)
# Sample plugins.json structure for GitHub repo:
SAMPLE_PLUGINS_JSON = {
"version": "1.0.0",
"plugins": [
{
"id": "loot_tracker",
"name": "Loot Tracker",
"description": "Track and analyze hunting loot",
"version": "1.0.0",
"author": "ImpulsiveFPS",
"category": "hunting",
"download_url": "https://github.com/.../loot_tracker.zip",
"icon": "🎁",
"min_app_version": "1.0.0"
},
{
"id": "mining_helper",
"name": "Mining Helper",
"description": "Track mining finds and claims",
"version": "1.1.0",
"author": "Community",
"category": "mining",
"download_url": "https://github.com/.../mining_helper.zip",
"icon": "⛏️",
"min_app_version": "1.0.0"
},
{
"id": "market_analyzer",
"name": "Market Analyzer",
"description": "Analyze auction prices and trends",
"version": "0.9.0",
"author": "EU Community",
"category": "trading",
"download_url": "https://github.com/.../market_analyzer.zip",
"icon": "📈",
"min_app_version": "1.0.0"
},
]
}