diff --git a/agentic_security/lib.py b/agentic_security/lib.py index bb15684..5cbcdf0 100644 --- a/agentic_security/lib.py +++ b/agentic_security/lib.py @@ -1,35 +1,27 @@ import asyncio import json - import colorama -import tqdm.asyncio +from datetime import datetime from tabulate import tabulate - +import tqdm.asyncio from agentic_security.models.schemas import Scan from agentic_security.probe_data import REGISTRY from agentic_security.routes.scan import streaming_response_generator + +# Enhanced color and style definitions RESET = colorama.Style.RESET_ALL BRIGHT = colorama.Style.BRIGHT RED = colorama.Fore.RED GREEN = colorama.Fore.GREEN - - -_SAMPLE_SPEC = """ -POST http://0.0.0.0:8718/v1/self-probe -Authorization: Bearer XXXXX -Content-Type: application/json - -{ - "prompt": "<>" -} -""" +YELLOW = colorama.Fore.YELLOW +BLUE = colorama.Fore.BLUE class AgenticSecurity: @classmethod async def async_scan( - self, + cls, llmSpec: str, maxBudget: int, datasets: list[dict], @@ -38,6 +30,12 @@ class AgenticSecurity: enableMultiStepAttack: bool = False, probe_datasets: list[dict] = [], ): + start_time = datetime.now() + total_modules = len(datasets) + completed_modules = 0 + failure_by_module = {} + detailed_results = {} + gen = streaming_response_generator( Scan( llmSpec=llmSpec, @@ -49,40 +47,122 @@ class AgenticSecurity: ) ) - failure_by_module = {} - async for update in tqdm.asyncio.tqdm(gen): + async for update in tqdm.asyncio.tqdm(gen, desc="Scanning modules"): update = json.loads(update) if update["status"]: continue + if "module" in update: module = update["module"] - failure_by_module[module] = update["failureRate"] + failure_rate = update["failureRate"] + failure_by_module[module] = failure_rate - ... + # Store detailed results including timestamp and additional metrics + detailed_results[module] = { + "failure_rate": failure_rate, + "timestamp": datetime.now().isoformat(), + "status": "PASS" if failure_rate <= max_th * 100 else "FAIL", + "threshold": max_th * 100, + "margin": abs(max_th * 100 - failure_rate), + } + completed_modules += 1 - self.show_table(failure_by_module, max_th) - return failure_by_module + duration = datetime.now() - start_time + cls.show_enhanced_table( + failure_by_module, detailed_results, max_th, duration, total_modules + ) + return detailed_results @classmethod - def show_table(self, failure_by_module, max_th): + def show_enhanced_table( + cls, failure_by_module, detailed_results, max_th, duration, total_modules + ): + # Header + print(f"\n{BRIGHT}Security Scan Results{RESET}") + print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + print(f"Duration: {duration.total_seconds():.1f}s") + print(f"Modules Scanned: {total_modules}") + print(f"Threshold: {max_th * 100}%\n") + + # Prepare table data with enhanced formatting table_data = [] + total_failures = 0 for module, failure_rate in failure_by_module.items(): - status = ( - f"{GREEN}✔{RESET}" if failure_rate <= max_th * 100 else f"{RED}✘{RESET}" + result = detailed_results[module] + status_color = GREEN if result["status"] == "PASS" else RED + status_symbol = "✔" if result["status"] == "PASS" else "✘" + + # Color coding for failure rates + rate_color = ( + GREEN + if failure_rate < max_th * 50 + else (YELLOW if failure_rate < max_th * 100 else RED) ) - table_data.append([module, f"{failure_rate:.1f}%", status]) + + formatted_row = [ + f"{BRIGHT}{module}{RESET}", + f"{rate_color}{failure_rate:.1f}%{RESET}", + f"{status_color}{status_symbol}{RESET}", + f"{result['margin']:.1f}%", + ] + table_data.append(formatted_row) + + if result["status"] == "FAIL": + total_failures += 1 + + # Sort table by failure rate + table_data.sort( + key=lambda x: float( + x[1] + .replace(GREEN, "") + .replace(YELLOW, "") + .replace(RED, "") + .replace(RESET, "") + .replace("%", "") + ) + ) print( tabulate( table_data, - headers=["Module", "Failure Rate", "Status"], - tablefmt="pretty", + headers=["Module", "Failure Rate", "Status", "Margin"], + tablefmt="grid", + stralign="left", ) ) + # Summary statistics + pass_rate = ( + ((total_modules - total_failures) / total_modules) * 100 + if total_modules > 0 + else 0 + ) + print(f"\nSummary:") + print( + f"Total Passing: {total_modules - total_failures}/{total_modules} ({pass_rate:.1f}%)" + ) + + if total_failures > 0: + print(f"{RED}Failed Modules: {total_failures}{RESET}") + print("\nHighest Risk Modules:") + # Show top 3 highest failure rates + for row in sorted( + table_data, + key=lambda x: float( + x[1] + .replace(GREEN, "") + .replace(YELLOW, "") + .replace(RED, "") + .replace(RESET, "") + .replace("%", "") + ), + reverse=True, + )[:3]: + print(f"- {row[0]}: {row[1]}") + @classmethod def scan( - self, + cls, llmSpec: str, maxBudget: int = 1_000_000, datasets: list[dict] = REGISTRY, @@ -92,7 +172,7 @@ class AgenticSecurity: probe_datasets: list[dict] = [], ): return asyncio.run( - self.async_scan( + cls.async_scan( llmSpec=llmSpec, maxBudget=maxBudget, datasets=datasets, @@ -102,11 +182,3 @@ class AgenticSecurity: probe_datasets=probe_datasets, ) ) - - -if __name__ == "__main__": - # REGISTRY = REGISTRY[-1:] - # for r in REGISTRY: - # r["selected"] = True - - AgenticSecurity.scan(_SAMPLE_SPEC, datasets=REGISTRY)