From 45b9872d736d06efad41f7624c186a135bc7cdd9 Mon Sep 17 00:00:00 2001 From: Zishan Date: Thu, 20 Feb 2025 13:43:46 +0100 Subject: [PATCH] change the way of sending streaming request and specify zurich, switzerland in prompt --- proxy/routes/anthropic.py | 86 ++++++++++++------- ...est_anthropic_header_with_invariant_key.py | 6 +- .../test_anthropic_with_tool_call.py | 4 +- .../test_anthropic_without_tool_call.py | 4 +- 4 files changed, 61 insertions(+), 39 deletions(-) diff --git a/proxy/routes/anthropic.py b/proxy/routes/anthropic.py index a5882b7..3bf7ee3 100644 --- a/proxy/routes/anthropic.py +++ b/proxy/routes/anthropic.py @@ -87,7 +87,7 @@ async def anthropic_proxy( except httpx.HTTPStatusError as e: raise HTTPException( status_code=response.status_code, - detail=f"Failed to fetch response: {response.text}, got error{e}", + detail=f"Failed to fetch response from Anthropic: {response.text}, got error{e}", ) await handle_non_streaming_response( response, dataset_name, request_body_json, invariant_authorization @@ -123,7 +123,18 @@ async def handle_non_streaming_response( invariant_authorization: str, ): """Handles non-streaming Anthropic responses""" - json_response = response.json() + try: + json_response = response.json() + except json.JSONDecodeError as e: + raise HTTPException( + status_code=response.status_code, + detail=f"Invalid JSON response received from Anthropic: {response.text}, got error{e}", + ) from e + if response.status_code != 200: + raise HTTPException( + status_code=response.status_code, + detail=json_response.get("error", "Unknown error from Anthropic"), + ) # Only push the trace to explorer if the last message is an end turn message if json_response.get("stop_reason") in END_REASONS: await push_to_explorer( @@ -141,50 +152,63 @@ async def handle_streaming_response( ) -> StreamingResponse: formatted_invariant_response = [] + + response = await client.send(anthropic_request, stream=True) + + if response.status_code != 200: + error_content = await response.aread() + try: + error_json = json.loads(error_content) + error_detail = error_json.get("error", "Unknown error from Anthropic") + except json.JSONDecodeError: + error_detail = {"error": "Failed to decode error response from Anthropic"} + raise HTTPException(status_code=response.status_code, detail=error_detail) async def event_generator() -> Any: - async with client.stream( - "POST", - anthropic_request.url, - headers=anthropic_request.headers, - content=anthropic_request.content, - ) as response: - if response.status_code != 200: - yield json.dumps( - {"error": f"Failed to fetch response: {response.status_code}"} - ).encode() - return - async for chunk in response.aiter_bytes(): - yield chunk + # async with client.stream( + # "POST", + # anthropic_request.url, + # headers=anthropic_request.headers, + # content=anthropic_request.content, + # ) as response: + # if response.status_code != 200: + # yield json.dumps( + # {"error": f"Failed to fetch response: {response.status_code}"} + # ).encode() + # return + async for chunk in response.aiter_bytes(): + chunk_decode = chunk.decode().strip() + if not chunk_decode: + continue - process_chunk_text( - chunk, - formatted_invariant_response - ) + yield chunk - if formatted_invariant_response and formatted_invariant_response[-1].get("stop_reason") in END_REASONS: - await push_to_explorer( - dataset_name, - formatted_invariant_response[-1], - json.loads(anthropic_request.content), - invariant_authorization, - ) + process_chunk_text( + chunk_decode, + formatted_invariant_response + ) + + if formatted_invariant_response and formatted_invariant_response[-1].get("stop_reason") in END_REASONS: + await push_to_explorer( + dataset_name, + formatted_invariant_response[-1], + json.loads(anthropic_request.content), + invariant_authorization, + ) generator = event_generator() return StreamingResponse(generator, media_type="text/event-stream") -def process_chunk_text(chunk, formatted_invariant_response): +def process_chunk_text(chunk_decode, formatted_invariant_response): """ Process the chunk of text and update the formatted_invariant_response Example of chunk list can be find in: ../../resources/streaming_chunk_text/anthropic.txt """ - text_decode = chunk.decode().strip() - for text_block in text_decode.split("\n\n"): - # might be empty block - + for text_block in chunk_decode.split("\n\n"): + # might be empty block if len(text_block.split("\ndata:"))>1: text_data = text_block.split("\ndata:")[1] text_json = json.loads(text_data) diff --git a/tests/anthropic/test_anthropic_header_with_invariant_key.py b/tests/anthropic/test_anthropic_header_with_invariant_key.py index 20040d8..88f3728 100644 --- a/tests/anthropic/test_anthropic_header_with_invariant_key.py +++ b/tests/anthropic/test_anthropic_header_with_invariant_key.py @@ -19,9 +19,7 @@ async def test_header( dataset_name = "claude_header_test" + str( datetime.datetime.now().strftime("%Y%m%d%H%M%S") ) - print("before patch:",anthropic_api_key) with patch.dict(os.environ, {"ANTHROPIC_API_KEY": anthropic_api_key + "|invariant-auth: "}): - print("after patch:",os.environ.get("ANTHROPIC_API_KEY")) client = anthropic.Anthropic( http_client=Client(), base_url = f"{proxy_url}/api/v1/proxy/{dataset_name}/anthropic", @@ -32,7 +30,7 @@ async def test_header( messages=[ { "role": "user", - "content": "Give me an introduction to Zurich within 200 words." + "content": "Give me an introduction to Zurich, Switzerland within 200 words." } ] ) @@ -55,7 +53,7 @@ async def test_header( assert trace["messages"] == [ { "role": "user", - "content": "Give me an introduction to Zurich within 200 words." + "content": "Give me an introduction to Zurich, Switzerland within 200 words." }, { "role": "assistant", diff --git a/tests/anthropic/test_anthropic_with_tool_call.py b/tests/anthropic/test_anthropic_with_tool_call.py index 30ecf37..3f94bec 100644 --- a/tests/anthropic/test_anthropic_with_tool_call.py +++ b/tests/anthropic/test_anthropic_with_tool_call.py @@ -164,7 +164,7 @@ async def test_response_with_toolcall( weather_agent = WeatherAgent(proxy_url) queries = [ - "What's the weather like in Zurich city?", + "What's the weather like in Zurich, Switzerland?", "Tell me the weather for New York", ] cities = ["zurich", "new york"] @@ -220,7 +220,7 @@ async def test_streaming_response_with_toolcall( weather_agent = WeatherAgent(proxy_url) queries = [ - "What's the weather like in Zurich city?", + "What's the weather like in Zurich, Switzerland?", "Tell me the weather for New York", ] cities = ["zurich", "new york"] diff --git a/tests/anthropic/test_anthropic_without_tool_call.py b/tests/anthropic/test_anthropic_without_tool_call.py index a2d05f3..2748158 100644 --- a/tests/anthropic/test_anthropic_without_tool_call.py +++ b/tests/anthropic/test_anthropic_without_tool_call.py @@ -25,7 +25,7 @@ async def test_response_without_toolcall( cities = ["zurich", "new york", "london"] queries = [ - "Can you introduce Zurich city within 200 words?", + "Can you introduce Zurich, Switzerland within 200 words?", "Tell me the history of New York within 100 words?", "How's the weather in London next week?" ] @@ -86,7 +86,7 @@ async def test_streaming_response_without_toolcall( cities = ["zurich", "new york", "london"] queries = [ - "Can you introduce Zurich city within 200 words?", + "Can you introduce Zurich, Switzerland within 200 words?", "Tell me the history of New York within 100 words?", "How's the weather in London next week?" ]