801 lines
30 KiB
Python
801 lines
30 KiB
Python
"""
|
|
IPMI Controller - Advanced Fan Control for Dell Servers
|
|
Features: Fan groups, multiple curves, HTTP sensors, panic mode
|
|
"""
|
|
import subprocess
|
|
import re
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
import requests
|
|
from dataclasses import dataclass, asdict
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler('/tmp/ipmi-controller.log')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TemperatureReading:
|
|
name: str
|
|
location: str
|
|
value: float
|
|
status: str
|
|
source: str = "ipmi" # ipmi, http, ssh
|
|
|
|
|
|
@dataclass
|
|
class FanReading:
|
|
fan_id: str
|
|
fan_number: int
|
|
speed_rpm: Optional[int]
|
|
speed_percent: Optional[int]
|
|
name: Optional[str] = None # Custom name
|
|
group: Optional[str] = None # Fan group
|
|
|
|
|
|
@dataclass
|
|
class FanCurve:
|
|
name: str
|
|
points: List[Dict[str, float]] # [{"temp": 30, "speed": 15}, ...]
|
|
sensor_source: str = "cpu" # Which sensor to use
|
|
applies_to: str = "all" # "all", group name, or fan_id
|
|
|
|
|
|
class HTTPSensorClient:
|
|
"""Client for fetching sensor data from HTTP endpoint (lm-sensors over HTTP)."""
|
|
|
|
def __init__(self, url: str, timeout: int = 10):
|
|
self.url = url
|
|
self.timeout = timeout
|
|
self.last_reading = None
|
|
self.consecutive_failures = 0
|
|
|
|
def fetch_sensors(self) -> List[TemperatureReading]:
|
|
"""Fetch sensor data from HTTP endpoint."""
|
|
try:
|
|
response = requests.get(self.url, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
# Parse lm-sensors style output
|
|
temps = self._parse_sensors_output(response.text)
|
|
self.consecutive_failures = 0
|
|
return temps
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch HTTP sensors from {self.url}: {e}")
|
|
self.consecutive_failures += 1
|
|
return []
|
|
|
|
def _parse_sensors_output(self, output: str) -> List[TemperatureReading]:
|
|
"""Parse lm-sensors -u style output."""
|
|
temps = []
|
|
current_chip = ""
|
|
|
|
for line in output.splitlines():
|
|
line = line.strip()
|
|
|
|
# New chip section - chip names typically don't have spaces or colons at start
|
|
if line and not line.startswith("_") and ":" not in line and not line[0].isdigit():
|
|
if "Adapter:" not in line and "ERROR" not in line.upper():
|
|
current_chip = line
|
|
continue
|
|
|
|
# Temperature reading
|
|
if "_input:" in line and "temp" in line.lower():
|
|
parts = line.split(":")
|
|
if len(parts) >= 2:
|
|
name = parts[0].strip()
|
|
try:
|
|
value = float(parts[1].strip().split()[0]) # Handle "34.000" or "34.000 (high ="
|
|
location = self._classify_sensor_name(name, current_chip)
|
|
temps.append(TemperatureReading(
|
|
name=f"{current_chip}/{name}",
|
|
location=location,
|
|
value=value,
|
|
status="ok",
|
|
source="http"
|
|
))
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
return temps
|
|
|
|
def _classify_sensor_name(self, name: str, chip: str) -> str:
|
|
"""Classify sensor location from name with detailed categories."""
|
|
import re
|
|
name_lower = name.lower()
|
|
chip_lower = chip.lower()
|
|
|
|
# Check chip name first for CPU identification
|
|
if "coretemp" in chip_lower:
|
|
# Extract CPU number from chip (coretemp-isa-0000 = cpu1, coretemp-isa-0001 = cpu2)
|
|
if "0001" in chip or "isa-0001" in chip_lower:
|
|
return "cpu2"
|
|
return "cpu1"
|
|
|
|
# Check sensor name for core temps
|
|
if "core" in name_lower:
|
|
# Try to determine which CPU based on core number
|
|
core_match = re.search(r'core\s*(\d+)', name_lower)
|
|
if core_match:
|
|
core_num = int(core_match.group(1))
|
|
if core_num >= 6:
|
|
return "cpu2"
|
|
return "cpu1"
|
|
return "cpu"
|
|
elif "package" in name_lower:
|
|
return "cpu"
|
|
elif "tdie" in name_lower or "tctl" in name_lower:
|
|
return "cpu"
|
|
elif "nvme" in name_lower or "composite" in name_lower:
|
|
return "nvme"
|
|
elif "raid" in name_lower or "megaraid" in name_lower:
|
|
return "raid"
|
|
elif "pcie" in name_lower:
|
|
return "pcie"
|
|
elif "inlet" in name_lower or "ambient" in name_lower or "room" in name_lower:
|
|
return "ambient"
|
|
elif "exhaust" in name_lower or "outlet" in name_lower:
|
|
return "exhaust"
|
|
elif "inlet" in name_lower:
|
|
return "inlet"
|
|
elif "loc1" in name_lower or "loc2" in name_lower or "chipset" in name_lower:
|
|
return "chipset"
|
|
return "other"
|
|
|
|
def is_healthy(self) -> bool:
|
|
return self.consecutive_failures < 3
|
|
|
|
|
|
class IPMIFanController:
|
|
"""IPMI fan controller with advanced features."""
|
|
|
|
def __init__(self, host: str, username: str, password: str, port: int = 623):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.port = port
|
|
self.manual_mode = False
|
|
self.last_successful_read = None
|
|
self.consecutive_failures = 0
|
|
self.max_failures = 5
|
|
|
|
def _run_ipmi(self, args: List[str], timeout: int = 15) -> Tuple[bool, str]:
|
|
"""Run IPMI command with error handling."""
|
|
cmd = [
|
|
"ipmitool", "-I", "lanplus",
|
|
"-H", self.host,
|
|
"-U", self.username,
|
|
"-P", self.password,
|
|
"-p", str(self.port)
|
|
] + args
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
if result.returncode == 0:
|
|
self.consecutive_failures = 0
|
|
return True, result.stdout
|
|
else:
|
|
self.consecutive_failures += 1
|
|
logger.warning(f"IPMI command failed: {result.stderr}")
|
|
return False, result.stderr
|
|
except subprocess.TimeoutExpired:
|
|
self.consecutive_failures += 1
|
|
logger.error(f"IPMI command timed out after {timeout}s")
|
|
return False, "Timeout"
|
|
except Exception as e:
|
|
self.consecutive_failures += 1
|
|
logger.error(f"IPMI command error: {e}")
|
|
return False, str(e)
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test IPMI connection."""
|
|
success, _ = self._run_ipmi(["mc", "info"], timeout=10)
|
|
return success
|
|
|
|
def enable_manual_fan_control(self) -> bool:
|
|
"""Enable manual fan control mode."""
|
|
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x00"])
|
|
if success:
|
|
self.manual_mode = True
|
|
logger.info("Manual fan control enabled")
|
|
return success
|
|
|
|
def disable_manual_fan_control(self) -> bool:
|
|
"""Return to automatic fan control."""
|
|
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x01"])
|
|
if success:
|
|
self.manual_mode = False
|
|
logger.info("Automatic fan control restored")
|
|
return success
|
|
|
|
def set_fan_speed(self, speed_percent: int, fan_id: str = "0xff") -> bool:
|
|
"""Set fan speed (0-100%). fan_id 0xff = all fans."""
|
|
speed_percent = max(0, min(100, speed_percent))
|
|
hex_speed = f"0x{speed_percent:02x}"
|
|
success, _ = self._run_ipmi([
|
|
"raw", "0x30", "0x30", "0x02", fan_id, hex_speed
|
|
])
|
|
|
|
if success:
|
|
logger.info(f"Fan {fan_id} speed set to {speed_percent}%")
|
|
return success
|
|
|
|
def get_temperatures(self) -> List[TemperatureReading]:
|
|
"""Get temperature readings from all sensors."""
|
|
success, output = self._run_ipmi(["sdr", "type", "temperature"])
|
|
if not success:
|
|
return []
|
|
|
|
temps = []
|
|
for line in output.splitlines():
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 5:
|
|
name = parts[0]
|
|
status = parts[2] if len(parts) > 2 else "unknown"
|
|
reading = parts[4]
|
|
|
|
match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE)
|
|
if match:
|
|
value = float(match.group(1))
|
|
location = self._classify_temp_location(name)
|
|
temps.append(TemperatureReading(
|
|
name=name,
|
|
location=location,
|
|
value=value,
|
|
status=status,
|
|
source="ipmi"
|
|
))
|
|
return temps
|
|
|
|
def get_fan_speeds(self) -> List[FanReading]:
|
|
"""Get current fan speeds."""
|
|
success, output = self._run_ipmi(["sdr", "elist", "full"])
|
|
if not success:
|
|
return []
|
|
|
|
fans = []
|
|
for line in output.splitlines():
|
|
if "fan" in line.lower() and "rpm" in line.lower():
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 5:
|
|
name = parts[0]
|
|
reading = parts[4]
|
|
|
|
match = re.search(r'fan\s*(\d+)', name, re.IGNORECASE)
|
|
fan_number = int(match.group(1)) if match else 0
|
|
fan_id = f"0x{fan_number-1:02x}" if fan_number > 0 else "0x00"
|
|
|
|
rpm_match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE)
|
|
rpm = int(rpm_match.group(1)) if rpm_match else None
|
|
|
|
fans.append(FanReading(
|
|
fan_id=fan_id,
|
|
fan_number=fan_number,
|
|
speed_rpm=rpm,
|
|
speed_percent=None
|
|
))
|
|
return fans
|
|
|
|
def _classify_temp_location(self, name: str) -> str:
|
|
"""Classify temperature sensor location."""
|
|
name_lower = name.lower()
|
|
if "cpu" in name_lower or "proc" in name_lower:
|
|
if "1" in name or "one" in name_lower:
|
|
return "cpu1"
|
|
elif "2" in name or "two" in name_lower:
|
|
return "cpu2"
|
|
return "cpu"
|
|
elif "inlet" in name_lower or "ambient" in name_lower:
|
|
return "inlet"
|
|
elif "exhaust" in name_lower:
|
|
return "exhaust"
|
|
elif "memory" in name_lower or "dimm" in name_lower:
|
|
return "memory"
|
|
return "other"
|
|
|
|
def is_healthy(self) -> bool:
|
|
"""Check if controller is working properly."""
|
|
return self.consecutive_failures < self.max_failures
|
|
|
|
|
|
class IPMIControllerService:
|
|
"""Main service for IPMI Controller with all advanced features."""
|
|
|
|
def __init__(self, config_path: str = "/etc/ipmi-controller/config.json"):
|
|
self.config_path = config_path
|
|
self.controller: Optional[IPMIFanController] = None
|
|
self.http_client: Optional[HTTPSensorClient] = None
|
|
self.running = False
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.current_speeds: Dict[str, int] = {} # fan_id -> speed
|
|
self.target_speeds: Dict[str, int] = {}
|
|
self.last_temps: List[TemperatureReading] = []
|
|
self.last_fans: List[FanReading] = []
|
|
self.lock = threading.Lock()
|
|
self.in_identify_mode = False
|
|
|
|
# Default config
|
|
self.config = {
|
|
# IPMI Settings
|
|
"ipmi_host": "",
|
|
"ipmi_username": "",
|
|
"ipmi_password": "",
|
|
"ipmi_port": 623,
|
|
|
|
# HTTP Sensor Settings
|
|
"http_sensor_enabled": False,
|
|
"http_sensor_url": "",
|
|
"http_sensor_timeout": 10,
|
|
|
|
# Fan Control Settings
|
|
"enabled": False,
|
|
"poll_interval": 10,
|
|
"fan_update_interval": 10,
|
|
"min_speed": 10,
|
|
"max_speed": 100,
|
|
"panic_temp": 85,
|
|
"panic_speed": 100,
|
|
"panic_on_no_data": True,
|
|
"no_data_timeout": 60,
|
|
|
|
# Sensor Selection
|
|
"primary_sensor": "cpu", # cpu, cpu1, cpu2, inlet, exhaust, pcie, etc.
|
|
"sensor_preference": "auto", # ipmi, http, auto
|
|
|
|
# Fan Configuration
|
|
"fans": {}, # fan_id -> {"name": "Custom Name", "group": "group1"}
|
|
"fan_groups": {}, # group_name -> {"fans": ["0x00", "0x01"], "curve": "Default"}
|
|
|
|
# Fan Curves
|
|
"active_curve": "Balanced",
|
|
"fan_curves": {
|
|
"Balanced": {
|
|
"points": [
|
|
{"temp": 30, "speed": 10},
|
|
{"temp": 35, "speed": 12},
|
|
{"temp": 40, "speed": 15},
|
|
{"temp": 45, "speed": 20},
|
|
{"temp": 50, "speed": 30},
|
|
{"temp": 55, "speed": 40},
|
|
{"temp": 60, "speed": 55},
|
|
{"temp": 65, "speed": 70},
|
|
{"temp": 70, "speed": 85},
|
|
{"temp": 75, "speed": 95},
|
|
{"temp": 80, "speed": 100}
|
|
],
|
|
"sensor_source": "cpu",
|
|
"applies_to": "all"
|
|
},
|
|
"Silent": {
|
|
"points": [
|
|
{"temp": 30, "speed": 5},
|
|
{"temp": 40, "speed": 10},
|
|
{"temp": 50, "speed": 15},
|
|
{"temp": 55, "speed": 25},
|
|
{"temp": 60, "speed": 35},
|
|
{"temp": 65, "speed": 50},
|
|
{"temp": 70, "speed": 70},
|
|
{"temp": 75, "speed": 85},
|
|
{"temp": 80, "speed": 100}
|
|
],
|
|
"sensor_source": "cpu",
|
|
"applies_to": "all"
|
|
},
|
|
"Performance": {
|
|
"points": [
|
|
{"temp": 30, "speed": 20},
|
|
{"temp": 35, "speed": 25},
|
|
{"temp": 40, "speed": 35},
|
|
{"temp": 45, "speed": 45},
|
|
{"temp": 50, "speed": 55},
|
|
{"temp": 55, "speed": 70},
|
|
{"temp": 60, "speed": 85},
|
|
{"temp": 65, "speed": 95},
|
|
{"temp": 70, "speed": 100}
|
|
],
|
|
"sensor_source": "cpu",
|
|
"applies_to": "all"
|
|
}
|
|
},
|
|
|
|
# UI Settings
|
|
"theme": "dark", # dark, light, auto
|
|
}
|
|
|
|
self._load_config()
|
|
self._last_data_time = datetime.utcnow()
|
|
|
|
def _load_config(self):
|
|
"""Load configuration from file."""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
if config_file.exists():
|
|
with open(config_file) as f:
|
|
loaded = json.load(f)
|
|
self._deep_update(self.config, loaded)
|
|
logger.info(f"Loaded config from {self.config_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load config: {e}")
|
|
|
|
def _deep_update(self, d: dict, u: dict):
|
|
"""Deep update dictionary."""
|
|
for k, v in u.items():
|
|
if isinstance(v, dict) and k in d and isinstance(d[k], dict):
|
|
self._deep_update(d[k], v)
|
|
else:
|
|
d[k] = v
|
|
|
|
def _save_config(self):
|
|
"""Save configuration to file."""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
config_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(config_file, 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
logger.info(f"Saved config to {self.config_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save config: {e}")
|
|
|
|
def update_config(self, **kwargs):
|
|
"""Update configuration values."""
|
|
self._deep_update(self.config, kwargs)
|
|
self._save_config()
|
|
|
|
# Reinitialize if needed
|
|
if any(k in kwargs for k in ['ipmi_host', 'ipmi_username', 'ipmi_password', 'ipmi_port']):
|
|
self._init_controller()
|
|
if any(k in kwargs for k in ['http_sensor_enabled', 'http_sensor_url']):
|
|
self._init_http_client()
|
|
|
|
def _init_controller(self) -> bool:
|
|
"""Initialize the IPMI controller."""
|
|
if not all([self.config.get('ipmi_host'), self.config.get('ipmi_username')]):
|
|
return False
|
|
|
|
self.controller = IPMIFanController(
|
|
host=self.config['ipmi_host'],
|
|
username=self.config['ipmi_username'],
|
|
password=self.config.get('ipmi_password', ''),
|
|
port=self.config.get('ipmi_port', 623)
|
|
)
|
|
|
|
if self.controller.test_connection():
|
|
logger.info(f"Connected to IPMI at {self.config['ipmi_host']}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to connect to IPMI")
|
|
self.controller = None
|
|
return False
|
|
|
|
def _init_http_client(self) -> bool:
|
|
"""Initialize HTTP sensor client."""
|
|
if not self.config.get('http_sensor_enabled'):
|
|
return False
|
|
|
|
url = self.config.get('http_sensor_url')
|
|
if not url:
|
|
return False
|
|
|
|
self.http_client = HTTPSensorClient(
|
|
url=url,
|
|
timeout=self.config.get('http_sensor_timeout', 10)
|
|
)
|
|
logger.info(f"HTTP sensor client initialized for {url}")
|
|
return True
|
|
|
|
def start(self) -> bool:
|
|
"""Start the controller service."""
|
|
if self.running:
|
|
return True
|
|
|
|
if not self._init_controller():
|
|
logger.error("Cannot start - IPMI connection failed")
|
|
return False
|
|
|
|
if self.config.get('http_sensor_enabled'):
|
|
self._init_http_client()
|
|
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._control_loop, daemon=True)
|
|
self.thread.start()
|
|
logger.info("IPMI Controller service started")
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop the controller service."""
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join(timeout=5)
|
|
|
|
if self.controller:
|
|
self.controller.disable_manual_fan_control()
|
|
|
|
logger.info("IPMI Controller service stopped")
|
|
|
|
def _control_loop(self):
|
|
"""Main control loop."""
|
|
if self.controller:
|
|
self.controller.enable_manual_fan_control()
|
|
|
|
poll_counter = 0
|
|
|
|
while self.running:
|
|
try:
|
|
if not self.config.get('enabled', False):
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Ensure controller is healthy
|
|
if not self.controller or not self.controller.is_healthy():
|
|
logger.warning("IPMI unhealthy, reconnecting...")
|
|
if not self._init_controller():
|
|
time.sleep(30)
|
|
continue
|
|
self.controller.enable_manual_fan_control()
|
|
|
|
# Poll temperatures at configured interval
|
|
poll_interval = self.config.get('poll_interval', 10)
|
|
if poll_counter % poll_interval == 0:
|
|
temps = self._get_temperatures()
|
|
fans = self.controller.get_fan_speeds() if self.controller else []
|
|
|
|
with self.lock:
|
|
self.last_temps = temps
|
|
self.last_fans = fans
|
|
|
|
if temps:
|
|
self._last_data_time = datetime.utcnow()
|
|
|
|
# Apply fan curves
|
|
if not self.in_identify_mode:
|
|
self._apply_fan_curves(temps)
|
|
|
|
poll_counter += 1
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Control loop error: {e}")
|
|
time.sleep(10)
|
|
|
|
def _get_temperatures(self) -> List[TemperatureReading]:
|
|
"""Get temperatures from all sources."""
|
|
temps = []
|
|
preference = self.config.get('sensor_preference', 'ipmi')
|
|
|
|
# Try IPMI
|
|
if self.controller and preference in ['ipmi', 'auto']:
|
|
temps = self.controller.get_temperatures()
|
|
|
|
# Try HTTP sensor
|
|
if self.http_client and preference in ['http', 'auto']:
|
|
http_temps = self.http_client.fetch_sensors()
|
|
if http_temps:
|
|
if preference == 'http' or not temps:
|
|
temps = http_temps
|
|
else:
|
|
# Merge, preferring HTTP for PCIe sensors
|
|
temp_dict = {t.name: t for t in temps}
|
|
for ht in http_temps:
|
|
if ht.location == 'pcie' or ht.name not in temp_dict:
|
|
temps.append(ht)
|
|
|
|
return temps
|
|
|
|
def _apply_fan_curves(self, temps: List[TemperatureReading]):
|
|
"""Apply fan curves based on temperatures."""
|
|
if not temps:
|
|
# Check for panic mode on no data
|
|
if self.config.get('panic_on_no_data', True):
|
|
time_since_data = (datetime.utcnow() - self._last_data_time).total_seconds()
|
|
if time_since_data > self.config.get('no_data_timeout', 60):
|
|
self._set_all_fans(self.config.get('panic_speed', 100), "PANIC: No data")
|
|
return
|
|
|
|
# Get primary sensor
|
|
primary_sensor = self.config.get('primary_sensor', 'cpu')
|
|
sensor_temps = [t for t in temps if t.location == primary_sensor]
|
|
if not sensor_temps:
|
|
sensor_temps = [t for t in temps if t.location.startswith(primary_sensor)]
|
|
if not sensor_temps:
|
|
sensor_temps = temps # Fallback to any temp
|
|
|
|
max_temp = max(t.value for t in sensor_temps)
|
|
|
|
# Check panic temperature
|
|
if max_temp >= self.config.get('panic_temp', 85):
|
|
self._set_all_fans(self.config.get('panic_speed', 100), f"PANIC: Temp {max_temp}°C")
|
|
return
|
|
|
|
# Get fan curves
|
|
curves = self.config.get('fan_curves', {})
|
|
active_curve_name = self.config.get('active_curve', 'Balanced')
|
|
default_curve = curves.get(active_curve_name, curves.get('Balanced', {'points': [{'temp': 30, 'speed': 15}, {'temp': 80, 'speed': 100}]}))
|
|
|
|
# Apply curves to fans
|
|
fans = self.config.get('fans', {})
|
|
groups = self.config.get('fan_groups', {})
|
|
|
|
# Calculate target speeds per group/individual
|
|
fan_speeds = {}
|
|
|
|
for fan_id, fan_info in fans.items():
|
|
group = fan_info.get('group')
|
|
curve_name = fan_info.get('curve', 'Default')
|
|
|
|
if group and group in groups:
|
|
curve_name = groups[group].get('curve', 'Default')
|
|
|
|
curve = curves.get(curve_name, default_curve)
|
|
speed = self._calculate_curve_speed(max_temp, curve['points'])
|
|
|
|
# Apply limits
|
|
speed = max(self.config.get('min_speed', 10),
|
|
min(self.config.get('max_speed', 100), speed))
|
|
|
|
fan_speeds[fan_id] = speed
|
|
|
|
# If no individual fan configs, apply to all
|
|
if not fan_speeds:
|
|
speed = self._calculate_curve_speed(max_temp, default_curve['points'])
|
|
speed = max(self.config.get('min_speed', 10),
|
|
min(self.config.get('max_speed', 100), speed))
|
|
self._set_all_fans(speed, f"Temp {max_temp}°C")
|
|
else:
|
|
# Set individual fan speeds
|
|
for fan_id, speed in fan_speeds.items():
|
|
self._set_fan_speed(fan_id, speed, f"Temp {max_temp}°C")
|
|
|
|
def _calculate_curve_speed(self, temp: float, points: List[Dict]) -> int:
|
|
"""Calculate fan speed from curve points."""
|
|
if not points:
|
|
return 50
|
|
|
|
sorted_points = sorted(points, key=lambda p: p['temp'])
|
|
|
|
if temp <= sorted_points[0]['temp']:
|
|
return sorted_points[0]['speed']
|
|
if temp >= sorted_points[-1]['temp']:
|
|
return sorted_points[-1]['speed']
|
|
|
|
for i in range(len(sorted_points) - 1):
|
|
p1, p2 = sorted_points[i], sorted_points[i + 1]
|
|
if p1['temp'] <= temp <= p2['temp']:
|
|
if p2['temp'] == p1['temp']:
|
|
return p1['speed']
|
|
ratio = (temp - p1['temp']) / (p2['temp'] - p1['temp'])
|
|
speed = p1['speed'] + ratio * (p2['speed'] - p1['speed'])
|
|
return int(round(speed))
|
|
|
|
return sorted_points[-1]['speed']
|
|
|
|
def _set_all_fans(self, speed: int, reason: str):
|
|
"""Set all fans to a speed."""
|
|
if self.controller and speed != self.current_speeds.get('all'):
|
|
if self.controller.set_fan_speed(speed, "0xff"):
|
|
self.current_speeds['all'] = speed
|
|
logger.info(f"All fans set to {speed}% ({reason})")
|
|
|
|
def _set_fan_speed(self, fan_id: str, speed: int, reason: str):
|
|
"""Set specific fan speed."""
|
|
if self.controller and speed != self.current_speeds.get(fan_id):
|
|
if self.controller.set_fan_speed(speed, fan_id):
|
|
self.current_speeds[fan_id] = speed
|
|
logger.info(f"Fan {fan_id} set to {speed}% ({reason})")
|
|
|
|
def identify_fan(self, fan_id: str):
|
|
"""Identify a fan by setting it to 100% and others to 0%."""
|
|
if not self.controller:
|
|
return False
|
|
|
|
self.in_identify_mode = True
|
|
|
|
# Set all fans to 0%
|
|
self.controller.set_fan_speed(0, "0xff")
|
|
time.sleep(0.5)
|
|
|
|
# Set target fan to 100%
|
|
self.controller.set_fan_speed(100, fan_id)
|
|
|
|
return True
|
|
|
|
def stop_identify(self):
|
|
"""Stop identify mode and resume normal control."""
|
|
self.in_identify_mode = False
|
|
|
|
def set_manual_speed(self, speed: int, fan_id: str = "0xff") -> bool:
|
|
"""Set manual fan speed."""
|
|
if not self.controller:
|
|
return False
|
|
|
|
self.config['enabled'] = False
|
|
self._save_config()
|
|
speed = max(0, min(100, speed))
|
|
|
|
return self.controller.set_fan_speed(speed, fan_id)
|
|
|
|
def set_auto_mode(self, enabled: bool):
|
|
"""Enable or disable automatic control."""
|
|
self.config['enabled'] = enabled
|
|
self._save_config()
|
|
|
|
if enabled and self.controller:
|
|
self.controller.enable_manual_fan_control()
|
|
elif not enabled and self.controller:
|
|
self.controller.disable_manual_fan_control()
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get current controller status."""
|
|
with self.lock:
|
|
status = {
|
|
"running": self.running,
|
|
"enabled": self.config.get('enabled', False),
|
|
"connected": self.controller is not None and self.controller.is_healthy(),
|
|
"manual_mode": self.controller.manual_mode if self.controller else False,
|
|
"in_identify_mode": self.in_identify_mode,
|
|
"current_speeds": self.current_speeds,
|
|
"target_speeds": self.target_speeds,
|
|
"temperatures": [asdict(t) for t in self.last_temps],
|
|
"fans": [asdict(f) for f in self.last_fans],
|
|
"config": self._get_safe_config()
|
|
}
|
|
return status
|
|
|
|
def _get_safe_config(self) -> Dict:
|
|
"""Get config without sensitive data."""
|
|
safe = json.loads(json.dumps(self.config))
|
|
# Remove passwords
|
|
safe.pop('ipmi_password', None)
|
|
safe.pop('http_sensor_password', None)
|
|
return safe
|
|
|
|
|
|
# Global service instances
|
|
_service_instances: Dict[str, IPMIControllerService] = {}
|
|
|
|
|
|
def get_service(config_path: str = "/etc/ipmi-controller/config.json") -> IPMIControllerService:
|
|
"""Get or create the service instance."""
|
|
if config_path not in _service_instances:
|
|
_service_instances[config_path] = IPMIControllerService(config_path)
|
|
return _service_instances[config_path]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# CLI test
|
|
import sys
|
|
|
|
if len(sys.argv) < 4:
|
|
print("Usage: fan_controller.py <host> <username> <password>")
|
|
sys.exit(1)
|
|
|
|
host, user, pwd = sys.argv[1:4]
|
|
port = int(sys.argv[4]) if len(sys.argv) > 4 else 623
|
|
|
|
ctrl = IPMIFanController(host, user, pwd, port)
|
|
|
|
print(f"Testing {host}...")
|
|
if ctrl.test_connection():
|
|
print("✓ Connected")
|
|
print("\nTemps:", [(t.name, t.value) for t in ctrl.get_temperatures()])
|
|
print("\nFans:", [(f.fan_number, f.speed_rpm) for f in ctrl.get_fan_speeds()])
|
|
else:
|
|
print("✗ Failed")
|