"""
Web API for IPMI Fan Controller v2 - With Auth & SSH Support
"""
import asyncio
import json
import logging
import hashlib
import secrets
import re
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional, List, Dict
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
# Import the fan controller
import sys
sys.path.insert(0, str(Path(__file__).parent))
from fan_controller import get_service, FanControlService, IPMIFanController
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Security
security = HTTPBearer(auto_error=False)
# Data directories
DATA_DIR = Path("/app/data") if Path("/app/data").exists() else Path(__file__).parent / "data"
DATA_DIR.mkdir(exist_ok=True)
CONFIG_FILE = DATA_DIR / "config.json"
USERS_FILE = DATA_DIR / "users.json"
SSH_KEYS_DIR = DATA_DIR / "ssh_keys"
SSH_KEYS_DIR.mkdir(exist_ok=True)
# Pydantic models
class UserLogin(BaseModel):
username: str
password: str
class ChangePassword(BaseModel):
current_password: str
new_password: str
@validator('new_password')
def password_strength(cls, v):
if len(v) < 6:
raise ValueError('Password must be at least 6 characters')
return v
class IPMIConfig(BaseModel):
host: str
username: str
password: Optional[str] = None # Only required on initial setup
port: int = 623
class SSHConfig(BaseModel):
enabled: bool = False
host: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None
use_key: bool = False
key_filename: Optional[str] = None
class FanSettings(BaseModel):
enabled: Optional[bool] = None
interval: Optional[int] = Field(None, ge=5, le=300)
min_speed: Optional[int] = Field(None, ge=0, le=100)
max_speed: Optional[int] = Field(None, ge=0, le=100)
panic_temp: Optional[float] = Field(None, ge=50, le=100)
panic_speed: Optional[int] = Field(None, ge=0, le=100)
class FanCurvePoint(BaseModel):
temp: float = Field(..., ge=0, le=100)
speed: int = Field(..., ge=0, le=100)
class FanCurveUpdate(BaseModel):
points: List[FanCurvePoint]
class ManualSpeedRequest(BaseModel):
speed: int = Field(..., ge=0, le=100)
class SetupRequest(BaseModel):
admin_username: str = Field(..., min_length=3)
admin_password: str = Field(..., min_length=6)
ipmi_host: str
ipmi_username: str
ipmi_password: str
ipmi_port: int = 623
# User management
class UserManager:
def __init__(self):
self.users_file = USERS_FILE
self._users = {}
self._sessions = {} # token -> (username, expiry)
self._load()
def _load(self):
if self.users_file.exists():
try:
with open(self.users_file) as f:
data = json.load(f)
self._users = data.get('users', {})
except Exception as e:
logger.error(f"Failed to load users: {e}")
self._users = {}
def _save(self):
with open(self.users_file, 'w') as f:
json.dump({'users': self._users}, f)
def _hash_password(self, password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def verify_user(self, username: str, password: str) -> bool:
if username not in self._users:
return False
return self._users[username] == self._hash_password(password)
def create_user(self, username: str, password: str) -> bool:
if username in self._users:
return False
self._users[username] = self._hash_password(password)
self._save()
return True
def change_password(self, username: str, current: str, new: str) -> bool:
if not self.verify_user(username, current):
return False
self._users[username] = self._hash_password(new)
self._save()
return True
def create_token(self, username: str) -> str:
token = secrets.token_urlsafe(32)
expiry = datetime.utcnow() + timedelta(days=7)
self._sessions[token] = (username, expiry)
return token
def verify_token(self, token: str) -> Optional[str]:
if token not in self._sessions:
return None
username, expiry = self._sessions[token]
if datetime.utcnow() > expiry:
del self._sessions[token]
return None
return username
def is_setup_complete(self) -> bool:
return len(self._users) > 0
user_manager = UserManager()
# Auth dependency
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
if not credentials:
raise HTTPException(status_code=401, detail="Not authenticated")
username = user_manager.verify_token(credentials.credentials)
if not username:
raise HTTPException(status_code=401, detail="Invalid or expired token")
return username
# HTML Templates
LOGIN_HTML = '''
Login - IPMI Fan Controller
'''
SETUP_HTML = '''
Setup - IPMI Fan Controller
🌬️ Fan Controller Setup
Configure your server connection
'''
DASHBOARD_HTML = '''
IPMI Fan Controller v2
🌬️ IPMI Fan Controller v2
SSH Configuration (for lm-sensors)
Fan Curve: Temperature → Speed
System Logs
Ready...
'''
# FastAPI app
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
# Ensure data directory exists
DATA_DIR.mkdir(parents=True, exist_ok=True)
yield
app = FastAPI(
title="IPMI Fan Controller v2",
description="Fan control for Dell servers with auth and SSH support",
version="2.1.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Routes
@app.get("/favicon.ico")
async def favicon():
"""Return a simple favicon to prevent 404 errors."""
# Return a transparent 1x1 PNG
transparent_png = bytes([
0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, # PNG signature
0x00, 0x00, 0x00, 0x0D, 0x49, 0x48, 0x44, 0x52, # IHDR chunk
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, # 1x1 pixel
0x08, 0x06, 0x00, 0x00, 0x00, 0x1F, 0x15, 0xC4, 0x89,
0x00, 0x00, 0x00, 0x0D, 0x49, 0x44, 0x41, 0x54, # IDAT chunk (transparent)
0x08, 0xD7, 0x63, 0x60, 0x60, 0x60, 0x00, 0x00, 0x00,
0x02, 0x00, 0x01, 0xE2, 0x21, 0xBC, 0x33,
0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4E, 0x44, # IEND chunk
0xAE, 0x42, 0x60, 0x82
])
from fastapi.responses import Response
return Response(content=transparent_png, media_type="image/png")
@app.get("/")
async def root(request: Request):
"""Main dashboard - always returns dashboard HTML, JS handles auth."""
# Check if setup is needed
if not user_manager.is_setup_complete():
return HTMLResponse(content=SETUP_HTML)
# Always return dashboard - JavaScript will check token and redirect if needed
return HTMLResponse(content=DASHBOARD_HTML)
@app.get("/login")
async def login_page():
"""Login page."""
if not user_manager.is_setup_complete():
return HTMLResponse(content=SETUP_HTML)
return HTMLResponse(content=LOGIN_HTML)
# Auth API
@app.post("/api/auth/login")
async def api_login(credentials: UserLogin):
"""Login and get token."""
if not user_manager.verify_user(credentials.username, credentials.password):
return {"success": False, "error": "Invalid username or password"}
token = user_manager.create_token(credentials.username)
return {"success": True, "token": token}
@app.post("/api/auth/change-password")
async def api_change_password(data: ChangePassword, username: str = Depends(get_current_user)):
"""Change current user's password."""
if not user_manager.change_password(username, data.current_password, data.new_password):
return {"success": False, "error": "Current password is incorrect"}
return {"success": True}
@app.post("/api/setup")
async def api_setup(data: SetupRequest):
"""Initial setup - create admin and configure IPMI."""
if user_manager.is_setup_complete():
return {"success": False, "error": "Setup already completed"}
# Create admin user
if not user_manager.create_user(data.admin_username, data.admin_password):
return {"success": False, "error": "Failed to create user"}
# Configure IPMI
service = get_service(str(CONFIG_FILE))
service.update_config(
ipmi_host=data.ipmi_host,
ipmi_username=data.ipmi_username,
ipmi_password=data.ipmi_password,
ipmi_port=data.ipmi_port
)
# Create token
token = user_manager.create_token(data.admin_username)
return {"success": True, "token": token}
# Status API
@app.get("/api/status")
async def api_status(username: str = Depends(get_current_user)):
"""Get current controller status."""
service = get_service(str(CONFIG_FILE))
status = service.get_status()
return status
@app.post("/api/test")
async def api_test(username: str = Depends(get_current_user)):
"""Test IPMI connection."""
service = get_service(str(CONFIG_FILE))
if not service.controller:
if not service._init_controller():
return {"success": False, "error": "Failed to initialize controller - check config"}
success = service.controller.test_connection()
return {"success": success, "error": None if success else "Connection failed"}
@app.post("/api/control/auto")
async def api_control_auto(data: dict, username: str = Depends(get_current_user)):
"""Enable/disable automatic control."""
service = get_service(str(CONFIG_FILE))
enabled = data.get('enabled', False)
if enabled and not service.config.get('ipmi_host'):
return {"success": False, "error": "IPMI host not configured"}
service.set_auto_mode(enabled)
if enabled and not service.running:
if not service.start():
return {"success": False, "error": "Failed to start service"}
return {"success": True}
@app.post("/api/control/manual")
async def api_control_manual(req: ManualSpeedRequest, username: str = Depends(get_current_user)):
"""Set manual fan speed."""
service = get_service(str(CONFIG_FILE))
if not service.controller:
if not service._init_controller():
return {"success": False, "error": "Failed to connect to server"}
if service.set_manual_speed(req.speed):
return {"success": True}
return {"success": False, "error": "Failed to set fan speed"}
# Config API
@app.post("/api/config/ipmi")
async def api_config_ipmi(data: IPMIConfig, username: str = Depends(get_current_user)):
"""Update IPMI configuration."""
service = get_service(str(CONFIG_FILE))
updates = {
"ipmi_host": data.host,
"ipmi_username": data.username,
"ipmi_port": data.port
}
if data.password:
updates["ipmi_password"] = data.password
service.update_config(**updates)
return {"success": True}
@app.post("/api/config/ssh")
async def api_config_ssh(data: dict, username: str = Depends(get_current_user)):
"""Update SSH configuration."""
service = get_service(str(CONFIG_FILE))
updates = {
"ssh_enabled": data.get('enabled', False),
"ssh_host": data.get('host'),
"ssh_username": data.get('username'),
"ssh_use_key": data.get('use_key', False),
"ssh_port": data.get('port', 22)
}
if data.get('password'):
updates["ssh_password"] = data['password']
# Handle SSH key
if data.get('use_key') and data.get('key_data'):
key_filename = f"ssh_key_{username}"
key_path = SSH_KEYS_DIR / key_filename
try:
with open(key_path, 'w') as f:
f.write(data['key_data'])
key_path.chmod(0o600)
updates["ssh_key_file"] = str(key_path)
except Exception as e:
return {"success": False, "error": f"Failed to save SSH key: {e}"}
service.update_config(**updates)
return {"success": True}
@app.post("/api/config/settings")
async def api_config_settings(data: FanSettings, username: str = Depends(get_current_user)):
"""Update fan control settings."""
service = get_service(str(CONFIG_FILE))
updates = {}
if data.enabled is not None:
updates['enabled'] = data.enabled
if data.interval is not None:
updates['interval'] = data.interval
if data.min_speed is not None:
updates['min_speed'] = data.min_speed
if data.max_speed is not None:
updates['max_speed'] = data.max_speed
if data.panic_temp is not None:
updates['panic_temp'] = data.panic_temp
if data.panic_speed is not None:
updates['panic_speed'] = data.panic_speed
service.update_config(**updates)
return {"success": True}
@app.post("/api/config/curve")
async def api_config_curve(curve: FanCurveUpdate, username: str = Depends(get_current_user)):
"""Update fan curve."""
service = get_service(str(CONFIG_FILE))
points = [{"temp": p.temp, "speed": p.speed} for p in curve.points]
service.update_config(fan_curve=points)
return {"success": True}
@app.post("/api/shutdown")
async def api_shutdown(username: str = Depends(get_current_user)):
"""Return fans to automatic control and stop service."""
service = get_service(str(CONFIG_FILE))
service.stop()
return {"success": True, "message": "Service stopped, fans returned to automatic control"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)