599 lines
20 KiB
Python
599 lines
20 KiB
Python
"""
|
|
EU-Utility - HTTP Client with Caching
|
|
|
|
Thread-safe HTTP client with disk-based caching, automatic retries,
|
|
rate limiting, and Cache-Control header support.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional, Union, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime, timedelta
|
|
|
|
try:
|
|
import requests
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
REQUESTS_AVAILABLE = True
|
|
except ImportError:
|
|
REQUESTS_AVAILABLE = False
|
|
print("Warning: 'requests' library not installed. HTTP client will not function.")
|
|
|
|
|
|
@dataclass
|
|
class CacheEntry:
|
|
"""Represents a cached HTTP response."""
|
|
url: str
|
|
status_code: int
|
|
headers: Dict[str, str]
|
|
content: bytes
|
|
cached_at: float
|
|
expires_at: float
|
|
cache_control: Optional[str] = None
|
|
etag: Optional[str] = None
|
|
last_modified: Optional[str] = None
|
|
|
|
|
|
class HTTPClientError(Exception):
|
|
"""Raised when HTTP client encounters an error."""
|
|
pass
|
|
|
|
|
|
class URLSecurityError(HTTPClientError):
|
|
"""Raised when URL fails security validation."""
|
|
pass
|
|
|
|
|
|
class HTTPClient:
|
|
"""
|
|
Thread-safe singleton HTTP client with caching support.
|
|
|
|
Features:
|
|
- Disk-based response caching
|
|
- TTL support (time-to-live)
|
|
- Auto-retry with exponential backoff
|
|
- Rate limiting between requests
|
|
- Cache-Control header respect
|
|
- Thread-safe operations
|
|
"""
|
|
|
|
_instance = None
|
|
_lock = threading.Lock()
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
if cls._instance is None:
|
|
with cls._lock:
|
|
if cls._instance is None:
|
|
cls._instance = super().__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(
|
|
self,
|
|
cache_dir: str = "cache/http",
|
|
default_cache_ttl: int = 3600,
|
|
rate_limit_delay: float = 0.0,
|
|
max_retries: int = 3,
|
|
backoff_factor: float = 0.5,
|
|
respect_cache_control: bool = True,
|
|
default_headers: Optional[Dict[str, str]] = None
|
|
):
|
|
"""
|
|
Initialize the HTTP client.
|
|
|
|
Args:
|
|
cache_dir: Directory for disk cache
|
|
default_cache_ttl: Default TTL in seconds
|
|
rate_limit_delay: Minimum delay between requests in seconds
|
|
max_retries: Maximum number of retries on failure
|
|
backoff_factor: Backoff factor for retries
|
|
respect_cache_control: Whether to respect Cache-Control headers
|
|
default_headers: Default headers for all requests
|
|
"""
|
|
if self._initialized:
|
|
return
|
|
|
|
if not REQUESTS_AVAILABLE:
|
|
raise RuntimeError("requests library is required for HTTP client")
|
|
|
|
# Settings
|
|
self.cache_dir = Path(cache_dir)
|
|
self.default_cache_ttl = default_cache_ttl
|
|
self.rate_limit_delay = rate_limit_delay
|
|
self.max_retries = max_retries
|
|
self.backoff_factor = backoff_factor
|
|
self.respect_cache_control = respect_cache_control
|
|
self.default_headers = default_headers or {
|
|
'User-Agent': 'EU-Utility/1.0'
|
|
}
|
|
|
|
# Thread safety
|
|
self._cache_lock = threading.RLock()
|
|
self._request_lock = threading.Lock()
|
|
self._last_request_time = 0
|
|
|
|
# Initialize cache directory
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize session with retries
|
|
self.session = requests.Session()
|
|
retry_strategy = Retry(
|
|
total=max_retries,
|
|
backoff_factor=backoff_factor,
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
self.session.mount("http://", adapter)
|
|
self.session.mount("https://", adapter)
|
|
|
|
self._initialized = True
|
|
print(f"[HTTP] Client initialized (cache: {self.cache_dir})")
|
|
|
|
def _validate_url(self, url: str) -> str:
|
|
"""Validate URL for security.
|
|
|
|
Args:
|
|
url: URL to validate
|
|
|
|
Returns:
|
|
Validated URL
|
|
|
|
Raises:
|
|
URLSecurityError: If URL fails validation
|
|
"""
|
|
if not url:
|
|
raise URLSecurityError("URL cannot be empty")
|
|
|
|
# Parse URL
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(url)
|
|
|
|
# Check scheme (only http and https allowed)
|
|
allowed_schemes = {'http', 'https'}
|
|
if parsed.scheme not in allowed_schemes:
|
|
raise URLSecurityError(f"URL scheme '{parsed.scheme}' not allowed. Use http:// or https://")
|
|
|
|
# Check for common attack patterns
|
|
dangerous_patterns = [
|
|
'..', # Path traversal
|
|
'@', # Credential injection
|
|
'\\', # Windows path separator
|
|
'\x00', # Null byte
|
|
]
|
|
|
|
for pattern in dangerous_patterns:
|
|
if pattern in url:
|
|
raise URLSecurityError(f"URL contains dangerous pattern: {repr(pattern)}")
|
|
|
|
# Block private/reserved IP ranges (SSRF protection)
|
|
hostname = parsed.hostname
|
|
if hostname:
|
|
import ipaddress
|
|
try:
|
|
# Check if it's an IP address
|
|
ip = ipaddress.ip_address(hostname)
|
|
# Block private, loopback, reserved, link-local
|
|
if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
|
|
raise URLSecurityError(f"URL resolves to restricted IP address: {hostname}")
|
|
except ValueError:
|
|
# Not an IP address, it's a hostname - that's fine
|
|
pass
|
|
|
|
return url
|
|
"""Generate a cache key from URL and parameters."""
|
|
key_string = url
|
|
if params:
|
|
# Sort params for consistent hashing
|
|
param_str = json.dumps(params, sort_keys=True)
|
|
key_string += "|" + param_str
|
|
return hashlib.sha256(key_string.encode()).hexdigest()
|
|
|
|
def _get_cache_path(self, cache_key: str) -> Path:
|
|
"""Get the file path for a cache entry."""
|
|
# Split into subdirectories for better file system performance
|
|
return self.cache_dir / cache_key[:2] / cache_key[2:4] / f"{cache_key}.json"
|
|
|
|
def _parse_cache_control(self, header: str) -> Optional[int]:
|
|
"""Parse Cache-Control header to extract max-age."""
|
|
if not header:
|
|
return None
|
|
|
|
try:
|
|
for directive in header.split(','):
|
|
directive = directive.strip().lower()
|
|
if directive.startswith('max-age='):
|
|
return int(directive.split('=')[1])
|
|
elif directive == 'no-cache' or directive == 'no-store':
|
|
return 0
|
|
except (ValueError, IndexError):
|
|
pass
|
|
return None
|
|
|
|
def _load_cache_entry(self, cache_key: str) -> Optional[CacheEntry]:
|
|
"""Load a cache entry from disk."""
|
|
cache_path = self._get_cache_path(cache_key)
|
|
|
|
if not cache_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with self._cache_lock:
|
|
with open(cache_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Convert content from base64 if stored that way
|
|
content = data.get('content', '')
|
|
if isinstance(content, str):
|
|
import base64
|
|
content = base64.b64decode(content)
|
|
|
|
return CacheEntry(
|
|
url=data['url'],
|
|
status_code=data['status_code'],
|
|
headers=data['headers'],
|
|
content=content,
|
|
cached_at=data['cached_at'],
|
|
expires_at=data['expires_at'],
|
|
cache_control=data.get('cache_control'),
|
|
etag=data.get('etag'),
|
|
last_modified=data.get('last_modified')
|
|
)
|
|
except Exception as e:
|
|
print(f"[HTTP] Cache load error: {e}")
|
|
return None
|
|
|
|
def _save_cache_entry(self, cache_key: str, entry: CacheEntry):
|
|
"""Save a cache entry to disk."""
|
|
cache_path = self._get_cache_path(cache_key)
|
|
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
with self._cache_lock:
|
|
# Encode content as base64 for JSON serialization
|
|
import base64
|
|
content_b64 = base64.b64encode(entry.content).decode('utf-8')
|
|
|
|
data = {
|
|
'url': entry.url,
|
|
'status_code': entry.status_code,
|
|
'headers': entry.headers,
|
|
'content': content_b64,
|
|
'cached_at': entry.cached_at,
|
|
'expires_at': entry.expires_at,
|
|
'cache_control': entry.cache_control,
|
|
'etag': entry.etag,
|
|
'last_modified': entry.last_modified
|
|
}
|
|
|
|
with open(cache_path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f)
|
|
except Exception as e:
|
|
print(f"[HTTP] Cache save error: {e}")
|
|
|
|
def _is_cache_valid(self, entry: CacheEntry) -> bool:
|
|
"""Check if a cache entry is still valid."""
|
|
return time.time() < entry.expires_at
|
|
|
|
def _apply_rate_limit(self):
|
|
"""Apply rate limiting between requests."""
|
|
if self.rate_limit_delay <= 0:
|
|
return
|
|
|
|
with self._request_lock:
|
|
elapsed = time.time() - self._last_request_time
|
|
if elapsed < self.rate_limit_delay:
|
|
sleep_time = self.rate_limit_delay - elapsed
|
|
time.sleep(sleep_time)
|
|
self._last_request_time = time.time()
|
|
|
|
def _make_request(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
**kwargs
|
|
) -> requests.Response:
|
|
"""Make an HTTP request with rate limiting and retries."""
|
|
self._apply_rate_limit()
|
|
|
|
# Merge headers
|
|
request_headers = self.default_headers.copy()
|
|
if headers:
|
|
request_headers.update(headers)
|
|
|
|
# Make request
|
|
response = self.session.request(
|
|
method=method,
|
|
url=url,
|
|
headers=request_headers,
|
|
**kwargs
|
|
)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
def get(
|
|
self,
|
|
url: str,
|
|
cache_ttl: Optional[int] = None,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
params: Optional[Dict[str, Any]] = None,
|
|
use_cache: bool = True,
|
|
**kwargs
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Perform a GET request with caching.
|
|
|
|
Args:
|
|
url: The URL to fetch
|
|
cache_ttl: Cache TTL in seconds (None = use default)
|
|
headers: Additional headers
|
|
params: URL parameters
|
|
use_cache: Whether to use cache
|
|
**kwargs: Additional arguments for requests
|
|
|
|
Returns:
|
|
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
|
|
"""
|
|
if not REQUESTS_AVAILABLE:
|
|
raise RuntimeError("requests library not available")
|
|
|
|
# Validate URL for security
|
|
self._validate_url(url)
|
|
|
|
cache_key = self._generate_cache_key(url, params)
|
|
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
|
|
|
|
# Check cache if enabled
|
|
if use_cache and ttl > 0:
|
|
cached = self._load_cache_entry(cache_key)
|
|
if cached and self._is_cache_valid(cached):
|
|
return {
|
|
'status_code': cached.status_code,
|
|
'headers': cached.headers,
|
|
'content': cached.content,
|
|
'text': cached.content.decode('utf-8', errors='replace'),
|
|
'json': None,
|
|
'from_cache': True
|
|
}
|
|
|
|
# Make request
|
|
try:
|
|
response = self._make_request('GET', url, headers=headers, params=params, **kwargs)
|
|
|
|
content = response.content
|
|
|
|
# Parse JSON if possible
|
|
json_data = None
|
|
try:
|
|
json_data = response.json()
|
|
except ValueError:
|
|
pass
|
|
|
|
# Cache the response
|
|
if use_cache and ttl > 0:
|
|
# Check Cache-Control header
|
|
cache_control = response.headers.get('Cache-Control', '')
|
|
cache_ttl_override = self._parse_cache_control(cache_control)
|
|
|
|
if cache_ttl_override is not None:
|
|
if self.respect_cache_control:
|
|
if cache_ttl_override == 0:
|
|
# Don't cache
|
|
pass
|
|
else:
|
|
ttl = cache_ttl_override
|
|
|
|
if ttl > 0:
|
|
entry = CacheEntry(
|
|
url=url,
|
|
status_code=response.status_code,
|
|
headers=dict(response.headers),
|
|
content=content,
|
|
cached_at=time.time(),
|
|
expires_at=time.time() + ttl,
|
|
cache_control=cache_control,
|
|
etag=response.headers.get('ETag'),
|
|
last_modified=response.headers.get('Last-Modified')
|
|
)
|
|
self._save_cache_entry(cache_key, entry)
|
|
|
|
return {
|
|
'status_code': response.status_code,
|
|
'headers': dict(response.headers),
|
|
'content': content,
|
|
'text': content.decode('utf-8', errors='replace'),
|
|
'json': json_data,
|
|
'from_cache': False
|
|
}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
# Try to return stale cache on failure
|
|
if use_cache:
|
|
cached = self._load_cache_entry(cache_key)
|
|
if cached:
|
|
print(f"[HTTP] Returning stale cache for {url}")
|
|
return {
|
|
'status_code': cached.status_code,
|
|
'headers': cached.headers,
|
|
'content': cached.content,
|
|
'text': cached.content.decode('utf-8', errors='replace'),
|
|
'json': None,
|
|
'from_cache': True,
|
|
'stale': True
|
|
}
|
|
raise
|
|
|
|
def post(
|
|
self,
|
|
url: str,
|
|
data: Optional[Union[Dict, str, bytes]] = None,
|
|
json: Optional[Dict] = None,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
use_cache: bool = False,
|
|
cache_ttl: Optional[int] = None,
|
|
**kwargs
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Perform a POST request.
|
|
|
|
Args:
|
|
url: The URL to post to
|
|
data: Form data, bytes, or string
|
|
json: JSON data (will be serialized)
|
|
headers: Additional headers
|
|
use_cache: Whether to cache the response
|
|
cache_ttl: Cache TTL in seconds
|
|
**kwargs: Additional arguments for requests
|
|
|
|
Returns:
|
|
Dict with 'status_code', 'headers', 'content', 'text', 'json', 'from_cache'
|
|
"""
|
|
if not REQUESTS_AVAILABLE:
|
|
raise RuntimeError("requests library not available")
|
|
|
|
# Validate URL for security
|
|
self._validate_url(url)
|
|
|
|
cache_key = self._generate_cache_key(url, data or json)
|
|
ttl = cache_ttl if cache_ttl is not None else self.default_cache_ttl
|
|
|
|
# Check cache if enabled (rare for POST but possible)
|
|
if use_cache and ttl > 0:
|
|
cached = self._load_cache_entry(cache_key)
|
|
if cached and self._is_cache_valid(cached):
|
|
return {
|
|
'status_code': cached.status_code,
|
|
'headers': cached.headers,
|
|
'content': cached.content,
|
|
'text': cached.content.decode('utf-8', errors='replace'),
|
|
'json': None,
|
|
'from_cache': True
|
|
}
|
|
|
|
# Make request
|
|
response = self._make_request('POST', url, headers=headers, data=data, json=json, **kwargs)
|
|
|
|
content = response.content
|
|
|
|
# Parse JSON if possible
|
|
json_data = None
|
|
try:
|
|
json_data = response.json()
|
|
except ValueError:
|
|
pass
|
|
|
|
# Cache the response if enabled
|
|
if use_cache and ttl > 0:
|
|
entry = CacheEntry(
|
|
url=url,
|
|
status_code=response.status_code,
|
|
headers=dict(response.headers),
|
|
content=content,
|
|
cached_at=time.time(),
|
|
expires_at=time.time() + ttl
|
|
)
|
|
self._save_cache_entry(cache_key, entry)
|
|
|
|
return {
|
|
'status_code': response.status_code,
|
|
'headers': dict(response.headers),
|
|
'content': content,
|
|
'text': content.decode('utf-8', errors='replace'),
|
|
'json': json_data,
|
|
'from_cache': False
|
|
}
|
|
|
|
def clear_cache(self):
|
|
"""Clear all cached responses."""
|
|
with self._cache_lock:
|
|
if self.cache_dir.exists():
|
|
import shutil
|
|
shutil.rmtree(self.cache_dir)
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
print("[HTTP] Cache cleared")
|
|
|
|
def invalidate_cache(self, url_pattern: str):
|
|
"""
|
|
Invalidate cache entries matching a URL pattern.
|
|
|
|
Args:
|
|
url_pattern: Substring to match in URLs
|
|
"""
|
|
with self._cache_lock:
|
|
count = 0
|
|
for cache_file in self.cache_dir.rglob("*.json"):
|
|
try:
|
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if url_pattern in data.get('url', ''):
|
|
cache_file.unlink()
|
|
count += 1
|
|
except Exception:
|
|
pass
|
|
print(f"[HTTP] Invalidated {count} cache entries matching '{url_pattern}'")
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]:
|
|
"""Get cache statistics."""
|
|
with self._cache_lock:
|
|
total_entries = 0
|
|
total_size = 0
|
|
valid_entries = 0
|
|
|
|
for cache_file in self.cache_dir.rglob("*.json"):
|
|
try:
|
|
total_entries += 1
|
|
total_size += cache_file.stat().st_size
|
|
|
|
with open(cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if time.time() < data.get('expires_at', 0):
|
|
valid_entries += 1
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
'total_entries': total_entries,
|
|
'valid_entries': valid_entries,
|
|
'expired_entries': total_entries - valid_entries,
|
|
'total_size_bytes': total_size,
|
|
'cache_dir': str(self.cache_dir)
|
|
}
|
|
|
|
|
|
# Singleton instance
|
|
_http_client = None
|
|
|
|
def get_http_client(
|
|
cache_dir: str = "cache/http",
|
|
default_cache_ttl: int = 3600,
|
|
rate_limit_delay: float = 0.0,
|
|
max_retries: int = 3,
|
|
backoff_factor: float = 0.5,
|
|
respect_cache_control: bool = True,
|
|
default_headers: Optional[Dict[str, str]] = None
|
|
) -> HTTPClient:
|
|
"""
|
|
Get or create the global HTTP client instance.
|
|
|
|
First call initializes the client with the provided parameters.
|
|
Subsequent calls return the existing instance (parameters ignored).
|
|
"""
|
|
global _http_client
|
|
if _http_client is None:
|
|
_http_client = HTTPClient(
|
|
cache_dir=cache_dir,
|
|
default_cache_ttl=default_cache_ttl,
|
|
rate_limit_delay=rate_limit_delay,
|
|
max_retries=max_retries,
|
|
backoff_factor=backoff_factor,
|
|
respect_cache_control=respect_cache_control,
|
|
default_headers=default_headers
|
|
)
|
|
return _http_client
|