change the way of sending streaming request and specify zurich, switzerland in prompt

This commit is contained in:
Zishan
2025-02-20 13:43:46 +01:00
parent c5ce8e305b
commit 45b9872d73
4 changed files with 61 additions and 39 deletions
+55 -31
View File
@@ -87,7 +87,7 @@ async def anthropic_proxy(
except httpx.HTTPStatusError as e:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to fetch response: {response.text}, got error{e}",
detail=f"Failed to fetch response from Anthropic: {response.text}, got error{e}",
)
await handle_non_streaming_response(
response, dataset_name, request_body_json, invariant_authorization
@@ -123,7 +123,18 @@ async def handle_non_streaming_response(
invariant_authorization: str,
):
"""Handles non-streaming Anthropic responses"""
json_response = response.json()
try:
json_response = response.json()
except json.JSONDecodeError as e:
raise HTTPException(
status_code=response.status_code,
detail=f"Invalid JSON response received from Anthropic: {response.text}, got error{e}",
) from e
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=json_response.get("error", "Unknown error from Anthropic"),
)
# Only push the trace to explorer if the last message is an end turn message
if json_response.get("stop_reason") in END_REASONS:
await push_to_explorer(
@@ -141,50 +152,63 @@ async def handle_streaming_response(
) -> StreamingResponse:
formatted_invariant_response = []
response = await client.send(anthropic_request, stream=True)
if response.status_code != 200:
error_content = await response.aread()
try:
error_json = json.loads(error_content)
error_detail = error_json.get("error", "Unknown error from Anthropic")
except json.JSONDecodeError:
error_detail = {"error": "Failed to decode error response from Anthropic"}
raise HTTPException(status_code=response.status_code, detail=error_detail)
async def event_generator() -> Any:
async with client.stream(
"POST",
anthropic_request.url,
headers=anthropic_request.headers,
content=anthropic_request.content,
) as response:
if response.status_code != 200:
yield json.dumps(
{"error": f"Failed to fetch response: {response.status_code}"}
).encode()
return
async for chunk in response.aiter_bytes():
yield chunk
# async with client.stream(
# "POST",
# anthropic_request.url,
# headers=anthropic_request.headers,
# content=anthropic_request.content,
# ) as response:
# if response.status_code != 200:
# yield json.dumps(
# {"error": f"Failed to fetch response: {response.status_code}"}
# ).encode()
# return
async for chunk in response.aiter_bytes():
chunk_decode = chunk.decode().strip()
if not chunk_decode:
continue
process_chunk_text(
chunk,
formatted_invariant_response
)
yield chunk
if formatted_invariant_response and formatted_invariant_response[-1].get("stop_reason") in END_REASONS:
await push_to_explorer(
dataset_name,
formatted_invariant_response[-1],
json.loads(anthropic_request.content),
invariant_authorization,
)
process_chunk_text(
chunk_decode,
formatted_invariant_response
)
if formatted_invariant_response and formatted_invariant_response[-1].get("stop_reason") in END_REASONS:
await push_to_explorer(
dataset_name,
formatted_invariant_response[-1],
json.loads(anthropic_request.content),
invariant_authorization,
)
generator = event_generator()
return StreamingResponse(generator, media_type="text/event-stream")
def process_chunk_text(chunk, formatted_invariant_response):
def process_chunk_text(chunk_decode, formatted_invariant_response):
"""
Process the chunk of text and update the formatted_invariant_response
Example of chunk list can be find in:
../../resources/streaming_chunk_text/anthropic.txt
"""
text_decode = chunk.decode().strip()
for text_block in text_decode.split("\n\n"):
# might be empty block
for text_block in chunk_decode.split("\n\n"):
# might be empty block
if len(text_block.split("\ndata:"))>1:
text_data = text_block.split("\ndata:")[1]
text_json = json.loads(text_data)
@@ -19,9 +19,7 @@ async def test_header(
dataset_name = "claude_header_test" + str(
datetime.datetime.now().strftime("%Y%m%d%H%M%S")
)
print("before patch:",anthropic_api_key)
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": anthropic_api_key + "|invariant-auth: <not needed for test>"}):
print("after patch:",os.environ.get("ANTHROPIC_API_KEY"))
client = anthropic.Anthropic(
http_client=Client(),
base_url = f"{proxy_url}/api/v1/proxy/{dataset_name}/anthropic",
@@ -32,7 +30,7 @@ async def test_header(
messages=[
{
"role": "user",
"content": "Give me an introduction to Zurich within 200 words."
"content": "Give me an introduction to Zurich, Switzerland within 200 words."
}
]
)
@@ -55,7 +53,7 @@ async def test_header(
assert trace["messages"] == [
{
"role": "user",
"content": "Give me an introduction to Zurich within 200 words."
"content": "Give me an introduction to Zurich, Switzerland within 200 words."
},
{
"role": "assistant",
@@ -164,7 +164,7 @@ async def test_response_with_toolcall(
weather_agent = WeatherAgent(proxy_url)
queries = [
"What's the weather like in Zurich city?",
"What's the weather like in Zurich, Switzerland?",
"Tell me the weather for New York",
]
cities = ["zurich", "new york"]
@@ -220,7 +220,7 @@ async def test_streaming_response_with_toolcall(
weather_agent = WeatherAgent(proxy_url)
queries = [
"What's the weather like in Zurich city?",
"What's the weather like in Zurich, Switzerland?",
"Tell me the weather for New York",
]
cities = ["zurich", "new york"]
@@ -25,7 +25,7 @@ async def test_response_without_toolcall(
cities = ["zurich", "new york", "london"]
queries = [
"Can you introduce Zurich city within 200 words?",
"Can you introduce Zurich, Switzerland within 200 words?",
"Tell me the history of New York within 100 words?",
"How's the weather in London next week?"
]
@@ -86,7 +86,7 @@ async def test_streaming_response_without_toolcall(
cities = ["zurich", "new york", "london"]
queries = [
"Can you introduce Zurich city within 200 words?",
"Can you introduce Zurich, Switzerland within 200 words?",
"Tell me the history of New York within 100 words?",
"How's the weather in London next week?"
]