744 lines
26 KiB
Python
744 lines
26 KiB
Python
"""
|
|
IPMI Fan Controller v2 - Simpler, More Robust
|
|
For Dell T710 and compatible servers
|
|
"""
|
|
import subprocess
|
|
import re
|
|
import time
|
|
import json
|
|
import logging
|
|
import threading
|
|
import paramiko
|
|
from dataclasses import dataclass, asdict
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler('/tmp/ipmi-fan-controller.log')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FanCurvePoint:
|
|
temp: float
|
|
speed: int
|
|
|
|
|
|
@dataclass
|
|
class TemperatureReading:
|
|
name: str
|
|
location: str
|
|
value: float
|
|
status: str
|
|
|
|
|
|
@dataclass
|
|
class FanReading:
|
|
fan_id: str
|
|
fan_number: int
|
|
speed_rpm: Optional[int]
|
|
speed_percent: Optional[int]
|
|
|
|
|
|
class IPMIFanController:
|
|
"""Simplified IPMI fan controller with robust error handling."""
|
|
|
|
# Default fan curve (temp C -> speed %)
|
|
DEFAULT_CURVE = [
|
|
FanCurvePoint(30, 15),
|
|
FanCurvePoint(40, 25),
|
|
FanCurvePoint(50, 40),
|
|
FanCurvePoint(60, 60),
|
|
FanCurvePoint(70, 80),
|
|
FanCurvePoint(80, 100),
|
|
]
|
|
|
|
def __init__(self, host: str, username: str, password: str, port: int = 623):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.port = port
|
|
self.manual_mode = False
|
|
self.last_successful_read = None
|
|
self.consecutive_failures = 0
|
|
self.max_failures = 5
|
|
|
|
def _run_ipmi(self, args: List[str], timeout: int = 15) -> Tuple[bool, str]:
|
|
"""Run IPMI command with error handling."""
|
|
cmd = [
|
|
"ipmitool", "-I", "lanplus",
|
|
"-H", self.host,
|
|
"-U", self.username,
|
|
"-P", self.password,
|
|
"-p", str(self.port)
|
|
] + args
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=timeout
|
|
)
|
|
if result.returncode == 0:
|
|
self.consecutive_failures = 0
|
|
return True, result.stdout
|
|
else:
|
|
self.consecutive_failures += 1
|
|
logger.warning(f"IPMI command failed: {result.stderr}")
|
|
return False, result.stderr
|
|
except subprocess.TimeoutExpired:
|
|
self.consecutive_failures += 1
|
|
logger.error(f"IPMI command timed out after {timeout}s")
|
|
return False, "Timeout"
|
|
except Exception as e:
|
|
self.consecutive_failures += 1
|
|
logger.error(f"IPMI command error: {e}")
|
|
return False, str(e)
|
|
|
|
def test_connection(self) -> bool:
|
|
"""Test IPMI connection."""
|
|
success, _ = self._run_ipmi(["mc", "info"], timeout=10)
|
|
return success
|
|
|
|
def enable_manual_fan_control(self) -> bool:
|
|
"""Enable manual fan control mode."""
|
|
# Dell: raw 0x30 0x30 0x01 0x00
|
|
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x00"])
|
|
if success:
|
|
self.manual_mode = True
|
|
logger.info("Manual fan control enabled")
|
|
return success
|
|
|
|
def disable_manual_fan_control(self) -> bool:
|
|
"""Return to automatic fan control."""
|
|
# Dell: raw 0x30 0x30 0x01 0x01
|
|
success, _ = self._run_ipmi(["raw", "0x30", "0x30", "0x01", "0x01"])
|
|
if success:
|
|
self.manual_mode = False
|
|
logger.info("Automatic fan control restored")
|
|
return success
|
|
|
|
def set_fan_speed(self, speed_percent: int, fan_id: str = "0xff") -> bool:
|
|
"""Set fan speed (0-100%). fan_id 0xff = all fans."""
|
|
if speed_percent < 0:
|
|
speed_percent = 0
|
|
if speed_percent > 100:
|
|
speed_percent = 100
|
|
|
|
hex_speed = f"0x{speed_percent:02x}"
|
|
success, _ = self._run_ipmi([
|
|
"raw", "0x30", "0x30", "0x02", fan_id, hex_speed
|
|
])
|
|
|
|
if success:
|
|
logger.info(f"Fan speed set to {speed_percent}%")
|
|
return success
|
|
|
|
def get_temperatures(self) -> List[TemperatureReading]:
|
|
"""Get temperature readings from all sensors."""
|
|
success, output = self._run_ipmi(["sdr", "type", "temperature"])
|
|
if not success:
|
|
return []
|
|
|
|
temps = []
|
|
for line in output.splitlines():
|
|
# Parse: Sensor Name | 01h | ok | 3.1 | 45 degrees C
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 5:
|
|
name = parts[0]
|
|
status = parts[2] if len(parts) > 2 else "unknown"
|
|
reading = parts[4]
|
|
|
|
match = re.search(r'(\d+(?:\.\d+)?)\s+degrees\s+C', reading, re.IGNORECASE)
|
|
if match:
|
|
value = float(match.group(1))
|
|
location = self._classify_temp_location(name)
|
|
temps.append(TemperatureReading(
|
|
name=name,
|
|
location=location,
|
|
value=value,
|
|
status=status
|
|
))
|
|
return temps
|
|
|
|
def get_fan_speeds(self) -> List[FanReading]:
|
|
"""Get current fan speeds."""
|
|
success, output = self._run_ipmi(["sdr", "elist", "full"])
|
|
if not success:
|
|
return []
|
|
|
|
fans = []
|
|
for line in output.splitlines():
|
|
if "fan" in line.lower() and "rpm" in line.lower():
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 5:
|
|
name = parts[0]
|
|
reading = parts[4]
|
|
|
|
# Extract fan number
|
|
match = re.search(r'fan\s*(\d+)', name, re.IGNORECASE)
|
|
fan_number = int(match.group(1)) if match else 0
|
|
fan_id = f"0x{fan_number-1:02x}" if fan_number > 0 else "0x00"
|
|
|
|
# Extract RPM
|
|
rpm_match = re.search(r'(\d+)\s*RPM', reading, re.IGNORECASE)
|
|
rpm = int(rpm_match.group(1)) if rpm_match else None
|
|
|
|
fans.append(FanReading(
|
|
fan_id=fan_id,
|
|
fan_number=fan_number,
|
|
speed_rpm=rpm,
|
|
speed_percent=None
|
|
))
|
|
return fans
|
|
|
|
def _classify_temp_location(self, name: str) -> str:
|
|
"""Classify temperature sensor location."""
|
|
name_lower = name.lower()
|
|
if "cpu" in name_lower or "proc" in name_lower:
|
|
if "1" in name or "one" in name_lower:
|
|
return "cpu1"
|
|
elif "2" in name or "two" in name_lower:
|
|
return "cpu2"
|
|
return "cpu"
|
|
elif "inlet" in name_lower or "ambient" in name_lower:
|
|
return "inlet"
|
|
elif "exhaust" in name_lower:
|
|
return "exhaust"
|
|
elif "memory" in name_lower or "dimm" in name_lower:
|
|
return "memory"
|
|
return "other"
|
|
|
|
def calculate_fan_speed(self, temps: List[TemperatureReading],
|
|
curve: Optional[List[FanCurvePoint]] = None) -> int:
|
|
"""Calculate target fan speed based on temperatures."""
|
|
if not temps:
|
|
return 50 # Default if no temps
|
|
|
|
if curve is None:
|
|
curve = self.DEFAULT_CURVE
|
|
|
|
# Find max CPU temperature
|
|
cpu_temps = [t for t in temps if t.location.startswith("cpu")]
|
|
if cpu_temps:
|
|
max_temp = max(t.value for t in cpu_temps)
|
|
else:
|
|
max_temp = max(t.value for t in temps)
|
|
|
|
# Apply fan curve with linear interpolation
|
|
sorted_curve = sorted(curve, key=lambda p: p.temp)
|
|
|
|
if max_temp <= sorted_curve[0].temp:
|
|
return sorted_curve[0].speed
|
|
if max_temp >= sorted_curve[-1].temp:
|
|
return sorted_curve[-1].speed
|
|
|
|
for i in range(len(sorted_curve) - 1):
|
|
p1, p2 = sorted_curve[i], sorted_curve[i + 1]
|
|
if p1.temp <= max_temp <= p2.temp:
|
|
if p2.temp == p1.temp:
|
|
return p1.speed
|
|
ratio = (max_temp - p1.temp) / (p2.temp - p1.temp)
|
|
speed = p1.speed + ratio * (p2.speed - p1.speed)
|
|
return int(round(speed))
|
|
|
|
return sorted_curve[-1].speed
|
|
|
|
def is_healthy(self) -> bool:
|
|
"""Check if controller is working properly."""
|
|
return self.consecutive_failures < self.max_failures
|
|
|
|
|
|
class SSHSensorClient:
|
|
"""SSH client for lm-sensors data collection."""
|
|
|
|
def __init__(self, host: str, username: str, password: Optional[str] = None,
|
|
key_file: Optional[str] = None, port: int = 22):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.key_file = key_file
|
|
self.port = port
|
|
self.client: Optional[paramiko.SSHClient] = None
|
|
self.consecutive_failures = 0
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to SSH server."""
|
|
try:
|
|
self.client = paramiko.SSHClient()
|
|
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
|
|
connect_kwargs = {
|
|
"hostname": self.host,
|
|
"port": self.port,
|
|
"username": self.username,
|
|
"timeout": 10
|
|
}
|
|
|
|
if self.key_file and Path(self.key_file).exists():
|
|
connect_kwargs["key_filename"] = self.key_file
|
|
elif self.password:
|
|
connect_kwargs["password"] = self.password
|
|
else:
|
|
logger.error("No authentication method available for SSH")
|
|
return False
|
|
|
|
self.client.connect(**connect_kwargs)
|
|
logger.info(f"SSH connected to {self.host}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"SSH connection failed: {e}")
|
|
self.consecutive_failures += 1
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Close SSH connection."""
|
|
if self.client:
|
|
self.client.close()
|
|
self.client = None
|
|
|
|
def get_lm_sensors_data(self) -> List[TemperatureReading]:
|
|
"""Get temperature data from lm-sensors."""
|
|
if not self.client:
|
|
if not self.connect():
|
|
return []
|
|
|
|
try:
|
|
stdin, stdout, stderr = self.client.exec_command("sensors -u", timeout=15)
|
|
output = stdout.read().decode()
|
|
error = stderr.read().decode()
|
|
|
|
if error:
|
|
logger.warning(f"sensors command stderr: {error}")
|
|
|
|
temps = self._parse_sensors_output(output)
|
|
self.consecutive_failures = 0
|
|
return temps
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get sensors data: {e}")
|
|
self.consecutive_failures += 1
|
|
self.disconnect() # Force reconnect on next attempt
|
|
return []
|
|
|
|
def _parse_sensors_output(self, output: str) -> List[TemperatureReading]:
|
|
"""Parse lm-sensors -u output."""
|
|
temps = []
|
|
current_chip = ""
|
|
|
|
for line in output.splitlines():
|
|
line = line.strip()
|
|
|
|
# New chip section
|
|
if line.endswith(":") and not line.startswith(" "):
|
|
current_chip = line.rstrip(":")
|
|
continue
|
|
|
|
# Temperature reading
|
|
if "_input:" in line and "temp" in line.lower():
|
|
parts = line.split(":")
|
|
if len(parts) == 2:
|
|
name = parts[0].strip()
|
|
try:
|
|
value = float(parts[1].strip())
|
|
location = self._classify_sensor_name(name, current_chip)
|
|
temps.append(TemperatureReading(
|
|
name=f"{current_chip}/{name}",
|
|
location=location,
|
|
value=value,
|
|
status="ok"
|
|
))
|
|
except ValueError:
|
|
pass
|
|
|
|
return temps
|
|
|
|
def _classify_sensor_name(self, name: str, chip: str) -> str:
|
|
"""Classify sensor location from name."""
|
|
name_lower = name.lower()
|
|
chip_lower = chip.lower()
|
|
|
|
if "core" in name_lower:
|
|
if "0" in name or "1" in name:
|
|
return "cpu1"
|
|
elif "2" in name or "3" in name:
|
|
return "cpu2"
|
|
return "cpu"
|
|
elif "package" in name_lower:
|
|
return "cpu"
|
|
elif "tdie" in name_lower or "tctl" in name_lower:
|
|
return "cpu"
|
|
return "other"
|
|
|
|
def is_healthy(self) -> bool:
|
|
return self.consecutive_failures < 3
|
|
|
|
|
|
class FanControlService:
|
|
"""Background service for automatic fan control."""
|
|
|
|
def __init__(self, config_path: str = "/etc/ipmi-fan-controller/config.json"):
|
|
self.config_path = config_path
|
|
self.controller: Optional[IPMIFanController] = None
|
|
self.ssh_client: Optional[SSHSensorClient] = None
|
|
self.running = False
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.current_speed = 0
|
|
self.target_speed = 0
|
|
self.last_temps: List[TemperatureReading] = []
|
|
self.last_fans: List[FanReading] = []
|
|
self.lock = threading.Lock()
|
|
|
|
# Default config with new structure
|
|
self.config = {
|
|
# IPMI Settings
|
|
"ipmi_host": "",
|
|
"ipmi_username": "",
|
|
"ipmi_password": "",
|
|
"ipmi_port": 623,
|
|
|
|
# SSH Settings
|
|
"ssh_enabled": False,
|
|
"ssh_host": None,
|
|
"ssh_username": None,
|
|
"ssh_password": None,
|
|
"ssh_use_key": False,
|
|
"ssh_key_file": None,
|
|
"ssh_port": 22,
|
|
|
|
# Fan Control Settings
|
|
"enabled": False,
|
|
"interval": 10,
|
|
"min_speed": 10,
|
|
"max_speed": 100,
|
|
"fan_curve": [
|
|
{"temp": 30, "speed": 15},
|
|
{"temp": 40, "speed": 25},
|
|
{"temp": 50, "speed": 40},
|
|
{"temp": 60, "speed": 60},
|
|
{"temp": 70, "speed": 80},
|
|
{"temp": 80, "speed": 100},
|
|
],
|
|
"panic_temp": 85,
|
|
"panic_speed": 100
|
|
}
|
|
|
|
self._load_config()
|
|
|
|
def _load_config(self):
|
|
"""Load configuration from file."""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
if config_file.exists():
|
|
with open(config_file) as f:
|
|
loaded = json.load(f)
|
|
self.config.update(loaded)
|
|
logger.info(f"Loaded config from {self.config_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load config: {e}")
|
|
|
|
def _save_config(self):
|
|
"""Save configuration to file."""
|
|
try:
|
|
config_file = Path(self.config_path)
|
|
config_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(config_file, 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
logger.info(f"Saved config to {self.config_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save config: {e}")
|
|
|
|
def update_config(self, **kwargs):
|
|
"""Update configuration values."""
|
|
self.config.update(kwargs)
|
|
self._save_config()
|
|
|
|
# Reinitialize controllers if connection params changed
|
|
ipmi_changed = any(k in kwargs for k in ['ipmi_host', 'ipmi_username', 'ipmi_password', 'ipmi_port'])
|
|
ssh_changed = any(k in kwargs for k in ['ssh_host', 'ssh_username', 'ssh_password', 'ssh_key_file', 'ssh_port'])
|
|
|
|
if ipmi_changed:
|
|
self._init_ipmi_controller()
|
|
if ssh_changed or (kwargs.get('ssh_enabled') and not self.ssh_client):
|
|
self._init_ssh_client()
|
|
|
|
def _init_ipmi_controller(self) -> bool:
|
|
"""Initialize the IPMI controller."""
|
|
if not all([self.config.get('ipmi_host'), self.config.get('ipmi_username')]):
|
|
logger.warning("Missing IPMI credentials")
|
|
return False
|
|
|
|
self.controller = IPMIFanController(
|
|
host=self.config['ipmi_host'],
|
|
username=self.config['ipmi_username'],
|
|
password=self.config.get('ipmi_password', ''),
|
|
port=self.config.get('ipmi_port', 623)
|
|
)
|
|
|
|
if self.controller.test_connection():
|
|
logger.info(f"Connected to IPMI at {self.config['ipmi_host']}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to connect to IPMI at {self.config['ipmi_host']}")
|
|
self.controller = None
|
|
return False
|
|
|
|
def _init_ssh_client(self) -> bool:
|
|
"""Initialize SSH client for lm-sensors."""
|
|
if not self.config.get('ssh_enabled'):
|
|
return False
|
|
|
|
host = self.config.get('ssh_host') or self.config.get('ipmi_host')
|
|
username = self.config.get('ssh_username') or self.config.get('ipmi_username')
|
|
|
|
if not all([host, username]):
|
|
logger.warning("Missing SSH credentials")
|
|
return False
|
|
|
|
self.ssh_client = SSHSensorClient(
|
|
host=host,
|
|
username=username,
|
|
password=self.config.get('ssh_password') or self.config.get('ipmi_password'),
|
|
key_file=self.config.get('ssh_key_file'),
|
|
port=self.config.get('ssh_port', 22)
|
|
)
|
|
|
|
return True
|
|
|
|
def start(self) -> bool:
|
|
"""Start the fan control service."""
|
|
if self.running:
|
|
return True
|
|
|
|
if not self._init_ipmi_controller():
|
|
logger.error("Cannot start service - IPMI connection failed")
|
|
return False
|
|
|
|
if self.config.get('ssh_enabled'):
|
|
self._init_ssh_client()
|
|
|
|
self.running = True
|
|
self.thread = threading.Thread(target=self._control_loop, daemon=True)
|
|
self.thread.start()
|
|
logger.info("Fan control service started")
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop the fan control service."""
|
|
self.running = False
|
|
if self.thread:
|
|
self.thread.join(timeout=5)
|
|
|
|
# Return to automatic control
|
|
if self.controller:
|
|
self.controller.disable_manual_fan_control()
|
|
|
|
if self.ssh_client:
|
|
self.ssh_client.disconnect()
|
|
|
|
logger.info("Fan control service stopped")
|
|
|
|
def _control_loop(self):
|
|
"""Main control loop running in background thread."""
|
|
# Enable manual control on startup
|
|
if self.controller:
|
|
self.controller.enable_manual_fan_control()
|
|
|
|
while self.running:
|
|
try:
|
|
if not self.config.get('enabled', False):
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Ensure controllers are healthy
|
|
if not self.controller or not self.controller.is_healthy():
|
|
logger.warning("IPMI controller unhealthy, attempting reconnect...")
|
|
if not self._init_ipmi_controller():
|
|
time.sleep(30)
|
|
continue
|
|
self.controller.enable_manual_fan_control()
|
|
|
|
# Get temperature data
|
|
temps = self._get_temperatures()
|
|
fans = self.controller.get_fan_speeds() if self.controller else []
|
|
|
|
with self.lock:
|
|
self.last_temps = temps
|
|
self.last_fans = fans
|
|
|
|
if not temps:
|
|
logger.warning("No temperature readings received")
|
|
time.sleep(self.config.get('interval', 10))
|
|
continue
|
|
|
|
# Check for panic temperature
|
|
cpu_temps = [t for t in temps if t.location.startswith('cpu')]
|
|
max_temp = max((t.value for t in cpu_temps), default=0)
|
|
|
|
if max_temp >= self.config.get('panic_temp', 85):
|
|
self.target_speed = self.config.get('panic_speed', 100)
|
|
logger.warning(f"PANIC MODE: CPU temp {max_temp}°C, setting fans to {self.target_speed}%")
|
|
else:
|
|
# Calculate target speed from curve
|
|
curve = [FanCurvePoint(p['temp'], p['speed']) for p in self.config.get('fan_curve', [])]
|
|
self.target_speed = self.controller.calculate_fan_speed(temps, curve)
|
|
|
|
# Apply limits
|
|
self.target_speed = max(self.config.get('min_speed', 10),
|
|
min(self.config.get('max_speed', 100), self.target_speed))
|
|
|
|
# Apply fan speed if changed significantly (>= 5%)
|
|
if abs(self.target_speed - self.current_speed) >= 5:
|
|
if self.controller.set_fan_speed(self.target_speed):
|
|
self.current_speed = self.target_speed
|
|
logger.info(f"Fan speed adjusted to {self.target_speed}% (CPU temp: {max_temp:.1f}°C)")
|
|
|
|
time.sleep(self.config.get('interval', 10))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Control loop error: {e}")
|
|
time.sleep(10)
|
|
|
|
def _get_temperatures(self) -> List[TemperatureReading]:
|
|
"""Get temperatures from IPMI and/or SSH lm-sensors."""
|
|
temps = []
|
|
|
|
# Try IPMI first
|
|
if self.controller:
|
|
temps = self.controller.get_temperatures()
|
|
|
|
# Try SSH lm-sensors if enabled and IPMI failed or has no data
|
|
if self.config.get('ssh_enabled') and self.ssh_client:
|
|
if not temps or self.config.get('prefer_ssh_temps', False):
|
|
ssh_temps = self.ssh_client.get_lm_sensors_data()
|
|
if ssh_temps:
|
|
temps = ssh_temps
|
|
|
|
return temps
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get current status."""
|
|
with self.lock:
|
|
status = {
|
|
"running": self.running,
|
|
"enabled": self.config.get('enabled', False),
|
|
"connected": self.controller is not None and self.controller.is_healthy(),
|
|
"manual_mode": self.controller.manual_mode if self.controller else False,
|
|
"current_speed": self.current_speed,
|
|
"target_speed": self.target_speed,
|
|
"temperatures": [asdict(t) for t in self.last_temps],
|
|
"fans": [asdict(f) for f in self.last_fans],
|
|
"config": {
|
|
# IPMI
|
|
"ipmi_host": self.config.get('ipmi_host'),
|
|
"ipmi_port": self.config.get('ipmi_port'),
|
|
"ipmi_username": self.config.get('ipmi_username'),
|
|
# SSH
|
|
"ssh_enabled": self.config.get('ssh_enabled'),
|
|
"ssh_host": self.config.get('ssh_host'),
|
|
"ssh_port": self.config.get('ssh_port'),
|
|
"ssh_username": self.config.get('ssh_username'),
|
|
"ssh_use_key": self.config.get('ssh_use_key'),
|
|
# Settings
|
|
"min_speed": self.config.get('min_speed'),
|
|
"max_speed": self.config.get('max_speed'),
|
|
"panic_temp": self.config.get('panic_temp'),
|
|
"interval": self.config.get('interval'),
|
|
"fan_curve": self.config.get('fan_curve')
|
|
}
|
|
}
|
|
return status
|
|
|
|
def set_manual_speed(self, speed: int) -> bool:
|
|
"""Set manual fan speed."""
|
|
if not self.controller:
|
|
return False
|
|
|
|
self.config['enabled'] = False
|
|
speed = max(0, min(100, speed))
|
|
|
|
if self.controller.set_fan_speed(speed):
|
|
self.current_speed = speed
|
|
return True
|
|
return False
|
|
|
|
def set_auto_mode(self, enabled: bool):
|
|
"""Enable or disable automatic control."""
|
|
self.config['enabled'] = enabled
|
|
self._save_config()
|
|
|
|
if enabled and self.controller:
|
|
self.controller.enable_manual_fan_control()
|
|
elif not enabled and self.controller:
|
|
self.controller.disable_manual_fan_control()
|
|
|
|
|
|
# Global service instances
|
|
_service_instances: Dict[str, FanControlService] = {}
|
|
|
|
|
|
def get_service(config_path: str = "/etc/ipmi-fan-controller/config.json") -> FanControlService:
|
|
"""Get or create the service instance for a config path."""
|
|
if config_path not in _service_instances:
|
|
_service_instances[config_path] = FanControlService(config_path)
|
|
return _service_instances[config_path]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Simple CLI test
|
|
import sys
|
|
|
|
if len(sys.argv) < 4:
|
|
print("Usage: python fan_controller.py <host> <username> <password> [port]")
|
|
sys.exit(1)
|
|
|
|
host = sys.argv[1]
|
|
username = sys.argv[2]
|
|
password = sys.argv[3]
|
|
port = int(sys.argv[4]) if len(sys.argv) > 4 else 623
|
|
|
|
controller = IPMIFanController(host, username, password, port)
|
|
|
|
print(f"Testing connection to {host}...")
|
|
if controller.test_connection():
|
|
print("✓ Connected successfully")
|
|
|
|
print("\nTemperatures:")
|
|
for temp in controller.get_temperatures():
|
|
print(f" {temp.name}: {temp.value}°C ({temp.location})")
|
|
|
|
print("\nFan speeds:")
|
|
for fan in controller.get_fan_speeds():
|
|
print(f" Fan {fan.fan_number}: {fan.speed_rpm} RPM")
|
|
|
|
print("\nEnabling manual control...")
|
|
if controller.enable_manual_fan_control():
|
|
print("✓ Manual control enabled")
|
|
|
|
print("\nSetting fans to 30%...")
|
|
if controller.set_fan_speed(30):
|
|
print("✓ Speed set to 30%")
|
|
time.sleep(3)
|
|
|
|
print("\nSetting fans to 50%...")
|
|
if controller.set_fan_speed(50):
|
|
print("✓ Speed set to 50%")
|
|
time.sleep(3)
|
|
|
|
print("\nReturning to automatic control...")
|
|
controller.disable_manual_fan_control()
|
|
print("✓ Done")
|
|
else:
|
|
print("✗ Connection failed")
|
|
sys.exit(1)
|