""" EU-Utility - Clipboard Manager Cross-platform clipboard access with history. Part of core - plugins access via PluginAPI. """ import json import threading from pathlib import Path from typing import List, Optional from collections import deque from dataclasses import dataclass, asdict from datetime import datetime @dataclass class ClipboardEntry: """A single clipboard entry.""" text: str timestamp: str source: str = "unknown" class ClipboardSecurityError(Exception): """Raised when clipboard security policy is violated.""" pass class ClipboardManager: """ Core clipboard service with history tracking. Uses pyperclip for cross-platform compatibility. """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self, max_history: int = 100, history_file: Path = None): if self._initialized: return self._initialized = True self._max_history = max_history self._history: deque = deque(maxlen=max_history) self._history_file = history_file or Path("data/clipboard_history.json") # Security limits self._max_text_length = 10000 # 10KB per entry self._max_total_storage = 1024 * 1024 # 1MB total self._monitoring = False self._last_clipboard = "" self._load_history() def _load_history(self): """Load clipboard history from file.""" if self._history_file.exists(): try: with open(self._history_file, 'r') as f: data = json.load(f) for entry in data: self._history.append(ClipboardEntry(**entry)) except Exception as e: print(f"[Clipboard] Error loading history: {e}") def _save_history(self): """Save clipboard history to file with secure permissions.""" try: self._history_file.parent.mkdir(parents=True, exist_ok=True) # Limit data before saving data = [] total_size = 0 for entry in reversed(self._history): # Newest first entry_dict = { 'text': entry.text[:self._max_text_length], 'timestamp': entry.timestamp, 'source': entry.source[:50] } entry_size = len(str(entry_dict)) if total_size + entry_size > self._max_total_storage: break data.append(entry_dict) total_size += entry_size # Write with restricted permissions (owner only) import os temp_path = self._history_file.with_suffix('.tmp') with open(temp_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) # Set secure permissions (owner read/write only) os.chmod(temp_path, 0o600) temp_path.replace(self._history_file) except Exception as e: print(f"[Clipboard] Error saving history: {e}") def _validate_text(self, text: str) -> tuple[bool, str]: """Validate text before adding to clipboard/history. Returns: Tuple of (is_valid, error_message) """ if not isinstance(text, str): return False, "Text must be a string" # Check for null bytes if '\x00' in text: return False, "Text contains null bytes" # Check length limit if len(text) > self._max_text_length: return False, f"Text exceeds maximum length of {self._max_text_length} characters" # Check total storage limit current_size = sum(len(entry.text) for entry in self._history) if current_size + len(text) > self._max_total_storage: # Remove oldest entries to make room while self._history and current_size + len(text) > self._max_total_storage: removed = self._history.popleft() current_size -= len(removed.text) return True, "" def _sanitize_text(self, text: str) -> str: """Sanitize text for safe storage. Args: text: Text to sanitize Returns: Sanitized text """ # Remove control characters except newlines and tabs sanitized = ''.join( char for char in text if char == '\n' or char == '\t' or ord(char) >= 32 ) return sanitized def copy(self, text: str, source: str = "plugin") -> bool: """Copy text to clipboard. Args: text: Text to copy source: Source identifier for history Returns: True if successful """ try: # Validate input is_valid, error_msg = self._validate_text(text) if not is_valid: print(f"[Clipboard] Security validation failed: {error_msg}") return False # Sanitize text text = self._sanitize_text(text) # Validate source source = self._sanitize_text(str(source))[:50] # Limit source length import pyperclip pyperclip.copy(text) # Add to history entry = ClipboardEntry( text=text, timestamp=datetime.now().isoformat(), source=source ) self._history.append(entry) self._save_history() return True except Exception as e: print(f"[Clipboard] Copy error: {e}") return False def paste(self) -> str: """Paste text from clipboard. Returns: Clipboard content or empty string (sanitized) """ try: import pyperclip text = pyperclip.paste() or "" # Sanitize pasted content text = self._sanitize_text(text) # Enforce max length on paste if len(text) > self._max_text_length: text = text[:self._max_text_length] return text except Exception as e: print(f"[Clipboard] Paste error: {e}") return "" def get_history(self, limit: int = None) -> List[ClipboardEntry]: """Get clipboard history. Args: limit: Maximum entries to return (None for all) Returns: List of clipboard entries (newest first) """ history = list(self._history) if limit: history = history[-limit:] return list(reversed(history)) def clear_history(self): """Clear clipboard history.""" self._history.clear() self._save_history() def is_available(self) -> bool: """Check if clipboard is available.""" try: import pyperclip pyperclip.paste() return True except: return False def get_clipboard_manager() -> ClipboardManager: """Get global ClipboardManager instance.""" return ClipboardManager() # Convenience functions def copy_to_clipboard(text: str) -> bool: """Quick copy to clipboard.""" return get_clipboard_manager().copy(text) def paste_from_clipboard() -> str: """Quick paste from clipboard.""" return get_clipboard_manager().paste()