fix(test-orchestrator): prevent SSRF and path traversal vulnerabilities

- Implement URL scheme validation for LLM endpoint connections.
- Mitigate Server-Side Request Forgery (SSRF) by ensuring only 'http' and 'https' schemes are allowed.
- Sanitize report output paths to prevent directory traversal attacks.
- Ensure test reports are saved only within the current working directory.
- Update RAG pipeline documentation to use Mermaid for improved flow diagram rendering.
This commit is contained in:
shiva108
2026-01-23 16:53:28 +01:00
parent 4964487bbe
commit 529d06f00a
2 changed files with 21 additions and 7 deletions

View File

@@ -930,7 +930,7 @@ log_entry = {
#### Example Secure Ingestion Flow
```text
```mermaid
Document Upload
Malware Scan → REJECT if threats found

View File

@@ -21,6 +21,7 @@ from pathlib import Path
from typing import Dict, List, Tuple, Any
from datetime import datetime
import requests
from urllib.parse import urlparse
# Configuration
SCRIPT_DIR = Path(__file__).parent.absolute()
@@ -45,6 +46,11 @@ class TestOrchestrator:
"""Main test orchestration class"""
def __init__(self, llm_endpoint: str = "http://localhost:11434", verbose: bool = True):
# Validate endpoint to prevent SSRF
parsed = urlparse(llm_endpoint)
if parsed.scheme not in ['http', 'https']:
raise ValueError(f"Invalid LLM endpoint scheme: '{parsed.scheme}'. Only 'http' and 'https' are allowed.")
self.llm_endpoint = llm_endpoint
self.verbose = verbose
self.results = {
@@ -372,22 +378,30 @@ Respond in JSON format."""
"""Generate test report"""
self.log(f"\n=== Generating Report ({format}) ===")
# Sanitize output path to prevent traversal
# Ensure we only write to the current directory by using the basename
safe_filename = os.path.basename(output_file)
if safe_filename != output_file:
self.log(f"Sanitizing output path from '{output_file}' to '{safe_filename}' for security", "WARNING")
output_path = Path.cwd() / safe_filename
if format == "json":
with open(output_file, 'w') as f:
with open(output_path, 'w') as f:
json.dump(self.results, f, indent=2)
self.log(f"JSON report saved to: {output_file}")
self.log(f"JSON report saved to: {output_path}")
elif format == "html":
html_content = self._generate_html_report()
with open(output_file, 'w') as f:
with open(output_path, 'w') as f:
f.write(html_content)
self.log(f"HTML report saved to: {output_file}")
self.log(f"HTML report saved to: {output_path}")
elif format == "summary":
summary = self._generate_summary()
with open(output_file, 'w') as f:
with open(output_path, 'w') as f:
f.write(summary)
self.log(f"Summary report saved to: {output_file}")
self.log(f"Summary report saved to: {output_path}")
def _generate_html_report(self) -> str:
"""Generate HTML report"""