EU-Utility/core/clipboard.py

258 lines
7.8 KiB
Python

"""
EU-Utility - Clipboard Manager
Cross-platform clipboard access with history.
Part of core - plugins access via PluginAPI.
"""
import json
import threading
from pathlib import Path
from typing import List, Optional
from collections import deque
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class ClipboardEntry:
"""A single clipboard entry."""
text: str
timestamp: str
source: str = "unknown"
class ClipboardSecurityError(Exception):
"""Raised when clipboard security policy is violated."""
pass
class ClipboardManager:
"""
Core clipboard service with history tracking.
Uses pyperclip for cross-platform compatibility.
"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, max_history: int = 100, history_file: Path = None):
if self._initialized:
return
self._initialized = True
self._max_history = max_history
self._history: deque = deque(maxlen=max_history)
self._history_file = history_file or Path("data/clipboard_history.json")
# Security limits
self._max_text_length = 10000 # 10KB per entry
self._max_total_storage = 1024 * 1024 # 1MB total
self._monitoring = False
self._last_clipboard = ""
self._load_history()
def _load_history(self):
"""Load clipboard history from file."""
if self._history_file.exists():
try:
with open(self._history_file, 'r') as f:
data = json.load(f)
for entry in data:
self._history.append(ClipboardEntry(**entry))
except Exception as e:
print(f"[Clipboard] Error loading history: {e}")
def _save_history(self):
"""Save clipboard history to file with secure permissions."""
try:
self._history_file.parent.mkdir(parents=True, exist_ok=True)
# Limit data before saving
data = []
total_size = 0
for entry in reversed(self._history): # Newest first
entry_dict = {
'text': entry.text[:self._max_text_length],
'timestamp': entry.timestamp,
'source': entry.source[:50]
}
entry_size = len(str(entry_dict))
if total_size + entry_size > self._max_total_storage:
break
data.append(entry_dict)
total_size += entry_size
# Write with restricted permissions (owner only)
import os
temp_path = self._history_file.with_suffix('.tmp')
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
# Set secure permissions (owner read/write only)
os.chmod(temp_path, 0o600)
temp_path.replace(self._history_file)
except Exception as e:
print(f"[Clipboard] Error saving history: {e}")
def _validate_text(self, text: str) -> tuple[bool, str]:
"""Validate text before adding to clipboard/history.
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(text, str):
return False, "Text must be a string"
# Check for null bytes
if '\x00' in text:
return False, "Text contains null bytes"
# Check length limit
if len(text) > self._max_text_length:
return False, f"Text exceeds maximum length of {self._max_text_length} characters"
# Check total storage limit
current_size = sum(len(entry.text) for entry in self._history)
if current_size + len(text) > self._max_total_storage:
# Remove oldest entries to make room
while self._history and current_size + len(text) > self._max_total_storage:
removed = self._history.popleft()
current_size -= len(removed.text)
return True, ""
def _sanitize_text(self, text: str) -> str:
"""Sanitize text for safe storage.
Args:
text: Text to sanitize
Returns:
Sanitized text
"""
# Remove control characters except newlines and tabs
sanitized = ''.join(
char for char in text
if char == '\n' or char == '\t' or ord(char) >= 32
)
return sanitized
def copy(self, text: str, source: str = "plugin") -> bool:
"""Copy text to clipboard.
Args:
text: Text to copy
source: Source identifier for history
Returns:
True if successful
"""
try:
# Validate input
is_valid, error_msg = self._validate_text(text)
if not is_valid:
print(f"[Clipboard] Security validation failed: {error_msg}")
return False
# Sanitize text
text = self._sanitize_text(text)
# Validate source
source = self._sanitize_text(str(source))[:50] # Limit source length
import pyperclip
pyperclip.copy(text)
# Add to history
entry = ClipboardEntry(
text=text,
timestamp=datetime.now().isoformat(),
source=source
)
self._history.append(entry)
self._save_history()
return True
except Exception as e:
print(f"[Clipboard] Copy error: {e}")
return False
def paste(self) -> str:
"""Paste text from clipboard.
Returns:
Clipboard content or empty string (sanitized)
"""
try:
import pyperclip
text = pyperclip.paste() or ""
# Sanitize pasted content
text = self._sanitize_text(text)
# Enforce max length on paste
if len(text) > self._max_text_length:
text = text[:self._max_text_length]
return text
except Exception as e:
print(f"[Clipboard] Paste error: {e}")
return ""
def get_history(self, limit: int = None) -> List[ClipboardEntry]:
"""Get clipboard history.
Args:
limit: Maximum entries to return (None for all)
Returns:
List of clipboard entries (newest first)
"""
history = list(self._history)
if limit:
history = history[-limit:]
return list(reversed(history))
def clear_history(self):
"""Clear clipboard history."""
self._history.clear()
self._save_history()
def is_available(self) -> bool:
"""Check if clipboard is available."""
try:
import pyperclip
pyperclip.paste()
return True
except:
return False
def get_clipboard_manager() -> ClipboardManager:
"""Get global ClipboardManager instance."""
return ClipboardManager()
# Convenience functions
def copy_to_clipboard(text: str) -> bool:
"""Quick copy to clipboard."""
return get_clipboard_manager().copy(text)
def paste_from_clipboard() -> str:
"""Quick paste from clipboard."""
return get_clipboard_manager().paste()