|
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait |
|
from typing import List, Optional |
|
|
|
import litellm |
|
from litellm._logging import print_verbose |
|
from litellm.utils import get_optional_params |
|
|
|
from ..llms.vllm.completion import handler as vllm_handler |
|
|
|
|
|
def batch_completion( |
|
model: str, |
|
|
|
messages: List = [], |
|
functions: Optional[List] = None, |
|
function_call: Optional[str] = None, |
|
temperature: Optional[float] = None, |
|
top_p: Optional[float] = None, |
|
n: Optional[int] = None, |
|
stream: Optional[bool] = None, |
|
stop=None, |
|
max_tokens: Optional[int] = None, |
|
presence_penalty: Optional[float] = None, |
|
frequency_penalty: Optional[float] = None, |
|
logit_bias: Optional[dict] = None, |
|
user: Optional[str] = None, |
|
deployment_id=None, |
|
request_timeout: Optional[int] = None, |
|
timeout: Optional[int] = 600, |
|
max_workers: Optional[int] = 100, |
|
|
|
**kwargs, |
|
): |
|
""" |
|
Batch litellm.completion function for a given model. |
|
|
|
Args: |
|
model (str): The model to use for generating completions. |
|
messages (List, optional): List of messages to use as input for generating completions. Defaults to []. |
|
functions (List, optional): List of functions to use as input for generating completions. Defaults to []. |
|
function_call (str, optional): The function call to use as input for generating completions. Defaults to "". |
|
temperature (float, optional): The temperature parameter for generating completions. Defaults to None. |
|
top_p (float, optional): The top-p parameter for generating completions. Defaults to None. |
|
n (int, optional): The number of completions to generate. Defaults to None. |
|
stream (bool, optional): Whether to stream completions or not. Defaults to None. |
|
stop (optional): The stop parameter for generating completions. Defaults to None. |
|
max_tokens (float, optional): The maximum number of tokens to generate. Defaults to None. |
|
presence_penalty (float, optional): The presence penalty for generating completions. Defaults to None. |
|
frequency_penalty (float, optional): The frequency penalty for generating completions. Defaults to None. |
|
logit_bias (dict, optional): The logit bias for generating completions. Defaults to {}. |
|
user (str, optional): The user string for generating completions. Defaults to "". |
|
deployment_id (optional): The deployment ID for generating completions. Defaults to None. |
|
request_timeout (int, optional): The request timeout for generating completions. Defaults to None. |
|
max_workers (int,optional): The maximum number of threads to use for parallel processing. |
|
|
|
Returns: |
|
list: A list of completion results. |
|
""" |
|
args = locals() |
|
|
|
batch_messages = messages |
|
completions = [] |
|
model = model |
|
custom_llm_provider = None |
|
if model.split("/", 1)[0] in litellm.provider_list: |
|
custom_llm_provider = model.split("/", 1)[0] |
|
model = model.split("/", 1)[1] |
|
if custom_llm_provider == "vllm": |
|
optional_params = get_optional_params( |
|
functions=functions, |
|
function_call=function_call, |
|
temperature=temperature, |
|
top_p=top_p, |
|
n=n, |
|
stream=stream or False, |
|
stop=stop, |
|
max_tokens=max_tokens, |
|
presence_penalty=presence_penalty, |
|
frequency_penalty=frequency_penalty, |
|
logit_bias=logit_bias, |
|
user=user, |
|
|
|
model=model, |
|
custom_llm_provider=custom_llm_provider, |
|
) |
|
results = vllm_handler.batch_completions( |
|
model=model, |
|
messages=batch_messages, |
|
custom_prompt_dict=litellm.custom_prompt_dict, |
|
optional_params=optional_params, |
|
) |
|
|
|
else: |
|
|
|
def chunks(lst, n): |
|
"""Yield successive n-sized chunks from lst.""" |
|
for i in range(0, len(lst), n): |
|
yield lst[i : i + n] |
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor: |
|
for sub_batch in chunks(batch_messages, 100): |
|
for message_list in sub_batch: |
|
kwargs_modified = args.copy() |
|
kwargs_modified.pop("max_workers") |
|
kwargs_modified["messages"] = message_list |
|
original_kwargs = {} |
|
if "kwargs" in kwargs_modified: |
|
original_kwargs = kwargs_modified.pop("kwargs") |
|
future = executor.submit( |
|
litellm.completion, **kwargs_modified, **original_kwargs |
|
) |
|
completions.append(future) |
|
|
|
|
|
|
|
|
|
results = [] |
|
for future in completions: |
|
try: |
|
results.append(future.result()) |
|
except Exception as exc: |
|
results.append(exc) |
|
|
|
return results |
|
|
|
|
|
|
|
|
|
def batch_completion_models(*args, **kwargs): |
|
""" |
|
Send a request to multiple language models concurrently and return the response |
|
as soon as one of the models responds. |
|
|
|
Args: |
|
*args: Variable-length positional arguments passed to the completion function. |
|
**kwargs: Additional keyword arguments: |
|
- models (str or list of str): The language models to send requests to. |
|
- Other keyword arguments to be passed to the completion function. |
|
|
|
Returns: |
|
str or None: The response from one of the language models, or None if no response is received. |
|
|
|
Note: |
|
This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. |
|
It sends requests concurrently and returns the response from the first model that responds. |
|
""" |
|
|
|
if "model" in kwargs: |
|
kwargs.pop("model") |
|
if "models" in kwargs: |
|
models = kwargs["models"] |
|
kwargs.pop("models") |
|
futures = {} |
|
with ThreadPoolExecutor(max_workers=len(models)) as executor: |
|
for model in models: |
|
futures[model] = executor.submit( |
|
litellm.completion, *args, model=model, **kwargs |
|
) |
|
|
|
for model, future in sorted( |
|
futures.items(), key=lambda x: models.index(x[0]) |
|
): |
|
if future.result() is not None: |
|
return future.result() |
|
elif "deployments" in kwargs: |
|
deployments = kwargs["deployments"] |
|
kwargs.pop("deployments") |
|
kwargs.pop("model_list") |
|
nested_kwargs = kwargs.pop("kwargs", {}) |
|
futures = {} |
|
with ThreadPoolExecutor(max_workers=len(deployments)) as executor: |
|
for deployment in deployments: |
|
for key in kwargs.keys(): |
|
if ( |
|
key not in deployment |
|
): |
|
deployment[key] = kwargs[key] |
|
kwargs = {**deployment, **nested_kwargs} |
|
futures[deployment["model"]] = executor.submit( |
|
litellm.completion, **kwargs |
|
) |
|
|
|
while futures: |
|
|
|
print_verbose("\n\n waiting for next result\n\n") |
|
done, _ = wait(futures.values(), return_when=FIRST_COMPLETED) |
|
print_verbose(f"done list\n{done}") |
|
for future in done: |
|
try: |
|
result = future.result() |
|
return result |
|
except Exception: |
|
|
|
print_verbose( |
|
"\n\ngot an exception, ignoring, removing from futures" |
|
) |
|
print_verbose(futures) |
|
new_futures = {} |
|
for key, value in futures.items(): |
|
if future == value: |
|
print_verbose(f"removing key{key}") |
|
continue |
|
else: |
|
new_futures[key] = value |
|
futures = new_futures |
|
print_verbose(f"new futures{futures}") |
|
continue |
|
|
|
print_verbose("\n\ndone looping through futures\n\n") |
|
print_verbose(futures) |
|
|
|
return None |
|
|
|
|
|
def batch_completion_models_all_responses(*args, **kwargs): |
|
""" |
|
Send a request to multiple language models concurrently and return a list of responses |
|
from all models that respond. |
|
|
|
Args: |
|
*args: Variable-length positional arguments passed to the completion function. |
|
**kwargs: Additional keyword arguments: |
|
- models (str or list of str): The language models to send requests to. |
|
- Other keyword arguments to be passed to the completion function. |
|
|
|
Returns: |
|
list: A list of responses from the language models that responded. |
|
|
|
Note: |
|
This function utilizes a ThreadPoolExecutor to parallelize requests to multiple models. |
|
It sends requests concurrently and collects responses from all models that respond. |
|
""" |
|
import concurrent.futures |
|
|
|
|
|
|
|
if "model" in kwargs: |
|
kwargs.pop("model") |
|
if "models" in kwargs: |
|
models = kwargs["models"] |
|
kwargs.pop("models") |
|
else: |
|
raise Exception("'models' param not in kwargs") |
|
|
|
responses = [] |
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=len(models)) as executor: |
|
for idx, model in enumerate(models): |
|
future = executor.submit(litellm.completion, *args, model=model, **kwargs) |
|
if future.result() is not None: |
|
responses.append(future.result()) |
|
|
|
return responses |
|
|