"""Fan control logic including curves and automatic control.""" import json import logging import asyncio from typing import List, Dict, Optional, Any from dataclasses import dataclass, asdict from datetime import datetime, timedelta from enum import Enum from concurrent.futures import ThreadPoolExecutor from sqlalchemy.orm import Session from backend.database import ( Server, FanCurve, SensorData, FanData, SystemLog, get_db, SessionLocal ) from backend.ipmi_client import IPMIClient, TemperatureReading from backend.config import settings logger = logging.getLogger(__name__) class ControlState(Enum): """Fan control state.""" AUTO = "auto" MANUAL = "manual" PANIC = "panic" OFF = "off" @dataclass class FanCurvePoint: """Single point on a fan curve.""" temp: float speed: int class FanCurveManager: """Manages fan curve calculations.""" @staticmethod def parse_curve(curve_data: str) -> List[FanCurvePoint]: """Parse fan curve from JSON string.""" try: data = json.loads(curve_data) return [FanCurvePoint(p["temp"], p["speed"]) for p in data] except (json.JSONDecodeError, KeyError) as e: logger.error(f"Failed to parse fan curve: {e}") # Return default curve return [ FanCurvePoint(30, 10), FanCurvePoint(40, 20), FanCurvePoint(50, 35), FanCurvePoint(60, 50), FanCurvePoint(70, 70), FanCurvePoint(80, 100), ] @staticmethod def serialize_curve(points: List[FanCurvePoint]) -> str: """Serialize fan curve to JSON string.""" return json.dumps([{"temp": p.temp, "speed": p.speed} for p in points]) @staticmethod def calculate_speed(curve: List[FanCurvePoint], temperature: float) -> int: """Calculate fan speed for a given temperature using linear interpolation.""" if not curve: return 50 # Default to 50% if no curve # Sort by temperature sorted_curve = sorted(curve, key=lambda p: p.temp) # Below minimum temp if temperature <= sorted_curve[0].temp: return sorted_curve[0].speed # Above maximum temp if temperature >= sorted_curve[-1].temp: return sorted_curve[-1].speed # Find surrounding points for i in range(len(sorted_curve) - 1): p1 = sorted_curve[i] p2 = sorted_curve[i + 1] if p1.temp <= temperature <= p2.temp: # Linear interpolation if p2.temp == p1.temp: return p1.speed ratio = (temperature - p1.temp) / (p2.temp - p1.temp) speed = p1.speed + ratio * (p2.speed - p1.speed) return int(round(speed)) return sorted_curve[-1].speed class FanController: """Main fan controller for managing server fans.""" def __init__(self): self.curve_manager = FanCurveManager() self.running = False self._tasks: Dict[int, asyncio.Task] = {} self._last_sensor_data: Dict[int, datetime] = {} async def start(self): """Start the fan controller service.""" self.running = True logger.info("Fan controller started") # Load all servers with auto-control enabled db = SessionLocal() try: servers = db.query(Server).filter( Server.auto_control_enabled == True, Server.is_active == True ).all() for server in servers: await self.start_server_control(server.id) finally: db.close() async def stop(self): """Stop all fan control tasks.""" self.running = False for task in self._tasks.values(): task.cancel() self._tasks.clear() logger.info("Fan controller stopped") async def start_server_control(self, server_id: int): """Start automatic control for a server.""" if server_id in self._tasks: self._tasks[server_id].cancel() task = asyncio.create_task(self._control_loop(server_id)) self._tasks[server_id] = task logger.info(f"Started fan control for server {server_id}") async def stop_server_control(self, server_id: int): """Stop automatic control for a server.""" if server_id in self._tasks: self._tasks[server_id].cancel() del self._tasks[server_id] logger.info(f"Stopped fan control for server {server_id}") async def _control_loop(self, server_id: int): """Main control loop for a server.""" while self.running: try: await self._control_iteration(server_id) await asyncio.sleep(5) # 5 second interval except asyncio.CancelledError: break except Exception as e: logger.error(f"Control loop error for server {server_id}: {e}") await asyncio.sleep(10) async def _control_iteration(self, server_id: int): """Single control iteration for a server.""" db = SessionLocal() try: server = db.query(Server).filter(Server.id == server_id).first() if not server or not server.is_active: return from backend.auth import decrypt_password # Create IPMI client client = IPMIClient( host=server.ipmi_host, username=server.ipmi_username, password=decrypt_password(server.ipmi_encrypted_password), port=server.ipmi_port, vendor=server.vendor ) # Test connection with timeout if not await asyncio.wait_for( asyncio.to_thread(client.test_connection), timeout=10.0 ): logger.warning(f"Cannot connect to server {server.name}") return # Get sensor data with timeout temps = await asyncio.wait_for( asyncio.to_thread(client.get_temperatures), timeout=15.0 ) # Update last sensor data time self._last_sensor_data[server_id] = datetime.utcnow() server.last_seen = datetime.utcnow() # Calculate and set fan speed if auto control is enabled if server.auto_control_enabled: await self._apply_fan_curve(db, server, client, temps) db.commit() except asyncio.TimeoutError: logger.warning(f"Control iteration timeout for server {server_id}") except Exception as e: logger.error(f"Control iteration error for server {server_id}: {e}") finally: db.close() async def _apply_fan_curve(self, db: Session, server: Server, client: IPMIClient, temps: List[TemperatureReading]): """Apply fan curve based on temperatures.""" if not temps: return # Get active fan curve curve_data = server.fan_curve_data if not curve_data: curve = [ FanCurvePoint(30, 10), FanCurvePoint(40, 20), FanCurvePoint(50, 35), FanCurvePoint(60, 50), FanCurvePoint(70, 70), FanCurvePoint(80, 100), ] else: curve = self.curve_manager.parse_curve(curve_data) # Find the highest CPU temperature cpu_temps = [t for t in temps if t.location.startswith("cpu")] if cpu_temps: max_temp = max(t.value for t in cpu_temps) else: max_temp = max(t.value for t in temps) # Calculate target speed target_speed = self.curve_manager.calculate_speed(curve, max_temp) # Enable manual control if not already if not server.manual_control_enabled: if await asyncio.wait_for( asyncio.to_thread(client.enable_manual_fan_control), timeout=10.0 ): server.manual_control_enabled = True logger.info(f"Enabled manual fan control for {server.name}") # Set fan speed if await asyncio.wait_for( asyncio.to_thread(client.set_all_fans_speed, target_speed), timeout=10.0 ): logger.info(f"Set {server.name} fans to {target_speed}% (temp: {max_temp}°C)") def get_controller_status(self, server_id: int) -> Dict[str, Any]: """Get current controller status for a server.""" is_running = server_id in self._tasks last_seen = self._last_sensor_data.get(server_id) return { "is_running": is_running, "last_sensor_data": last_seen.isoformat() if last_seen else None, "state": ControlState.AUTO.value if is_running else ControlState.OFF.value } class SensorCollector: """High-performance background sensor data collector. - Collects from all servers in parallel using thread pool - Times out slow operations to prevent hanging - Cleans up old database records periodically - Updates cache for fast web UI access """ def __init__(self, max_workers: int = 4): self.running = False self._task: Optional[asyncio.Task] = None self._collection_interval = 30 # seconds - IPMI is slow, need more time self._cleanup_interval = 3600 # 1 hour self._cache = None self._executor = ThreadPoolExecutor(max_workers=max_workers) self._last_cleanup = datetime.utcnow() self._first_collection_done = False async def start(self): """Start the sensor collector.""" self.running = True self._task = asyncio.create_task(self._collection_loop()) logger.info("Sensor collector started") async def stop(self): """Stop the sensor collector.""" self.running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None self._executor.shutdown(wait=False) logger.info("Sensor collector stopped") async def _collection_loop(self): """Main collection loop.""" # Initial collection immediately on startup try: logger.info("Performing initial sensor collection...") await self._collect_all_servers() self._first_collection_done = True logger.info("Initial sensor collection complete") except Exception as e: logger.error(f"Initial collection error: {e}") while self.running: try: start_time = datetime.utcnow() await self._collect_all_servers() # Periodic database cleanup if (datetime.utcnow() - self._last_cleanup).total_seconds() > self._cleanup_interval: await self._cleanup_old_data() # Calculate sleep time to maintain interval elapsed = (datetime.utcnow() - start_time).total_seconds() sleep_time = max(0, self._collection_interval - elapsed) # Only warn if significantly over (collections can be slow) if elapsed > self._collection_interval * 1.5: logger.warning(f"Collection took {elapsed:.1f}s, longer than interval {self._collection_interval}s") await asyncio.sleep(sleep_time) except asyncio.CancelledError: break except Exception as e: logger.error(f"Sensor collection error: {e}") await asyncio.sleep(self._collection_interval) async def _collect_all_servers(self): """Collect sensor data from all active servers in parallel.""" db = SessionLocal() try: servers = db.query(Server).filter(Server.is_active == True).all() if not servers: return # Create tasks for parallel collection tasks = [] for server in servers: task = self._collect_server_with_timeout(server) tasks.append(task) # Run all collections concurrently with timeout protection results = await asyncio.gather(*tasks, return_exceptions=True) # Process results and batch store in database all_sensor_data = [] all_fan_data = [] for server, result in zip(servers, results): if isinstance(result, Exception): logger.debug(f"Server {server.name} collection failed: {result}") continue if result: temps, fans = result now = datetime.utcnow() # Prepare batch inserts for temp in temps: all_sensor_data.append({ 'server_id': server.id, 'sensor_name': temp.name, 'sensor_type': 'temperature', 'value': temp.value, 'unit': '°C', 'timestamp': now }) for fan in fans: all_fan_data.append({ 'server_id': server.id, 'fan_number': fan.fan_number, 'fan_id': getattr(fan, 'fan_id', str(fan.fan_number)), 'speed_rpm': fan.speed_rpm, 'speed_percent': fan.speed_percent, 'timestamp': now }) server.last_seen = now # Batch insert for better performance if all_sensor_data: db.bulk_insert_mappings(SensorData, all_sensor_data) if all_fan_data: db.bulk_insert_mappings(FanData, all_fan_data) db.commit() logger.debug(f"Collected data from {len([r for r in results if not isinstance(r, Exception)])}/{len(servers)} servers") finally: db.close() async def _collect_server_with_timeout(self, server: Server) -> Optional[tuple]: """Collect sensor data from a single server with timeout protection.""" try: return await asyncio.wait_for( self._collect_server(server), timeout=30.0 # Max 30 seconds per server (IPMI can be slow) ) except asyncio.TimeoutError: logger.warning(f"Collection timeout for {server.name}") return None async def _collect_server(self, server: Server) -> Optional[tuple]: """Collect sensor data from a single server.""" try: from backend.auth import decrypt_password from backend.main import sensor_cache # Run blocking IPMI operations in thread pool loop = asyncio.get_event_loop() client = IPMIClient( host=server.ipmi_host, username=server.ipmi_username, password=decrypt_password(server.ipmi_encrypted_password), port=server.ipmi_port, vendor=server.vendor ) # Test connection connected = await loop.run_in_executor(self._executor, client.test_connection) if not connected: return None # Get sensor data in parallel using thread pool temps_future = loop.run_in_executor(self._executor, client.get_temperatures) fans_future = loop.run_in_executor(self._executor, client.get_fan_speeds) power_future = loop.run_in_executor(self._executor, client.get_power_consumption) temps, fans, power = await asyncio.gather( temps_future, fans_future, power_future ) # Calculate summary metrics max_temp = max((t.value for t in temps if t.value is not None), default=0) avg_fan = sum(f.speed_percent for f in fans if f.speed_percent is not None) / len(fans) if fans else 0 # Extract current power consumption current_power = None if power and isinstance(power, dict): import re for key, value in power.items(): if 'current' in key.lower() and 'power' in key.lower(): match = re.search(r'(\d+(?:\.\d+)?)', str(value)) if match: current_power = float(match.group(1)) break # Prepare cache data - format must match response schemas cache_data = { "max_temp": max_temp, "avg_fan_speed": round(avg_fan, 1), "power_consumption": current_power, "timestamp": datetime.utcnow().isoformat(), "temps": [{"name": t.name, "value": t.value, "location": t.location, "status": getattr(t, 'status', 'ok')} for t in temps], "fans": [{"fan_id": getattr(f, 'fan_id', f'0x0{f.fan_number-1}'), "fan_number": f.fan_number, "speed_percent": f.speed_percent, "speed_rpm": f.speed_rpm} for f in fans], "power_raw": power if isinstance(power, dict) else None } # Store in cache await sensor_cache.set(server.id, cache_data) logger.info(f"Collected and cached sensors for {server.name}: temp={max_temp:.1f}°C, fan={avg_fan:.1f}%") return temps, fans except Exception as e: logger.warning(f"Failed to collect sensors for {server.name}: {e}") return None async def _cleanup_old_data(self): """Clean up old sensor data to prevent database bloat.""" try: db = SessionLocal() try: # Keep only last 24 hours of detailed sensor data cutoff = datetime.utcnow() - timedelta(hours=24) # Delete old sensor data deleted_sensors = db.query(SensorData).filter( SensorData.timestamp < cutoff ).delete(synchronize_session=False) # Delete old fan data deleted_fans = db.query(FanData).filter( FanData.timestamp < cutoff ).delete(synchronize_session=False) db.commit() if deleted_sensors > 0 or deleted_fans > 0: logger.info(f"Cleaned up {deleted_sensors} sensor records and {deleted_fans} fan records") self._last_cleanup = datetime.utcnow() finally: db.close() except Exception as e: logger.error(f"Database cleanup failed: {e}") # Global controller instance fan_controller = FanController() sensor_collector = SensorCollector(max_workers=4) async def initialize_fan_controller(): """Initialize and start the fan controller and sensor collector.""" await sensor_collector.start() await fan_controller.start() async def shutdown_fan_controller(): """Shutdown the fan controller and sensor collector.""" await fan_controller.stop() await sensor_collector.stop()