|
import asyncio |
|
import json |
|
import time |
|
from typing import Callable, List, Union |
|
|
|
import litellm |
|
from litellm.llms.custom_httpx.http_handler import ( |
|
AsyncHTTPHandler, |
|
HTTPHandler, |
|
_get_httpx_client, |
|
get_async_httpx_client, |
|
) |
|
from litellm.types.llms.openai import AllMessageValues |
|
from litellm.utils import CustomStreamWrapper, ModelResponse |
|
|
|
from ..common_utils import ReplicateError |
|
from .transformation import ReplicateConfig |
|
|
|
replicate_config = ReplicateConfig() |
|
|
|
|
|
|
|
def handle_prediction_response_streaming( |
|
prediction_url, api_token, print_verbose, headers: dict, http_client: HTTPHandler |
|
): |
|
previous_output = "" |
|
output_string = "" |
|
|
|
status = "" |
|
while True and (status not in ["succeeded", "failed", "canceled"]): |
|
time.sleep(0.5) |
|
print_verbose(f"replicate: polling endpoint: {prediction_url}") |
|
response = http_client.get(prediction_url, headers=headers) |
|
if response.status_code == 200: |
|
response_data = response.json() |
|
status = response_data["status"] |
|
if "output" in response_data: |
|
try: |
|
output_string = "".join(response_data["output"]) |
|
except Exception: |
|
raise ReplicateError( |
|
status_code=422, |
|
message="Unable to parse response. Got={}".format( |
|
response_data["output"] |
|
), |
|
headers=response.headers, |
|
) |
|
new_output = output_string[len(previous_output) :] |
|
print_verbose(f"New chunk: {new_output}") |
|
yield {"output": new_output, "status": status} |
|
previous_output = output_string |
|
status = response_data["status"] |
|
if status == "failed": |
|
replicate_error = response_data.get("error", "") |
|
raise ReplicateError( |
|
status_code=400, |
|
message=f"Error: {replicate_error}", |
|
headers=response.headers, |
|
) |
|
else: |
|
|
|
print_verbose( |
|
f"Replicate: Failed to fetch prediction status and output.{response.status_code}{response.text}" |
|
) |
|
|
|
|
|
|
|
async def async_handle_prediction_response_streaming( |
|
prediction_url, |
|
api_token, |
|
print_verbose, |
|
headers: dict, |
|
http_client: AsyncHTTPHandler, |
|
): |
|
previous_output = "" |
|
output_string = "" |
|
|
|
status = "" |
|
while True and (status not in ["succeeded", "failed", "canceled"]): |
|
await asyncio.sleep(0.5) |
|
print_verbose(f"replicate: polling endpoint: {prediction_url}") |
|
response = await http_client.get(prediction_url, headers=headers) |
|
if response.status_code == 200: |
|
response_data = response.json() |
|
status = response_data["status"] |
|
if "output" in response_data: |
|
try: |
|
output_string = "".join(response_data["output"]) |
|
except Exception: |
|
raise ReplicateError( |
|
status_code=422, |
|
message="Unable to parse response. Got={}".format( |
|
response_data["output"] |
|
), |
|
headers=response.headers, |
|
) |
|
new_output = output_string[len(previous_output) :] |
|
print_verbose(f"New chunk: {new_output}") |
|
yield {"output": new_output, "status": status} |
|
previous_output = output_string |
|
status = response_data["status"] |
|
if status == "failed": |
|
replicate_error = response_data.get("error", "") |
|
raise ReplicateError( |
|
status_code=400, |
|
message=f"Error: {replicate_error}", |
|
headers=response.headers, |
|
) |
|
else: |
|
|
|
print_verbose( |
|
f"Replicate: Failed to fetch prediction status and output.{response.status_code}{response.text}" |
|
) |
|
|
|
|
|
|
|
def completion( |
|
model: str, |
|
messages: list, |
|
api_base: str, |
|
model_response: ModelResponse, |
|
print_verbose: Callable, |
|
optional_params: dict, |
|
litellm_params: dict, |
|
logging_obj, |
|
api_key, |
|
encoding, |
|
custom_prompt_dict={}, |
|
logger_fn=None, |
|
acompletion=None, |
|
headers={}, |
|
) -> Union[ModelResponse, CustomStreamWrapper]: |
|
headers = replicate_config.validate_environment( |
|
api_key=api_key, |
|
headers=headers, |
|
model=model, |
|
messages=messages, |
|
optional_params=optional_params, |
|
) |
|
|
|
version_id = replicate_config.model_to_version_id(model) |
|
input_data = replicate_config.transform_request( |
|
model=model, |
|
messages=messages, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
headers=headers, |
|
) |
|
|
|
if acompletion is not None and acompletion is True: |
|
return async_completion( |
|
model_response=model_response, |
|
model=model, |
|
encoding=encoding, |
|
messages=messages, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
version_id=version_id, |
|
input_data=input_data, |
|
api_key=api_key, |
|
api_base=api_base, |
|
logging_obj=logging_obj, |
|
print_verbose=print_verbose, |
|
headers=headers, |
|
) |
|
|
|
model_response.created = int( |
|
time.time() |
|
) |
|
|
|
prediction_url = replicate_config.get_complete_url( |
|
api_base=api_base, model=model, optional_params=optional_params |
|
) |
|
|
|
|
|
httpx_client = _get_httpx_client( |
|
params={"timeout": 600.0}, |
|
) |
|
response = httpx_client.post( |
|
url=prediction_url, |
|
headers=headers, |
|
data=json.dumps(input_data), |
|
) |
|
|
|
prediction_url = replicate_config.get_prediction_url(response) |
|
|
|
|
|
if "stream" in optional_params and optional_params["stream"] is True: |
|
print_verbose("streaming request") |
|
_response = handle_prediction_response_streaming( |
|
prediction_url, |
|
api_key, |
|
print_verbose, |
|
headers=headers, |
|
http_client=httpx_client, |
|
) |
|
return CustomStreamWrapper(_response, model, logging_obj=logging_obj, custom_llm_provider="replicate") |
|
else: |
|
for retry in range(litellm.DEFAULT_REPLICATE_POLLING_RETRIES): |
|
time.sleep( |
|
litellm.DEFAULT_REPLICATE_POLLING_DELAY_SECONDS + 2 * retry |
|
) |
|
response = httpx_client.get(url=prediction_url, headers=headers) |
|
if ( |
|
response.status_code == 200 |
|
and response.json().get("status") == "processing" |
|
): |
|
continue |
|
return litellm.ReplicateConfig().transform_response( |
|
model=model, |
|
raw_response=response, |
|
model_response=model_response, |
|
logging_obj=logging_obj, |
|
api_key=api_key, |
|
request_data=input_data, |
|
messages=messages, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
encoding=encoding, |
|
) |
|
|
|
raise ReplicateError( |
|
status_code=500, |
|
message="No response received from Replicate API after max retries", |
|
headers=None, |
|
) |
|
|
|
|
|
async def async_completion( |
|
model_response: ModelResponse, |
|
model: str, |
|
messages: List[AllMessageValues], |
|
encoding, |
|
optional_params: dict, |
|
litellm_params: dict, |
|
version_id, |
|
input_data, |
|
api_key, |
|
api_base, |
|
logging_obj, |
|
print_verbose, |
|
headers: dict, |
|
) -> Union[ModelResponse, CustomStreamWrapper]: |
|
|
|
prediction_url = replicate_config.get_complete_url( |
|
api_base=api_base, model=model, optional_params=optional_params |
|
) |
|
async_handler = get_async_httpx_client( |
|
llm_provider=litellm.LlmProviders.REPLICATE, |
|
params={"timeout": 600.0}, |
|
) |
|
response = await async_handler.post( |
|
url=prediction_url, headers=headers, data=json.dumps(input_data) |
|
) |
|
prediction_url = replicate_config.get_prediction_url(response) |
|
|
|
if "stream" in optional_params and optional_params["stream"] is True: |
|
_response = async_handle_prediction_response_streaming( |
|
prediction_url, |
|
api_key, |
|
print_verbose, |
|
headers=headers, |
|
http_client=async_handler, |
|
) |
|
return CustomStreamWrapper(_response, model, logging_obj=logging_obj, custom_llm_provider="replicate") |
|
|
|
for retry in range(litellm.DEFAULT_REPLICATE_POLLING_RETRIES): |
|
await asyncio.sleep( |
|
litellm.DEFAULT_REPLICATE_POLLING_DELAY_SECONDS + 2 * retry |
|
) |
|
response = await async_handler.get(url=prediction_url, headers=headers) |
|
if ( |
|
response.status_code == 200 |
|
and response.json().get("status") == "processing" |
|
): |
|
continue |
|
return litellm.ReplicateConfig().transform_response( |
|
model=model, |
|
raw_response=response, |
|
model_response=model_response, |
|
logging_obj=logging_obj, |
|
api_key=api_key, |
|
request_data=input_data, |
|
messages=messages, |
|
optional_params=optional_params, |
|
litellm_params=litellm_params, |
|
encoding=encoding, |
|
) |
|
|
|
raise ReplicateError( |
|
status_code=500, |
|
message="No response received from Replicate API after max retries", |
|
headers=None, |
|
) |
|
|