ipmi-fan-control/backend/ipmi_client.py

377 lines
13 KiB
Python

"""IPMI client for communicating with servers."""
import subprocess
import re
import json
import logging
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from datetime import datetime
from backend.config import settings
logger = logging.getLogger(__name__)
@dataclass
class SensorReading:
"""Sensor reading data."""
name: str
sensor_type: str
value: float
unit: str
status: str
@dataclass
class FanReading:
"""Fan speed reading."""
fan_id: str
fan_number: int
speed_rpm: Optional[int]
speed_percent: Optional[int]
@dataclass
class TemperatureReading:
"""Temperature reading."""
name: str
location: str
value: float
status: str
class IPMIClient:
"""IPMI client for server communication."""
# Fan number mapping: IPMI ID -> Physical fan number
FAN_MAPPING = {
"0x00": 1, "0x01": 2, "0x02": 3, "0x03": 4,
"0x04": 5, "0x05": 6, "0x06": 7, "0x07": 8,
}
# Hex to percent conversion
HEX_TO_PERCENT = {f"0x{i:02x}": i for i in range(101)}
PERCENT_TO_HEX = {i: f"0x{i:02x}" for i in range(101)}
def __init__(self, host: str, username: str, password: str, port: int = 623, vendor: str = "dell"):
self.host = host
self.username = username
self.password = password
self.port = port
self.vendor = vendor.lower()
self.base_cmd = [
settings.IPMITOOL_PATH,
"-I", "lanplus",
"-H", host,
"-U", username,
"-P", password,
"-p", str(port),
]
def _run_command(self, args: List[str], timeout: int = 30) -> Tuple[bool, str, str]:
"""Run IPMI command and return success, stdout, stderr."""
cmd = self.base_cmd + args
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
success = result.returncode == 0
if not success:
logger.error(f"IPMI command failed: {result.stderr}")
return success, result.stdout, result.stderr
except subprocess.TimeoutExpired:
logger.error(f"IPMI command timed out after {timeout}s")
return False, "", "Command timed out"
except Exception as e:
logger.error(f"IPMI command error: {e}")
return False, "", str(e)
def test_connection(self) -> bool:
"""Test IPMI connection."""
success, stdout, stderr = self._run_command(["mc", "info"], timeout=10)
return success
def get_mc_info(self) -> Dict[str, Any]:
"""Get Management Controller info."""
success, stdout, stderr = self._run_command(["mc", "info"])
if not success:
return {"error": stderr}
info = {}
for line in stdout.splitlines():
if ":" in line:
key, value = line.split(":", 1)
info[key.strip()] = value.strip()
return info
def enable_manual_fan_control(self) -> bool:
"""Enable manual fan control."""
# Dell command: raw 0x30 0x30 0x01 0x00
success, _, _ = self._run_command(["raw", "0x30", "0x30", "0x01", "0x00"])
return success
def disable_manual_fan_control(self) -> bool:
"""Disable manual fan control (return to automatic)."""
# Dell command: raw 0x30 0x30 0x01 0x01
success, _, _ = self._run_command(["raw", "0x30", "0x30", "0x01", "0x01"])
return success
def set_fan_speed(self, fan_id: str, speed_percent: int) -> bool:
"""
Set fan speed.
fan_id: '0xff' for all fans, or '0x00', '0x01', etc. for specific fan
speed_percent: 0-100
"""
if speed_percent < 0:
speed_percent = 0
if speed_percent > 100:
speed_percent = 100
hex_speed = self.PERCENT_TO_HEX.get(speed_percent, "0x32")
success, _, _ = self._run_command([
"raw", "0x30", "0x30", "0x02", fan_id, hex_speed
])
return success
def set_all_fans_speed(self, speed_percent: int) -> bool:
"""Set all fans to the same speed."""
return self.set_fan_speed("0xff", speed_percent)
def get_third_party_pcie_response(self) -> Optional[bool]:
"""Get 3rd party PCIe card response state."""
success, stdout, _ = self._run_command([
"raw", "0x30", "0xce", "0x01", "0x16", "0x05", "0x00", "0x00", "0x00"
])
if not success:
return None
# Parse response: 00 00 00 = Enabled, 01 00 00 = Disabled
parts = stdout.strip().split()
if len(parts) >= 3:
return parts[0] == "00" # True if enabled
return None
def enable_third_party_pcie_response(self) -> bool:
"""Enable 3rd party PCIe card response."""
success, _, _ = self._run_command([
"raw", "0x30", "0xce", "0x00", "0x16", "0x05", "0x00", "0x00", "0x00",
"0x05", "0x00", "0x00", "0x00", "0x00"
])
return success
def disable_third_party_pcie_response(self) -> bool:
"""Disable 3rd party PCIe card response."""
success, _, _ = self._run_command([
"raw", "0x30", "0xce", "0x00", "0x16", "0x05", "0x00", "0x00", "0x00",
"0x05", "0x00", "0x01", "0x00", "0x00"
])
return success
def get_temperatures(self) -> List[TemperatureReading]:
"""Get temperature sensor readings."""
success, stdout, _ = self._run_command(["sdr", "type", "temperature"])
if not success:
return []
temps = []
for line in stdout.splitlines():
# Parse: Sensor Name | 01h | ok | 3.1 | 45 degrees C
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
name = parts[0]
status = parts[2] if len(parts) > 2 else "unknown"
reading = parts[4] if len(parts) > 4 else ""
# Extract temperature value
match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE)
if match:
value = float(match.group(1))
# Determine location
location = self._determine_temp_location(name)
temps.append(TemperatureReading(
name=name,
location=location,
value=value,
status=status
))
return temps
def _determine_temp_location(self, name: str) -> str:
"""Determine temperature sensor location from name."""
name_lower = name.lower()
if "cpu" in name_lower or "proc" in name_lower:
if "1" in name or "one" in name_lower:
return "cpu1"
elif "2" in name or "two" in name_lower:
return "cpu2"
return "cpu"
elif "inlet" in name_lower or "ambient" in name_lower:
return "inlet"
elif "exhaust" in name_lower:
return "exhaust"
elif "chipset" in name_lower or "pch" in name_lower:
return "chipset"
elif "memory" in name_lower or "dimm" in name_lower:
return "memory"
elif "psu" in name_lower or "power supply" in name_lower:
return "psu"
return "other"
def get_fan_speeds(self) -> List[FanReading]:
"""Get fan speed readings."""
success, stdout, _ = self._run_command(["sdr", "elist", "full"])
if not success:
return []
fans = []
for line in stdout.splitlines():
# Look for fan entries: Fan1 RPM | 30h | ok | 29.1 | 4200 RPM
if "fan" in line.lower() and "rpm" in line.lower():
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
name = parts[0]
reading = parts[4]
# Extract fan number
match = re.search(r'fan\s*(\d+)', name, re.IGNORECASE)
fan_number = int(match.group(1)) if match else 0
# Map to IPMI fan ID
fan_id = None
for fid, num in self.FAN_MAPPING.items():
if num == fan_number:
fan_id = fid
break
if not fan_id:
fan_id = f"0x{fan_number-1:02x}" if fan_number > 0 else "0x00"
# Extract RPM
rpm_match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE)
rpm = int(rpm_match.group(1)) if rpm_match else None
fans.append(FanReading(
fan_id=fan_id,
fan_number=fan_number,
speed_rpm=rpm,
speed_percent=None # Calculate from RPM if max known
))
return fans
def get_all_sensors(self) -> List[SensorReading]:
"""Get all sensor readings."""
success, stdout, _ = self._run_command(["sdr", "elist", "full"])
if not success:
return []
sensors = []
for line in stdout.splitlines():
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 5:
name = parts[0]
sensor_type = self._determine_sensor_type(name)
reading = parts[4]
status = parts[2] if len(parts) > 2 else "unknown"
# Extract value and unit
value, unit = self._parse_sensor_value(reading)
if value is not None:
sensors.append(SensorReading(
name=name,
sensor_type=sensor_type,
value=value,
unit=unit,
status=status
))
return sensors
def _determine_sensor_type(self, name: str) -> str:
"""Determine sensor type from name."""
name_lower = name.lower()
if "temp" in name_lower or "degrees" in name_lower:
return "temperature"
elif "fan" in name_lower or "rpm" in name_lower:
return "fan"
elif "volt" in name_lower or "v" in name_lower:
return "voltage"
elif "power" in name_lower or "watt" in name_lower or "psu" in name_lower:
return "power"
elif "current" in name_lower or "amp" in name_lower:
return "current"
return "other"
def _parse_sensor_value(self, reading: str) -> Tuple[Optional[float], str]:
"""Parse sensor value and unit from reading string."""
# Temperature: "45 degrees C"
match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE)
if match:
return float(match.group(1)), "°C"
# RPM: "4200 RPM"
match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE)
if match:
return float(match.group(1)), "RPM"
# Voltage: "12.05 Volts" or "3.3 V"
match = re.search(r'(\d+(?:\.\d+)?)\s*(?:Volts?|V)\b', reading, re.IGNORECASE)
if match:
return float(match.group(1)), "V"
# Power: "250 Watts" or "250 W"
match = re.search(r'(\d+(?:\.\d+)?)\s*(?:Watts?|W)\b', reading, re.IGNORECASE)
if match:
return float(match.group(1)), "W"
# Current: "5.5 Amps" or "5.5 A"
match = re.search(r'(\d+(?:\.\d+)?)\s*(?:Amps?|A)\b', reading, re.IGNORECASE)
if match:
return float(match.group(1)), "A"
# Generic number
match = re.search(r'(\d+(?:\.\d+)?)', reading)
if match:
return float(match.group(1)), ""
return None, ""
def get_power_consumption(self) -> Optional[Dict[str, Any]]:
"""Get power consumption data (Dell OEM command)."""
success, stdout, _ = self._run_command(["delloem", "powermonitor"])
if not success:
return None
power_data = {}
for line in stdout.splitlines():
if ":" in line:
key, value = line.split(":", 1)
power_data[key.strip()] = value.strip()
return power_data
def get_power_supply_status(self) -> List[SensorReading]:
"""Get power supply sensor data."""
success, stdout, _ = self._run_command(["sdr", "type", "Power Supply"])
if not success:
return []
psus = []
for line in stdout.splitlines():
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3:
name = parts[0]
status = parts[2] if len(parts) > 2 else "unknown"
psus.append(SensorReading(
name=name,
sensor_type="power_supply",
value=1.0 if status.lower() == "ok" else 0.0,
unit="status",
status=status
))
return psus