# Description: Database initialization and connection management # Implements SQLite setup with Data Principle support # Standards: Python 3.11+, type hints, async where possible import sqlite3 import os from pathlib import Path from decimal import Decimal from datetime import datetime from typing import Optional, Any, List import json import logging # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class DatabaseManager: """ Manages SQLite database connections and schema initialization. Implements the Data Principle by providing robust data persistence with foreign key support, transactions, and connection pooling. """ def __init__(self, db_path: Optional[str] = None): """ Initialize database manager. Args: db_path: Path to SQLite database. Defaults to ./data/lemontropia.db """ if db_path is None: # Get project root (parent of core/) core_dir = Path(__file__).parent project_root = core_dir.parent db_path = project_root / "data" / "lemontropia.db" self.db_path = Path(db_path) self._connection: Optional[sqlite3.Connection] = None # Ensure data directory exists self.db_path.parent.mkdir(parents=True, exist_ok=True) logger.info(f"DatabaseManager initialized: {self.db_path}") def initialize(self) -> bool: """ Initialize database with schema. Returns: True if successful, False otherwise """ try: conn = self.get_connection() # Read and execute schema schema_path = Path(__file__).parent / "schema.sql" with open(schema_path, 'r') as f: schema = f.read() # Execute schema (multiple statements) conn.executescript(schema) conn.commit() # Run migrations for existing databases self._run_migrations() logger.info("Database schema initialized successfully") return True except Exception as e: logger.error(f"Failed to initialize database: {e}") return False def _run_migrations(self): """Run migrations to update existing databases.""" try: conn = self.get_connection() # Migration: Add description column to projects table try: conn.execute("ALTER TABLE projects ADD COLUMN description TEXT") conn.commit() logger.info("Migration: Added description column to projects table") except sqlite3.OperationalError: # Column already exists pass # Migration: Add status column to sessions table try: conn.execute("ALTER TABLE sessions ADD COLUMN status TEXT DEFAULT 'running'") conn.commit() logger.info("Migration: Added status column to sessions table") except sqlite3.OperationalError: # Column already exists pass except Exception as e: logger.error(f"Migration failed: {e}") def get_connection(self) -> sqlite3.Connection: """ Get or create database connection. Returns: SQLite connection with row factory and type detection """ if self._connection is None: self._connection = sqlite3.connect( self.db_path, detect_types=0 # Disable auto type detection to avoid TIMESTAMP parsing issues ) self._connection.row_factory = sqlite3.Row # Enable foreign keys self._connection.execute("PRAGMA foreign_keys = ON") # Performance optimizations (Rule #3: 60+ FPS) self._connection.execute("PRAGMA journal_mode = WAL") self._connection.execute("PRAGMA synchronous = NORMAL") self._connection.execute("PRAGMA cache_size = -64000") # 64MB cache logger.debug("Database connection established") return self._connection def close(self) -> None: """Close database connection.""" if self._connection: self._connection.close() self._connection = None logger.debug("Database connection closed") def execute(self, query: str, parameters: tuple = ()) -> sqlite3.Cursor: """ Execute a query with parameters. Args: query: SQL query string parameters: Query parameters (prevents SQL injection) Returns: Cursor object """ conn = self.get_connection() return conn.execute(query, parameters) def executemany(self, query: str, parameters: list) -> sqlite3.Cursor: """ Execute query with multiple parameter sets. Args: query: SQL query string parameters: List of parameter tuples Returns: Cursor object """ conn = self.get_connection() return conn.executemany(query, parameters) def fetchall(self, query: str, parameters: tuple = ()) -> List[sqlite3.Row]: """ Execute query and fetch all results. Args: query: SQL query string parameters: Query parameters Returns: List of row objects """ cursor = self.execute(query, parameters) return cursor.fetchall() def fetchone(self, query: str, parameters: tuple = ()) -> Optional[sqlite3.Row]: """ Execute query and fetch first result. Args: query: SQL query string parameters: Query parameters Returns: Single row or None """ cursor = self.execute(query, parameters) return cursor.fetchone() def commit(self) -> None: """Commit current transaction.""" if self._connection: self._connection.commit() def rollback(self) -> None: """Rollback current transaction.""" if self._connection: self._connection.rollback() def get_schema_version(self) -> int: """ Get current schema version. Returns: Schema version number (0 if not initialized) """ try: cursor = self.execute( "SELECT MAX(version) FROM schema_version" ) result = cursor.fetchone() return result[0] if result and result[0] else 0 except sqlite3.OperationalError: # Table doesn't exist return 0 def backup(self, backup_path: Optional[str] = None) -> bool: """ Create database backup. Args: backup_path: Path for backup file. Defaults to timestamped backup Returns: True if successful """ try: if backup_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir = self.db_path.parent / "backups" backup_dir.mkdir(exist_ok=True) backup_path = backup_dir / f"lemontropia_backup_{timestamp}.db" # Use SQLite backup API source = self.get_connection() dest = sqlite3.connect(backup_path) with dest: source.backup(dest) dest.close() logger.info(f"Database backed up to: {backup_path}") return True except Exception as e: logger.error(f"Backup failed: {e}") return False def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with automatic commit/rollback.""" if exc_type is None: self.commit() else: self.rollback() self.close() # ============================================================================ # DECIMAL HANDLING (Rule #4: Precision) # ============================================================================ def adapt_decimal(d: Decimal) -> str: """Convert Decimal to string for SQLite storage.""" return str(d) def convert_decimal(s: bytes) -> Decimal: """Convert SQLite string back to Decimal.""" return Decimal(s.decode('utf-8')) # Register Decimal adapter/converter sqlite3.register_adapter(Decimal, adapt_decimal) sqlite3.register_converter("DECIMAL", convert_decimal) # ============================================================================ # MODULE EXPORTS # ============================================================================ __all__ = ['DatabaseManager', 'adapt_decimal', 'convert_decimal']