From 24c6eaae2896cefc51934b0a9ed78fd5b6af9d5a Mon Sep 17 00:00:00 2001 From: Adam Wilson Date: Wed, 30 Jul 2025 11:13:20 -0600 Subject: [PATCH] JSON schema script --- tests/logs/json_schema_generator_for_logs.py | 241 +++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 tests/logs/json_schema_generator_for_logs.py diff --git a/tests/logs/json_schema_generator_for_logs.py b/tests/logs/json_schema_generator_for_logs.py new file mode 100644 index 000000000..f042dae04 --- /dev/null +++ b/tests/logs/json_schema_generator_for_logs.py @@ -0,0 +1,241 @@ +import json +import sys +from typing import Any, Dict, List, Union +from collections import defaultdict + +class JSONSchemaGenerator: + def __init__(self): + self.type_counts = defaultdict(int) + + def get_python_type(self, value: Any) -> str: + """Determine the Python type of a value.""" + if value is None: + return "null" + elif isinstance(value, bool): + return "boolean" + elif isinstance(value, int): + return "integer" + elif isinstance(value, float): + return "number" + elif isinstance(value, str): + return "string" + elif isinstance(value, list): + return "array" + elif isinstance(value, dict): + return "object" + else: + return "unknown" + + def analyze_array(self, arr: List[Any], path: str = "") -> Dict[str, Any]: + """Analyze an array and determine the schema for its items.""" + if not arr: + return { + "type": "array", + "items": {"type": "unknown"}, + "minItems": 0, + "maxItems": 0 + } + + # Collect types and schemas of all items + item_schemas = [] + type_frequency = defaultdict(int) + + for item in arr: + item_type = self.get_python_type(item) + type_frequency[item_type] += 1 + + if item_type == "object": + item_schemas.append(self.analyze_object(item, f"{path}[item]")) + elif item_type == "array": + item_schemas.append(self.analyze_array(item, f"{path}[item]")) + else: + item_schemas.append({"type": item_type}) + + # Determine the most common type or create a union type + most_common_type = max(type_frequency.items(), key=lambda x: x[1])[0] + + schema = { + "type": "array", + "minItems": len(arr), + "maxItems": len(arr) + } + + if len(type_frequency) == 1: + # All items are the same type + if most_common_type == "object" and item_schemas: + # Merge object schemas + schema["items"] = self.merge_object_schemas(item_schemas) + elif most_common_type == "array" and item_schemas: + # For arrays of arrays, use the first array's schema as template + schema["items"] = item_schemas[0] + else: + schema["items"] = {"type": most_common_type} + else: + # Mixed types - create anyOf schema + unique_schemas = [] + seen_schemas = set() + + for item_schema in item_schemas: + schema_str = json.dumps(item_schema, sort_keys=True) + if schema_str not in seen_schemas: + seen_schemas.add(schema_str) + unique_schemas.append(item_schema) + + schema["items"] = { + "anyOf": unique_schemas + } + + return schema + + def merge_object_schemas(self, schemas: List[Dict[str, Any]]) -> Dict[str, Any]: + """Merge multiple object schemas into one.""" + if not schemas: + return {"type": "object"} + + merged = { + "type": "object", + "properties": {}, + "required": [] + } + + # Collect all properties + all_properties = set() + property_frequency = defaultdict(int) + property_schemas = defaultdict(list) + + for schema in schemas: + if "properties" in schema: + for prop, prop_schema in schema["properties"].items(): + all_properties.add(prop) + property_frequency[prop] += 1 + property_schemas[prop].append(prop_schema) + + # Build merged properties + total_schemas = len(schemas) + for prop in all_properties: + prop_count = property_frequency[prop] + prop_schema_list = property_schemas[prop] + + # If property appears in all schemas, it's required + if prop_count == total_schemas: + merged["required"].append(prop) + + # Merge property schemas + if len(set(json.dumps(s, sort_keys=True) for s in prop_schema_list)) == 1: + # All schemas are identical + merged["properties"][prop] = prop_schema_list[0] + else: + # Different schemas - create anyOf + unique_schemas = [] + seen = set() + for ps in prop_schema_list: + ps_str = json.dumps(ps, sort_keys=True) + if ps_str not in seen: + seen.add(ps_str) + unique_schemas.append(ps) + + if len(unique_schemas) == 1: + merged["properties"][prop] = unique_schemas[0] + else: + merged["properties"][prop] = {"anyOf": unique_schemas} + + if not merged["required"]: + del merged["required"] + + return merged + + def analyze_object(self, obj: Dict[str, Any], path: str = "") -> Dict[str, Any]: + """Analyze an object and generate its schema.""" + schema = { + "type": "object", + "properties": {}, + "required": list(obj.keys()) + } + + for key, value in obj.items(): + current_path = f"{path}.{key}" if path else key + value_type = self.get_python_type(value) + + if value_type == "object": + schema["properties"][key] = self.analyze_object(value, current_path) + elif value_type == "array": + schema["properties"][key] = self.analyze_array(value, current_path) + else: + prop_schema = {"type": value_type} + + # Add additional constraints based on type + if value_type == "string" and value: + prop_schema["minLength"] = len(value) + prop_schema["maxLength"] = len(value) + elif value_type == "integer": + prop_schema["minimum"] = value + prop_schema["maximum"] = value + elif value_type == "number": + prop_schema["minimum"] = value + prop_schema["maximum"] = value + + schema["properties"][key] = prop_schema + + return schema + + def generate_schema(self, data: Any) -> Dict[str, Any]: + """Generate a JSON schema from the provided data.""" + root_type = self.get_python_type(data) + + if root_type == "object": + return self.analyze_object(data) + elif root_type == "array": + return self.analyze_array(data) + else: + return {"type": root_type} + + def load_and_analyze(self, file_path: str) -> Dict[str, Any]: + """Load a JSON file and generate its schema.""" + try: + with open(file_path, 'r', encoding='utf-8') as file: + data = json.load(file) + + schema = self.generate_schema(data) + + # Add schema metadata + schema["$schema"] = "http://json-schema.org/draft-07/schema#" + schema["title"] = f"Generated schema for {file_path}" + schema["description"] = f"Auto-generated JSON schema based on the structure of {file_path}" + + return schema + + except FileNotFoundError: + raise FileNotFoundError(f"File not found: {file_path}") + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in file {file_path}: {e}") + except Exception as e: + raise Exception(f"Error processing file {file_path}: {e}") + +def main(): + if len(sys.argv) != 2: + print("Usage: python json_schema_generator.py ") + print("Example: python json_schema_generator.py data.json") + sys.exit(1) + + file_path = sys.argv[1] + generator = JSONSchemaGenerator() + + try: + schema = generator.load_and_analyze(file_path) + + # Pretty print the schema + print(json.dumps(schema, indent=2, ensure_ascii=False)) + + # Optionally save to file + output_file = file_path.replace('.json', '_schema.json') + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(schema, f, indent=2, ensure_ascii=False) + + print(f"\nSchema saved to: {output_file}") + + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file