File size: 7,317 Bytes
e3278e4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# What is this?
## Translates OpenAI call to Anthropic `/v1/messages` format
import traceback
from typing import Any, Optional
import litellm
from litellm import ChatCompletionRequest, verbose_logger
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.llms.anthropic import AnthropicMessagesRequest, AnthropicResponse
from litellm.types.utils import AdapterCompletionStreamWrapper, ModelResponse
class AnthropicAdapter(CustomLogger):
def __init__(self) -> None:
super().__init__()
def translate_completion_input_params(
self, kwargs
) -> Optional[ChatCompletionRequest]:
"""
- translate params, where needed
- pass rest, as is
"""
request_body = AnthropicMessagesRequest(**kwargs) # type: ignore
translated_body = litellm.AnthropicExperimentalPassThroughConfig().translate_anthropic_to_openai(
anthropic_message_request=request_body
)
return translated_body
def translate_completion_output_params(
self, response: ModelResponse
) -> Optional[AnthropicResponse]:
return litellm.AnthropicExperimentalPassThroughConfig().translate_openai_response_to_anthropic(
response=response
)
def translate_completion_output_params_streaming(
self, completion_stream: Any
) -> AdapterCompletionStreamWrapper | None:
return AnthropicStreamWrapper(completion_stream=completion_stream)
anthropic_adapter = AnthropicAdapter()
class AnthropicStreamWrapper(AdapterCompletionStreamWrapper):
"""
- first chunk return 'message_start'
- content block must be started and stopped
- finish_reason must map exactly to anthropic reason, else anthropic client won't be able to parse it.
"""
sent_first_chunk: bool = False
sent_content_block_start: bool = False
sent_content_block_finish: bool = False
sent_last_message: bool = False
holding_chunk: Optional[Any] = None
def __next__(self):
try:
if self.sent_first_chunk is False:
self.sent_first_chunk = True
return {
"type": "message_start",
"message": {
"id": "msg_1nZdL29xx5MUA1yADyHTEsnR8uuvGzszyY",
"type": "message",
"role": "assistant",
"content": [],
"model": "claude-3-5-sonnet-20240620",
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 25, "output_tokens": 1},
},
}
if self.sent_content_block_start is False:
self.sent_content_block_start = True
return {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
}
for chunk in self.completion_stream:
if chunk == "None" or chunk is None:
raise Exception
processed_chunk = litellm.AnthropicExperimentalPassThroughConfig().translate_streaming_openai_response_to_anthropic(
response=chunk
)
if (
processed_chunk["type"] == "message_delta"
and self.sent_content_block_finish is False
):
self.holding_chunk = processed_chunk
self.sent_content_block_finish = True
return {
"type": "content_block_stop",
"index": 0,
}
elif self.holding_chunk is not None:
return_chunk = self.holding_chunk
self.holding_chunk = processed_chunk
return return_chunk
else:
return processed_chunk
if self.holding_chunk is not None:
return_chunk = self.holding_chunk
self.holding_chunk = None
return return_chunk
if self.sent_last_message is False:
self.sent_last_message = True
return {"type": "message_stop"}
raise StopIteration
except StopIteration:
if self.sent_last_message is False:
self.sent_last_message = True
return {"type": "message_stop"}
raise StopIteration
except Exception as e:
verbose_logger.error(
"Anthropic Adapter - {}\n{}".format(e, traceback.format_exc())
)
async def __anext__(self):
try:
if self.sent_first_chunk is False:
self.sent_first_chunk = True
return {
"type": "message_start",
"message": {
"id": "msg_1nZdL29xx5MUA1yADyHTEsnR8uuvGzszyY",
"type": "message",
"role": "assistant",
"content": [],
"model": "claude-3-5-sonnet-20240620",
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 25, "output_tokens": 1},
},
}
if self.sent_content_block_start is False:
self.sent_content_block_start = True
return {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
}
async for chunk in self.completion_stream:
if chunk == "None" or chunk is None:
raise Exception
processed_chunk = litellm.AnthropicExperimentalPassThroughConfig().translate_streaming_openai_response_to_anthropic(
response=chunk
)
if (
processed_chunk["type"] == "message_delta"
and self.sent_content_block_finish is False
):
self.holding_chunk = processed_chunk
self.sent_content_block_finish = True
return {
"type": "content_block_stop",
"index": 0,
}
elif self.holding_chunk is not None:
return_chunk = self.holding_chunk
self.holding_chunk = processed_chunk
return return_chunk
else:
return processed_chunk
if self.holding_chunk is not None:
return_chunk = self.holding_chunk
self.holding_chunk = None
return return_chunk
if self.sent_last_message is False:
self.sent_last_message = True
return {"type": "message_stop"}
raise StopIteration
except StopIteration:
if self.sent_last_message is False:
self.sent_last_message = True
return {"type": "message_stop"}
raise StopAsyncIteration
|