# Description: Database initialization and connection management # Implements SQLite setup with Data Principle support # Standards: Python 3.11+, type hints, async where possible import sqlite3 import os from pathlib import Path from decimal import Decimal from datetime import datetime from typing import Optional, Any import json import logging # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) class DatabaseManager: """ Manages SQLite database connections and schema initialization. Implements the Data Principle by providing robust data persistence with foreign key support, transactions, and connection pooling. """ def __init__(self, db_path: Optional[str] = None): """ Initialize database manager. Args: db_path: Path to SQLite database. Defaults to ./data/lemontropia.db """ if db_path is None: # Get project root (parent of core/) core_dir = Path(__file__).parent project_root = core_dir.parent db_path = project_root / "data" / "lemontropia.db" self.db_path = Path(db_path) self._connection: Optional[sqlite3.Connection] = None # Ensure data directory exists self.db_path.parent.mkdir(parents=True, exist_ok=True) logger.info(f"DatabaseManager initialized: {self.db_path}") def initialize(self) -> bool: """ Initialize database with schema. Returns: True if successful, False otherwise """ try: conn = self.get_connection() # Read and execute schema schema_path = Path(__file__).parent / "schema.sql" with open(schema_path, 'r') as f: schema = f.read() # Execute schema (multiple statements) conn.executescript(schema) conn.commit() # Run migrations for existing databases self._run_migrations() logger.info("Database schema initialized successfully") return True except Exception as e: logger.error(f"Failed to initialize database: {e}") return False def _run_migrations(self): """Run migrations to update existing databases.""" try: conn = self.get_connection() # Migration: Add description column to projects table try: conn.execute("ALTER TABLE projects ADD COLUMN description TEXT") conn.commit() logger.info("Migration: Added description column to projects table") except sqlite3.OperationalError: # Column already exists pass except Exception as e: logger.error(f"Migration failed: {e}") def get_connection(self) -> sqlite3.Connection: """ Get or create database connection. Returns: SQLite connection with row factory and type detection """ if self._connection is None: self._connection = sqlite3.connect( self.db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES ) self._connection.row_factory = sqlite3.Row # Enable foreign keys self._connection.execute("PRAGMA foreign_keys = ON") # Performance optimizations (Rule #3: 60+ FPS) self._connection.execute("PRAGMA journal_mode = WAL") self._connection.execute("PRAGMA synchronous = NORMAL") self._connection.execute("PRAGMA cache_size = -64000") # 64MB cache logger.debug("Database connection established") return self._connection def close(self) -> None: """Close database connection.""" if self._connection: self._connection.close() self._connection = None logger.debug("Database connection closed") def execute(self, query: str, parameters: tuple = ()) -> sqlite3.Cursor: """ Execute a query with parameters. Args: query: SQL query string parameters: Query parameters (prevents SQL injection) Returns: Cursor object """ conn = self.get_connection() return conn.execute(query, parameters) def executemany(self, query: str, parameters: list) -> sqlite3.Cursor: """ Execute query with multiple parameter sets. Args: query: SQL query string parameters: List of parameter tuples Returns: Cursor object """ conn = self.get_connection() return conn.executemany(query, parameters) def fetchall(self, query: str, parameters: tuple = ()) -> List[sqlite3.Row]: """ Execute query and fetch all results. Args: query: SQL query string parameters: Query parameters Returns: List of row objects """ cursor = self.execute(query, parameters) return cursor.fetchall() def fetchone(self, query: str, parameters: tuple = ()) -> Optional[sqlite3.Row]: """ Execute query and fetch first result. Args: query: SQL query string parameters: Query parameters Returns: Single row or None """ cursor = self.execute(query, parameters) return cursor.fetchone() def commit(self) -> None: """Commit current transaction.""" if self._connection: self._connection.commit() def rollback(self) -> None: """Rollback current transaction.""" if self._connection: self._connection.rollback() def get_schema_version(self) -> int: """ Get current schema version. Returns: Schema version number (0 if not initialized) """ try: cursor = self.execute( "SELECT MAX(version) FROM schema_version" ) result = cursor.fetchone() return result[0] if result and result[0] else 0 except sqlite3.OperationalError: # Table doesn't exist return 0 def backup(self, backup_path: Optional[str] = None) -> bool: """ Create database backup. Args: backup_path: Path for backup file. Defaults to timestamped backup Returns: True if successful """ try: if backup_path is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_dir = self.db_path.parent / "backups" backup_dir.mkdir(exist_ok=True) backup_path = backup_dir / f"lemontropia_backup_{timestamp}.db" # Use SQLite backup API source = self.get_connection() dest = sqlite3.connect(backup_path) with dest: source.backup(dest) dest.close() logger.info(f"Database backed up to: {backup_path}") return True except Exception as e: logger.error(f"Backup failed: {e}") return False def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with automatic commit/rollback.""" if exc_type is None: self.commit() else: self.rollback() self.close() # ============================================================================ # DECIMAL HANDLING (Rule #4: Precision) # ============================================================================ def adapt_decimal(d: Decimal) -> str: """Convert Decimal to string for SQLite storage.""" return str(d) def convert_decimal(s: bytes) -> Decimal: """Convert SQLite string back to Decimal.""" return Decimal(s.decode('utf-8')) # Register Decimal adapter/converter sqlite3.register_adapter(Decimal, adapt_decimal) sqlite3.register_converter("DECIMAL", convert_decimal) # ============================================================================ # MODULE EXPORTS # ============================================================================ __all__ = ['DatabaseManager', 'adapt_decimal', 'convert_decimal']