ipmi-fan-control/backend/auth.py

99 lines
2.9 KiB
Python

"""Authentication and security utilities."""
from datetime import datetime, timedelta
from typing import Optional, Union
import secrets
import hashlib
import base64
import logging
from jose import JWTError, jwt
from passlib.context import CryptContext
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from config import settings
logger = logging.getLogger(__name__)
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Encryption key for server passwords (generated from secret key)
def get_encryption_key() -> bytes:
"""Generate encryption key from secret key."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b"ipmi_fan_control_salt", # Fixed salt - in production use random salt stored in DB
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(settings.SECRET_KEY.encode()))
return key
def get_fernet() -> Fernet:
"""Get Fernet cipher instance."""
return Fernet(get_encryption_key())
def encrypt_password(password: str) -> str:
"""Encrypt a password for storage."""
try:
f = get_fernet()
encrypted = f.encrypt(password.encode())
return encrypted.decode()
except Exception as e:
logger.error(f"Failed to encrypt password: {e}")
raise
def decrypt_password(encrypted_password: str) -> str:
"""Decrypt a stored password."""
try:
f = get_fernet()
decrypted = f.decrypt(encrypted_password.encode())
return decrypted.decode()
except Exception as e:
logger.error(f"Failed to decrypt password: {e}")
raise
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""Decode and verify JWT access token."""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError as e:
logger.warning(f"JWT decode error: {e}")
return None
def generate_setup_token() -> str:
"""Generate a one-time setup token for initial configuration."""
return secrets.token_urlsafe(32)