diff --git a/proxy/routes/anthropic.py b/proxy/routes/anthropic.py index a9516ad..d6c5671 100644 --- a/proxy/routes/anthropic.py +++ b/proxy/routes/anthropic.py @@ -26,7 +26,7 @@ IGNORED_HEADERS = [ MISSING_INVARIANT_AUTH_HEADER = "Missing invariant-authorization header" MISSING_ANTHROPIC_AUTH_HEADER = "Missing athropic authorization header" -NOT_SUPPORTED_ENDPOINT = "Not supported OpenAI endpoint" +NOT_SUPPORTED_ENDPOINT = "Not supported Anthropic endpoint" FAILED_TO_PUSH_TRACE = "Failed to push trace to the dataset: " END_REASONS = [ "end_turn", @@ -141,7 +141,7 @@ async def handle_streaming_response( invariant_authorization: str ) -> StreamingResponse: - format_invariant_response = [] + formatted_invariant_response = [] async def event_generator() -> Any: async with client.stream( @@ -160,13 +160,13 @@ async def handle_streaming_response( process_chunk_text( chunk, - format_invariant_response + formatted_invariant_response ) - if len(format_invariant_response) > 0 and format_invariant_response[-1].get("stop_reason") in END_REASONS: + if formatted_invariant_response and formatted_invariant_response[-1].get("stop_reason") in END_REASONS: await push_to_explorer( dataset_name, - format_invariant_response[-1], + formatted_invariant_response[-1], json.loads(anthropic_request.content), invariant_authorization, reformat = False @@ -177,28 +177,23 @@ async def handle_streaming_response( return StreamingResponse(generator, media_type="text/event-stream") -def process_chunk_text(chunk, format_invariant_response): +def process_chunk_text(chunk, formatted_invariant_response): """ - Process the chunk of text and update the format_invariant_response - Example of chunk list: - chunk_list = [b'event: message_start\ndata: {"type":"message_start","message":{"id":"msg_012KWB6kiKvzx7r1SKs5nGA1","type":"message","role":"assistant","model":"claude-3-5-sonnet-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5}} }\n\nevent: content_block_start\ndata: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }\n\n' - ,b'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" making it an attractive destination for both business"} }\n\n' - , b'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" and leisure."} }\n\n' - , b'event: content_block_stop\ndata: {"type":"content_block_stop","index":0}\n\n' - , b'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":301} }\n\n' - , b'event: message_stop\ndata: {"type":"message_stop" }\n\n'] + Process the chunk of text and update the formatted_invariant_response + Example of chunk list can be find in: + ../../resources/streaming_chunk_text/anthropic.txt """ text_decode = chunk.decode().strip() for text_block in text_decode.split("\n\n"): text_data = text_block.split("\ndata:")[1] text_json = json.loads(text_data) - update_format_invariant_response(text_json, format_invariant_response) + update_formatted_invariant_response(text_json, formatted_invariant_response) -def update_format_invariant_response(text_json, format_invariant_response): +def update_formatted_invariant_response(text_json, formatted_invariant_response): if text_json.get("type") == MESSAGE_START: message = text_json.get("message") - format_invariant_response.append({ + formatted_invariant_response.append({ "id": message.get("id"), "role": message.get("role"), "content": "", @@ -206,10 +201,10 @@ def update_format_invariant_response(text_json, format_invariant_response): "stop_reason": message.get("stop_reason"), "stop_sequence": message.get("stop_sequence"), }) - elif text_json.get("type") == CONTENT_BLOCK_DELTA and len(format_invariant_response) > 0: - format_invariant_response[-1]["content"] += text_json.get("delta").get("text") - elif text_json.get("type") == MESSGAE_DELTA and len(format_invariant_response) > 0: - format_invariant_response[-1]["stop_reason"] = text_json.get("delta").get("stop_reason") + elif text_json.get("type") == CONTENT_BLOCK_DELTA: + formatted_invariant_response[-1]["content"] += text_json.get("delta").get("text") + elif text_json.get("type") == MESSGAE_DELTA: + formatted_invariant_response[-1]["stop_reason"] = text_json.get("delta").get("stop_reason") def anthropic_to_invariant_messages( messages: list[dict], keep_empty_tool_response: bool = False diff --git a/resources/streaming_chunk_text/anthropic.txt b/resources/streaming_chunk_text/anthropic.txt new file mode 100644 index 0000000..c7f5def --- /dev/null +++ b/resources/streaming_chunk_text/anthropic.txt @@ -0,0 +1,6 @@ +anthropic_chunk_list = [b'event: message_start\ndata: {"type":"message_start","message":{"id":"msg_012KWB6kiKvzx7r1SKs5nGA1","type":"message","role":"assistant","model":"claude-3-5-sonnet-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":20,"cache_creation_input_tokens":0,"cache_read_input_tokens":0,"output_tokens":5}} }\n\nevent: content_block_start\ndata: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""} }\n\n' + ,b'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" making it an attractive destination for both business"} }\n\n' + , b'event: content_block_delta\ndata: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" and leisure."} }\n\n' + , b'event: content_block_stop\ndata: {"type":"content_block_stop","index":0}\n\n' + , b'event: message_delta\ndata: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":301} }\n\n' + , b'event: message_stop\ndata: {"type":"message_stop" }\n\n'] \ No newline at end of file