EU-Utility/core/http_client.py

601 lines
21 KiB
Python

"""
EU-Utility - HTTP Client with Caching
Thread-safe HTTP client with disk-based caching, automatic retries,
rate limiting, and Cache-Control header support.
"""
import os
import json
import hashlib
import time
import threading
from pathlib import Path
from typing import Dict, Any, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
try:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
REQUESTS_AVAILABLE = True
except ImportError:
REQUESTS_AVAILABLE = False
print("Warning: 'requests' library not installed. HTTP client will not function.")
@dataclass
class CacheEntry:
"""Represents a cached HTTP response."""
url: str
status_code: int
headers: Dict[str, str]
content: bytes
cached_at: float
expires_at: float
cache_control: Optional[str] = None
etag: Optional[str] = None
last_modified: Optional[str] = None
class HTTPClientError(Exception):
"""Raised when HTTP client encounters an error."""
pass
class URLSecurityError(HTTPClientError):
"""Raised when URL fails security validation."""
pass
class HTTPClient:
"""
Thread-safe singleton HTTP client with caching support.
Features:
- Disk-based response caching
- TTL support (time-to-live)
- Auto-retry with exponential backoff
- Rate limiting between requests
- Cache-Control header respect
- Thread-safe operations
"""
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(
self,
cache_dir: str = "cache/http",
default_cache_ttl: int = 3600,
rate_limit_delay: float = 0.0,
max_retries: int = 3,
backoff_factor: float = 0.5,
respect_cache_control: bool = True,
default_headers: Optional[Dict[str, str]] = None
):
"""
Initialize the HTTP client.
Args:
cache_dir: Directory for disk cache
default_cache_ttl: Default TTL in seconds
rate_limit_delay: Minimum delay between requests in seconds
max_retries: Maximum number of retries on failure
backoff_factor: Backoff factor for retries
respect_cache_control: Whether to respect Cache-Control headers
default_headers: Default headers for all requests
"""
if self._initialized:
return
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library is required for HTTP client")
# Settings
self.cache_dir = Path(cache_dir)
self.default_cache_ttl = default_cache_ttl
self.rate_limit_delay = rate_limit_delay
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.respect_cache_control = respect_cache_control
self.default_headers = default_headers or {
'User-Agent': 'EU-Utility/1.0'
}
# Thread safety
self._cache_lock = threading.RLock()
self._request_lock = threading.Lock()
self._last_request_time = 0
# Initialize cache directory
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Initialize session with retries
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self._initialized = True
print(f"[HTTP] Client initialized (cache: {self.cache_dir})")
def _validate_url(self, url: str) -> str:
"""Validate URL for security.
Args:
url: URL to validate
Returns:
Validated URL
Raises:
URLSecurityError: If URL fails validation
"""
if not url:
raise URLSecurityError("URL cannot be empty")
# Parse URL
from urllib.parse import urlparse
parsed = urlparse(url)
# Check scheme (only http and https allowed)
allowed_schemes = {'http', 'https'}
if parsed.scheme not in allowed_schemes:
raise URLSecurityError(f"URL scheme '{parsed.scheme}' not allowed. Use http:// or https://")
# Check for common attack patterns
dangerous_patterns = [
'..', # Path traversal
'@', # Credential injection
'\\', # Windows path separator
'\x00', # Null byte
]
for pattern in dangerous_patterns:
if pattern in url:
raise URLSecurityError(f"URL contains dangerous pattern: {repr(pattern)}")
# Block private/reserved IP ranges (SSRF protection)
hostname = parsed.hostname
if hostname:
import ipaddress
try:
# Check if it's an IP address
ip = ipaddress.ip_address(hostname)
# Block private, loopback, reserved, link-local
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
raise URLSecurityError(f"URL resolves to restricted IP address: {hostname}")
except ValueError:
# Not an IP address, it's a hostname - that's fine
pass
return url
def _generate_cache_key(self, url: str, params: Optional[Dict] = None) -> str:
"""Generate a cache key from URL and parameters."""
key_string = url
if params:
# Sort params for consistent hashing
param_str = json.dumps(params, sort_keys=True)
key_string += "|" + param_str
return hashlib.sha256(key_string.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> Path:
"""Get the file path for a cache entry."""
# Split into subdirectories for better file system performance
return self.cache_dir / cache_key[:2] / cache_key[2:4] / f"{cache_key}.json"
def _parse_cache_control(self, header: str) -> Optional[int]:
"""Parse Cache-Control header to extract max-age."""
if not header:
return None
try:
for directive in header.split(','):
directive = directive.strip().lower()
if directive.startswith('max-age='):
return int(directive.split('=')[1])
elif directive == 'no-cache' or directive == 'no-store':
return 0
except (ValueError, IndexError):
pass
return None
def _load_cache_entry(self, cache_key: str) -> Optional[CacheEntry]:
"""Load a cache entry from disk."""
cache_path = self._get_cache_path(cache_key)
if not cache_path.exists():
return None
try:
with self._cache_lock:
with open(cache_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Convert content from base64 if stored that way
content = data.get('content', '')
if isinstance(content, str):
import base64
content = base64.b64decode(content)
return CacheEntry(
url=data['url'],
status_code=data['status_code'],
headers=data['headers'],
content=content,
cached_at=data['cached_at'],
expires_at=data['expires_at'],
cache_control=data.get('cache_control'),
etag=data.get('etag'),
last_modified=data.get('last_modified')
)
except Exception as e:
print(f"[HTTP] Cache load error: {e}")
return None
def _save_cache_entry(self, cache_key: str, entry: CacheEntry):
"""Save a cache entry to disk."""
cache_path = self._get_cache_path(cache_key)
cache_path.parent.mkdir(parents=True, exist_ok=True)
try:
with self._cache_lock:
# Encode content as base64 for JSON serialization
import base64
content_b64 = base64.b64encode(entry.content).decode('utf-8')
data = {
'url': entry.url,
'status_code': entry.status_code,
'headers': entry.headers,
'content': content_b64,
'cached_at': entry.cached_at,
'expires_at': entry.expires_at,
'cache_control': entry.cache_control,
'etag': entry.etag,
'last_modified': entry.last_modified
}
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f)
except Exception as e:
print(f"[HTTP] Cache save error: {e}")
def _is_cache_valid(self, entry: CacheEntry) -> bool:
"""Check if a cache entry is still valid."""
return time.time() < entry.expires_at
def _apply_rate_limit(self):
"""Apply rate limiting between requests."""
if self.rate_limit_delay <= 0:
return
with self._request_lock:
elapsed = time.time() - self._last_request_time
if elapsed < self.rate_limit_delay:
sleep_time = self.rate_limit_delay - elapsed
time.sleep(sleep_time)
self._last_request_time = time.time()
def _make_request(
self,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
**kwargs
) -> requests.Response:
"""Make an HTTP request with rate limiting and retries."""
self._apply_rate_limit()
# Merge headers
request_headers = self.default_headers.copy()
if headers:
request_headers.update(headers)
# Make request
response = self.session.request(
method=method,
url=url,
headers=request_headers,
**kwargs
)
response.raise_for_status()
return response
def get(
self,
url: str,
cache_ttl: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, Any]] = None,
use_cache: bool = True,
**kwargs
) -> Dict[str, Any]:
"""
Perform a GET request with caching.
Args:
url: The URL to fetch
cache_ttl: Cache TTL in seconds (None = use default)
headers: Additional headers
params: URL parameters
use_cache: Whether to use cache
**kwargs: Additional arguments for requests
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library not available")
# Validate URL for security
self._validate_url(url)
cache_key = self._generate_cache_key(url, params)
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
# Check cache if enabled
if use_cache and ttl > 0:
cached = self._load_cache_entry(cache_key)
if cached and self._is_cache_valid(cached):
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True
}
# Make request
try:
response = self._make_request('GET', url, headers=headers, params=params, **kwargs)
content = response.content
# Parse JSON if possible
json_data = None
try:
json_data = response.json()
except ValueError:
pass
# Cache the response
if use_cache and ttl > 0:
# Check Cache-Control header
cache_control = response.headers.get('Cache-Control', '')
cache_ttl_override = self._parse_cache_control(cache_control)
if cache_ttl_override is not None:
if self.respect_cache_control:
if cache_ttl_override == 0:
# Don't cache
pass
else:
ttl = cache_ttl_override
if ttl > 0:
entry = CacheEntry(
url=url,
status_code=response.status_code,
headers=dict(response.headers),
content=content,
cached_at=time.time(),
expires_at=time.time() + ttl,
cache_control=cache_control,
etag=response.headers.get('ETag'),
last_modified=response.headers.get('Last-Modified')
)
self._save_cache_entry(cache_key, entry)
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': content,
'text': content.decode('utf-8', errors='replace'),
'json': json_data,
'from_cache': False
}
except requests.exceptions.RequestException as e:
# Try to return stale cache on failure
if use_cache:
cached = self._load_cache_entry(cache_key)
if cached:
print(f"[HTTP] Returning stale cache for {url}")
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True,
'stale': True
}
raise
def post(
self,
url: str,
data: Optional[Union[Dict, str, bytes]] = None,
json: Optional[Dict] = None,
headers: Optional[Dict[str, str]] = None,
use_cache: bool = False,
cache_ttl: Optional[int] = None,
**kwargs
) -> Dict[str, Any]:
"""
Perform a POST request.
Args:
url: The URL to post to
data: Form data, bytes, or string
json: JSON data (will be serialized)
headers: Additional headers
use_cache: Whether to cache the response
cache_ttl: Cache TTL in seconds
**kwargs: Additional arguments for requests
Returns:
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
"""
if not REQUESTS_AVAILABLE:
raise RuntimeError("requests library not available")
# Validate URL for security
self._validate_url(url)
cache_key = self._generate_cache_key(url, data or json)
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
# Check cache if enabled (rare for POST but possible)
if use_cache and ttl > 0:
cached = self._load_cache_entry(cache_key)
if cached and self._is_cache_valid(cached):
return {
'status_code': cached.status_code,
'headers': cached.headers,
'content': cached.content,
'text': cached.content.decode('utf-8', errors='replace'),
'json': None,
'from_cache': True
}
# Make request
response = self._make_request('POST', url, headers=headers, data=data, json=json, **kwargs)
content = response.content
# Parse JSON if possible
json_data = None
try:
json_data = response.json()
except ValueError:
pass
# Cache the response if enabled
if use_cache and ttl > 0:
entry = CacheEntry(
url=url,
status_code=response.status_code,
headers=dict(response.headers),
content=content,
cached_at=time.time(),
expires_at=time.time() + ttl
)
self._save_cache_entry(cache_key, entry)
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': content,
'text': content.decode('utf-8', errors='replace'),
'json': json_data,
'from_cache': False
}
def clear_cache(self):
"""Clear all cached responses."""
with self._cache_lock:
if self.cache_dir.exists():
import shutil
shutil.rmtree(self.cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
print("[HTTP] Cache cleared")
def invalidate_cache(self, url_pattern: str):
"""
Invalidate cache entries matching a URL pattern.
Args:
url_pattern: Substring to match in URLs
"""
with self._cache_lock:
count = 0
for cache_file in self.cache_dir.rglob("*.json"):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if url_pattern in data.get('url', ''):
cache_file.unlink()
count += 1
except Exception:
pass
print(f"[HTTP] Invalidated {count} cache entries matching '{url_pattern}'")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self._cache_lock:
total_entries = 0
total_size = 0
valid_entries = 0
for cache_file in self.cache_dir.rglob("*.json"):
try:
total_entries += 1
total_size += cache_file.stat().st_size
with open(cache_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if time.time() < data.get('expires_at', 0):
valid_entries += 1
except Exception:
pass
return {
'total_entries': total_entries,
'valid_entries': valid_entries,
'expired_entries': total_entries - valid_entries,
'total_size_bytes': total_size,
'cache_dir': str(self.cache_dir)
}
# Singleton instance
_http_client = None
def get_http_client(
cache_dir: str = "cache/http",
default_cache_ttl: int = 3600,
rate_limit_delay: float = 0.0,
max_retries: int = 3,
backoff_factor: float = 0.5,
respect_cache_control: bool = True,
default_headers: Optional[Dict[str, str]] = None
) -> HTTPClient:
"""
Get or create the global HTTP client instance.
First call initializes the client with the provided parameters.
Subsequent calls return the existing instance (parameters ignored).
"""
global _http_client
if _http_client is None:
_http_client = HTTPClient(
cache_dir=cache_dir,
default_cache_ttl=default_cache_ttl,
rate_limit_delay=rate_limit_delay,
max_retries=max_retries,
backoff_factor=backoff_factor,
respect_cache_control=respect_cache_control,
default_headers=default_headers
)
return _http_client