405 lines
13 KiB
Python
405 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
EU-Utility Webhook Payload Validator
|
||
=====================================
|
||
|
||
Validates webhook payloads for various integrations:
|
||
- Discord webhooks
|
||
- Home Assistant webhooks
|
||
- Generic HTTP webhooks
|
||
|
||
Usage:
|
||
python webhook_validator.py --help
|
||
python webhook_validator.py discord payload.json
|
||
python webhook_validator.py homeassistant payload.json
|
||
python webhook_validator.py test
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import argparse
|
||
from typing import Dict, Any, List, Tuple, Optional
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
|
||
@dataclass
|
||
class ValidationResult:
|
||
"""Validation result container."""
|
||
valid: bool
|
||
errors: List[str]
|
||
warnings: List[str]
|
||
info: List[str]
|
||
|
||
|
||
class WebhookValidator:
|
||
"""Base webhook validator."""
|
||
|
||
def validate(self, payload: Dict) -> ValidationResult:
|
||
"""Validate a payload. Must be implemented by subclasses."""
|
||
raise NotImplementedError
|
||
|
||
def validate_json(self, data: str) -> Tuple[bool, Optional[Dict], str]:
|
||
"""Validate JSON format."""
|
||
try:
|
||
payload = json.loads(data)
|
||
return True, payload, "Valid JSON"
|
||
except json.JSONDecodeError as e:
|
||
return False, None, f"Invalid JSON: {e}"
|
||
|
||
|
||
class DiscordWebhookValidator(WebhookValidator):
|
||
"""Validator for Discord webhook payloads."""
|
||
|
||
MAX_CONTENT_LENGTH = 2000
|
||
MAX_EMBEDS = 10
|
||
MAX_EMBED_TITLE = 256
|
||
MAX_EMBED_DESCRIPTION = 4096
|
||
MAX_FIELDS = 25
|
||
MAX_FIELD_NAME = 256
|
||
MAX_FIELD_VALUE = 1024
|
||
MAX_FOOTER_TEXT = 2048
|
||
MAX_AUTHOR_NAME = 256
|
||
|
||
COLORS = {
|
||
"DEFAULT": 0,
|
||
"AQUA": 1752220,
|
||
"GREEN": 3066993,
|
||
"BLUE": 3447003,
|
||
"PURPLE": 10181046,
|
||
"GOLD": 15844367,
|
||
"ORANGE": 15105570,
|
||
"RED": 15158332,
|
||
"GREY": 9807270,
|
||
"DARKER_GREY": 8359053,
|
||
"NAVY": 3426654,
|
||
"DARK_AQUA": 1146986,
|
||
"DARK_GREEN": 2067276,
|
||
"DARK_BLUE": 2123412,
|
||
"DARK_PURPLE": 7419530,
|
||
"DARK_GOLD": 12745742,
|
||
"DARK_ORANGE": 11027200,
|
||
"DARK_RED": 10038562,
|
||
"DARK_GREY": 9936031,
|
||
"LIGHT_GREY": 12370112,
|
||
"DARK_NAVY": 2899536,
|
||
}
|
||
|
||
def validate(self, payload: Dict) -> ValidationResult:
|
||
"""Validate Discord webhook payload."""
|
||
errors = []
|
||
warnings = []
|
||
info = []
|
||
|
||
# Check required fields
|
||
if "content" not in payload and "embeds" not in payload:
|
||
errors.append("Payload must contain either 'content' or 'embeds'")
|
||
return ValidationResult(False, errors, warnings, info)
|
||
|
||
# Validate content
|
||
if "content" in payload:
|
||
content = payload["content"]
|
||
if content:
|
||
if len(content) > self.MAX_CONTENT_LENGTH:
|
||
errors.append(f"Content exceeds {self.MAX_CONTENT_LENGTH} characters")
|
||
|
||
# Check for @everyone or @here
|
||
if "@everyone" in content or "@here" in content:
|
||
warnings.append("Content contains @everyone or @here")
|
||
|
||
info.append(f"Content length: {len(content)}")
|
||
|
||
# Validate embeds
|
||
if "embeds" in payload:
|
||
embeds = payload["embeds"]
|
||
|
||
if not isinstance(embeds, list):
|
||
errors.append("Embeds must be an array")
|
||
else:
|
||
if len(embeds) > self.MAX_EMBEDS:
|
||
errors.append(f"Too many embeds (max {self.MAX_EMBEDS})")
|
||
|
||
for i, embed in enumerate(embeds):
|
||
embed_errors, embed_warnings = self._validate_embed(embed, i)
|
||
errors.extend(embed_errors)
|
||
warnings.extend(embed_warnings)
|
||
|
||
# Validate username
|
||
if "username" in payload:
|
||
if len(payload["username"]) > 80:
|
||
errors.append("Username exceeds 80 characters")
|
||
|
||
# Validate avatar_url
|
||
if "avatar_url" in payload:
|
||
url = payload["avatar_url"]
|
||
if not url.startswith(("http://", "https://")):
|
||
errors.append("Avatar URL must be HTTP(S)")
|
||
|
||
# Validate allowed_mentions
|
||
if "allowed_mentions" in payload:
|
||
self._validate_allowed_mentions(payload["allowed_mentions"], errors, warnings)
|
||
|
||
# Validate components (buttons)
|
||
if "components" in payload:
|
||
warnings.append("Components (buttons) require application-owned webhook")
|
||
|
||
valid = len(errors) == 0
|
||
return ValidationResult(valid, errors, warnings, info)
|
||
|
||
def _validate_embed(self, embed: Dict, index: int) -> Tuple[List[str], List[str]]:
|
||
"""Validate a single embed."""
|
||
errors = []
|
||
warnings = []
|
||
prefix = f"Embed[{index}]: "
|
||
|
||
# Check title
|
||
if "title" in embed:
|
||
if len(embed["title"]) > self.MAX_EMBED_TITLE:
|
||
errors.append(f"{prefix}Title exceeds {self.MAX_EMBED_TITLE} characters")
|
||
|
||
# Check description
|
||
if "description" in embed:
|
||
if len(embed["description"]) > self.MAX_EMBED_DESCRIPTION:
|
||
errors.append(f"{prefix}Description exceeds {self.MAX_EMBED_DESCRIPTION} characters")
|
||
|
||
# Check color
|
||
if "color" in embed:
|
||
color = embed["color"]
|
||
if not isinstance(color, int):
|
||
warnings.append(f"{prefix}Color should be an integer")
|
||
elif color < 0 or color > 16777215:
|
||
errors.append(f"{prefix}Color must be between 0 and 16777215")
|
||
|
||
# Check fields
|
||
if "fields" in embed:
|
||
fields = embed["fields"]
|
||
if not isinstance(fields, list):
|
||
errors.append(f"{prefix}Fields must be an array")
|
||
elif len(fields) > self.MAX_FIELDS:
|
||
errors.append(f"{prefix}Too many fields (max {self.MAX_FIELDS})")
|
||
else:
|
||
for i, field in enumerate(fields):
|
||
if "name" not in field or "value" not in field:
|
||
errors.append(f"{prefix}Field[{i}] missing name or value")
|
||
else:
|
||
if len(field["name"]) > self.MAX_FIELD_NAME:
|
||
errors.append(f"{prefix}Field[{i}] name exceeds {self.MAX_FIELD_NAME} characters")
|
||
if len(field["value"]) > self.MAX_FIELD_VALUE:
|
||
errors.append(f"{prefix}Field[{i}] value exceeds {self.MAX_FIELD_VALUE} characters")
|
||
|
||
# Check footer
|
||
if "footer" in embed:
|
||
footer = embed["footer"]
|
||
if "text" in footer and len(footer["text"]) > self.MAX_FOOTER_TEXT:
|
||
errors.append(f"{prefix}Footer text exceeds {self.MAX_FOOTER_TEXT} characters")
|
||
|
||
# Check author
|
||
if "author" in embed:
|
||
author = embed["author"]
|
||
if "name" in author and len(author["name"]) > self.MAX_AUTHOR_NAME:
|
||
errors.append(f"{prefix}Author name exceeds {self.MAX_AUTHOR_NAME} characters")
|
||
|
||
# Check timestamp
|
||
if "timestamp" in embed:
|
||
warnings.append(f"{prefix}Timestamp: {embed['timestamp']}")
|
||
|
||
return errors, warnings
|
||
|
||
def _validate_allowed_mentions(self, allowed: Dict, errors: List, warnings: List):
|
||
"""Validate allowed_mentions object."""
|
||
valid_keys = {"parse", "roles", "users", "replied_user"}
|
||
|
||
for key in allowed.keys():
|
||
if key not in valid_keys:
|
||
errors.append(f"Invalid allowed_mentions key: {key}")
|
||
|
||
if "parse" in allowed:
|
||
valid_parse = {"everyone", "users", "roles"}
|
||
for item in allowed["parse"]:
|
||
if item not in valid_parse:
|
||
errors.append(f"Invalid parse value: {item}")
|
||
|
||
|
||
class HomeAssistantWebhookValidator(WebhookValidator):
|
||
"""Validator for Home Assistant webhook payloads."""
|
||
|
||
def validate(self, payload: Dict) -> ValidationResult:
|
||
"""Validate Home Assistant webhook payload."""
|
||
errors = []
|
||
warnings = []
|
||
info = []
|
||
|
||
# HA webhooks are flexible, but we can check for common patterns
|
||
|
||
# Check for event_type (common in HA webhooks)
|
||
if "event_type" in payload:
|
||
info.append(f"Event type: {payload['event_type']}")
|
||
|
||
# Check for event_data
|
||
if "event_data" in payload:
|
||
if not isinstance(payload["event_data"], dict):
|
||
warnings.append("event_data should be an object")
|
||
else:
|
||
info.append(f"Event data keys: {list(payload['event_data'].keys())}")
|
||
|
||
# Check for state (sensor updates)
|
||
if "state" in payload:
|
||
info.append(f"State: {payload['state']}")
|
||
|
||
# Check for attributes
|
||
if "attributes" in payload:
|
||
if not isinstance(payload["attributes"], dict):
|
||
warnings.append("attributes should be an object")
|
||
|
||
valid = len(errors) == 0
|
||
return ValidationResult(valid, errors, warnings, info)
|
||
|
||
|
||
class GenericWebhookValidator(WebhookValidator):
|
||
"""Generic webhook validator."""
|
||
|
||
MAX_PAYLOAD_SIZE = 1024 * 1024 # 1MB
|
||
|
||
def validate(self, payload: Dict) -> ValidationResult:
|
||
"""Validate generic webhook payload."""
|
||
errors = []
|
||
warnings = []
|
||
info = []
|
||
|
||
# Check payload size
|
||
payload_size = len(json.dumps(payload))
|
||
if payload_size > self.MAX_PAYLOAD_SIZE:
|
||
errors.append(f"Payload exceeds {self.MAX_PAYLOAD_SIZE} bytes")
|
||
|
||
info.append(f"Payload size: {payload_size} bytes")
|
||
info.append(f"Keys: {list(payload.keys())}")
|
||
|
||
# Check for nested objects
|
||
for key, value in payload.items():
|
||
if isinstance(value, dict):
|
||
info.append(f"'{key}' is nested object with keys: {list(value.keys())}")
|
||
elif isinstance(value, list):
|
||
info.append(f"'{key}' is array with {len(value)} items")
|
||
|
||
valid = len(errors) == 0
|
||
return ValidationResult(valid, errors, warnings, info)
|
||
|
||
|
||
def print_result(result: ValidationResult, verbose: bool = False):
|
||
"""Print validation result."""
|
||
if result.valid:
|
||
print("✅ Validation PASSED")
|
||
else:
|
||
print("❌ Validation FAILED")
|
||
|
||
if result.errors:
|
||
print("\nErrors:")
|
||
for error in result.errors:
|
||
print(f" ❌ {error}")
|
||
|
||
if result.warnings:
|
||
print("\nWarnings:")
|
||
for warning in result.warnings:
|
||
print(f" ⚠️ {warning}")
|
||
|
||
if verbose and result.info:
|
||
print("\nInfo:")
|
||
for info in result.info:
|
||
print(f" ℹ️ {info}")
|
||
|
||
return 0 if result.valid else 1
|
||
|
||
|
||
def main():
|
||
"""Main entry point."""
|
||
parser = argparse.ArgumentParser(
|
||
description="EU-Utility Webhook Payload Validator"
|
||
)
|
||
parser.add_argument(
|
||
"type",
|
||
choices=["discord", "homeassistant", "generic", "test"],
|
||
help="Webhook type to validate"
|
||
)
|
||
parser.add_argument(
|
||
"payload",
|
||
nargs="?",
|
||
help="JSON payload string or file path"
|
||
)
|
||
parser.add_argument(
|
||
"-v", "--verbose",
|
||
action="store_true",
|
||
help="Show verbose output"
|
||
)
|
||
parser.add_argument(
|
||
"--pretty",
|
||
action="store_true",
|
||
help="Pretty print the payload"
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Get validator
|
||
validators = {
|
||
"discord": DiscordWebhookValidator(),
|
||
"homeassistant": HomeAssistantWebhookValidator(),
|
||
"generic": GenericWebhookValidator(),
|
||
}
|
||
|
||
if args.type == "test":
|
||
# Run test payloads
|
||
print("Running test payloads...")
|
||
print("=" * 50)
|
||
|
||
test_cases = [
|
||
("discord", '{"content": "Hello World"}'),
|
||
("discord", '{"embeds": [{"title": "Test", "description": "A test embed"}]}'),
|
||
("discord", '{"content": "' + "x" * 2001 + '"}'), # Too long
|
||
("homeassistant", '{"event_type": "eu_utility_loot", "event_data": {"value": 50}}'),
|
||
("generic", '{"custom": "data", "nested": {"key": "value"}}'),
|
||
]
|
||
|
||
for webhook_type, payload_json in test_cases:
|
||
print(f"\n{webhook_type.upper()}: {payload_json[:50]}...")
|
||
validator = validators[webhook_type]
|
||
is_valid, payload, msg = validator.validate_json(payload_json)
|
||
|
||
if is_valid:
|
||
result = validator.validate(payload)
|
||
print_result(result, args.verbose)
|
||
else:
|
||
print(f"❌ JSON Error: {msg}")
|
||
|
||
return 0
|
||
|
||
# Validate specific payload
|
||
if not args.payload:
|
||
print("Error: payload required (or use 'test' command)")
|
||
return 1
|
||
|
||
validator = validators[args.type]
|
||
|
||
# Load payload
|
||
payload_str = args.payload
|
||
if Path(payload_str).exists():
|
||
with open(payload_str, 'r') as f:
|
||
payload_str = f.read()
|
||
|
||
# Validate JSON
|
||
is_valid, payload, msg = validator.validate_json(payload_str)
|
||
|
||
if not is_valid:
|
||
print(f"❌ {msg}")
|
||
return 1
|
||
|
||
if args.pretty:
|
||
print("Payload:")
|
||
print(json.dumps(payload, indent=2))
|
||
print()
|
||
|
||
# Validate structure
|
||
result = validator.validate(payload)
|
||
return print_result(result, args.verbose)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main()) |