File size: 4,930 Bytes
246d201 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
import asyncio
from functools import partial
from typing import Any
from litellm import acompletion as litellm_acompletion
from openhands.core.exceptions import UserCancelledError
from openhands.core.logger import openhands_logger as logger
from openhands.llm.llm import (
LLM,
LLM_RETRY_EXCEPTIONS,
REASONING_EFFORT_SUPPORTED_MODELS,
)
from openhands.utils.shutdown_listener import should_continue
class AsyncLLM(LLM):
"""Asynchronous LLM class."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._async_completion = partial(
self._call_acompletion,
model=self.config.model,
api_key=self.config.api_key.get_secret_value()
if self.config.api_key
else None,
base_url=self.config.base_url,
api_version=self.config.api_version,
custom_llm_provider=self.config.custom_llm_provider,
max_tokens=self.config.max_output_tokens,
timeout=self.config.timeout,
temperature=self.config.temperature,
top_p=self.config.top_p,
drop_params=self.config.drop_params,
)
async_completion_unwrapped = self._async_completion
@self.retry_decorator(
num_retries=self.config.num_retries,
retry_exceptions=LLM_RETRY_EXCEPTIONS,
retry_min_wait=self.config.retry_min_wait,
retry_max_wait=self.config.retry_max_wait,
retry_multiplier=self.config.retry_multiplier,
)
async def async_completion_wrapper(*args, **kwargs):
"""Wrapper for the litellm acompletion function that adds logging and cost tracking."""
messages: list[dict[str, Any]] | dict[str, Any] = []
# some callers might send the model and messages directly
# litellm allows positional args, like completion(model, messages, **kwargs)
# see llm.py for more details
if len(args) > 1:
messages = args[1] if len(args) > 1 else args[0]
kwargs['messages'] = messages
# remove the first args, they're sent in kwargs
args = args[2:]
elif 'messages' in kwargs:
messages = kwargs['messages']
# Set reasoning effort for models that support it
if self.config.model.lower() in REASONING_EFFORT_SUPPORTED_MODELS:
kwargs['reasoning_effort'] = self.config.reasoning_effort
# ensure we work with a list of messages
messages = messages if isinstance(messages, list) else [messages]
# if we have no messages, something went very wrong
if not messages:
raise ValueError(
'The messages list is empty. At least one message is required.'
)
self.log_prompt(messages)
async def check_stopped():
while should_continue():
if (
hasattr(self.config, 'on_cancel_requested_fn')
and self.config.on_cancel_requested_fn is not None
and await self.config.on_cancel_requested_fn()
):
return
await asyncio.sleep(0.1)
stop_check_task = asyncio.create_task(check_stopped())
try:
# Directly call and await litellm_acompletion
resp = await async_completion_unwrapped(*args, **kwargs)
message_back = resp['choices'][0]['message']['content']
self.log_response(message_back)
# log costs and tokens used
self._post_completion(resp)
# We do not support streaming in this method, thus return resp
return resp
except UserCancelledError:
logger.debug('LLM request cancelled by user.')
raise
except Exception as e:
logger.error(f'Completion Error occurred:\n{e}')
raise
finally:
await asyncio.sleep(0.1)
stop_check_task.cancel()
try:
await stop_check_task
except asyncio.CancelledError:
pass
self._async_completion = async_completion_wrapper # type: ignore
async def _call_acompletion(self, *args, **kwargs):
"""Wrapper for the litellm acompletion function."""
# Used in testing?
return await litellm_acompletion(*args, **kwargs)
@property
def async_completion(self):
"""Decorator for the async litellm acompletion function."""
return self._async_completion
|