TestLLM / litellm /llms /base_llm /base_model_iterator.py
Raju2024's picture
Upload 1072 files
e3278e4 verified
raw
history blame
4.33 kB
import json
from abc import abstractmethod
from typing import Optional, Union
from litellm.types.utils import GenericStreamingChunk, ModelResponseStream
class BaseModelResponseIterator:
def __init__(
self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False
):
self.streaming_response = streaming_response
self.response_iterator = self.streaming_response
self.json_mode = json_mode
def chunk_parser(
self, chunk: dict
) -> Union[GenericStreamingChunk, ModelResponseStream]:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
# Sync iterator
def __iter__(self):
return self
def _handle_string_chunk(
self, str_line: str
) -> Union[GenericStreamingChunk, ModelResponseStream]:
# chunk is a str at this point
if "[DONE]" in str_line:
return GenericStreamingChunk(
text="",
is_finished=True,
finish_reason="stop",
usage=None,
index=0,
tool_use=None,
)
elif str_line.startswith("data:"):
data_json = json.loads(str_line[5:])
return self.chunk_parser(chunk=data_json)
else:
return GenericStreamingChunk(
text="",
is_finished=False,
finish_reason="",
usage=None,
index=0,
tool_use=None,
)
def __next__(self):
try:
chunk = self.response_iterator.__next__()
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
str_line = chunk
if isinstance(chunk, bytes): # Handle binary data
str_line = chunk.decode("utf-8") # Convert bytes to string
index = str_line.find("data:")
if index != -1:
str_line = str_line[index:]
# chunk is a str at this point
return self._handle_string_chunk(str_line=str_line)
except StopIteration:
raise StopIteration
except ValueError as e:
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
# Async iterator
def __aiter__(self):
self.async_response_iterator = self.streaming_response.__aiter__()
return self
async def __anext__(self):
try:
chunk = await self.async_response_iterator.__anext__()
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error receiving chunk from stream: {e}")
try:
str_line = chunk
if isinstance(chunk, bytes): # Handle binary data
str_line = chunk.decode("utf-8") # Convert bytes to string
index = str_line.find("data:")
if index != -1:
str_line = str_line[index:]
# chunk is a str at this point
return self._handle_string_chunk(str_line=str_line)
except StopAsyncIteration:
raise StopAsyncIteration
except ValueError as e:
raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
class FakeStreamResponseIterator:
def __init__(self, model_response, json_mode: Optional[bool] = False):
self.model_response = model_response
self.json_mode = json_mode
self.is_done = False
# Sync iterator
def __iter__(self):
return self
@abstractmethod
def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
pass
def __next__(self):
if self.is_done:
raise StopIteration
self.is_done = True
return self.chunk_parser(self.model_response)
# Async iterator
def __aiter__(self):
return self
async def __anext__(self):
if self.is_done:
raise StopAsyncIteration
self.is_done = True
return self.chunk_parser(self.model_response)