""" IPMI Fan Controller v2 - Simpler, More Robust For Dell T710 and compatible servers """ import subprocess import re import time import json import logging import threading from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Tuple from datetime import datetime from pathlib import Path # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('/tmp/ipmi-fan-controller.log') ] ) logger = logging.getLogger(__name__) @dataclass class FanCurvePoint: temp: float speed: int @dataclass class TemperatureReading: name: str location: str value: float status: str @dataclass class FanReading: fan_id: str fan_number: int speed_rpm: Optional[int] speed_percent: Optional[int] class IPMIFanController: """Simplified IPMI fan controller with robust error handling.""" # Default fan curve (temp C -> speed %) DEFAULT_CURVE = [ FanCurvePoint(30, 15), FanCurvePoint(40, 25), FanCurvePoint(50, 40), FanCurvePoint(60, 60), FanCurvePoint(70, 80), FanCurvePoint(80, 100), ] def __init__(self, host: str, username: str, password: str, port: int = 623): self.host = host self.username = username self.password = password self.port = port self.manual_mode = False self.last_successful_read = None self.consecutive_failures = 0 self.max_failures = 5 def _run_ipmi(self, args: List[str], timeout: int = 15) -> Tuple[bool, str]: """Run IPMI command with error handling.""" cmd = [ "ipmitool", "-I", "lanplus", "-H", self.host, "-U", self.username, "-P", self.password, "-p", str(self.port) ] + args try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode == 0: self.consecutive_failures = 0 return True, result.stdout else: self.consecutive_failures += 1 logger.warning(f"IPMI command failed: {result.stderr}") return False, result.stderr except subprocess.TimeoutExpired: self.consecutive_failures += 1 logger.error(f"IPMI command timed out after {timeout}s") return False, "Timeout" except Exception as e: self.consecutive_failures += 1 logger.error(f"IPMI command error: {e}") return False, str(e) def test_connection(self) -> bool: """Test if we can connect to the server.""" success, _ = self._run_ipmi(["mc", "info"], timeout=10) return success def enable_manual_fan_control(self) -> bool: """Enable manual fan control mode.""" # Dell: raw 0x30 0x30 0x01 0x00 success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x00"]) if success: self.manual_mode = True logger.info("Manual fan control enabled") return success def disable_manual_fan_control(self) -> bool: """Return to automatic fan control.""" # Dell: raw 0x30 0x30 0x01 0x01 success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x01"]) if success: self.manual_mode = False logger.info("Automatic fan control restored") return success def set_fan_speed(self, speed_percent: int, fan_id: str = "0xff") -> bool: """Set fan speed (0-100%). fan_id 0xff = all fans.""" if speed_percent < 0: speed_percent = 0 if speed_percent > 100: speed_percent = 100 hex_speed = f"0x{speed_percent:02x}" success, _ = self._run_ipmi([ "raw", "0x30", "0x30", "0x02", fan_id, hex_speed ]) if success: logger.info(f"Fan speed set to {speed_percent}%") return success def get_temperatures(self) -> List[TemperatureReading]: """Get temperature readings from all sensors.""" success, output = self._run_ipmi(["sdr", "type", "temperature"]) if not success: return [] temps = [] for line in output.splitlines(): # Parse: Sensor Name | 01h | ok | 3.1 | 45 degrees C parts = [p.strip() for p in line.split("|")] if len(parts) >= 5: name = parts[0] status = parts[2] if len(parts) > 2 else "unknown" reading = parts[4] match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE) if match: value = float(match.group(1)) location = self._classify_temp_location(name) temps.append(TemperatureReading( name=name, location=location, value=value, status=status )) return temps def get_fan_speeds(self) -> List[FanReading]: """Get current fan speeds.""" success, output = self._run_ipmi(["sdr", "elist", "full"]) if not success: return [] fans = [] for line in output.splitlines(): if "fan" in line.lower() and "rpm" in line.lower(): parts = [p.strip() for p in line.split("|")] if len(parts) >= 5: name = parts[0] reading = parts[4] # Extract fan number match = re.search(r'fan\s*(\d+)', name, re.IGNORECASE) fan_number = int(match.group(1)) if match else 0 fan_id = f"0x{fan_number-1:02x}" if fan_number > 0 else "0x00" # Extract RPM rpm_match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE) rpm = int(rpm_match.group(1)) if rpm_match else None fans.append(FanReading( fan_id=fan_id, fan_number=fan_number, speed_rpm=rpm, speed_percent=None )) return fans def _classify_temp_location(self, name: str) -> str: """Classify temperature sensor location.""" name_lower = name.lower() if "cpu" in name_lower or "proc" in name_lower: if "1" in name or "one" in name_lower: return "cpu1" elif "2" in name or "two" in name_lower: return "cpu2" return "cpu" elif "inlet" in name_lower or "ambient" in name_lower: return "inlet" elif "exhaust" in name_lower: return "exhaust" elif "memory" in name_lower or "dimm" in name_lower: return "memory" return "other" def calculate_fan_speed(self, temps: List[TemperatureReading], curve: Optional[List[FanCurvePoint]] = None) -> int: """Calculate target fan speed based on temperatures.""" if not temps: return 50 # Default if no temps if curve is None: curve = self.DEFAULT_CURVE # Find max CPU temperature cpu_temps = [t for t in temps if t.location.startswith("cpu")] if cpu_temps: max_temp = max(t.value for t in cpu_temps) else: max_temp = max(t.value for t in temps) # Apply fan curve with linear interpolation sorted_curve = sorted(curve, key=lambda p: p.temp) if max_temp <= sorted_curve[0].temp: return sorted_curve[0].speed if max_temp >= sorted_curve[-1].temp: return sorted_curve[-1].speed for i in range(len(sorted_curve) - 1): p1, p2 = sorted_curve[i], sorted_curve[i + 1] if p1.temp <= max_temp <= p2.temp: if p2.temp == p1.temp: return p1.speed ratio = (max_temp - p1.temp) / (p2.temp - p1.temp) speed = p1.speed + ratio * (p2.speed - p1.speed) return int(round(speed)) return sorted_curve[-1].speed def is_healthy(self) -> bool: """Check if controller is working properly.""" return self.consecutive_failures < self.max_failures class FanControlService: """Background service for automatic fan control.""" def __init__(self, config_path: str = "/etc/ipmi-fan-controller/config.json"): self.config_path = config_path self.controller: Optional[IPMIFanController] = None self.running = False self.thread: Optional[threading.Thread] = None self.current_speed = 0 self.target_speed = 0 self.last_temps: List[TemperatureReading] = [] self.last_fans: List[FanReading] = [] self.lock = threading.Lock() # Default config self.config = { "host": "", "username": "", "password": "", "port": 623, "enabled": False, "interval": 10, # seconds "min_speed": 10, "max_speed": 100, "fan_curve": [ {"temp": 30, "speed": 15}, {"temp": 40, "speed": 25}, {"temp": 50, "speed": 40}, {"temp": 60, "speed": 60}, {"temp": 70, "speed": 80}, {"temp": 80, "speed": 100}, ], "panic_temp": 85, "panic_speed": 100 } self._load_config() def _load_config(self): """Load configuration from file.""" try: if Path(self.config_path).exists(): with open(self.config_path, 'r') as f: loaded = json.load(f) self.config.update(loaded) logger.info(f"Loaded config from {self.config_path}") except Exception as e: logger.error(f"Failed to load config: {e}") def _save_config(self): """Save configuration to file.""" try: Path(self.config_path).parent.mkdir(parents=True, exist_ok=True) with open(self.config_path, 'w') as f: json.dump(self.config, f, indent=2) logger.info(f"Saved config to {self.config_path}") except Exception as e: logger.error(f"Failed to save config: {e}") def update_config(self, **kwargs): """Update configuration values.""" self.config.update(kwargs) self._save_config() # Reinitialize controller if connection params changed if any(k in kwargs for k in ['host', 'username', 'password', 'port']): self._init_controller() def _init_controller(self): """Initialize the IPMI controller.""" if not all([self.config.get('host'), self.config.get('username'), self.config.get('password')]): logger.warning("Missing IPMI credentials") return False self.controller = IPMIFanController( host=self.config['host'], username=self.config['username'], password=self.config['password'], port=self.config.get('port', 623) ) if self.controller.test_connection(): logger.info(f"Connected to IPMI at {self.config['host']}") return True else: logger.error(f"Failed to connect to IPMI at {self.config['host']}") self.controller = None return False def start(self): """Start the fan control service.""" if self.running: return if not self._init_controller(): logger.error("Cannot start service - IPMI connection failed") return False self.running = True self.thread = threading.Thread(target=self._control_loop, daemon=True) self.thread.start() logger.info("Fan control service started") return True def stop(self): """Stop the fan control service.""" self.running = False if self.thread: self.thread.join(timeout=5) # Return to automatic control if self.controller: self.controller.disable_manual_fan_control() logger.info("Fan control service stopped") def _control_loop(self): """Main control loop running in background thread.""" # Enable manual control on startup if self.controller: self.controller.enable_manual_fan_control() while self.running: try: if not self.config.get('enabled', False): time.sleep(1) continue if not self.controller or not self.controller.is_healthy(): logger.warning("Controller unhealthy, attempting reconnect...") if not self._init_controller(): time.sleep(30) continue self.controller.enable_manual_fan_control() # Get sensor data temps = self.controller.get_temperatures() fans = self.controller.get_fan_speeds() with self.lock: self.last_temps = temps self.last_fans = fans if not temps: logger.warning("No temperature readings received") time.sleep(self.config.get('interval', 10)) continue # Check for panic temperature max_temp = max((t.value for t in temps if t.location.startswith('cpu')), default=0) if max_temp >= self.config.get('panic_temp', 85): self.target_speed = self.config.get('panic_speed', 100) logger.warning(f"PANIC MODE: CPU temp {max_temp}°C, setting fans to {self.target_speed}%") else: # Calculate target speed from curve curve = [FanCurvePoint(p['temp'], p['speed']) for p in self.config.get('fan_curve', [])] self.target_speed = self.controller.calculate_fan_speed(temps, curve) # Apply limits self.target_speed = max(self.config.get('min_speed', 10), min(self.config.get('max_speed', 100), self.target_speed)) # Apply fan speed if changed significantly (>= 5%) if abs(self.target_speed - self.current_speed) >= 5: if self.controller.set_fan_speed(self.target_speed): self.current_speed = self.target_speed logger.info(f"Fan speed adjusted to {self.target_speed}% (CPU temp: {max_temp:.1f}°C)") time.sleep(self.config.get('interval', 10)) except Exception as e: logger.error(f"Control loop error: {e}") time.sleep(10) def get_status(self) -> Dict: """Get current status.""" with self.lock: return { "running": self.running, "enabled": self.config.get('enabled', False), "connected": self.controller is not None and self.controller.is_healthy(), "manual_mode": self.controller.manual_mode if self.controller else False, "current_speed": self.current_speed, "target_speed": self.target_speed, "temperatures": [asdict(t) for t in self.last_temps], "fans": [asdict(f) for f in self.last_fans], "config": { k: v for k, v in self.config.items() if k != 'password' # Don't expose password } } def set_manual_speed(self, speed: int) -> bool: """Set manual fan speed.""" if not self.controller: return False self.config['enabled'] = False speed = max(0, min(100, speed)) if self.controller.set_fan_speed(speed): self.current_speed = speed return True return False def set_auto_mode(self, enabled: bool): """Enable or disable automatic control.""" self.config['enabled'] = enabled self._save_config() if enabled and self.controller: self.controller.enable_manual_fan_control() elif not enabled and self.controller: self.controller.disable_manual_fan_control() # Global service instance _service: Optional[FanControlService] = None def get_service(config_path: str = "/etc/ipmi-fan-controller/config.json") -> FanControlService: """Get or create the global service instance.""" global _service if _service is None: _service = FanControlService(config_path) return _service if __name__ == "__main__": # Simple CLI test import sys if len(sys.argv) < 4: print("Usage: python fan_controller.py [port]") sys.exit(1) host = sys.argv[1] username = sys.argv[2] password = sys.argv[3] port = int(sys.argv[4]) if len(sys.argv) > 4 else 623 controller = IPMIFanController(host, username, password, port) print(f"Testing connection to {host}...") if controller.test_connection(): print("✓ Connected successfully") print("\nTemperatures:") for temp in controller.get_temperatures(): print(f" {temp.name}: {temp.value}°C ({temp.location})") print("\nFan speeds:") for fan in controller.get_fan_speeds(): print(f" Fan {fan.fan_number}: {fan.speed_rpm} RPM") print("\nEnabling manual control...") if controller.enable_manual_fan_control(): print("✓ Manual control enabled") print("\nSetting fans to 30%...") if controller.set_fan_speed(30): print("✓ Speed set to 30%") time.sleep(3) print("\nSetting fans to 50%...") if controller.set_fan_speed(50): print("✓ Speed set to 50%") time.sleep(3) print("\nReturning to automatic control...") controller.disable_manual_fan_control() print("✓ Done") else: print("✗ Connection failed") sys.exit(1)