fix: make cost calculation model-aware instead of hardcoded to deepseek-chat

Previously, calculate_cost() was always called without a model parameter,
causing all scans to report costs based on deepseek-chat pricing regardless
of the actual target model (e.g. gpt-4, claude-3-opus).

Changes:

- http_spec.py: Add 'model_name' property to LLMSpec that extracts the
  model field from the JSON request body. Returns 'unknown' if the body
  is not valid JSON or has no 'model' field.

- probe_data/image_generator.py: Add 'model_name' pass-through property
  to RequestAdapter, delegating to the underlying LLMSpec.

- probe_data/audio_generator.py: Same as above - add 'model_name'
  pass-through property to RequestAdapter.

- probe_actor/cost_module.py:
  - Change return type from float to float | None.
  - Unknown models now log a warning and return None instead of raising
    ValueError, so scans are not interrupted by unsupported model names.
  - Add logger import for the warning message.

- probe_actor/fuzzer.py: Pass model_name to calculate_cost() in both
  scan_module() and perform_many_shot_scan() using
  getattr(request_factory, 'model_name', 'unknown').

- primitives/models.py: Update ScanResult.cost type from float to
  float | None to accommodate unknown model pricing.
This commit is contained in:
zhanz5
2026-06-05 13:59:59 +08:00
parent d2bbad32b4
commit 02b68b06ee
6 changed files with 33 additions and 6 deletions
+13
View File
@@ -1,4 +1,5 @@
import base64
import json
from enum import Enum
from urllib.parse import urlparse
@@ -145,6 +146,18 @@ class LLMSpec(BaseModel):
fn = probe
@property
def model_name(self) -> str:
"""Extract the model name from the request body (JSON).
Returns the value of the 'model' field if present, otherwise 'unknown'.
"""
try:
body_json = json.loads(self.body)
return body_json.get("model", "unknown")
except (json.JSONDecodeError, TypeError):
return "unknown"
@property
def modality(self) -> Modality:
if self.has_image:
+1 -1
View File
@@ -40,7 +40,7 @@ class Scan(BaseModel):
class ScanResult(BaseModel):
module: str
tokens: float | int
cost: float
cost: float | None
progress: float
status: bool = False
failureRate: float = 0.0
+9 -3
View File
@@ -1,4 +1,7 @@
def calculate_cost(tokens: int, model: str = "deepseek-chat") -> float:
from agentic_security.logutils import logger
def calculate_cost(tokens: int, model: str = "deepseek-chat") -> float | None:
"""Calculate API cost based on token count and model.
Args:
@@ -6,7 +9,7 @@ def calculate_cost(tokens: int, model: str = "deepseek-chat") -> float:
model (str): Model name to calculate cost for
Returns:
float: Cost in USD
float | None: Cost in USD, or None if the model pricing is unknown.
"""
# API pricing as of 2024-03-01
pricing = {
@@ -49,7 +52,10 @@ def calculate_cost(tokens: int, model: str = "deepseek-chat") -> float:
}
if model not in pricing:
raise ValueError(f"Unknown model: {model}")
logger.warning(
f"Unknown model '{model}': pricing not available, cost will not be estimated."
)
return None
# For now, assume 1:1 input/output ratio
input_cost = tokens * pricing[model]["input"]
+2 -2
View File
@@ -273,7 +273,7 @@ async def scan_module(
failure_rate = module_failures / max(module_prompts, 1)
failure_rates.append(failure_rate)
cost = calculate_cost(tokens)
cost = calculate_cost(tokens, model=getattr(request_factory, 'model_name', 'unknown'))
response_text = fuzzer_state.get_last_output(prompt) or ""
@@ -543,7 +543,7 @@ async def perform_many_shot_scan(
failure_rate = module_failures / max(processed_prompts, 1)
failure_rates.append(failure_rate)
cost = calculate_cost(tokens)
cost = calculate_cost(tokens, model=getattr(request_factory, 'model_name', 'unknown'))
yield ScanResult(
module=module.dataset_name,
@@ -131,6 +131,10 @@ class RequestAdapter:
if not llm_spec.has_audio:
raise ValueError("LLMSpec must have an image")
@property
def model_name(self) -> str:
return self.llm_spec.model_name
async def probe(
self, prompt: str, encoded_image: str = "", encoded_audio: str = "", files={}
) -> httpx.Response:
@@ -131,6 +131,10 @@ class RequestAdapter:
if not llm_spec.has_image:
raise ValueError("LLMSpec must have an image")
@property
def model_name(self) -> str:
return self.llm_spec.model_name
async def probe(
self, prompt: str, encoded_image: str = "", encoded_audio: str = "", files={}
) -> httpx.Response: