ipmi-fan-control/fan_controller.py

737 lines
26 KiB
Python

"""
IPMI Controller - Advanced Fan Control for Dell Servers
Features: Fan groups, multiple curves, HTTP sensors, panic mode
"""
import subprocess
import re
import time
import json
import logging
import threading
import requests
from dataclasses import dataclass, asdict
from typing import List, Dict, Optional, Tuple
from datetime import datetime
from pathlib import Path
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/tmp/ipmi-controller.log')
]
)
logger = logging.getLogger(__name__)
@dataclass
class TemperatureReading:
name: str
location: str
value: float
status: str
source: str = "ipmi" # ipmi, http, ssh
@dataclass
class FanReading:
fan_id: str
fan_number: int
speed_rpm: Optional[int]
speed_percent: Optional[int]
name: Optional[str] = None # Custom name
group: Optional[str] = None # Fan group
@dataclass
class FanCurve:
name: str
points: List[Dict[str, float]] # [{"temp": 30, "speed": 15}, ...]
sensor_source: str = "cpu" # Which sensor to use
applies_to: str = "all" # "all", group name, or fan_id
class HTTPSensorClient:
"""Client for fetching sensor data from HTTP endpoint (lm-sensors over HTTP)."""
def __init__(self, url: str, timeout: int = 10):
self.url = url
self.timeout = timeout
self.last_reading = None
self.consecutive_failures = 0
def fetch_sensors(self) -> List[TemperatureReading]:
"""Fetch sensor data from HTTP endpoint."""
try:
response = requests.get(self.url, timeout=self.timeout)
response.raise_for_status()
# Parse lm-sensors style output
temps = self._parse_sensors_output(response.text)
self.consecutive_failures = 0
return temps
except Exception as e:
logger.error(f"Failed to fetch HTTP sensors from {self.url}: {e}")
self.consecutive_failures += 1
return []
def _parse_sensors_output(self, output: str) -> List[TemperatureReading]:
"""Parse lm-sensors -u style output."""
temps = []
current_chip = ""
for line in output.splitlines():
line = line.strip()
# New chip section
if line.endswith(":") and not line.startswith(" "):
current_chip = line.rstrip(":")
continue
# Temperature reading
if "_input:" in line and "temp" in line.lower():
parts = line.split(":")
if len(parts) == 2:
name = parts[0].strip()
try:
value = float(parts[1].strip())
location = self._classify_sensor_name(name, current_chip)
temps.append(TemperatureReading(
name=f"{current_chip}/{name}",
location=location,
value=value,
status="ok",
source="http"
))
except ValueError:
pass
return temps
def _classify_sensor_name(self, name: str, chip: str) -> str:
"""Classify sensor location from name."""
name_lower = name.lower()
if "core" in name_lower:
if "0" in name or "1" in name:
return "cpu1"
elif "2" in name or "3" in name:
return "cpu2"
return "cpu"
elif "package" in name_lower:
return "cpu"
elif "tdie" in name_lower or "tctl" in name_lower:
return "cpu"
elif "pcie" in name_lower or "nvme" in name_lower or "gpu" in name_lower:
return "pcie"
return "other"
def is_healthy(self) -> bool:
return self.consecutive_failures < 3
class IPMIFanController:
"""IPMI fan controller with advanced features."""
def __init__(self, host: str, username: str, password: str, port: int = 623):
self.host = host
self.username = username
self.password = password
self.port = port
self.manual_mode = False
self.last_successful_read = None
self.consecutive_failures = 0
self.max_failures = 5
def _run_ipmi(self, args: List[str], timeout: int = 15) -> Tuple[bool, str]:
"""Run IPMI command with error handling."""
cmd = [
"ipmitool", "-I", "lanplus",
"-H", self.host,
"-U", self.username,
"-P", self.password,
"-p", str(self.port)
] + args
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
if result.returncode == 0:
self.consecutive_failures = 0
return True, result.stdout
else:
self.consecutive_failures += 1
logger.warning(f"IPMI command failed: {result.stderr}")
return False, result.stderr
except subprocess.TimeoutExpired:
self.consecutive_failures += 1
logger.error(f"IPMI command timed out after {timeout}s")
return False, "Timeout"
except Exception as e:
self.consecutive_failures += 1
logger.error(f"IPMI command error: {e}")
return False, str(e)
def test_connection(self) -> bool:
"""Test IPMI connection."""
success, _ = self._run_ipmi(["mc", "info"], timeout=10)
return success
def enable_manual_fan_control(self) -> bool:
"""Enable manual fan control mode."""
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x00"])
if success:
self.manual_mode = True
logger.info("Manual fan control enabled")
return success
def disable_manual_fan_control(self) -> bool:
"""Return to automatic fan control."""
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x01"])
if success:
self.manual_mode = False
logger.info("Automatic fan control restored")
return success
def set_fan_speed(self, speed_percent: int, fan_id: str = "0xff") -> bool:
"""Set fan speed (0-100%). fan_id 0xff = all fans."""
speed_percent = max(0, min(100, speed_percent))
hex_speed = f"0x{speed_percent:02x}"
success, _ = self._run_ipmi([
"raw", "0x30", "0x30", "0x02", fan_id, hex_speed
])
if success:
logger.info(f"Fan {fan_id} speed set to {speed_percent}%")
return success
def get_temperatures(self) -> List[TemperatureReading]:
"""Get temperature readings from all sensors."""
success, output = self._run_ipmi(["sdr", "type", "temperature"])
if not success:
return []
temps = []
for line in output.splitlines():
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
name = parts[0]
status = parts[2] if len(parts) > 2 else "unknown"
reading = parts[4]
match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE)
if match:
value = float(match.group(1))
location = self._classify_temp_location(name)
temps.append(TemperatureReading(
name=name,
location=location,
value=value,
status=status,
source="ipmi"
))
return temps
def get_fan_speeds(self) -> List[FanReading]:
"""Get current fan speeds."""
success, output = self._run_ipmi(["sdr", "elist", "full"])
if not success:
return []
fans = []
for line in output.splitlines():
if "fan" in line.lower() and "rpm" in line.lower():
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
name = parts[0]
reading = parts[4]
match = re.search(r'fan\s*(\d+)', name, re.IGNORECASE)
fan_number = int(match.group(1)) if match else 0
fan_id = f"0x{fan_number-1:02x}" if fan_number > 0 else "0x00"
rpm_match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE)
rpm = int(rpm_match.group(1)) if rpm_match else None
fans.append(FanReading(
fan_id=fan_id,
fan_number=fan_number,
speed_rpm=rpm,
speed_percent=None
))
return fans
def _classify_temp_location(self, name: str) -> str:
"""Classify temperature sensor location."""
name_lower = name.lower()
if "cpu" in name_lower or "proc" in name_lower:
if "1" in name or "one" in name_lower:
return "cpu1"
elif "2" in name or "two" in name_lower:
return "cpu2"
return "cpu"
elif "inlet" in name_lower or "ambient" in name_lower:
return "inlet"
elif "exhaust" in name_lower:
return "exhaust"
elif "memory" in name_lower or "dimm" in name_lower:
return "memory"
return "other"
def is_healthy(self) -> bool:
"""Check if controller is working properly."""
return self.consecutive_failures < self.max_failures
class IPMIControllerService:
"""Main service for IPMI Controller with all advanced features."""
def __init__(self, config_path: str = "/etc/ipmi-controller/config.json"):
self.config_path = config_path
self.controller: Optional[IPMIFanController] = None
self.http_client: Optional[HTTPSensorClient] = None
self.running = False
self.thread: Optional[threading.Thread] = None
self.current_speeds: Dict[str, int] = {} # fan_id -> speed
self.target_speeds: Dict[str, int] = {}
self.last_temps: List[TemperatureReading] = []
self.last_fans: List[FanReading] = []
self.lock = threading.Lock()
self.in_identify_mode = False
# Default config
self.config = {
# IPMI Settings
"ipmi_host": "",
"ipmi_username": "",
"ipmi_password": "",
"ipmi_port": 623,
# HTTP Sensor Settings
"http_sensor_enabled": False,
"http_sensor_url": "",
"http_sensor_timeout": 10,
# Fan Control Settings
"enabled": False,
"poll_interval": 10,
"fan_update_interval": 10,
"min_speed": 10,
"max_speed": 100,
"panic_temp": 85,
"panic_speed": 100,
"panic_on_no_data": True,
"no_data_timeout": 60,
# Sensor Selection
"primary_sensor": "cpu", # cpu, cpu1, cpu2, inlet, exhaust, pcie, etc.
"sensor_preference": "ipmi", # ipmi, http, auto
# Fan Configuration
"fans": {}, # fan_id -> {"name": "Custom Name", "group": "group1"}
"fan_groups": {}, # group_name -> {"fans": ["0x00", "0x01"], "curve": "Default"}
# Fan Curves
"fan_curves": {
"Default": {
"points": [
{"temp": 30, "speed": 15},
{"temp": 40, "speed": 25},
{"temp": 50, "speed": 40},
{"temp": 60, "speed": 60},
{"temp": 70, "speed": 80},
{"temp": 80, "speed": 100},
],
"sensor_source": "cpu",
"applies_to": "all"
}
},
# UI Settings
"theme": "dark", # dark, light, auto
}
self._load_config()
self._last_data_time = datetime.utcnow()
def _load_config(self):
"""Load configuration from file."""
try:
config_file = Path(self.config_path)
if config_file.exists():
with open(config_file) as f:
loaded = json.load(f)
self._deep_update(self.config, loaded)
logger.info(f"Loaded config from {self.config_path}")
except Exception as e:
logger.error(f"Failed to load config: {e}")
def _deep_update(self, d: dict, u: dict):
"""Deep update dictionary."""
for k, v in u.items():
if isinstance(v, dict) and k in d and isinstance(d[k], dict):
self._deep_update(d[k], v)
else:
d[k] = v
def _save_config(self):
"""Save configuration to file."""
try:
config_file = Path(self.config_path)
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, 'w') as f:
json.dump(self.config, f, indent=2)
logger.info(f"Saved config to {self.config_path}")
except Exception as e:
logger.error(f"Failed to save config: {e}")
def update_config(self, **kwargs):
"""Update configuration values."""
self._deep_update(self.config, kwargs)
self._save_config()
# Reinitialize if needed
if any(k in kwargs for k in ['ipmi_host', 'ipmi_username', 'ipmi_password', 'ipmi_port']):
self._init_controller()
if any(k in kwargs for k in ['http_sensor_enabled', 'http_sensor_url']):
self._init_http_client()
def _init_controller(self) -> bool:
"""Initialize the IPMI controller."""
if not all([self.config.get('ipmi_host'), self.config.get('ipmi_username')]):
return False
self.controller = IPMIFanController(
host=self.config['ipmi_host'],
username=self.config['ipmi_username'],
password=self.config.get('ipmi_password', ''),
port=self.config.get('ipmi_port', 623)
)
if self.controller.test_connection():
logger.info(f"Connected to IPMI at {self.config['ipmi_host']}")
return True
else:
logger.error(f"Failed to connect to IPMI")
self.controller = None
return False
def _init_http_client(self) -> bool:
"""Initialize HTTP sensor client."""
if not self.config.get('http_sensor_enabled'):
return False
url = self.config.get('http_sensor_url')
if not url:
return False
self.http_client = HTTPSensorClient(
url=url,
timeout=self.config.get('http_sensor_timeout', 10)
)
logger.info(f"HTTP sensor client initialized for {url}")
return True
def start(self) -> bool:
"""Start the controller service."""
if self.running:
return True
if not self._init_controller():
logger.error("Cannot start - IPMI connection failed")
return False
if self.config.get('http_sensor_enabled'):
self._init_http_client()
self.running = True
self.thread = threading.Thread(target=self._control_loop, daemon=True)
self.thread.start()
logger.info("IPMI Controller service started")
return True
def stop(self):
"""Stop the controller service."""
self.running = False
if self.thread:
self.thread.join(timeout=5)
if self.controller:
self.controller.disable_manual_fan_control()
logger.info("IPMI Controller service stopped")
def _control_loop(self):
"""Main control loop."""
if self.controller:
self.controller.enable_manual_fan_control()
poll_counter = 0
while self.running:
try:
if not self.config.get('enabled', False):
time.sleep(1)
continue
# Ensure controller is healthy
if not self.controller or not self.controller.is_healthy():
logger.warning("IPMI unhealthy, reconnecting...")
if not self._init_controller():
time.sleep(30)
continue
self.controller.enable_manual_fan_control()
# Poll temperatures at configured interval
poll_interval = self.config.get('poll_interval', 10)
if poll_counter % poll_interval == 0:
temps = self._get_temperatures()
fans = self.controller.get_fan_speeds() if self.controller else []
with self.lock:
self.last_temps = temps
self.last_fans = fans
if temps:
self._last_data_time = datetime.utcnow()
# Apply fan curves
if not self.in_identify_mode:
self._apply_fan_curves(temps)
poll_counter += 1
time.sleep(1)
except Exception as e:
logger.error(f"Control loop error: {e}")
time.sleep(10)
def _get_temperatures(self) -> List[TemperatureReading]:
"""Get temperatures from all sources."""
temps = []
preference = self.config.get('sensor_preference', 'ipmi')
# Try IPMI
if self.controller and preference in ['ipmi', 'auto']:
temps = self.controller.get_temperatures()
# Try HTTP sensor
if self.http_client and preference in ['http', 'auto']:
http_temps = self.http_client.fetch_sensors()
if http_temps:
if preference == 'http' or not temps:
temps = http_temps
else:
# Merge, preferring HTTP for PCIe sensors
temp_dict = {t.name: t for t in temps}
for ht in http_temps:
if ht.location == 'pcie' or ht.name not in temp_dict:
temps.append(ht)
return temps
def _apply_fan_curves(self, temps: List[TemperatureReading]):
"""Apply fan curves based on temperatures."""
if not temps:
# Check for panic mode on no data
if self.config.get('panic_on_no_data', True):
time_since_data = (datetime.utcnow() - self._last_data_time).total_seconds()
if time_since_data > self.config.get('no_data_timeout', 60):
self._set_all_fans(self.config.get('panic_speed', 100), "PANIC: No data")
return
# Get primary sensor
primary_sensor = self.config.get('primary_sensor', 'cpu')
sensor_temps = [t for t in temps if t.location == primary_sensor]
if not sensor_temps:
sensor_temps = [t for t in temps if t.location.startswith(primary_sensor)]
if not sensor_temps:
sensor_temps = temps # Fallback to any temp
max_temp = max(t.value for t in sensor_temps)
# Check panic temperature
if max_temp >= self.config.get('panic_temp', 85):
self._set_all_fans(self.config.get('panic_speed', 100), f"PANIC: Temp {max_temp}°C")
return
# Get fan curves
curves = self.config.get('fan_curves', {})
default_curve = curves.get('Default', {'points': [{'temp': 30, 'speed': 15}, {'temp': 80, 'speed': 100}]})
# Apply curves to fans
fans = self.config.get('fans', {})
groups = self.config.get('fan_groups', {})
# Calculate target speeds per group/individual
fan_speeds = {}
for fan_id, fan_info in fans.items():
group = fan_info.get('group')
curve_name = fan_info.get('curve', 'Default')
if group and group in groups:
curve_name = groups[group].get('curve', 'Default')
curve = curves.get(curve_name, default_curve)
speed = self._calculate_curve_speed(max_temp, curve['points'])
# Apply limits
speed = max(self.config.get('min_speed', 10),
min(self.config.get('max_speed', 100), speed))
fan_speeds[fan_id] = speed
# If no individual fan configs, apply to all
if not fan_speeds:
speed = self._calculate_curve_speed(max_temp, default_curve['points'])
speed = max(self.config.get('min_speed', 10),
min(self.config.get('max_speed', 100), speed))
self._set_all_fans(speed, f"Temp {max_temp}°C")
else:
# Set individual fan speeds
for fan_id, speed in fan_speeds.items():
self._set_fan_speed(fan_id, speed, f"Temp {max_temp}°C")
def _calculate_curve_speed(self, temp: float, points: List[Dict]) -> int:
"""Calculate fan speed from curve points."""
if not points:
return 50
sorted_points = sorted(points, key=lambda p: p['temp'])
if temp <= sorted_points[0]['temp']:
return sorted_points[0]['speed']
if temp >= sorted_points[-1]['temp']:
return sorted_points[-1]['speed']
for i in range(len(sorted_points) - 1):
p1, p2 = sorted_points[i], sorted_points[i + 1]
if p1['temp'] <= temp <= p2['temp']:
if p2['temp'] == p1['temp']:
return p1['speed']
ratio = (temp - p1['temp']) / (p2['temp'] - p1['temp'])
speed = p1['speed'] + ratio * (p2['speed'] - p1['speed'])
return int(round(speed))
return sorted_points[-1]['speed']
def _set_all_fans(self, speed: int, reason: str):
"""Set all fans to a speed."""
if self.controller and speed != self.current_speeds.get('all'):
if self.controller.set_fan_speed(speed, "0xff"):
self.current_speeds['all'] = speed
logger.info(f"All fans set to {speed}% ({reason})")
def _set_fan_speed(self, fan_id: str, speed: int, reason: str):
"""Set specific fan speed."""
if self.controller and speed != self.current_speeds.get(fan_id):
if self.controller.set_fan_speed(speed, fan_id):
self.current_speeds[fan_id] = speed
logger.info(f"Fan {fan_id} set to {speed}% ({reason})")
def identify_fan(self, fan_id: str):
"""Identify a fan by setting it to 100% and others to 0%."""
if not self.controller:
return False
self.in_identify_mode = True
# Set all fans to 0%
self.controller.set_fan_speed(0, "0xff")
time.sleep(0.5)
# Set target fan to 100%
self.controller.set_fan_speed(100, fan_id)
return True
def stop_identify(self):
"""Stop identify mode and resume normal control."""
self.in_identify_mode = False
def set_manual_speed(self, speed: int, fan_id: str = "0xff") -> bool:
"""Set manual fan speed."""
if not self.controller:
return False
self.config['enabled'] = False
self._save_config()
speed = max(0, min(100, speed))
return self.controller.set_fan_speed(speed, fan_id)
def set_auto_mode(self, enabled: bool):
"""Enable or disable automatic control."""
self.config['enabled'] = enabled
self._save_config()
if enabled and self.controller:
self.controller.enable_manual_fan_control()
elif not enabled and self.controller:
self.controller.disable_manual_fan_control()
def get_status(self) -> Dict:
"""Get current controller status."""
with self.lock:
status = {
"running": self.running,
"enabled": self.config.get('enabled', False),
"connected": self.controller is not None and self.controller.is_healthy(),
"manual_mode": self.controller.manual_mode if self.controller else False,
"in_identify_mode": self.in_identify_mode,
"current_speeds": self.current_speeds,
"target_speeds": self.target_speeds,
"temperatures": [asdict(t) for t in self.last_temps],
"fans": [asdict(f) for f in self.last_fans],
"config": self._get_safe_config()
}
return status
def _get_safe_config(self) -> Dict:
"""Get config without sensitive data."""
safe = json.loads(json.dumps(self.config))
# Remove passwords
safe.pop('ipmi_password', None)
safe.pop('http_sensor_password', None)
return safe
# Global service instances
_service_instances: Dict[str, IPMIControllerService] = {}
def get_service(config_path: str = "/etc/ipmi-controller/config.json") -> IPMIControllerService:
"""Get or create the service instance."""
if config_path not in _service_instances:
_service_instances[config_path] = IPMIControllerService(config_path)
return _service_instances[config_path]
if __name__ == "__main__":
# CLI test
import sys
if len(sys.argv) < 4:
print("Usage: fan_controller.py <host> <username> <password>")
sys.exit(1)
host, user, pwd = sys.argv[1:4]
port = int(sys.argv[4]) if len(sys.argv) > 4 else 623
ctrl = IPMIFanController(host, user, pwd, port)
print(f"Testing {host}...")
if ctrl.test_connection():
print("✓ Connected")
print("\nTemps:", [(t.name, t.value) for t in ctrl.get_temperatures()])
print("\nFans:", [(f.fan_number, f.speed_rpm) for f in ctrl.get_fan_speeds()])
else:
print("✗ Failed")