891 lines
31 KiB
Python
891 lines
31 KiB
Python
"""
|
|
ImportExport Plugin - Data Export and Import Functionality
|
|
|
|
Provides comprehensive export/import capabilities for EU-Utility data,
|
|
settings, and configurations in multiple formats.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import csv
|
|
import zipfile
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List, Callable, Union, BinaryIO
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from core.base_plugin import BasePlugin
|
|
|
|
|
|
class ExportFormat(Enum):
|
|
"""Supported export formats."""
|
|
JSON = "json"
|
|
CSV = "csv"
|
|
XML = "xml"
|
|
YAML = "yaml"
|
|
ZIP = "zip"
|
|
|
|
|
|
class ImportMode(Enum):
|
|
"""Import behavior modes."""
|
|
MERGE = "merge" # Merge with existing data
|
|
REPLACE = "replace" # Replace existing data
|
|
SKIP = "skip" # Skip existing data
|
|
|
|
|
|
@dataclass
|
|
class ExportProfile:
|
|
"""Defines what data to export."""
|
|
name: str
|
|
include_settings: bool = True
|
|
include_plugins: bool = True
|
|
include_history: bool = False
|
|
include_stats: bool = False
|
|
include_clipboard: bool = False
|
|
include_custom_data: List[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.include_custom_data is None:
|
|
self.include_custom_data = []
|
|
|
|
|
|
@dataclass
|
|
class ExportResult:
|
|
"""Result of an export operation."""
|
|
success: bool
|
|
filepath: str
|
|
format: str
|
|
items_exported: int
|
|
errors: List[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.errors is None:
|
|
self.errors = []
|
|
|
|
|
|
@dataclass
|
|
class ImportResult:
|
|
"""Result of an import operation."""
|
|
success: bool
|
|
items_imported: int
|
|
items_skipped: int
|
|
items_failed: int
|
|
errors: List[str] = None
|
|
warnings: List[str] = None
|
|
|
|
def __post_init__(self):
|
|
if self.errors is None:
|
|
self.errors = []
|
|
if self.warnings is None:
|
|
self.warnings = []
|
|
|
|
|
|
class ImportExportPlugin(BasePlugin):
|
|
"""
|
|
Data export and import functionality.
|
|
|
|
Features:
|
|
- Export settings, plugins, history, and stats
|
|
- Multiple export formats (JSON, CSV, XML, YAML, ZIP)
|
|
- Import with merge/replace/skip modes
|
|
- Export profiles for common scenarios
|
|
- Data validation and sanitization
|
|
- Progress callbacks
|
|
"""
|
|
|
|
name = "import_export"
|
|
description = "Export and import data in multiple formats"
|
|
version = "1.0.0"
|
|
author = "EU-Utility"
|
|
|
|
DEFAULT_CONFIG = {
|
|
"export_dir": "data/exports",
|
|
"import_dir": "data/imports",
|
|
"temp_dir": "data/temp",
|
|
"max_export_size_mb": 100,
|
|
"default_format": "json",
|
|
"backup_before_import": True,
|
|
}
|
|
|
|
# Predefined export profiles
|
|
PROFILES = {
|
|
"full": ExportProfile(
|
|
name="full",
|
|
include_settings=True,
|
|
include_plugins=True,
|
|
include_history=True,
|
|
include_stats=True,
|
|
include_clipboard=True,
|
|
),
|
|
"settings_only": ExportProfile(
|
|
name="settings_only",
|
|
include_settings=True,
|
|
include_plugins=False,
|
|
include_history=False,
|
|
include_stats=False,
|
|
include_clipboard=False,
|
|
),
|
|
"plugins_only": ExportProfile(
|
|
name="plugins_only",
|
|
include_settings=False,
|
|
include_plugins=True,
|
|
include_history=False,
|
|
include_stats=False,
|
|
include_clipboard=False,
|
|
),
|
|
"minimal": ExportProfile(
|
|
name="minimal",
|
|
include_settings=True,
|
|
include_plugins=False,
|
|
include_history=False,
|
|
include_stats=False,
|
|
include_clipboard=False,
|
|
),
|
|
}
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._config = self.DEFAULT_CONFIG.copy()
|
|
self._listeners: List[Callable] = []
|
|
self._data_dir = Path("data")
|
|
|
|
def on_start(self) -> None:
|
|
"""Start the import/export service."""
|
|
print(f"[{self.name}] Starting import/export service...")
|
|
|
|
# Ensure directories exist
|
|
Path(self._config["export_dir"]).mkdir(parents=True, exist_ok=True)
|
|
Path(self._config["import_dir"]).mkdir(parents=True, exist_ok=True)
|
|
Path(self._config["temp_dir"]).mkdir(parents=True, exist_ok=True)
|
|
|
|
def on_stop(self) -> None:
|
|
"""Stop the import/export service."""
|
|
print(f"[{self.name}] Stopping import/export service...")
|
|
|
|
# Export Methods
|
|
|
|
def export_data(self,
|
|
profile: Union[str, ExportProfile] = "full",
|
|
format: ExportFormat = ExportFormat.JSON,
|
|
filepath: Optional[str] = None,
|
|
progress_callback: Optional[Callable[[int, int], None]] = None) -> ExportResult:
|
|
"""
|
|
Export data according to a profile.
|
|
|
|
Args:
|
|
profile: Export profile name or custom ExportProfile
|
|
format: Export format
|
|
filepath: Output file path (auto-generated if None)
|
|
progress_callback: Called with (current, total) during export
|
|
|
|
Returns:
|
|
ExportResult with operation details
|
|
"""
|
|
# Resolve profile
|
|
if isinstance(profile, str):
|
|
profile = self.PROFILES.get(profile, self.PROFILES["full"])
|
|
|
|
# Generate filepath if not provided
|
|
if filepath is None:
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"export_{profile.name}_{timestamp}.{format.value}"
|
|
filepath = str(Path(self._config["export_dir"]) / filename)
|
|
|
|
print(f"[{self.name}] Exporting with profile '{profile.name}' to {filepath}")
|
|
|
|
try:
|
|
# Collect data
|
|
data = self._collect_data(profile, progress_callback)
|
|
|
|
# Export based on format
|
|
if format == ExportFormat.JSON:
|
|
self._export_json(data, filepath)
|
|
elif format == ExportFormat.CSV:
|
|
self._export_csv(data, filepath, profile)
|
|
elif format == ExportFormat.XML:
|
|
self._export_xml(data, filepath)
|
|
elif format == ExportFormat.YAML:
|
|
self._export_yaml(data, filepath)
|
|
elif format == ExportFormat.ZIP:
|
|
self._export_zip(data, filepath, profile)
|
|
|
|
items_count = sum(len(v) if isinstance(v, list) else 1 for v in data.values())
|
|
|
|
result = ExportResult(
|
|
success=True,
|
|
filepath=filepath,
|
|
format=format.value,
|
|
items_exported=items_count,
|
|
)
|
|
|
|
print(f"[{self.name}] ✓ Exported {items_count} items")
|
|
self._notify_listeners("export_complete", result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
result = ExportResult(
|
|
success=False,
|
|
filepath=filepath,
|
|
format=format.value,
|
|
items_exported=0,
|
|
errors=[str(e)],
|
|
)
|
|
print(f"[{self.name}] Export failed: {e}")
|
|
self._notify_listeners("export_failed", result)
|
|
return result
|
|
|
|
def export_settings(self, filepath: Optional[str] = None) -> ExportResult:
|
|
"""Quick export of settings only."""
|
|
return self.export_data("settings_only", ExportFormat.JSON, filepath)
|
|
|
|
def export_plugins(self, filepath: Optional[str] = None) -> ExportResult:
|
|
"""Quick export of plugin configurations."""
|
|
return self.export_data("plugins_only", ExportFormat.JSON, filepath)
|
|
|
|
def create_backup(self, name: Optional[str] = None) -> ExportResult:
|
|
"""
|
|
Create a full backup.
|
|
|
|
Args:
|
|
name: Backup name (default: timestamp)
|
|
|
|
Returns:
|
|
ExportResult
|
|
"""
|
|
if name is None:
|
|
name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
filepath = str(Path(self._config["export_dir"]) / f"{name}.zip")
|
|
return self.export_data("full", ExportFormat.ZIP, filepath)
|
|
|
|
# Import Methods
|
|
|
|
def import_data(self,
|
|
filepath: str,
|
|
mode: ImportMode = ImportMode.MERGE,
|
|
progress_callback: Optional[Callable[[int, int], None]] = None) -> ImportResult:
|
|
"""
|
|
Import data from a file.
|
|
|
|
Args:
|
|
filepath: Path to import file
|
|
mode: Import behavior mode
|
|
progress_callback: Called with (current, total) during import
|
|
|
|
Returns:
|
|
ImportResult with operation details
|
|
"""
|
|
filepath = Path(filepath)
|
|
|
|
if not filepath.exists():
|
|
return ImportResult(
|
|
success=False,
|
|
items_imported=0,
|
|
items_skipped=0,
|
|
items_failed=0,
|
|
errors=[f"File not found: {filepath}"],
|
|
)
|
|
|
|
print(f"[{self.name}] Importing from {filepath} (mode: {mode.value})")
|
|
|
|
# Create backup before import if configured
|
|
if self._config["backup_before_import"]:
|
|
backup_result = self.create_backup("pre_import_backup")
|
|
if not backup_result.success:
|
|
print(f"[{self.name}] Warning: Failed to create pre-import backup")
|
|
|
|
try:
|
|
# Detect format and load data
|
|
data = self._load_import_file(filepath)
|
|
|
|
if data is None:
|
|
return ImportResult(
|
|
success=False,
|
|
items_imported=0,
|
|
items_skipped=0,
|
|
items_failed=0,
|
|
errors=["Failed to parse import file"],
|
|
)
|
|
|
|
# Import data sections
|
|
result = ImportResult(success=True, items_imported=0, items_skipped=0, items_failed=0)
|
|
|
|
# Import settings
|
|
if "settings" in data:
|
|
self._import_settings(data["settings"], mode, result)
|
|
|
|
# Import plugin configs
|
|
if "plugins" in data:
|
|
self._import_plugins(data["plugins"], mode, result)
|
|
|
|
# Import history
|
|
if "history" in data:
|
|
self._import_history(data["history"], mode, result)
|
|
|
|
# Import stats
|
|
if "stats" in data:
|
|
self._import_stats(data["stats"], mode, result)
|
|
|
|
# Import clipboard history
|
|
if "clipboard" in data:
|
|
self._import_clipboard(data["clipboard"], mode, result)
|
|
|
|
print(f"[{self.name}] ✓ Import complete: {result.items_imported} imported, "
|
|
f"{result.items_skipped} skipped, {result.items_failed} failed")
|
|
|
|
self._notify_listeners("import_complete", result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
result = ImportResult(
|
|
success=False,
|
|
items_imported=0,
|
|
items_skipped=0,
|
|
items_failed=0,
|
|
errors=[str(e)],
|
|
)
|
|
print(f"[{self.name}] Import failed: {e}")
|
|
self._notify_listeners("import_failed", result)
|
|
return result
|
|
|
|
def restore_backup(self, backup_path: str, mode: ImportMode = ImportMode.REPLACE) -> ImportResult:
|
|
"""
|
|
Restore from a backup file.
|
|
|
|
Args:
|
|
backup_path: Path to backup file
|
|
mode: Import mode (default: REPLACE for full restore)
|
|
|
|
Returns:
|
|
ImportResult
|
|
"""
|
|
return self.import_data(backup_path, mode)
|
|
|
|
def list_backups(self) -> List[Dict[str, Any]]:
|
|
"""List available backup files."""
|
|
export_dir = Path(self._config["export_dir"])
|
|
backups = []
|
|
|
|
for file_path in export_dir.glob("backup_*.zip"):
|
|
stat = file_path.stat()
|
|
backups.append({
|
|
"name": file_path.stem,
|
|
"path": str(file_path),
|
|
"size_bytes": stat.st_size,
|
|
"created": datetime.fromtimestamp(stat.st_mtime).isoformat(),
|
|
})
|
|
|
|
backups.sort(key=lambda x: x["created"], reverse=True)
|
|
return backups
|
|
|
|
# Data Collection
|
|
|
|
def _collect_data(self, profile: ExportProfile, progress_callback: Optional[Callable] = None) -> Dict[str, Any]:
|
|
"""Collect data based on export profile."""
|
|
data = {
|
|
"export_info": {
|
|
"version": "1.0",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"profile": profile.name,
|
|
}
|
|
}
|
|
|
|
items_to_collect = []
|
|
if profile.include_settings:
|
|
items_to_collect.append("settings")
|
|
if profile.include_plugins:
|
|
items_to_collect.append("plugins")
|
|
if profile.include_history:
|
|
items_to_collect.append("history")
|
|
if profile.include_stats:
|
|
items_to_collect.append("stats")
|
|
if profile.include_clipboard:
|
|
items_to_collect.append("clipboard")
|
|
|
|
total = len(items_to_collect)
|
|
for i, item in enumerate(items_to_collect):
|
|
if progress_callback:
|
|
progress_callback(i, total)
|
|
|
|
if item == "settings":
|
|
data["settings"] = self._collect_settings()
|
|
elif item == "plugins":
|
|
data["plugins"] = self._collect_plugin_configs()
|
|
elif item == "history":
|
|
data["history"] = self._collect_history()
|
|
elif item == "stats":
|
|
data["stats"] = self._collect_stats()
|
|
elif item == "clipboard":
|
|
data["clipboard"] = self._collect_clipboard_history()
|
|
|
|
# Collect custom data paths
|
|
for custom_path in profile.include_custom_data:
|
|
path = Path(custom_path)
|
|
if path.exists():
|
|
key = f"custom_{path.name}"
|
|
if path.is_file():
|
|
with open(path) as f:
|
|
data[key] = json.load(f)
|
|
elif path.is_dir():
|
|
data[key] = {}
|
|
for file in path.rglob("*"):
|
|
if file.is_file():
|
|
rel_path = str(file.relative_to(path))
|
|
try:
|
|
with open(file) as f:
|
|
data[key][rel_path] = json.load(f)
|
|
except:
|
|
data[key][rel_path] = None
|
|
|
|
if progress_callback:
|
|
progress_callback(total, total)
|
|
|
|
return data
|
|
|
|
def _collect_settings(self) -> Dict[str, Any]:
|
|
"""Collect application settings."""
|
|
settings = {}
|
|
|
|
# Collect all JSON config files
|
|
for config_file in self._data_dir.rglob("*.json"):
|
|
if "temp" in str(config_file):
|
|
continue
|
|
try:
|
|
with open(config_file) as f:
|
|
key = str(config_file.relative_to(self._data_dir))
|
|
settings[key] = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
return settings
|
|
|
|
def _collect_plugin_configs(self) -> Dict[str, Any]:
|
|
"""Collect plugin configurations."""
|
|
plugins = {}
|
|
|
|
# Look for plugin config files
|
|
plugin_config_dir = self._data_dir / "plugin_configs"
|
|
if plugin_config_dir.exists():
|
|
for config_file in plugin_config_dir.rglob("*.json"):
|
|
try:
|
|
with open(config_file) as f:
|
|
plugin_name = config_file.stem
|
|
plugins[plugin_name] = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
return plugins
|
|
|
|
def _collect_history(self) -> List[Dict[str, Any]]:
|
|
"""Collect application history."""
|
|
history_file = self._data_dir / "history.json"
|
|
if history_file.exists():
|
|
try:
|
|
with open(history_file) as f:
|
|
return json.load(f)
|
|
except:
|
|
pass
|
|
return []
|
|
|
|
def _collect_stats(self) -> Dict[str, Any]:
|
|
"""Collect statistics data."""
|
|
stats_dir = self._data_dir / "stats"
|
|
stats = {}
|
|
|
|
if stats_dir.exists():
|
|
for stats_file in stats_dir.rglob("*.json"):
|
|
try:
|
|
with open(stats_file) as f:
|
|
key = str(stats_file.relative_to(stats_dir))
|
|
stats[key] = json.load(f)
|
|
except:
|
|
pass
|
|
|
|
return stats
|
|
|
|
def _collect_clipboard_history(self) -> List[Dict[str, Any]]:
|
|
"""Collect clipboard history."""
|
|
# This would integrate with the clipboard service
|
|
# For now, return empty list
|
|
return []
|
|
|
|
# Export Formatters
|
|
|
|
def _export_json(self, data: Dict[str, Any], filepath: str) -> None:
|
|
"""Export as JSON."""
|
|
with open(filepath, 'w') as f:
|
|
json.dump(data, f, indent=2, default=str)
|
|
|
|
def _export_csv(self, data: Dict[str, Any], filepath: str, profile: ExportProfile) -> None:
|
|
"""Export as CSV (flattened for tabular data)."""
|
|
# For CSV, we export a flattened representation
|
|
# This is a simplified implementation
|
|
rows = []
|
|
|
|
if "settings" in data:
|
|
for key, value in data["settings"].items():
|
|
rows.append({
|
|
"category": "settings",
|
|
"key": key,
|
|
"value": json.dumps(value),
|
|
})
|
|
|
|
if "plugins" in data:
|
|
for plugin, config in data["plugins"].items():
|
|
rows.append({
|
|
"category": "plugins",
|
|
"key": plugin,
|
|
"value": json.dumps(config),
|
|
})
|
|
|
|
if rows:
|
|
with open(filepath, 'w', newline='') as f:
|
|
writer = csv.DictWriter(f, fieldnames=["category", "key", "value"])
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
def _export_xml(self, data: Dict[str, Any], filepath: str) -> None:
|
|
"""Export as XML."""
|
|
root = ET.Element("export")
|
|
root.set("version", "1.0")
|
|
root.set("timestamp", datetime.now().isoformat())
|
|
|
|
def add_dict_to_element(parent: ET.Element, d: Dict[str, Any]):
|
|
for key, value in d.items():
|
|
child = ET.SubElement(parent, str(key).replace(" ", "_"))
|
|
if isinstance(value, dict):
|
|
add_dict_to_element(child, value)
|
|
elif isinstance(value, list):
|
|
for item in value:
|
|
item_elem = ET.SubElement(child, "item")
|
|
if isinstance(item, dict):
|
|
add_dict_to_element(item_elem, item)
|
|
else:
|
|
item_elem.text = str(item)
|
|
else:
|
|
child.text = str(value)
|
|
|
|
add_dict_to_element(root, data)
|
|
|
|
tree = ET.ElementTree(root)
|
|
tree.write(filepath, encoding='utf-8', xml_declaration=True)
|
|
|
|
def _export_yaml(self, data: Dict[str, Any], filepath: str) -> None:
|
|
"""Export as YAML."""
|
|
try:
|
|
import yaml
|
|
with open(filepath, 'w') as f:
|
|
yaml.dump(data, f, default_flow_style=False)
|
|
except ImportError:
|
|
# Fallback to JSON if PyYAML not available
|
|
filepath = filepath.replace('.yaml', '.json')
|
|
self._export_json(data, filepath)
|
|
|
|
def _export_zip(self, data: Dict[str, Any], filepath: str, profile: ExportProfile) -> None:
|
|
"""Export as ZIP archive with multiple files."""
|
|
temp_dir = Path(self._config["temp_dir"]) / f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
# Export each section as separate JSON file
|
|
if "settings" in data:
|
|
with open(temp_dir / "settings.json", 'w') as f:
|
|
json.dump(data["settings"], f, indent=2)
|
|
|
|
if "plugins" in data:
|
|
with open(temp_dir / "plugins.json", 'w') as f:
|
|
json.dump(data["plugins"], f, indent=2)
|
|
|
|
if "history" in data:
|
|
with open(temp_dir / "history.json", 'w') as f:
|
|
json.dump(data["history"], f, indent=2)
|
|
|
|
if "stats" in data:
|
|
with open(temp_dir / "stats.json", 'w') as f:
|
|
json.dump(data["stats"], f, indent=2)
|
|
|
|
if "clipboard" in data:
|
|
with open(temp_dir / "clipboard.json", 'w') as f:
|
|
json.dump(data["clipboard"], f, indent=2)
|
|
|
|
# Create manifest
|
|
manifest = {
|
|
"version": "1.0",
|
|
"timestamp": datetime.now().isoformat(),
|
|
"profile": profile.name,
|
|
"contents": list(data.keys()),
|
|
}
|
|
with open(temp_dir / "manifest.json", 'w') as f:
|
|
json.dump(manifest, f, indent=2)
|
|
|
|
# Create ZIP
|
|
with zipfile.ZipFile(filepath, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
for file_path in temp_dir.rglob("*"):
|
|
if file_path.is_file():
|
|
zf.write(file_path, file_path.relative_to(temp_dir))
|
|
|
|
finally:
|
|
# Cleanup temp directory
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
# Import Helpers
|
|
|
|
def _load_import_file(self, filepath: Path) -> Optional[Dict[str, Any]]:
|
|
"""Load and parse an import file."""
|
|
suffix = filepath.suffix.lower()
|
|
|
|
if suffix == '.zip':
|
|
return self._load_zip_import(filepath)
|
|
elif suffix == '.json':
|
|
with open(filepath) as f:
|
|
return json.load(f)
|
|
elif suffix in ['.yaml', '.yml']:
|
|
try:
|
|
import yaml
|
|
with open(filepath) as f:
|
|
return yaml.safe_load(f)
|
|
except ImportError:
|
|
raise ValueError("PyYAML required for YAML import")
|
|
elif suffix == '.xml':
|
|
return self._load_xml_import(filepath)
|
|
elif suffix == '.csv':
|
|
return self._load_csv_import(filepath)
|
|
|
|
return None
|
|
|
|
def _load_zip_import(self, filepath: Path) -> Optional[Dict[str, Any]]:
|
|
"""Load data from ZIP archive."""
|
|
data = {}
|
|
|
|
with zipfile.ZipFile(filepath, 'r') as zf:
|
|
# Read manifest if present
|
|
if "manifest.json" in zf.namelist():
|
|
with zf.open("manifest.json") as f:
|
|
manifest = json.load(f)
|
|
data["export_info"] = manifest
|
|
|
|
# Read each section
|
|
for name in zf.namelist():
|
|
if name.endswith('.json') and name != "manifest.json":
|
|
section = name.replace('.json', '')
|
|
with zf.open(name) as f:
|
|
data[section] = json.load(f)
|
|
|
|
return data
|
|
|
|
def _load_xml_import(self, filepath: Path) -> Dict[str, Any]:
|
|
"""Load data from XML file."""
|
|
tree = ET.parse(filepath)
|
|
root = tree.getroot()
|
|
|
|
def element_to_dict(element: ET.Element) -> Any:
|
|
children = list(element)
|
|
if not children:
|
|
return element.text
|
|
|
|
result = {}
|
|
for child in children:
|
|
if child.tag == "item":
|
|
if "items" not in result:
|
|
result["items"] = []
|
|
result["items"].append(element_to_dict(child))
|
|
else:
|
|
result[child.tag] = element_to_dict(child)
|
|
return result
|
|
|
|
return element_to_dict(root)
|
|
|
|
def _load_csv_import(self, filepath: Path) -> Dict[str, Any]:
|
|
"""Load data from CSV file."""
|
|
data = {"settings": {}, "plugins": {}}
|
|
|
|
with open(filepath, newline='') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
category = row.get("category", "settings")
|
|
key = row["key"]
|
|
value = json.loads(row["value"])
|
|
|
|
if category == "settings":
|
|
data["settings"][key] = value
|
|
elif category == "plugins":
|
|
data["plugins"][key] = value
|
|
|
|
return data
|
|
|
|
def _import_settings(self, settings: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None:
|
|
"""Import settings data."""
|
|
for key, value in settings.items():
|
|
try:
|
|
# Determine file path
|
|
file_path = self._data_dir / key
|
|
|
|
# Check if exists
|
|
if file_path.exists() and mode == ImportMode.SKIP:
|
|
result.items_skipped += 1
|
|
continue
|
|
|
|
# Merge or replace
|
|
if mode == ImportMode.MERGE and file_path.exists():
|
|
with open(file_path) as f:
|
|
existing = json.load(f)
|
|
if isinstance(existing, dict) and isinstance(value, dict):
|
|
existing.update(value)
|
|
value = existing
|
|
|
|
# Write file
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(file_path, 'w') as f:
|
|
json.dump(value, f, indent=2)
|
|
|
|
result.items_imported += 1
|
|
|
|
except Exception as e:
|
|
result.items_failed += 1
|
|
result.errors.append(f"Failed to import setting '{key}': {e}")
|
|
|
|
def _import_plugins(self, plugins: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None:
|
|
"""Import plugin configurations."""
|
|
plugin_config_dir = self._data_dir / "plugin_configs"
|
|
plugin_config_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for plugin_name, config in plugins.items():
|
|
try:
|
|
file_path = plugin_config_dir / f"{plugin_name}.json"
|
|
|
|
if file_path.exists() and mode == ImportMode.SKIP:
|
|
result.items_skipped += 1
|
|
continue
|
|
|
|
if mode == ImportMode.MERGE and file_path.exists():
|
|
with open(file_path) as f:
|
|
existing = json.load(f)
|
|
if isinstance(existing, dict) and isinstance(config, dict):
|
|
existing.update(config)
|
|
config = existing
|
|
|
|
with open(file_path, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
|
|
result.items_imported += 1
|
|
|
|
except Exception as e:
|
|
result.items_failed += 1
|
|
result.errors.append(f"Failed to import plugin '{plugin_name}': {e}")
|
|
|
|
def _import_history(self, history: List[Dict], mode: ImportMode, result: ImportResult) -> None:
|
|
"""Import history data."""
|
|
try:
|
|
history_file = self._data_dir / "history.json"
|
|
|
|
if mode == ImportMode.MERGE and history_file.exists():
|
|
with open(history_file) as f:
|
|
existing = json.load(f)
|
|
# Merge and deduplicate by timestamp
|
|
timestamps = {h.get("timestamp") for h in existing}
|
|
for h in history:
|
|
if h.get("timestamp") not in timestamps:
|
|
existing.append(h)
|
|
history = existing
|
|
|
|
with open(history_file, 'w') as f:
|
|
json.dump(history, f, indent=2)
|
|
|
|
result.items_imported += len(history)
|
|
|
|
except Exception as e:
|
|
result.items_failed += len(history)
|
|
result.errors.append(f"Failed to import history: {e}")
|
|
|
|
def _import_stats(self, stats: Dict[str, Any], mode: ImportMode, result: ImportResult) -> None:
|
|
"""Import statistics data."""
|
|
try:
|
|
stats_dir = self._data_dir / "stats"
|
|
stats_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for key, value in stats.items():
|
|
file_path = stats_dir / key
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(file_path, 'w') as f:
|
|
json.dump(value, f, indent=2)
|
|
|
|
result.items_imported += 1
|
|
|
|
except Exception as e:
|
|
result.errors.append(f"Failed to import stats: {e}")
|
|
|
|
def _import_clipboard(self, clipboard: List[Dict], mode: ImportMode, result: ImportResult) -> None:
|
|
"""Import clipboard history."""
|
|
# This would integrate with the clipboard service
|
|
result.warnings.append("Clipboard import not yet implemented")
|
|
|
|
# Event Listeners
|
|
|
|
def add_listener(self, callback: Callable[[str, Any], None]) -> None:
|
|
"""Add an event listener. Events: 'export_complete', 'export_failed', 'import_complete', 'import_failed'."""
|
|
self._listeners.append(callback)
|
|
|
|
def remove_listener(self, callback: Callable) -> None:
|
|
"""Remove an event listener."""
|
|
if callback in self._listeners:
|
|
self._listeners.remove(callback)
|
|
|
|
def _notify_listeners(self, event: str, data: Any) -> None:
|
|
"""Notify event listeners."""
|
|
for listener in self._listeners:
|
|
try:
|
|
listener(event, data)
|
|
except Exception as e:
|
|
print(f"[{self.name}] Listener error: {e}")
|
|
|
|
# Public API
|
|
|
|
def get_export_profiles(self) -> Dict[str, ExportProfile]:
|
|
"""Get available export profiles."""
|
|
return self.PROFILES.copy()
|
|
|
|
def create_custom_profile(self, name: str, **kwargs) -> ExportProfile:
|
|
"""Create a custom export profile."""
|
|
return ExportProfile(name=name, **kwargs)
|
|
|
|
def validate_import_file(self, filepath: str) -> Dict[str, Any]:
|
|
"""
|
|
Validate an import file without importing.
|
|
|
|
Returns:
|
|
Validation result with file info and any errors
|
|
"""
|
|
result = {
|
|
"valid": False,
|
|
"format": None,
|
|
"size_bytes": 0,
|
|
"contents": [],
|
|
"errors": [],
|
|
}
|
|
|
|
path = Path(filepath)
|
|
if not path.exists():
|
|
result["errors"].append("File not found")
|
|
return result
|
|
|
|
result["size_bytes"] = path.stat().st_size
|
|
result["format"] = path.suffix.lower()
|
|
|
|
try:
|
|
data = self._load_import_file(path)
|
|
if data:
|
|
result["valid"] = True
|
|
result["contents"] = list(data.keys())
|
|
if "export_info" in data:
|
|
result["export_info"] = data["export_info"]
|
|
else:
|
|
result["errors"].append("Failed to parse file")
|
|
except Exception as e:
|
|
result["errors"].append(str(e))
|
|
|
|
return result
|