|
|
|
|
|
|
|
|
|
import json |
|
from typing import Optional |
|
|
|
import httpx |
|
|
|
import litellm |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
AsyncHTTPHandler, |
|
HTTPHandler, |
|
get_async_httpx_client, |
|
) |
|
from litellm.types.utils import EmbeddingResponse |
|
|
|
from ..common_utils import OpenAILikeBase, OpenAILikeError |
|
|
|
|
|
class OpenAILikeEmbeddingHandler(OpenAILikeBase): |
|
def __init__(self, **kwargs): |
|
pass |
|
|
|
async def aembedding( |
|
self, |
|
input: list, |
|
data: dict, |
|
model_response: EmbeddingResponse, |
|
timeout: float, |
|
api_key: str, |
|
api_base: str, |
|
logging_obj, |
|
headers: dict, |
|
client=None, |
|
) -> EmbeddingResponse: |
|
response = None |
|
try: |
|
if client is None or not isinstance(client, AsyncHTTPHandler): |
|
async_client = get_async_httpx_client( |
|
llm_provider=litellm.LlmProviders.OPENAI, |
|
params={"timeout": timeout}, |
|
) |
|
else: |
|
async_client = client |
|
try: |
|
response = await async_client.post( |
|
api_base, |
|
headers=headers, |
|
data=json.dumps(data), |
|
) |
|
|
|
response.raise_for_status() |
|
|
|
response_json = response.json() |
|
except httpx.HTTPStatusError as e: |
|
raise OpenAILikeError( |
|
status_code=e.response.status_code, |
|
message=e.response.text if e.response else str(e), |
|
) |
|
except httpx.TimeoutException: |
|
raise OpenAILikeError( |
|
status_code=408, message="Timeout error occurred." |
|
) |
|
except Exception as e: |
|
raise OpenAILikeError(status_code=500, message=str(e)) |
|
|
|
|
|
logging_obj.post_call( |
|
input=input, |
|
api_key=api_key, |
|
additional_args={"complete_input_dict": data}, |
|
original_response=response_json, |
|
) |
|
return EmbeddingResponse(**response_json) |
|
except Exception as e: |
|
|
|
logging_obj.post_call( |
|
input=input, |
|
api_key=api_key, |
|
original_response=str(e), |
|
) |
|
raise e |
|
|
|
def embedding( |
|
self, |
|
model: str, |
|
input: list, |
|
timeout: float, |
|
logging_obj, |
|
api_key: Optional[str], |
|
api_base: Optional[str], |
|
optional_params: dict, |
|
model_response: Optional[EmbeddingResponse] = None, |
|
client=None, |
|
aembedding=None, |
|
custom_endpoint: Optional[bool] = None, |
|
headers: Optional[dict] = None, |
|
) -> EmbeddingResponse: |
|
api_base, headers = self._validate_environment( |
|
api_base=api_base, |
|
api_key=api_key, |
|
endpoint_type="embeddings", |
|
headers=headers, |
|
custom_endpoint=custom_endpoint, |
|
) |
|
model = model |
|
data = {"model": model, "input": input, **optional_params} |
|
|
|
|
|
logging_obj.pre_call( |
|
input=input, |
|
api_key=api_key, |
|
additional_args={"complete_input_dict": data, "api_base": api_base}, |
|
) |
|
|
|
if aembedding is True: |
|
return self.aembedding(data=data, input=input, logging_obj=logging_obj, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, headers=headers) |
|
if client is None or isinstance(client, AsyncHTTPHandler): |
|
self.client = HTTPHandler(timeout=timeout) |
|
else: |
|
self.client = client |
|
|
|
|
|
try: |
|
response = self.client.post( |
|
api_base, |
|
headers=headers, |
|
data=json.dumps(data), |
|
) |
|
|
|
response.raise_for_status() |
|
|
|
response_json = response.json() |
|
except httpx.HTTPStatusError as e: |
|
raise OpenAILikeError( |
|
status_code=e.response.status_code, |
|
message=e.response.text, |
|
) |
|
except httpx.TimeoutException: |
|
raise OpenAILikeError(status_code=408, message="Timeout error occurred.") |
|
except Exception as e: |
|
raise OpenAILikeError(status_code=500, message=str(e)) |
|
|
|
|
|
logging_obj.post_call( |
|
input=input, |
|
api_key=api_key, |
|
additional_args={"complete_input_dict": data}, |
|
original_response=response_json, |
|
) |
|
|
|
return litellm.EmbeddingResponse(**response_json) |
|
|