""" ImportExport Plugin - Data Export and Import Functionality Provides comprehensive export/import capabilities for EU-Utility data, settings, and configurations in multiple formats. """ import os import json import csv import zipfile import shutil from pathlib import Path from typing import Optional, Dict, Any, List, Callable, Union, BinaryIO from dataclasses import dataclass, asdict from datetime import datetime from enum import Enum import xml.etree.ElementTree as ET from core.base_plugin import BasePlugin class ExportFormat(Enum): """Supported export formats.""" JSON = "json" CSV = "csv" XML = "xml" YAML = "yaml" ZIP = "zip" class ImportMode(Enum): """Import behavior modes.""" MERGE = "merge" # Merge with existing data REPLACE = "replace" # Replace existing data SKIP = "skip" # Skip existing data @dataclass class ExportProfile: """Defines what data to export.""" name: str include_settings: bool = True include_plugins: bool = True include_history: bool = False include_stats: bool = False include_clipboard: bool = False include_custom_data: List[str] = None def __post_init__(self): if self.include_custom_data is None: self.include_custom_data = [] @dataclass class ExportResult: """Result of an export operation.""" success: bool filepath: str format: str items_exported: int errors: List[str] = None def __post_init__(self): if self.errors is None: self.errors = [] @dataclass class ImportResult: """Result of an import operation.""" success: bool items_imported: int items_skipped: int items_failed: int errors: List[str] = None warnings: List[str] = None def __post_init__(self): if self.errors is None: self.errors = [] if self.warnings is None: self.warnings = [] class ImportExportPlugin(BasePlugin): """ Data export and import functionality. Features: - Export settings, plugins, history, and stats - Multiple export formats (JSON, CSV, XML, YAML, ZIP) - Import with merge/replace/skip modes - Export profiles for common scenarios - Data validation and sanitization - Progress callbacks """ name = "import_export" description = "Export and import data in multiple formats" version = "1.0.0" author = "EU-Utility" DEFAULT_CONFIG = { "export_dir": "data/exports", "import_dir": "data/imports", "temp_dir": "data/temp", "max_export_size_mb": 100, "default_format": "json", "backup_before_import": True, } # Predefined export profiles PROFILES = { "full": ExportProfile( name="full", include_settings=True, include_plugins=True, include_history=True, include_stats=True, include_clipboard=True, ), "settings_only": ExportProfile( name="settings_only", include_settings=True, include_plugins=False, include_history=False, include_stats=False, include_clipboard=False, ), "plugins_only": ExportProfile( name="plugins_only", include_settings=False, include_plugins=True, include_history=False, include_stats=False, include_clipboard=False, ), "minimal": ExportProfile( name="minimal", include_settings=True, include_plugins=False, include_history=False, include_stats=False, include_clipboard=False, ), } def __init__(self): super().__init__() self._config = self.DEFAULT_CONFIG.copy() self._listeners: List[Callable] = [] self._data_dir = Path("data") def on_start(self) -> None: """Start the import/export service.""" print(f"[{self.name}] Starting import/export service...") # Ensure directories exist Path(self._config["export_dir"]).mkdir(parents=True, exist_ok=True) Path(self._config["import_dir"]).mkdir(parents=True, exist_ok=True) Path(self._config["temp_dir"]).mkdir(parents=True, exist_ok=True) def on_stop(self) -> None: """Stop the import/export service.""" print(f"[{self.name}] Stopping import/export service...") # Export Methods def export_data(self, profile: Union[str, ExportProfile] = "full", format: ExportFormat = ExportFormat.JSON, filepath: Optional[str] = None, progress_callback: Optional[Callable[[int, int], None]] = None) -> ExportResult: """ Export data according to a profile. Args: profile: Export profile name or custom ExportProfile format: Export format filepath: Output file path (auto-generated if None) progress_callback: Called with (current, total) during export Returns: ExportResult with operation details """ # Resolve profile if isinstance(profile, str): profile = self.PROFILES.get(profile, self.PROFILES["full"]) # Generate filepath if not provided if filepath is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"export_{profile.name}_{timestamp}.{format.value}" filepath = str(Path(self._config["export_dir"]) / filename) print(f"[{self.name}] Exporting with profile '{profile.name}' to {filepath}") try: # Collect data data = self._collect_data(profile, progress_callback) # Export based on format if format == ExportFormat.JSON: self._export_json(data, filepath) elif format == ExportFormat.CSV: self._export_csv(data, filepath, profile) elif format == ExportFormat.XML: self._export_xml(data, filepath) elif format == ExportFormat.YAML: self._export_yaml(data, filepath) elif format == ExportFormat.ZIP: self._export_zip(data, filepath, profile) items_count = sum(len(v) if isinstance(v, list) else 1 for v in data.values()) result = ExportResult( success=True, filepath=filepath, format=format.value, items_exported=items_count, ) print(f"[{self.name}] ✓ Exported {items_count} items") self._notify_listeners("export_complete", result) return result except Exception as e: result = ExportResult( success=False, filepath=filepath, format=format.value, items_exported=0, errors=[str(e)], ) print(f"[{self.name}] Export failed: {e}") self._notify_listeners("export_failed", result) return result def export_settings(self, filepath: Optional[str] = None) -> ExportResult: """Quick export of settings only.""" return self.export_data("settings_only", ExportFormat.JSON, filepath) def export_plugins(self, filepath: Optional[str] = None) -> ExportResult: """Quick export of plugin configurations.""" return self.export_data("plugins_only", ExportFormat.JSON, filepath) def create_backup(self, name: Optional[str] = None) -> ExportResult: """ Create a full backup. Args: name: Backup name (default: timestamp) Returns: ExportResult """ if name is None: name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" filepath = str(Path(self._config["export_dir"]) / f"{name}.zip") return self.export_data("full", ExportFormat.ZIP, filepath) # Import Methods def import_data(self, filepath: str, mode: ImportMode = ImportMode.MERGE, progress_callback: Optional[Callable[[int, int], None]] = None) -> ImportResult: """ Import data from a file. Args: filepath: Path to import file mode: Import behavior mode progress_callback: Called with (current, total) during import Returns: ImportResult with operation details """ filepath = Path(filepath) if not filepath.exists(): return ImportResult( success=False, items_imported=0, items_skipped=0, items_failed=0, errors=[f"File not found: {filepath}"], ) print(f"[{self.name}] Importing from {filepath} (mode: {mode.value})") # Create backup before import if configured if self._config["backup_before_import"]: backup_result = self.create_backup("pre_import_backup") if not backup_result.success: print(f"[{self.name}] Warning: Failed to create pre-import backup") try: # Detect format and load data data = self._load_import_file(filepath) if data is None: return ImportResult( success=False, items_imported=0, items_skipped=0, items_failed=0, errors=["Failed to parse import file"], ) # Import data sections result = ImportResult(success=True, items_imported=0, items_skipped=0, items_failed=0) # Import settings if "settings" in data: self._import_settings(data["settings"], mode, result) # Import plugin configs if "plugins" in data: self._import_plugins(data["plugins"], mode, result) # Import history if "history" in data: self._import_history(data["history"], mode, result) # Import stats if "stats" in data: self._import_stats(data["stats"], mode, result) # Import clipboard history if "clipboard" in data: self._import_clipboard(data["clipboard"], mode, result) print(f"[{self.name}] ✓ Import complete: {result.items_imported} imported, " f"{result.items_skipped} skipped, {result.items_failed} failed") self._notify_listeners("import_complete", result) return result except Exception as e: result = ImportResult( success=False, items_imported=0, items_skipped=0, items_failed=0, errors=[str(e)], ) print(f"[{self.name}] Import failed: {e}") self._notify_listeners("import_failed", result) return result def restore_backup(self, backup_path: str, mode: ImportMode = ImportMode.REPLACE) -> ImportResult: """ Restore from a backup file. Args: backup_path: Path to backup file mode: Import mode (default: REPLACE for full restore) Returns: ImportResult """ return self.import_data(backup_path, mode) def list_backups(self) -> List[Dict[str, Any]]: """List available backup files.""" export_dir = Path(self._config["export_dir"]) backups = [] for file_path in export_dir.glob("backup_*.zip"): stat = file_path.stat() backups.append({ "name": file_path.stem, "path": str(file_path), "size_bytes": stat.st_size, "created": datetime.fromtimestamp(stat.st_mtime).isoformat(), }) backups.sort(key=lambda x: x["created"], reverse=True) return backups # Data Collection def _collect_data(self, profile: ExportProfile, progress_callback: Optional[Callable] = None) -> Dict[str, Any]: """Collect data based on export profile.""" data = { "export_info": { "version": "1.0", "timestamp": datetime.now().isoformat(), "profile": profile.name, } } items_to_collect = [] if profile.include_settings: items_to_collect.append("settings") if profile.include_plugins: items_to_collect.append("plugins") if profile.include_history: items_to_collect.append("history") if profile.include_stats: items_to_collect.append("stats") if profile.include_clipboard: items_to_collect.append("clipboard") total = len(items_to_collect) for i, item in enumerate(items_to_collect): if progress_callback: progress_callback(i, total) if item == "settings": data["settings"] = self._collect_settings() elif item == "plugins": data["plugins"] = self._collect_plugin_configs() elif item == "history": data["history"] = self._collect_history() elif item == "stats": data["stats"] = self._collect_stats() elif item == "clipboard": data["clipboard"] = self._collect_clipboard_history() # Collect custom data paths for custom_path in profile.include_custom_data: path = Path(custom_path) if path.exists(): key = f"custom_{path.name}" if path.is_file(): with open(path) as f: data[key] = json.load(f) elif path.is_dir(): data[key] = {} for file in path.rglob("*"): if file.is_file(): rel_path = str(file.relative_to(path)) try: with open(file) as f: data[key][rel_path] = json.load(f) except: data[key][rel_path] = None if progress_callback: progress_callback(total, total) return data def _collect_settings(self) -> Dict[str, Any]: """Collect application settings.""" settings = {} # Collect all JSON config files for config_file in self._data_dir.rglob("*.json"): if "temp" in str(config_file): continue try: with open(config_file) as f: key = str(config_file.relative_to(self._data_dir)) settings[key] = json.load(f) except: pass return settings def _collect_plugin_configs(self) -> Dict[str, Any]: """Collect plugin configurations.""" plugins = {} # Look for plugin config files plugin_config_dir = self._data_dir / "plugin_configs" if plugin_config_dir.exists(): for config_file in plugin_config_dir.rglob("*.json"): try: with open(config_file) as f: plugin_name = config_file.stem plugins[plugin_name] = json.load(f) except: pass return plugins def _collect_history(self) -> List[Dict[str, Any]]: """Collect application history.""" history_file = self._data_dir / "history.json" if history_file.exists(): try: with open(history_file) as f: return json.load(f) except: pass return [] def _collect_stats(self) -> Dict[str, Any]: """Collect statistics data.""" stats_dir = self._data_dir / "stats" stats = {} if stats_dir.exists(): for stats_file in stats_dir.rglob("*.json"): try: with open(stats_file) as f: key = str(stats_file.relative_to(stats_dir)) stats[key] = json.load(f) except: pass return stats def _collect_clipboard_history(self) -> List[Dict[str, Any]]: """Collect clipboard history.""" # This would integrate with the clipboard service # For now, return empty list return [] # Export Formatters def _export_json(self, data: Dict[str, Any], filepath: str) -> None: """Export as JSON.""" with open(filepath, 'w') as f: json.dump(data, f, indent=2, default=str) def _export_csv(self, data: Dict[str, Any], filepath: str, profile: ExportProfile) -> None: """Export as CSV (flattened for tabular data).""" # For CSV, we export a flattened representation # This is a simplified implementation rows = [] if "settings" in data: for key, value in data["settings"].items(): rows.append({ "category": "settings", "key": key, "value": json.dumps(value), }) if "plugins" in data: for plugin, config in data["plugins"].items(): rows.append({ "category": "plugins", "key": plugin, "value": json.dumps(config), }) if rows: with open(filepath, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=["category", "key", "value"]) writer.writeheader() writer.writerows(rows) def _export_xml(self, data: Dict[str, Any], filepath: str) -> None: """Export as XML.""" root = ET.Element("export") root.set("version", "1.0") root.set("timestamp", datetime.now().isoformat()) def add_dict_to_element(parent: ET.Element, d: Dict[str, Any]): for key, value in d.items(): child = ET.SubElement(parent, str(key).replace(" ", "_")) if isinstance(value, dict): add_dict_to_element(child, value) elif isinstance(value, list): for item in value: item_elem = ET.SubElement(child, "item") if isinstance(item, dict): add_dict_to_element(item_elem, item) else: item_elem.text = str(item) else: child.text = str(value) add_dict_to_element(root, data) tree = ET.ElementTree(root) tree.write(filepath, encoding='utf-8', xml_declaration=True) def _export_yaml(self, data: Dict[str, Any], filepath: str) -> None: """Export as YAML.""" try: import yaml with open(filepath, 'w') as f: yaml.dump(data, f, default_flow_style=False) except ImportError: # Fallback to JSON if PyYAML not available filepath = filepath.replace('.yaml', '.json') self._export_json(data, filepath) def _export_zip(self, data: Dict[str, Any], filepath: str, profile: ExportProfile) -> None: """Export as ZIP archive with multiple files.""" temp_dir = Path(self._config["temp_dir"]) / f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}" temp_dir.mkdir(parents=True, exist_ok=True) try: # Export each section as separate JSON file if "settings" in data: with open(temp_dir / "settings.json", 'w') as f: json.dump(data["settings"], f, indent=2) if "plugins" in data: with open(temp_dir / "plugins.json", 'w') as f: json.dump(data["plugins"], f, indent=2) if "history" in data: with open(temp_dir / "history.json", 'w') as f: json.dump(data["history"], f, indent=2) if "stats" in data: with open(temp_dir / "stats.json", 'w') as f: json.dump(data["stats"], f, indent=2) if "clipboard" in data: with open(temp_dir / "clipboard.json", 'w') as f: json.dump(data["clipboard"], f, indent=2) # Create manifest manifest = { "version": "1.0", "timestamp": datetime.now().isoformat(), "profile": profile.name, "contents": list(data.keys()), } with open(temp_dir / "manifest.json", 'w') as f: json.dump(manifest, f, indent=2) # Create ZIP with zipfile.ZipFile(filepath, 'w', zipfile.ZIP_DEFLATED) as zf: for file_path in temp_dir.rglob("*"): if file_path.is_file(): zf.write(file_path, file_path.relative_to(temp_dir)) finally: # Cleanup temp directory shutil.rmtree(temp_dir, ignore_errors=True) # Import Helpers def _load_import_file(self, filepath: Path) -> Optional[Dict[str, Any]]: """Load and parse an import file.""" suffix = filepath.suffix.lower() if suffix == '.zip': return self._load_zip_import(filepath) elif suffix == '.json': with open(filepath) as f: return json.load(f) elif suffix in ['.yaml', '.yml']: try: import yaml with open(filepath) as f: return yaml.safe_load(f) except ImportError: raise ValueError("PyYAML required for YAML import") elif suffix == '.xml': return self._load_xml_import(filepath) elif suffix == '.csv': return self._load_csv_import(filepath) return None def _load_zip_import(self, filepath: Path) -> Optional[Dict[str, Any]]: """Load data from ZIP archive.""" data = {} with zipfile.ZipFile(filepath, 'r') as zf: # Read manifest if present if "manifest.json" in zf.namelist(): with zf.open("manifest.json") as f: manifest = json.load(f) data["export_info"] = manifest # Read each section for name in zf.namelist(): if name.endswith('.json') and name != "manifest.json": section = name.replace('.json', '') with zf.open(name) as f: data[section] = json.load(f) return data def _load_xml_import(self, filepath: Path) -> Dict[str, Any]: """Load data from XML file.""" tree = ET.parse(filepath) root = tree.getroot() def element_to_dict(element: ET.Element) -> Any: children = list(element) if not children: return element.text result = {} for child in children: if child.tag == "item": if "items" not in result: result["items"] = [] result["items"].append(element_to_dict(child)) else: result[child.tag] = element_to_dict(child) return result return element_to_dict(root) def _load_csv_import(self, filepath: Path) -> Dict[str, Any]: """Load data from CSV file.""" data = {"settings": {}, "plugins": {}} with open(filepath, newline='') as f: reader = csv.DictReader(f) for row in reader: category = row.get("category", "settings") key = row["key"] value = json.loads(row["value"]) if category == "settings": data["settings"][key] = value elif category == "plugins": data["plugins"][key] = value return data def _import_settings(self, settings: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None: """Import settings data.""" for key, value in settings.items(): try: # Determine file path file_path = self._data_dir / key # Check if exists if file_path.exists() and mode == ImportMode.SKIP: result.items_skipped += 1 continue # Merge or replace if mode == ImportMode.MERGE and file_path.exists(): with open(file_path) as f: existing = json.load(f) if isinstance(existing, dict) and isinstance(value, dict): existing.update(value) value = existing # Write file file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w') as f: json.dump(value, f, indent=2) result.items_imported += 1 except Exception as e: result.items_failed += 1 result.errors.append(f"Failed to import setting '{key}': {e}") def _import_plugins(self, plugins: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None: """Import plugin configurations.""" plugin_config_dir = self._data_dir / "plugin_configs" plugin_config_dir.mkdir(parents=True, exist_ok=True) for plugin_name, config in plugins.items(): try: file_path = plugin_config_dir / f"{plugin_name}.json" if file_path.exists() and mode == ImportMode.SKIP: result.items_skipped += 1 continue if mode == ImportMode.MERGE and file_path.exists(): with open(file_path) as f: existing = json.load(f) if isinstance(existing, dict) and isinstance(config, dict): existing.update(config) config = existing with open(file_path, 'w') as f: json.dump(config, f, indent=2) result.items_imported += 1 except Exception as e: result.items_failed += 1 result.errors.append(f"Failed to import plugin '{plugin_name}': {e}") def _import_history(self, history: List[Dict], mode: ImportMode, result: ImportResult) -> None: """Import history data.""" try: history_file = self._data_dir / "history.json" if mode == ImportMode.MERGE and history_file.exists(): with open(history_file) as f: existing = json.load(f) # Merge and deduplicate by timestamp timestamps = {h.get("timestamp") for h in existing} for h in history: if h.get("timestamp") not in timestamps: existing.append(h) history = existing with open(history_file, 'w') as f: json.dump(history, f, indent=2) result.items_imported += len(history) except Exception as e: result.items_failed += len(history) result.errors.append(f"Failed to import history: {e}") def _import_stats(self, stats: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None: """Import statistics data.""" try: stats_dir = self._data_dir / "stats" stats_dir.mkdir(parents=True, exist_ok=True) for key, value in stats.items(): file_path = stats_dir / key file_path.parent.mkdir(parents=True, exist_ok=True) with open(file_path, 'w') as f: json.dump(value, f, indent=2) result.items_imported += 1 except Exception as e: result.errors.append(f"Failed to import stats: {e}") def _import_clipboard(self, clipboard: List[Dict], mode: ImportMode, result: ImportResult) -> None: """Import clipboard history.""" # This would integrate with the clipboard service result.warnings.append("Clipboard import not yet implemented") # Event Listeners def add_listener(self, callback: Callable[[str, Any], None]) -> None: """Add an event listener. Events: 'export_complete', 'export_failed', 'import_complete', 'import_failed'.""" self._listeners.append(callback) def remove_listener(self, callback: Callable) -> None: """Remove an event listener.""" if callback in self._listeners: self._listeners.remove(callback) def _notify_listeners(self, event: str, data: Any) -> None: """Notify event listeners.""" for listener in self._listeners: try: listener(event, data) except Exception as e: print(f"[{self.name}] Listener error: {e}") # Public API def get_export_profiles(self) -> Dict[str, ExportProfile]: """Get available export profiles.""" return self.PROFILES.copy() def create_custom_profile(self, name: str, **kwargs) -> ExportProfile: """Create a custom export profile.""" return ExportProfile(name=name, **kwargs) def validate_import_file(self, filepath: str) -> Dict[str, Any]: """ Validate an import file without importing. Returns: Validation result with file info and any errors """ result = { "valid": False, "format": None, "size_bytes": 0, "contents": [], "errors": [], } path = Path(filepath) if not path.exists(): result["errors"].append("File not found") return result result["size_bytes"] = path.stat().st_size result["format"] = path.suffix.lower() try: data = self._load_import_file(path) if data: result["valid"] = True result["contents"] = list(data.keys()) if "export_info" in data: result["export_info"] = data["export_info"] else: result["errors"].append("Failed to parse file") except Exception as e: result["errors"].append(str(e)) return result