#!/usr/bin/env python3 """ EU-Utility Webhook Payload Validator ===================================== Validates webhook payloads for various integrations: - Discord webhooks - Home Assistant webhooks - Generic HTTP webhooks Usage: python webhook_validator.py --help python webhook_validator.py discord payload.json python webhook_validator.py homeassistant payload.json python webhook_validator.py test """ import json import sys import argparse from typing import Dict, Any, List, Tuple, Optional from dataclasses import dataclass from pathlib import Path @dataclass class ValidationResult: """Validation result container.""" valid: bool errors: List[str] warnings: List[str] info: List[str] class WebhookValidator: """Base webhook validator.""" def validate(self, payload: Dict) -> ValidationResult: """Validate a payload. Must be implemented by subclasses.""" raise NotImplementedError def validate_json(self, data: str) -> Tuple[bool, Optional[Dict], str]: """Validate JSON format.""" try: payload = json.loads(data) return True, payload, "Valid JSON" except json.JSONDecodeError as e: return False, None, f"Invalid JSON: {e}" class DiscordWebhookValidator(WebhookValidator): """Validator for Discord webhook payloads.""" MAX_CONTENT_LENGTH = 2000 MAX_EMBEDS = 10 MAX_EMBED_TITLE = 256 MAX_EMBED_DESCRIPTION = 4096 MAX_FIELDS = 25 MAX_FIELD_NAME = 256 MAX_FIELD_VALUE = 1024 MAX_FOOTER_TEXT = 2048 MAX_AUTHOR_NAME = 256 COLORS = { "DEFAULT": 0, "AQUA": 1752220, "GREEN": 3066993, "BLUE": 3447003, "PURPLE": 10181046, "GOLD": 15844367, "ORANGE": 15105570, "RED": 15158332, "GREY": 9807270, "DARKER_GREY": 8359053, "NAVY": 3426654, "DARK_AQUA": 1146986, "DARK_GREEN": 2067276, "DARK_BLUE": 2123412, "DARK_PURPLE": 7419530, "DARK_GOLD": 12745742, "DARK_ORANGE": 11027200, "DARK_RED": 10038562, "DARK_GREY": 9936031, "LIGHT_GREY": 12370112, "DARK_NAVY": 2899536, } def validate(self, payload: Dict) -> ValidationResult: """Validate Discord webhook payload.""" errors = [] warnings = [] info = [] # Check required fields if "content" not in payload and "embeds" not in payload: errors.append("Payload must contain either 'content' or 'embeds'") return ValidationResult(False, errors, warnings, info) # Validate content if "content" in payload: content = payload["content"] if content: if len(content) > self.MAX_CONTENT_LENGTH: errors.append(f"Content exceeds {self.MAX_CONTENT_LENGTH} characters") # Check for @everyone or @here if "@everyone" in content or "@here" in content: warnings.append("Content contains @everyone or @here") info.append(f"Content length: {len(content)}") # Validate embeds if "embeds" in payload: embeds = payload["embeds"] if not isinstance(embeds, list): errors.append("Embeds must be an array") else: if len(embeds) > self.MAX_EMBEDS: errors.append(f"Too many embeds (max {self.MAX_EMBEDS})") for i, embed in enumerate(embeds): embed_errors, embed_warnings = self._validate_embed(embed, i) errors.extend(embed_errors) warnings.extend(embed_warnings) # Validate username if "username" in payload: if len(payload["username"]) > 80: errors.append("Username exceeds 80 characters") # Validate avatar_url if "avatar_url" in payload: url = payload["avatar_url"] if not url.startswith(("http://", "https://")): errors.append("Avatar URL must be HTTP(S)") # Validate allowed_mentions if "allowed_mentions" in payload: self._validate_allowed_mentions(payload["allowed_mentions"], errors, warnings) # Validate components (buttons) if "components" in payload: warnings.append("Components (buttons) require application-owned webhook") valid = len(errors) == 0 return ValidationResult(valid, errors, warnings, info) def _validate_embed(self, embed: Dict, index: int) -> Tuple[List[str], List[str]]: """Validate a single embed.""" errors = [] warnings = [] prefix = f"Embed[{index}]: " # Check title if "title" in embed: if len(embed["title"]) > self.MAX_EMBED_TITLE: errors.append(f"{prefix}Title exceeds {self.MAX_EMBED_TITLE} characters") # Check description if "description" in embed: if len(embed["description"]) > self.MAX_EMBED_DESCRIPTION: errors.append(f"{prefix}Description exceeds {self.MAX_EMBED_DESCRIPTION} characters") # Check color if "color" in embed: color = embed["color"] if not isinstance(color, int): warnings.append(f"{prefix}Color should be an integer") elif color < 0 or color > 16777215: errors.append(f"{prefix}Color must be between 0 and 16777215") # Check fields if "fields" in embed: fields = embed["fields"] if not isinstance(fields, list): errors.append(f"{prefix}Fields must be an array") elif len(fields) > self.MAX_FIELDS: errors.append(f"{prefix}Too many fields (max {self.MAX_FIELDS})") else: for i, field in enumerate(fields): if "name" not in field or "value" not in field: errors.append(f"{prefix}Field[{i}] missing name or value") else: if len(field["name"]) > self.MAX_FIELD_NAME: errors.append(f"{prefix}Field[{i}] name exceeds {self.MAX_FIELD_NAME} characters") if len(field["value"]) > self.MAX_FIELD_VALUE: errors.append(f"{prefix}Field[{i}] value exceeds {self.MAX_FIELD_VALUE} characters") # Check footer if "footer" in embed: footer = embed["footer"] if "text" in footer and len(footer["text"]) > self.MAX_FOOTER_TEXT: errors.append(f"{prefix}Footer text exceeds {self.MAX_FOOTER_TEXT} characters") # Check author if "author" in embed: author = embed["author"] if "name" in author and len(author["name"]) > self.MAX_AUTHOR_NAME: errors.append(f"{prefix}Author name exceeds {self.MAX_AUTHOR_NAME} characters") # Check timestamp if "timestamp" in embed: warnings.append(f"{prefix}Timestamp: {embed['timestamp']}") return errors, warnings def _validate_allowed_mentions(self, allowed: Dict, errors: List, warnings: List): """Validate allowed_mentions object.""" valid_keys = {"parse", "roles", "users", "replied_user"} for key in allowed.keys(): if key not in valid_keys: errors.append(f"Invalid allowed_mentions key: {key}") if "parse" in allowed: valid_parse = {"everyone", "users", "roles"} for item in allowed["parse"]: if item not in valid_parse: errors.append(f"Invalid parse value: {item}") class HomeAssistantWebhookValidator(WebhookValidator): """Validator for Home Assistant webhook payloads.""" def validate(self, payload: Dict) -> ValidationResult: """Validate Home Assistant webhook payload.""" errors = [] warnings = [] info = [] # HA webhooks are flexible, but we can check for common patterns # Check for event_type (common in HA webhooks) if "event_type" in payload: info.append(f"Event type: {payload['event_type']}") # Check for event_data if "event_data" in payload: if not isinstance(payload["event_data"], dict): warnings.append("event_data should be an object") else: info.append(f"Event data keys: {list(payload['event_data'].keys())}") # Check for state (sensor updates) if "state" in payload: info.append(f"State: {payload['state']}") # Check for attributes if "attributes" in payload: if not isinstance(payload["attributes"], dict): warnings.append("attributes should be an object") valid = len(errors) == 0 return ValidationResult(valid, errors, warnings, info) class GenericWebhookValidator(WebhookValidator): """Generic webhook validator.""" MAX_PAYLOAD_SIZE = 1024 * 1024 # 1MB def validate(self, payload: Dict) -> ValidationResult: """Validate generic webhook payload.""" errors = [] warnings = [] info = [] # Check payload size payload_size = len(json.dumps(payload)) if payload_size > self.MAX_PAYLOAD_SIZE: errors.append(f"Payload exceeds {self.MAX_PAYLOAD_SIZE} bytes") info.append(f"Payload size: {payload_size} bytes") info.append(f"Keys: {list(payload.keys())}") # Check for nested objects for key, value in payload.items(): if isinstance(value, dict): info.append(f"'{key}' is nested object with keys: {list(value.keys())}") elif isinstance(value, list): info.append(f"'{key}' is array with {len(value)} items") valid = len(errors) == 0 return ValidationResult(valid, errors, warnings, info) def print_result(result: ValidationResult, verbose: bool = False): """Print validation result.""" if result.valid: print("✅ Validation PASSED") else: print("❌ Validation FAILED") if result.errors: print("\nErrors:") for error in result.errors: print(f" ❌ {error}") if result.warnings: print("\nWarnings:") for warning in result.warnings: print(f" ⚠️ {warning}") if verbose and result.info: print("\nInfo:") for info in result.info: print(f" ℹ️ {info}") return 0 if result.valid else 1 def main(): """Main entry point.""" parser = argparse.ArgumentParser( description="EU-Utility Webhook Payload Validator" ) parser.add_argument( "type", choices=["discord", "homeassistant", "generic", "test"], help="Webhook type to validate" ) parser.add_argument( "payload", nargs="?", help="JSON payload string or file path" ) parser.add_argument( "-v", "--verbose", action="store_true", help="Show verbose output" ) parser.add_argument( "--pretty", action="store_true", help="Pretty print the payload" ) args = parser.parse_args() # Get validator validators = { "discord": DiscordWebhookValidator(), "homeassistant": HomeAssistantWebhookValidator(), "generic": GenericWebhookValidator(), } if args.type == "test": # Run test payloads print("Running test payloads...") print("=" * 50) test_cases = [ ("discord", '{"content": "Hello World"}'), ("discord", '{"embeds": [{"title": "Test", "description": "A test embed"}]}'), ("discord", '{"content": "' + "x" * 2001 + '"}'), # Too long ("homeassistant", '{"event_type": "eu_utility_loot", "event_data": {"value": 50}}'), ("generic", '{"custom": "data", "nested": {"key": "value"}}'), ] for webhook_type, payload_json in test_cases: print(f"\n{webhook_type.upper()}: {payload_json[:50]}...") validator = validators[webhook_type] is_valid, payload, msg = validator.validate_json(payload_json) if is_valid: result = validator.validate(payload) print_result(result, args.verbose) else: print(f"❌ JSON Error: {msg}") return 0 # Validate specific payload if not args.payload: print("Error: payload required (or use 'test' command)") return 1 validator = validators[args.type] # Load payload payload_str = args.payload if Path(payload_str).exists(): with open(payload_str, 'r') as f: payload_str = f.read() # Validate JSON is_valid, payload, msg = validator.validate_json(payload_str) if not is_valid: print(f"❌ {msg}") return 1 if args.pretty: print("Payload:") print(json.dumps(payload, indent=2)) print() # Validate structure result = validator.validate(payload) return print_result(result, args.verbose) if __name__ == "__main__": sys.exit(main())