ipmi-fan-control/backend/main.py

998 lines
33 KiB
Python

"""Main FastAPI application."""
import logging
import os
from contextlib import asynccontextmanager
from typing import List, Optional
from fastapi import FastAPI, Depends, HTTPException, status, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from backend.config import settings
from backend.database import init_db, get_db, is_setup_complete, set_setup_complete, User, Server, FanCurve, SystemLog, SensorData, FanData
from backend.auth import (
verify_password, get_password_hash, create_access_token,
decode_access_token, encrypt_password, decrypt_password
)
from backend.ipmi_client import IPMIClient
from backend.fan_control import fan_controller, FanCurveManager, initialize_fan_controller, shutdown_fan_controller
from backend.schemas import (
UserCreate, UserLogin, UserResponse, Token, TokenData,
ServerCreate, ServerUpdate, ServerResponse, ServerDetailResponse, ServerStatusResponse,
FanCurveCreate, FanCurveResponse, FanCurvePoint,
SensorReading, TemperatureReading, FanReading, ServerSensorsResponse,
FanControlCommand, AutoControlSettings, FanCurveApply,
SystemLogResponse, SetupStatus, SetupComplete, DashboardStats, ServerDashboardData
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Security
security = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Get current authenticated user."""
if not credentials:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
payload = decode_access_token(credentials.credentials)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
username: str = payload.get("sub")
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User is inactive"
)
return user
async def get_current_user_optional(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> Optional[User]:
"""Get current user if authenticated, None otherwise."""
try:
return await get_current_user(credentials, db)
except HTTPException:
return None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
# Startup
logger.info("Starting up IPMI Fan Control...")
init_db()
await initialize_fan_controller()
yield
# Shutdown
logger.info("Shutting down IPMI Fan Control...")
await shutdown_fan_controller()
app = FastAPI(
title=settings.APP_NAME,
description="IPMI Fan Control for Dell T710 and compatible servers",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup wizard endpoints
@app.get("/api/setup/status", response_model=SetupStatus)
async def setup_status(db: Session = Depends(get_db)):
"""Check if initial setup is complete."""
return {"setup_complete": is_setup_complete(db)}
@app.post("/api/setup/complete", response_model=UserResponse)
async def complete_setup(setup_data: SetupComplete, db: Session = Depends(get_db)):
"""Complete initial setup by creating admin user."""
if is_setup_complete(db):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Setup already completed"
)
# Create admin user
hashed_password = get_password_hash(setup_data.password)
user = User(
username=setup_data.username,
hashed_password=hashed_password,
is_active=True
)
db.add(user)
# Mark setup as complete
set_setup_complete(db, True)
db.commit()
db.refresh(user)
logger.info(f"Setup completed - admin user '{setup_data.username}' created")
return user
# Authentication endpoints
@app.post("/api/auth/login", response_model=Token)
async def login(credentials: UserLogin, db: Session = Depends(get_db)):
"""Login and get access token."""
user = db.query(User).filter(User.username == credentials.username).first()
if not user or not verify_password(credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User is inactive"
)
# Update last login
from datetime import datetime
user.last_login = datetime.utcnow()
db.commit()
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/api/auth/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_user)):
"""Get current user info."""
return current_user
# Server management endpoints
@app.get("/api/servers", response_model=List[ServerResponse])
async def list_servers(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""List all servers."""
return db.query(Server).all()
@app.post("/api/servers", response_model=ServerResponse, status_code=status.HTTP_201_CREATED)
async def create_server(
server_data: ServerCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Add a new server."""
# Encrypt IPMI password
ipmi_encrypted_password = encrypt_password(server_data.ipmi.ipmi_password)
# Encrypt SSH password if provided
ssh_encrypted_password = None
if server_data.ssh.ssh_password:
ssh_encrypted_password = encrypt_password(server_data.ssh.ssh_password)
server = Server(
name=server_data.name,
# IPMI settings
ipmi_host=server_data.ipmi.ipmi_host,
ipmi_port=server_data.ipmi.ipmi_port,
ipmi_username=server_data.ipmi.ipmi_username,
ipmi_encrypted_password=ipmi_encrypted_password,
# SSH settings
ssh_host=server_data.ssh.ssh_host or server_data.ipmi.ipmi_host,
ssh_port=server_data.ssh.ssh_port,
ssh_username=server_data.ssh.ssh_username,
ssh_encrypted_password=ssh_encrypted_password,
ssh_key_file=server_data.ssh.ssh_key_file,
use_ssh=server_data.ssh.use_ssh,
vendor=server_data.vendor
)
db.add(server)
db.commit()
db.refresh(server)
logger.info(f"Server '{server.name}' added by {current_user.username}")
return server
@app.get("/api/servers/{server_id}", response_model=ServerDetailResponse)
async def get_server(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get server details."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return server
@app.put("/api/servers/{server_id}", response_model=ServerResponse)
async def update_server(
server_id: int,
server_data: ServerUpdate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update server configuration."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
update_data = server_data.dict(exclude_unset=True)
# Handle IPMI password update separately
if "ipmi_password" in update_data:
server.ipmi_encrypted_password = encrypt_password(update_data.pop("ipmi_password"))
# Handle SSH password update separately
if "ssh_password" in update_data:
if update_data["ssh_password"]:
server.ssh_encrypted_password = encrypt_password(update_data.pop("ssh_password"))
else:
update_data.pop("ssh_password")
# Update other fields
for field, value in update_data.items():
if hasattr(server, field):
setattr(server, field, value)
db.commit()
db.refresh(server)
logger.info(f"Server '{server.name}' updated by {current_user.username}")
return server
@app.delete("/api/servers/{server_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_server(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Delete a server."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
# Stop fan control if running
await fan_controller.stop_server_control(server_id)
db.delete(server)
db.commit()
logger.info(f"Server '{server.name}' deleted by {current_user.username}")
return None
@app.get("/api/servers/{server_id}/status", response_model=ServerStatusResponse)
async def get_server_status(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get server connection and controller status."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
# Test connection
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
is_connected = client.test_connection()
except Exception as e:
logger.warning(f"Failed to connect to server {server.name}: {e}")
is_connected = False
return {
"server": server,
"is_connected": is_connected,
"controller_status": fan_controller.get_controller_status(server_id)
}
# Sensor data endpoints
@app.get("/api/servers/{server_id}/sensors", response_model=ServerSensorsResponse)
async def get_server_sensors(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get current sensor readings from server."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
temps = client.get_temperatures()
fans = client.get_fan_speeds()
all_sensors = client.get_all_sensors()
from datetime import datetime
return {
"server_id": server_id,
"temperatures": [t.__dict__ for t in temps],
"fans": [f.__dict__ for f in fans],
"all_sensors": [s.__dict__ for s in all_sensors],
"timestamp": datetime.utcnow()
}
except Exception as e:
logger.error(f"Failed to get sensors from {server.name}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get sensor data: {str(e)}")
@app.get("/api/servers/{server_id}/power", response_model=dict)
async def get_server_power(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get power consumption data."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
power_data = client.get_power_consumption()
if power_data is None:
raise HTTPException(status_code=500, detail="Power data not available")
return power_data
except Exception as e:
logger.error(f"Failed to get power data from {server.name}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get power data: {str(e)}")
# Fan control endpoints
@app.post("/api/servers/{server_id}/fans/manual/enable")
async def enable_manual_fan_control(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Enable manual fan control."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
if client.enable_manual_fan_control():
server.manual_control_enabled = True
server.auto_control_enabled = False # Disable auto when manual enabled
db.commit()
# Log event
log = SystemLog(
server_id=server_id,
event_type="fan_change",
message="Manual fan control enabled",
details=f"Enabled by {current_user.username}"
)
db.add(log)
db.commit()
return {"success": True, "message": "Manual fan control enabled"}
else:
raise HTTPException(status_code=500, detail="Failed to enable manual fan control")
except Exception as e:
logger.error(f"Failed to enable manual control on {server.name}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/servers/{server_id}/fans/manual/disable")
async def disable_manual_fan_control(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Disable manual fan control (return to automatic)."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
if client.disable_manual_fan_control():
server.manual_control_enabled = False
server.auto_control_enabled = False
db.commit()
# Stop auto control task if running
await fan_controller.stop_server_control(server_id)
log = SystemLog(
server_id=server_id,
event_type="fan_change",
message="Manual fan control disabled (automatic mode)",
details=f"Disabled by {current_user.username}"
)
db.add(log)
db.commit()
return {"success": True, "message": "Fan control returned to automatic"}
else:
raise HTTPException(status_code=500, detail="Failed to disable manual fan control")
except Exception as e:
logger.error(f"Failed to disable manual control on {server.name}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/servers/{server_id}/fans/set")
async def set_fan_speed(
server_id: int,
command: FanControlCommand,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Set fan speed manually."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
if not server.manual_control_enabled:
raise HTTPException(
status_code=400,
detail="Manual fan control is not enabled. Enable it first."
)
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
if client.set_fan_speed(command.fan_id, command.speed_percent):
fan_desc = f"Fan {command.fan_id}" if command.fan_id != "0xff" else "All fans"
log = SystemLog(
server_id=server_id,
event_type="fan_change",
message=f"{fan_desc} speed set to {command.speed_percent}%",
details=f"Set by {current_user.username}"
)
db.add(log)
db.commit()
return {"success": True, "message": f"Fan speed set to {command.speed_percent}%"}
else:
raise HTTPException(status_code=500, detail="Failed to set fan speed")
except Exception as e:
logger.error(f"Failed to set fan speed on {server.name}: {e}")
raise HTTPException(status_code=500, detail=str(e))
# Fan curve endpoints
@app.get("/api/servers/{server_id}/fan-curves", response_model=List[FanCurveResponse])
async def list_fan_curves(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""List fan curves for a server."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
return db.query(FanCurve).filter(FanCurve.server_id == server_id).all()
@app.post("/api/servers/{server_id}/fan-curves", response_model=FanCurveResponse)
async def create_fan_curve(
server_id: int,
curve_data: FanCurveCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Create a new fan curve."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
curve = FanCurve(
server_id=server_id,
name=curve_data.name,
curve_data=FanCurveManager.serialize_curve([FanCurvePoint(temp=p.temp, speed=p.speed) for p in curve_data.curve_data]),
sensor_source=curve_data.sensor_source,
is_active=curve_data.is_active
)
db.add(curve)
db.commit()
db.refresh(curve)
return curve
@app.put("/api/servers/{server_id}/fan-curves/{curve_id}", response_model=FanCurveResponse)
async def update_fan_curve(
server_id: int,
curve_id: int,
curve_data: FanCurveCreate,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Update a fan curve."""
curve = db.query(FanCurve).filter(
FanCurve.id == curve_id,
FanCurve.server_id == server_id
).first()
if not curve:
raise HTTPException(status_code=404, detail="Fan curve not found")
curve.name = curve_data.name
curve.curve_data = FanCurveManager.serialize_curve([FanCurvePoint(temp=p.temp, speed=p.speed) for p in curve_data.curve_data])
curve.sensor_source = curve_data.sensor_source
curve.is_active = curve_data.is_active
db.commit()
db.refresh(curve)
return curve
@app.delete("/api/servers/{server_id}/fan-curves/{curve_id}")
async def delete_fan_curve(
server_id: int,
curve_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Delete a fan curve."""
curve = db.query(FanCurve).filter(
FanCurve.id == curve_id,
FanCurve.server_id == server_id
).first()
if not curve:
raise HTTPException(status_code=404, detail="Fan curve not found")
db.delete(curve)
db.commit()
return {"success": True}
# Auto control endpoints
@app.post("/api/servers/{server_id}/auto-control/enable")
async def enable_auto_control(
server_id: int,
settings_data: AutoControlSettings,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Enable automatic fan control with fan curve."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
# Set fan curve if specified
if settings_data.curve_id:
curve = db.query(FanCurve).filter(
FanCurve.id == settings_data.curve_id,
FanCurve.server_id == server_id
).first()
if curve:
server.fan_curve_data = curve.curve_data
server.auto_control_enabled = True
server.manual_control_enabled = True # Auto control requires manual mode
db.commit()
# Start the controller
await fan_controller.start_server_control(server_id)
log = SystemLog(
server_id=server_id,
event_type="fan_change",
message="Automatic fan control enabled",
details=f"Enabled by {current_user.username}"
)
db.add(log)
db.commit()
return {"success": True, "message": "Automatic fan control enabled"}
@app.post("/api/servers/{server_id}/auto-control/disable")
async def disable_auto_control(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Disable automatic fan control."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
server.auto_control_enabled = False
db.commit()
# Stop the controller
await fan_controller.stop_server_control(server_id)
log = SystemLog(
server_id=server_id,
event_type="fan_change",
message="Automatic fan control disabled",
details=f"Disabled by {current_user.username}"
)
db.add(log)
db.commit()
return {"success": True, "message": "Automatic fan control disabled"}
# Dashboard endpoints
@app.get("/api/dashboard/stats", response_model=DashboardStats)
async def get_dashboard_stats(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get dashboard statistics."""
servers = db.query(Server).all()
total_servers = len(servers)
active_servers = sum(1 for s in servers if s.is_active)
manual_servers = sum(1 for s in servers if s.manual_control_enabled)
auto_servers = sum(1 for s in servers if s.auto_control_enabled)
# Count servers in panic mode (this is a simplified check)
panic_servers = 0
for server in servers:
status = fan_controller.get_controller_status(server.id)
if status.get("state") == "panic":
panic_servers += 1
# Get recent logs
recent_logs = db.query(SystemLog).order_by(SystemLog.timestamp.desc()).limit(10).all()
return {
"total_servers": total_servers,
"active_servers": active_servers,
"manual_control_servers": manual_servers,
"auto_control_servers": auto_servers,
"panic_mode_servers": panic_servers,
"recent_logs": recent_logs
}
@app.get("/api/dashboard/servers/{server_id}", response_model=ServerDashboardData)
async def get_server_dashboard(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get detailed dashboard data for a specific server."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
# Get current sensor data
temps = []
fans = []
power_data = None
try:
client = IPMIClient(
host=server.ipmi_host,
username=server.ipmi_username,
password=decrypt_password(server.ipmi_encrypted_password),
port=server.ipmi_port,
vendor=server.vendor
)
if client.test_connection():
temps_readings = client.get_temperatures()
temps = [t.__dict__ for t in temps_readings]
fans_readings = client.get_fan_speeds()
fans = [f.__dict__ for f in fans_readings]
power_data = client.get_power_consumption()
except Exception as e:
logger.warning(f"Could not fetch live data for {server.name}: {e}")
# Get recent historical data
recent_sensor_data = db.query(SensorData).filter(
SensorData.server_id == server_id
).order_by(SensorData.timestamp.desc()).limit(50).all()
recent_fan_data = db.query(FanData).filter(
FanData.server_id == server_id
).order_by(FanData.timestamp.desc()).limit(50).all()
return {
"server": server,
"current_temperatures": temps,
"current_fans": fans,
"recent_sensor_data": recent_sensor_data,
"recent_fan_data": recent_fan_data,
"power_consumption": power_data
}
# System logs endpoint
@app.get("/api/logs", response_model=List[SystemLogResponse])
async def get_system_logs(
server_id: Optional[int] = None,
event_type: Optional[str] = None,
limit: int = 100,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get system logs with optional filtering."""
query = db.query(SystemLog)
if server_id:
query = query.filter(SystemLog.server_id == server_id)
if event_type:
query = query.filter(SystemLog.event_type == event_type)
logs = query.order_by(SystemLog.timestamp.desc()).limit(limit).all()
return logs
# SSH endpoints
@app.get("/api/servers/{server_id}/ssh/sensors")
async def get_ssh_sensors(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get sensor data via SSH (lm-sensors)."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
if not server.use_ssh:
raise HTTPException(status_code=400, detail="SSH not enabled for this server")
try:
from backend.ssh_client import SSHClient
ssh_client = SSHClient(
host=server.ssh_host or server.ipmi_host,
username=server.ssh_username or server.ipmi_username,
password=decrypt_password(server.ssh_encrypted_password) if server.ssh_encrypted_password else None,
port=server.ssh_port
)
if not await ssh_client.connect():
raise HTTPException(status_code=500, detail="Failed to connect via SSH")
try:
cpu_temps = await ssh_client.get_cpu_temperatures()
raw_data = await ssh_client.get_lmsensors_data()
return {
"cpu_temps": [
{
"cpu_name": ct.cpu_name,
"core_temps": ct.core_temps,
"package_temp": ct.package_temp
}
for ct in cpu_temps
],
"raw_data": raw_data
}
finally:
await ssh_client.disconnect()
except Exception as e:
logger.error(f"Failed to get SSH sensor data: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get SSH sensor data: {str(e)}")
@app.get("/api/servers/{server_id}/ssh/system-info")
async def get_ssh_system_info(
server_id: int,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Get system info via SSH."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
if not server.use_ssh:
raise HTTPException(status_code=400, detail="SSH not enabled for this server")
try:
from backend.ssh_client import SSHClient
ssh_client = SSHClient(
host=server.ssh_host or server.ipmi_host,
username=server.ssh_username or server.ipmi_username,
password=decrypt_password(server.ssh_encrypted_password) if server.ssh_encrypted_password else None,
port=server.ssh_port
)
if not await ssh_client.connect():
raise HTTPException(status_code=500, detail="Failed to connect via SSH")
try:
info = await ssh_client.get_system_info()
return info or {}
finally:
await ssh_client.disconnect()
except Exception as e:
logger.error(f"Failed to get SSH system info: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get system info: {str(e)}")
@app.post("/api/servers/{server_id}/ssh/execute")
async def execute_ssh_command(
server_id: int,
command_data: dict,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Execute a command via SSH."""
server = db.query(Server).filter(Server.id == server_id).first()
if not server:
raise HTTPException(status_code=404, detail="Server not found")
if not server.use_ssh:
raise HTTPException(status_code=400, detail="SSH not enabled for this server")
command = command_data.get("command", "").strip()
if not command:
raise HTTPException(status_code=400, detail="Command is required")
# Log the command execution
log = SystemLog(
server_id=server_id,
event_type="info",
message="SSH command executed",
details=f"Command: {command[:50]}..." if len(command) > 50 else f"Command: {command}"
)
db.add(log)
db.commit()
try:
from backend.ssh_client import SSHClient
ssh_client = SSHClient(
host=server.ssh_host or server.ipmi_host,
username=server.ssh_username or server.ipmi_username,
password=decrypt_password(server.ssh_encrypted_password) if server.ssh_encrypted_password else None,
port=server.ssh_port
)
if not await ssh_client.connect():
raise HTTPException(status_code=500, detail="Failed to connect via SSH")
try:
exit_code, stdout, stderr = await ssh_client.execute_command(command)
return {
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr
}
finally:
await ssh_client.disconnect()
except Exception as e:
logger.error(f"Failed to execute SSH command: {e}")
raise HTTPException(status_code=500, detail=f"Command execution failed: {str(e)}")
# Health check endpoint (no auth required)
@app.get("/api/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
# Static files - serve frontend
frontend_path = os.path.join(os.path.dirname(__file__), "..", "frontend", "dist")
if os.path.exists(frontend_path):
app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets")
@app.get("/", include_in_schema=False)
async def serve_index():
return FileResponse(os.path.join(frontend_path, "index.html"))
@app.get("/{path:path}", include_in_schema=False)
async def serve_spa(path: str):
# For SPA routing, return index.html for all non-API routes
if not path.startswith("api/") and not path.startswith("assets/"):
file_path = os.path.join(frontend_path, path)
if os.path.exists(file_path) and os.path.isfile(file_path):
return FileResponse(file_path)
return FileResponse(os.path.join(frontend_path, "index.html"))
raise HTTPException(status_code=404)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)