EU-Utility/plugins/auto_updater.py

563 lines
19 KiB
Python

"""
AutoUpdater Plugin - Automatic Update System for EU-Utility
Checks for updates, downloads, and installs new versions automatically.
Uses semantic versioning and supports rollback on failure.
"""
import os
import json
import hashlib
import urllib.request
import urllib.error
import zipfile
import shutil
import threading
import time
from pathlib import Path
from typing import Optional, Dict, Any, Callable, List
from dataclasses import dataclass, asdict
from enum import Enum
from core.base_plugin import BasePlugin
class UpdateStatus(Enum):
"""Status of an update operation."""
IDLE = "idle"
CHECKING = "checking"
AVAILABLE = "available"
DOWNLOADING = "downloading"
VERIFYING = "verifying"
INSTALLING = "installing"
COMPLETED = "completed"
FAILED = "failed"
ROLLING_BACK = "rolling_back"
@dataclass
class VersionInfo:
"""Version information structure."""
major: int
minor: int
patch: int
prerelease: Optional[str] = None
build: Optional[str] = None
@classmethod
def from_string(cls, version_str: str) -> "VersionInfo":
"""Parse version from string (e.g., '1.2.3-beta+build123')."""
# Remove 'v' prefix if present
version_str = version_str.lstrip('v')
# Split prerelease and build metadata
build = None
if '+' in version_str:
version_str, build = version_str.split('+', 1)
prerelease = None
if '-' in version_str:
version_str, prerelease = version_str.split('-', 1)
parts = version_str.split('.')
major = int(parts[0]) if len(parts) > 0 else 0
minor = int(parts[1]) if len(parts) > 1 else 0
patch = int(parts[2]) if len(parts) > 2 else 0
return cls(major, minor, patch, prerelease, build)
def __str__(self) -> str:
version = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
version += f"-{self.prerelease}"
if self.build:
version += f"+{self.build}"
return version
def __lt__(self, other: "VersionInfo") -> bool:
if (self.major, self.minor, self.patch) != (other.major, other.minor, other.patch):
return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch)
# Prerelease versions are lower than release versions
if self.prerelease and not other.prerelease:
return True
if not self.prerelease and other.prerelease:
return False
if self.prerelease and other.prerelease:
return self.prerelease < other.prerelease
return False
def __le__(self, other: "VersionInfo") -> bool:
return self == other or self < other
def __gt__(self, other: "VersionInfo") -> bool:
return not self <= other
def __ge__(self, other: "VersionInfo") -> bool:
return not self < other
@dataclass
class UpdateInfo:
"""Information about an available update."""
version: str
download_url: str
checksum: str
size_bytes: int
release_notes: str
mandatory: bool = False
release_date: Optional[str] = None
class AutoUpdaterPlugin(BasePlugin):
"""
Automatic update system for EU-Utility.
Features:
- Check for updates from remote repository
- Download and verify updates
- Automatic or manual installation
- Rollback on failure
- Update history tracking
"""
name = "auto_updater"
description = "Automatic update system with rollback support"
version = "1.0.0"
author = "EU-Utility"
# Default configuration
DEFAULT_CONFIG = {
"check_interval_hours": 24,
"auto_check": True,
"auto_install": False, # Manual by default for safety
"update_server": "https://api.eu-utility.app/updates",
"backup_dir": "data/backups",
"temp_dir": "data/temp",
"current_version": "1.0.0",
"channel": "stable", # stable, beta, alpha
}
def __init__(self):
super().__init__()
self._config = self.DEFAULT_CONFIG.copy()
self._status = UpdateStatus.IDLE
self._current_update: Optional[UpdateInfo] = None
self._update_history: List[Dict[str, Any]] = []
self._check_thread: Optional[threading.Thread] = None
self._running = False
self._listeners: List[Callable] = []
self._data_dir = Path("data")
self._data_dir.mkdir(exist_ok=True)
self._load_history()
def on_start(self) -> None:
"""Start the auto-updater service."""
print(f"[{self.name}] Starting auto-updater...")
self._running = True
# Ensure directories exist
Path(self._config["backup_dir"]).mkdir(parents=True, exist_ok=True)
Path(self._config["temp_dir"]).mkdir(parents=True, exist_ok=True)
# Start automatic check thread if enabled
if self._config["auto_check"]:
self._check_thread = threading.Thread(target=self._auto_check_loop, daemon=True)
self._check_thread.start()
print(f"[{self.name}] Auto-check enabled (interval: {self._config['check_interval_hours']}h)")
def on_stop(self) -> None:
"""Stop the auto-updater service."""
print(f"[{self.name}] Stopping auto-updater...")
self._running = False
self._save_history()
# Configuration
def set_config(self, config: Dict[str, Any]) -> None:
"""Update configuration."""
self._config.update(config)
def get_config(self) -> Dict[str, Any]:
"""Get current configuration."""
return self._config.copy()
# Status & Events
def get_status(self) -> str:
"""Get current update status."""
return self._status.value
def add_listener(self, callback: Callable[[UpdateStatus, Optional[UpdateInfo]], None]) -> None:
"""Add a status change listener."""
self._listeners.append(callback)
def remove_listener(self, callback: Callable) -> None:
"""Remove a status change listener."""
if callback in self._listeners:
self._listeners.remove(callback)
def _set_status(self, status: UpdateStatus, info: Optional[UpdateInfo] = None) -> None:
"""Set status and notify listeners."""
self._status = status
self._current_update = info
for listener in self._listeners:
try:
listener(status, info)
except Exception as e:
print(f"[{self.name}] Listener error: {e}")
# Update Operations
def check_for_updates(self) -> Optional[UpdateInfo]:
"""
Check for available updates.
Returns:
UpdateInfo if update available, None otherwise
"""
self._set_status(UpdateStatus.CHECKING)
try:
current = VersionInfo.from_string(self._config["current_version"])
# Build check URL
url = f"{self._config['update_server']}/check"
params = {
"version": str(current),
"channel": self._config["channel"],
"platform": os.name,
}
# Make request
query = "&".join(f"{k}={v}" for k, v in params.items())
full_url = f"{url}?{query}"
req = urllib.request.Request(
full_url,
headers={"User-Agent": f"EU-Utility/{current}"}
)
with urllib.request.urlopen(req, timeout=30) as response:
data = json.loads(response.read().decode('utf-8'))
if data.get("update_available"):
update_info = UpdateInfo(
version=data["version"],
download_url=data["download_url"],
checksum=data["checksum"],
size_bytes=data["size_bytes"],
release_notes=data.get("release_notes", ""),
mandatory=data.get("mandatory", False),
release_date=data.get("release_date"),
)
new_version = VersionInfo.from_string(update_info.version)
if new_version > current:
self._set_status(UpdateStatus.AVAILABLE, update_info)
print(f"[{self.name}] Update available: {current}{new_version}")
return update_info
self._set_status(UpdateStatus.IDLE)
print(f"[{self.name}] No updates available")
return None
except Exception as e:
self._set_status(UpdateStatus.FAILED)
print(f"[{self.name}] Update check failed: {e}")
return None
def download_update(self, update_info: Optional[UpdateInfo] = None) -> Optional[Path]:
"""
Download an update.
Args:
update_info: Update to download (uses current if None)
Returns:
Path to downloaded file, or None on failure
"""
info = update_info or self._current_update
if not info:
print(f"[{self.name}] No update to download")
return None
self._set_status(UpdateStatus.DOWNLOADING, info)
try:
temp_dir = Path(self._config["temp_dir"])
temp_dir.mkdir(parents=True, exist_ok=True)
download_path = temp_dir / f"update_{info.version}.zip"
print(f"[{self.name}] Downloading update {info.version}...")
# Download with progress
req = urllib.request.Request(
info.download_url,
headers={"User-Agent": f"EU-Utility/{self._config['current_version']}"}
)
with urllib.request.urlopen(req, timeout=300) as response:
total_size = int(response.headers.get('Content-Length', 0))
downloaded = 0
chunk_size = 8192
with open(download_path, 'wb') as f:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"[{self.name}] Download: {percent:.1f}%")
# Verify checksum
self._set_status(UpdateStatus.VERIFYING, info)
if not self._verify_checksum(download_path, info.checksum):
raise ValueError("Checksum verification failed")
print(f"[{self.name}] Download complete: {download_path}")
return download_path
except Exception as e:
self._set_status(UpdateStatus.FAILED, info)
print(f"[{self.name}] Download failed: {e}")
return None
def install_update(self, download_path: Path, update_info: Optional[UpdateInfo] = None) -> bool:
"""
Install a downloaded update.
Args:
download_path: Path to downloaded update file
update_info: Update information
Returns:
True if installation successful
"""
info = update_info or self._current_update
if not info:
return False
self._set_status(UpdateStatus.INSTALLING, info)
backup_path = None
try:
# Create backup
backup_path = self._create_backup()
print(f"[{self.name}] Backup created: {backup_path}")
# Extract update
temp_extract = Path(self._config["temp_dir"]) / "extract"
temp_extract.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(download_path, 'r') as zip_ref:
zip_ref.extractall(temp_extract)
# Apply update
self._apply_update(temp_extract)
# Update version
old_version = self._config["current_version"]
self._config["current_version"] = info.version
self._save_config()
# Record success
self._record_update(old_version, info.version, True)
# Cleanup
shutil.rmtree(temp_extract, ignore_errors=True)
download_path.unlink(missing_ok=True)
self._set_status(UpdateStatus.COMPLETED, info)
print(f"[{self.name}] Update installed: {old_version}{info.version}")
return True
except Exception as e:
self._set_status(UpdateStatus.FAILED, info)
print(f"[{self.name}] Installation failed: {e}")
# Attempt rollback
if backup_path:
self._rollback(backup_path)
self._record_update(self._config["current_version"], info.version, False, str(e))
return False
def update(self) -> bool:
"""
Full update process: check, download, and install.
Returns:
True if update successful
"""
# Check for updates
update_info = self.check_for_updates()
if not update_info:
return False
# Download
download_path = self.download_update(update_info)
if not download_path:
return False
# Install
return self.install_update(download_path, update_info)
# Private Methods
def _auto_check_loop(self) -> None:
"""Background thread for automatic update checks."""
interval_seconds = self._config["check_interval_hours"] * 3600
while self._running:
try:
self.check_for_updates()
except Exception as e:
print(f"[{self.name}] Auto-check error: {e}")
# Sleep with early exit check
for _ in range(interval_seconds):
if not self._running:
break
time.sleep(1)
def _verify_checksum(self, file_path: Path, expected_checksum: str) -> bool:
"""Verify file checksum (SHA256)."""
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
sha256.update(chunk)
return sha256.hexdigest().lower() == expected_checksum.lower()
def _create_backup(self) -> Path:
"""Create a backup of current installation."""
backup_dir = Path(self._config["backup_dir"])
backup_dir.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
backup_path = backup_dir / f"backup_{self._config['current_version']}_{timestamp}"
backup_path.mkdir(exist_ok=True)
# Backup core files
for item in ["core", "plugins", "main.py", "requirements.txt"]:
src = Path(item)
if src.exists():
dst = backup_path / item
if src.is_dir():
shutil.copytree(src, dst, dirs_exist_ok=True)
else:
shutil.copy2(src, dst)
return backup_path
def _apply_update(self, extract_path: Path) -> None:
"""Apply extracted update files."""
# Copy new files
for item in extract_path.iterdir():
dst = Path(item.name)
# Remove old version
if dst.exists():
if dst.is_dir():
shutil.rmtree(dst)
else:
dst.unlink()
# Copy new version
if item.is_dir():
shutil.copytree(item, dst)
else:
shutil.copy2(item, dst)
def _rollback(self, backup_path: Path) -> bool:
"""Rollback to backup version."""
self._set_status(UpdateStatus.ROLLING_BACK)
print(f"[{self.name}] Rolling back...")
try:
for item in backup_path.iterdir():
dst = Path(item.name)
if dst.exists():
if dst.is_dir():
shutil.rmtree(dst)
else:
dst.unlink()
if item.is_dir():
shutil.copytree(item, dst)
else:
shutil.copy2(item, dst)
print(f"[{self.name}] Rollback complete")
return True
except Exception as e:
print(f"[{self.name}] Rollback failed: {e}")
return False
def _record_update(self, old_version: str, new_version: str, success: bool, error: Optional[str] = None) -> None:
"""Record update attempt in history."""
record = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"old_version": old_version,
"new_version": new_version,
"success": success,
"error": error,
}
self._update_history.append(record)
self._save_history()
def _load_history(self) -> None:
"""Load update history from file."""
history_file = self._data_dir / "update_history.json"
if history_file.exists():
try:
with open(history_file) as f:
self._update_history = json.load(f)
except Exception as e:
print(f"[{self.name}] Failed to load history: {e}")
def _save_history(self) -> None:
"""Save update history to file."""
history_file = self._data_dir / "update_history.json"
try:
with open(history_file, 'w') as f:
json.dump(self._update_history, f, indent=2)
except Exception as e:
print(f"[{self.name}] Failed to save history: {e}")
def _save_config(self) -> None:
"""Save configuration to file."""
config_file = self._data_dir / "updater_config.json"
try:
with open(config_file, 'w') as f:
json.dump(self._config, f, indent=2)
except Exception as e:
print(f"[{self.name}] Failed to save config: {e}")
# Public API
def get_update_history(self) -> List[Dict[str, Any]]:
"""Get update history."""
return self._update_history.copy()
def get_current_version(self) -> str:
"""Get current version string."""
return self._config["current_version"]
def set_channel(self, channel: str) -> None:
"""Set update channel (stable, beta, alpha)."""
if channel in ["stable", "beta", "alpha"]:
self._config["channel"] = channel
def force_check(self) -> Optional[UpdateInfo]:
"""Force an immediate update check."""
return self.check_for_updates()