248 lines
9.1 KiB
Python
248 lines
9.1 KiB
Python
"""SSH client for connecting to servers to get lm-sensors data."""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
import asyncssh
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class SensorData:
|
|
"""Sensor data from lm-sensors."""
|
|
name: str
|
|
adapter: str
|
|
values: Dict[str, float]
|
|
unit: str
|
|
|
|
|
|
@dataclass
|
|
class CPUTemp:
|
|
"""CPU temperature data."""
|
|
cpu_name: str
|
|
core_temps: Dict[str, float]
|
|
package_temp: Optional[float]
|
|
|
|
|
|
class SSHClient:
|
|
"""SSH client for server sensor monitoring."""
|
|
|
|
def __init__(self, host: str, username: str, password: Optional[str] = None,
|
|
port: int = 22, key_file: Optional[str] = None):
|
|
self.host = host
|
|
self.username = username
|
|
self.password = password
|
|
self.port = port
|
|
self.key_file = key_file
|
|
self._conn = None
|
|
|
|
async def connect(self) -> bool:
|
|
"""Connect to the server via SSH."""
|
|
try:
|
|
conn_options = {}
|
|
if self.password:
|
|
conn_options['password'] = self.password
|
|
if self.key_file:
|
|
conn_options['client_keys'] = [self.key_file]
|
|
|
|
self._conn = await asyncssh.connect(
|
|
self.host,
|
|
port=self.port,
|
|
username=self.username,
|
|
known_hosts=None, # Allow unknown hosts (use with caution)
|
|
**conn_options
|
|
)
|
|
logger.info(f"SSH connected to {self.host}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"SSH connection failed to {self.host}: {e}")
|
|
return False
|
|
|
|
async def disconnect(self):
|
|
"""Disconnect from the server."""
|
|
if self._conn:
|
|
self._conn.close()
|
|
await self._conn.wait_closed()
|
|
self._conn = None
|
|
|
|
async def test_connection(self) -> bool:
|
|
"""Test SSH connection."""
|
|
if not self._conn:
|
|
if not await self.connect():
|
|
return False
|
|
|
|
try:
|
|
result = await self._conn.run('echo "test"', check=True)
|
|
return result.exit_status == 0
|
|
except Exception as e:
|
|
logger.error(f"SSH test failed: {e}")
|
|
return False
|
|
|
|
async def get_lmsensors_data(self) -> Optional[Dict[str, Any]]:
|
|
"""Get sensor data from lm-sensors."""
|
|
if not self._conn:
|
|
if not await self.connect():
|
|
return None
|
|
|
|
try:
|
|
# Check if sensors command exists
|
|
result = await self._conn.run('which sensors', check=False)
|
|
if result.exit_status != 0:
|
|
logger.warning("lm-sensors not installed on remote server")
|
|
return None
|
|
|
|
# Get sensor data in JSON format
|
|
result = await self._conn.run('sensors -j', check=False)
|
|
if result.exit_status == 0:
|
|
return json.loads(result.stdout)
|
|
else:
|
|
# Try without JSON format
|
|
result = await self._conn.run('sensors', check=False)
|
|
if result.exit_status == 0:
|
|
return self._parse_sensors_text(result.stdout)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get lm-sensors data: {e}")
|
|
return None
|
|
|
|
def _parse_sensors_text(self, output: str) -> Dict[str, Any]:
|
|
"""Parse plain text sensors output."""
|
|
data = {}
|
|
current_adapter = None
|
|
|
|
for line in output.split('\n'):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Adapter line
|
|
if line.startswith('Adapter:'):
|
|
current_adapter = line.replace('Adapter:', '').strip()
|
|
continue
|
|
|
|
# Chip header
|
|
if ':' not in line and line:
|
|
current_chip = line
|
|
if current_chip not in data:
|
|
data[current_chip] = {}
|
|
if current_adapter:
|
|
data[current_chip]['Adapter'] = current_adapter
|
|
continue
|
|
|
|
# Sensor value
|
|
if ':' in line and current_chip in data:
|
|
parts = line.split(':')
|
|
if len(parts) == 2:
|
|
key = parts[0].strip()
|
|
value_str = parts[1].strip()
|
|
# Try to extract numeric value
|
|
try:
|
|
# Remove units and extract number
|
|
value_clean = ''.join(c for c in value_str if c.isdigit() or c == '.' or c == '-')
|
|
if value_clean:
|
|
data[current_chip][key] = float(value_clean)
|
|
except ValueError:
|
|
data[current_chip][key] = value_str
|
|
|
|
return data
|
|
|
|
async def get_cpu_temperatures(self) -> List[CPUTemp]:
|
|
"""Get CPU temperatures from lm-sensors."""
|
|
sensors_data = await self.get_lmsensors_data()
|
|
if not sensors_data:
|
|
return []
|
|
|
|
cpu_temps = []
|
|
|
|
for chip_name, chip_data in sensors_data.items():
|
|
# Look for coretemp or k10temp (AMD) chips
|
|
if 'coretemp' in chip_name.lower() or 'k10temp' in chip_name.lower():
|
|
core_temps = {}
|
|
package_temp = None
|
|
|
|
for key, value in chip_data.items():
|
|
# Skip metadata fields
|
|
if key in ['Adapter']:
|
|
continue
|
|
|
|
# Handle nested JSON structure from sensors -j
|
|
# e.g., "Core 0": {"temp2_input": 31, "temp2_max": 79, ...}
|
|
if isinstance(value, dict):
|
|
# Look for temp*_input field which contains the actual temperature
|
|
for sub_key, sub_value in value.items():
|
|
if 'input' in sub_key.lower() and isinstance(sub_value, (int, float)):
|
|
temp_value = float(sub_value)
|
|
if 'core' in key.lower():
|
|
core_temps[key] = temp_value
|
|
elif 'tdie' in key.lower() or 'tctl' in key.lower() or 'package' in key.lower():
|
|
package_temp = temp_value
|
|
break # Only take the first _input value
|
|
|
|
# Handle flat structure (fallback for text parsing)
|
|
elif isinstance(value, (int, float)):
|
|
if 'core' in key.lower():
|
|
core_temps[key] = float(value)
|
|
elif 'tdie' in key.lower() or 'tctl' in key.lower() or 'package' in key.lower():
|
|
package_temp = float(value)
|
|
|
|
if core_temps or package_temp:
|
|
cpu_temps.append(CPUTemp(
|
|
cpu_name=chip_name,
|
|
core_temps=core_temps,
|
|
package_temp=package_temp
|
|
))
|
|
|
|
return cpu_temps
|
|
|
|
async def get_system_info(self) -> Optional[Dict[str, str]]:
|
|
"""Get basic system information."""
|
|
if not self._conn:
|
|
if not await self.connect():
|
|
return None
|
|
|
|
try:
|
|
info = {}
|
|
|
|
# CPU info
|
|
result = await self._conn.run('cat /proc/cpuinfo | grep "model name" | head -1', check=False)
|
|
if result.exit_status == 0:
|
|
info['cpu'] = result.stdout.split(':')[1].strip() if ':' in result.stdout else 'Unknown'
|
|
|
|
# Memory info
|
|
result = await self._conn.run('free -h | grep Mem', check=False)
|
|
if result.exit_status == 0:
|
|
parts = result.stdout.split()
|
|
if len(parts) >= 2:
|
|
info['memory'] = parts[1]
|
|
|
|
# OS info
|
|
result = await self._conn.run('cat /etc/os-release | grep PRETTY_NAME', check=False)
|
|
if result.exit_status == 0:
|
|
info['os'] = result.stdout.split('=')[1].strip().strip('"')
|
|
|
|
# Uptime
|
|
result = await self._conn.run('uptime -p', check=False)
|
|
if result.exit_status == 0:
|
|
info['uptime'] = result.stdout.strip()
|
|
|
|
return info
|
|
except Exception as e:
|
|
logger.error(f"Failed to get system info: {e}")
|
|
return None
|
|
|
|
async def execute_command(self, command: str) -> tuple[int, str, str]:
|
|
"""Execute a custom command on the server."""
|
|
if not self._conn:
|
|
if not await self.connect():
|
|
return -1, "", "Not connected"
|
|
|
|
try:
|
|
result = await self._conn.run(command, check=False)
|
|
return result.exit_status, result.stdout, result.stderr
|
|
except Exception as e:
|
|
logger.error(f"Command execution failed: {e}")
|
|
return -1, "", str(e)
|