530 lines
20 KiB
Python
530 lines
20 KiB
Python
"""Fan control logic including curves and automatic control."""
|
|
import json
|
|
import logging
|
|
import asyncio
|
|
from typing import List, Dict, Optional, Any
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
from enum import Enum
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.database import (
|
|
Server, FanCurve, SensorData, FanData, SystemLog,
|
|
get_db, SessionLocal
|
|
)
|
|
from backend.ipmi_client import IPMIClient, TemperatureReading
|
|
from backend.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ControlState(Enum):
|
|
"""Fan control state."""
|
|
AUTO = "auto"
|
|
MANUAL = "manual"
|
|
PANIC = "panic"
|
|
OFF = "off"
|
|
|
|
|
|
@dataclass
|
|
class FanCurvePoint:
|
|
"""Single point on a fan curve."""
|
|
temp: float
|
|
speed: int
|
|
|
|
|
|
class FanCurveManager:
|
|
"""Manages fan curve calculations."""
|
|
|
|
@staticmethod
|
|
def parse_curve(curve_data: str) -> List[FanCurvePoint]:
|
|
"""Parse fan curve from JSON string."""
|
|
try:
|
|
data = json.loads(curve_data)
|
|
return [FanCurvePoint(p["temp"], p["speed"]) for p in data]
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
logger.error(f"Failed to parse fan curve: {e}")
|
|
# Return default curve
|
|
return [
|
|
FanCurvePoint(30, 10),
|
|
FanCurvePoint(40, 20),
|
|
FanCurvePoint(50, 35),
|
|
FanCurvePoint(60, 50),
|
|
FanCurvePoint(70, 70),
|
|
FanCurvePoint(80, 100),
|
|
]
|
|
|
|
@staticmethod
|
|
def serialize_curve(points: List[FanCurvePoint]) -> str:
|
|
"""Serialize fan curve to JSON string."""
|
|
return json.dumps([{"temp": p.temp, "speed": p.speed} for p in points])
|
|
|
|
@staticmethod
|
|
def calculate_speed(curve: List[FanCurvePoint], temperature: float) -> int:
|
|
"""Calculate fan speed for a given temperature using linear interpolation."""
|
|
if not curve:
|
|
return 50 # Default to 50% if no curve
|
|
|
|
# Sort by temperature
|
|
sorted_curve = sorted(curve, key=lambda p: p.temp)
|
|
|
|
# Below minimum temp
|
|
if temperature <= sorted_curve[0].temp:
|
|
return sorted_curve[0].speed
|
|
|
|
# Above maximum temp
|
|
if temperature >= sorted_curve[-1].temp:
|
|
return sorted_curve[-1].speed
|
|
|
|
# Find surrounding points
|
|
for i in range(len(sorted_curve) - 1):
|
|
p1 = sorted_curve[i]
|
|
p2 = sorted_curve[i + 1]
|
|
|
|
if p1.temp <= temperature <= p2.temp:
|
|
# Linear interpolation
|
|
if p2.temp == p1.temp:
|
|
return p1.speed
|
|
|
|
ratio = (temperature - p1.temp) / (p2.temp - p1.temp)
|
|
speed = p1.speed + ratio * (p2.speed - p1.speed)
|
|
return int(round(speed))
|
|
|
|
return sorted_curve[-1].speed
|
|
|
|
|
|
class FanController:
|
|
"""Main fan controller for managing server fans."""
|
|
|
|
def __init__(self):
|
|
self.curve_manager = FanCurveManager()
|
|
self.running = False
|
|
self._tasks: Dict[int, asyncio.Task] = {}
|
|
self._last_sensor_data: Dict[int, datetime] = {}
|
|
|
|
async def start(self):
|
|
"""Start the fan controller service."""
|
|
self.running = True
|
|
logger.info("Fan controller started")
|
|
|
|
# Load all servers with auto-control enabled
|
|
db = SessionLocal()
|
|
try:
|
|
servers = db.query(Server).filter(
|
|
Server.auto_control_enabled == True,
|
|
Server.is_active == True
|
|
).all()
|
|
|
|
for server in servers:
|
|
await self.start_server_control(server.id)
|
|
finally:
|
|
db.close()
|
|
|
|
async def stop(self):
|
|
"""Stop all fan control tasks."""
|
|
self.running = False
|
|
for task in self._tasks.values():
|
|
task.cancel()
|
|
self._tasks.clear()
|
|
logger.info("Fan controller stopped")
|
|
|
|
async def start_server_control(self, server_id: int):
|
|
"""Start automatic control for a server."""
|
|
if server_id in self._tasks:
|
|
self._tasks[server_id].cancel()
|
|
|
|
task = asyncio.create_task(self._control_loop(server_id))
|
|
self._tasks[server_id] = task
|
|
logger.info(f"Started fan control for server {server_id}")
|
|
|
|
async def stop_server_control(self, server_id: int):
|
|
"""Stop automatic control for a server."""
|
|
if server_id in self._tasks:
|
|
self._tasks[server_id].cancel()
|
|
del self._tasks[server_id]
|
|
logger.info(f"Stopped fan control for server {server_id}")
|
|
|
|
async def _control_loop(self, server_id: int):
|
|
"""Main control loop for a server."""
|
|
while self.running:
|
|
try:
|
|
await self._control_iteration(server_id)
|
|
await asyncio.sleep(5) # 5 second interval
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Control loop error for server {server_id}: {e}")
|
|
await asyncio.sleep(10)
|
|
|
|
async def _control_iteration(self, server_id: int):
|
|
"""Single control iteration for a server."""
|
|
db = SessionLocal()
|
|
try:
|
|
server = db.query(Server).filter(Server.id == server_id).first()
|
|
if not server or not server.is_active:
|
|
return
|
|
|
|
from backend.auth import decrypt_password
|
|
|
|
# Create IPMI client
|
|
client = IPMIClient(
|
|
host=server.ipmi_host,
|
|
username=server.ipmi_username,
|
|
password=decrypt_password(server.ipmi_encrypted_password),
|
|
port=server.ipmi_port,
|
|
vendor=server.vendor
|
|
)
|
|
|
|
# Test connection with timeout
|
|
if not await asyncio.wait_for(
|
|
asyncio.to_thread(client.test_connection),
|
|
timeout=10.0
|
|
):
|
|
logger.warning(f"Cannot connect to server {server.name}")
|
|
return
|
|
|
|
# Get sensor data with timeout
|
|
temps = await asyncio.wait_for(
|
|
asyncio.to_thread(client.get_temperatures),
|
|
timeout=15.0
|
|
)
|
|
|
|
# Update last sensor data time
|
|
self._last_sensor_data[server_id] = datetime.utcnow()
|
|
server.last_seen = datetime.utcnow()
|
|
|
|
# Calculate and set fan speed if auto control is enabled
|
|
if server.auto_control_enabled:
|
|
await self._apply_fan_curve(db, server, client, temps)
|
|
|
|
db.commit()
|
|
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Control iteration timeout for server {server_id}")
|
|
except Exception as e:
|
|
logger.error(f"Control iteration error for server {server_id}: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
async def _apply_fan_curve(self, db: Session, server: Server,
|
|
client: IPMIClient, temps: List[TemperatureReading]):
|
|
"""Apply fan curve based on temperatures."""
|
|
if not temps:
|
|
return
|
|
|
|
# Get active fan curve
|
|
curve_data = server.fan_curve_data
|
|
if not curve_data:
|
|
curve = [
|
|
FanCurvePoint(30, 10),
|
|
FanCurvePoint(40, 20),
|
|
FanCurvePoint(50, 35),
|
|
FanCurvePoint(60, 50),
|
|
FanCurvePoint(70, 70),
|
|
FanCurvePoint(80, 100),
|
|
]
|
|
else:
|
|
curve = self.curve_manager.parse_curve(curve_data)
|
|
|
|
# Find the highest CPU temperature
|
|
cpu_temps = [t for t in temps if t.location.startswith("cpu")]
|
|
if cpu_temps:
|
|
max_temp = max(t.value for t in cpu_temps)
|
|
else:
|
|
max_temp = max(t.value for t in temps)
|
|
|
|
# Calculate target speed
|
|
target_speed = self.curve_manager.calculate_speed(curve, max_temp)
|
|
|
|
# Enable manual control if not already
|
|
if not server.manual_control_enabled:
|
|
if await asyncio.wait_for(
|
|
asyncio.to_thread(client.enable_manual_fan_control),
|
|
timeout=10.0
|
|
):
|
|
server.manual_control_enabled = True
|
|
logger.info(f"Enabled manual fan control for {server.name}")
|
|
|
|
# Set fan speed
|
|
if await asyncio.wait_for(
|
|
asyncio.to_thread(client.set_all_fans_speed, target_speed),
|
|
timeout=10.0
|
|
):
|
|
logger.info(f"Set {server.name} fans to {target_speed}% (temp: {max_temp}°C)")
|
|
|
|
def get_controller_status(self, server_id: int) -> Dict[str, Any]:
|
|
"""Get current controller status for a server."""
|
|
is_running = server_id in self._tasks
|
|
last_seen = self._last_sensor_data.get(server_id)
|
|
|
|
return {
|
|
"is_running": is_running,
|
|
"last_sensor_data": last_seen.isoformat() if last_seen else None,
|
|
"state": ControlState.AUTO.value if is_running else ControlState.OFF.value
|
|
}
|
|
|
|
|
|
class SensorCollector:
|
|
"""High-performance background sensor data collector.
|
|
|
|
- Collects from all servers in parallel using thread pool
|
|
- Times out slow operations to prevent hanging
|
|
- Cleans up old database records periodically
|
|
- Updates cache for fast web UI access
|
|
"""
|
|
|
|
def __init__(self, max_workers: int = 4):
|
|
self.running = False
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._collection_interval = 30 # seconds - IPMI is slow, need more time
|
|
self._cleanup_interval = 3600 # 1 hour
|
|
self._cache = None
|
|
self._executor = ThreadPoolExecutor(max_workers=max_workers)
|
|
self._last_cleanup = datetime.utcnow()
|
|
self._first_collection_done = False
|
|
|
|
async def start(self):
|
|
"""Start the sensor collector."""
|
|
self.running = True
|
|
self._task = asyncio.create_task(self._collection_loop())
|
|
logger.info("Sensor collector started")
|
|
|
|
async def stop(self):
|
|
"""Stop the sensor collector."""
|
|
self.running = False
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._task = None
|
|
self._executor.shutdown(wait=False)
|
|
logger.info("Sensor collector stopped")
|
|
|
|
async def _collection_loop(self):
|
|
"""Main collection loop."""
|
|
# Initial collection immediately on startup
|
|
try:
|
|
logger.info("Performing initial sensor collection...")
|
|
await self._collect_all_servers()
|
|
self._first_collection_done = True
|
|
logger.info("Initial sensor collection complete")
|
|
except Exception as e:
|
|
logger.error(f"Initial collection error: {e}")
|
|
|
|
while self.running:
|
|
try:
|
|
start_time = datetime.utcnow()
|
|
await self._collect_all_servers()
|
|
|
|
# Periodic database cleanup
|
|
if (datetime.utcnow() - self._last_cleanup).total_seconds() > self._cleanup_interval:
|
|
await self._cleanup_old_data()
|
|
|
|
# Calculate sleep time to maintain interval
|
|
elapsed = (datetime.utcnow() - start_time).total_seconds()
|
|
sleep_time = max(0, self._collection_interval - elapsed)
|
|
|
|
# Only warn if significantly over (collections can be slow)
|
|
if elapsed > self._collection_interval * 1.5:
|
|
logger.warning(f"Collection took {elapsed:.1f}s, longer than interval {self._collection_interval}s")
|
|
|
|
await asyncio.sleep(sleep_time)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Sensor collection error: {e}")
|
|
await asyncio.sleep(self._collection_interval)
|
|
|
|
async def _collect_all_servers(self):
|
|
"""Collect sensor data from all active servers in parallel."""
|
|
db = SessionLocal()
|
|
try:
|
|
servers = db.query(Server).filter(Server.is_active == True).all()
|
|
if not servers:
|
|
return
|
|
|
|
# Create tasks for parallel collection
|
|
tasks = []
|
|
for server in servers:
|
|
task = self._collect_server_with_timeout(server)
|
|
tasks.append(task)
|
|
|
|
# Run all collections concurrently with timeout protection
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
# Process results and batch store in database
|
|
all_sensor_data = []
|
|
all_fan_data = []
|
|
|
|
for server, result in zip(servers, results):
|
|
if isinstance(result, Exception):
|
|
logger.debug(f"Server {server.name} collection failed: {result}")
|
|
continue
|
|
|
|
if result:
|
|
temps, fans = result
|
|
now = datetime.utcnow()
|
|
|
|
# Prepare batch inserts
|
|
for temp in temps:
|
|
all_sensor_data.append({
|
|
'server_id': server.id,
|
|
'sensor_name': temp.name,
|
|
'sensor_type': 'temperature',
|
|
'value': temp.value,
|
|
'unit': '°C',
|
|
'timestamp': now
|
|
})
|
|
|
|
for fan in fans:
|
|
all_fan_data.append({
|
|
'server_id': server.id,
|
|
'fan_number': fan.fan_number,
|
|
'fan_id': getattr(fan, 'fan_id', str(fan.fan_number)),
|
|
'speed_rpm': fan.speed_rpm,
|
|
'speed_percent': fan.speed_percent,
|
|
'timestamp': now
|
|
})
|
|
|
|
server.last_seen = now
|
|
|
|
# Batch insert for better performance
|
|
if all_sensor_data:
|
|
db.bulk_insert_mappings(SensorData, all_sensor_data)
|
|
if all_fan_data:
|
|
db.bulk_insert_mappings(FanData, all_fan_data)
|
|
|
|
db.commit()
|
|
logger.debug(f"Collected data from {len([r for r in results if not isinstance(r, Exception)])}/{len(servers)} servers")
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
async def _collect_server_with_timeout(self, server: Server) -> Optional[tuple]:
|
|
"""Collect sensor data from a single server with timeout protection."""
|
|
try:
|
|
return await asyncio.wait_for(
|
|
self._collect_server(server),
|
|
timeout=30.0 # Max 30 seconds per server (IPMI can be slow)
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"Collection timeout for {server.name}")
|
|
return None
|
|
|
|
async def _collect_server(self, server: Server) -> Optional[tuple]:
|
|
"""Collect sensor data from a single server."""
|
|
try:
|
|
from backend.auth import decrypt_password
|
|
from backend.main import sensor_cache
|
|
|
|
# Run blocking IPMI operations in thread pool
|
|
loop = asyncio.get_event_loop()
|
|
|
|
client = IPMIClient(
|
|
host=server.ipmi_host,
|
|
username=server.ipmi_username,
|
|
password=decrypt_password(server.ipmi_encrypted_password),
|
|
port=server.ipmi_port,
|
|
vendor=server.vendor
|
|
)
|
|
|
|
# Test connection
|
|
connected = await loop.run_in_executor(self._executor, client.test_connection)
|
|
if not connected:
|
|
return None
|
|
|
|
# Get sensor data in parallel using thread pool
|
|
temps_future = loop.run_in_executor(self._executor, client.get_temperatures)
|
|
fans_future = loop.run_in_executor(self._executor, client.get_fan_speeds)
|
|
power_future = loop.run_in_executor(self._executor, client.get_power_consumption)
|
|
|
|
temps, fans, power = await asyncio.gather(
|
|
temps_future, fans_future, power_future
|
|
)
|
|
|
|
# Calculate summary metrics
|
|
max_temp = max((t.value for t in temps if t.value is not None), default=0)
|
|
avg_fan = sum(f.speed_percent for f in fans if f.speed_percent is not None) / len(fans) if fans else 0
|
|
|
|
# Extract current power consumption
|
|
current_power = None
|
|
if power and isinstance(power, dict):
|
|
import re
|
|
for key, value in power.items():
|
|
if 'current' in key.lower() and 'power' in key.lower():
|
|
match = re.search(r'(\d+(?:\.\d+)?)', str(value))
|
|
if match:
|
|
current_power = float(match.group(1))
|
|
break
|
|
|
|
# Prepare cache data - format must match response schemas
|
|
cache_data = {
|
|
"max_temp": max_temp,
|
|
"avg_fan_speed": round(avg_fan, 1),
|
|
"power_consumption": current_power,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"temps": [{"name": t.name, "value": t.value, "location": t.location, "status": getattr(t, 'status', 'ok')} for t in temps],
|
|
"fans": [{"fan_id": getattr(f, 'fan_id', f'0x0{f.fan_number-1}'), "fan_number": f.fan_number, "speed_percent": f.speed_percent, "speed_rpm": f.speed_rpm} for f in fans],
|
|
"power_raw": power if isinstance(power, dict) else None
|
|
}
|
|
|
|
# Store in cache
|
|
await sensor_cache.set(server.id, cache_data)
|
|
|
|
logger.info(f"Collected and cached sensors for {server.name}: temp={max_temp:.1f}°C, fan={avg_fan:.1f}%")
|
|
return temps, fans
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to collect sensors for {server.name}: {e}")
|
|
return None
|
|
|
|
async def _cleanup_old_data(self):
|
|
"""Clean up old sensor data to prevent database bloat."""
|
|
try:
|
|
db = SessionLocal()
|
|
try:
|
|
# Keep only last 24 hours of detailed sensor data
|
|
cutoff = datetime.utcnow() - timedelta(hours=24)
|
|
|
|
# Delete old sensor data
|
|
deleted_sensors = db.query(SensorData).filter(
|
|
SensorData.timestamp < cutoff
|
|
).delete(synchronize_session=False)
|
|
|
|
# Delete old fan data
|
|
deleted_fans = db.query(FanData).filter(
|
|
FanData.timestamp < cutoff
|
|
).delete(synchronize_session=False)
|
|
|
|
db.commit()
|
|
|
|
if deleted_sensors > 0 or deleted_fans > 0:
|
|
logger.info(f"Cleaned up {deleted_sensors} sensor records and {deleted_fans} fan records")
|
|
|
|
self._last_cleanup = datetime.utcnow()
|
|
finally:
|
|
db.close()
|
|
except Exception as e:
|
|
logger.error(f"Database cleanup failed: {e}")
|
|
|
|
|
|
# Global controller instance
|
|
fan_controller = FanController()
|
|
sensor_collector = SensorCollector(max_workers=4)
|
|
|
|
|
|
async def initialize_fan_controller():
|
|
"""Initialize and start the fan controller and sensor collector."""
|
|
await sensor_collector.start()
|
|
await fan_controller.start()
|
|
|
|
|
|
async def shutdown_fan_controller():
|
|
"""Shutdown the fan controller and sensor collector."""
|
|
await fan_controller.stop()
|
|
await sensor_collector.stop()
|