Spaces:
Running
Running
from typing import Any | |
from openai.lib.streaming import AssistantEventHandler | |
from langflow.base.astra_assistants.util import get_patched_openai_client | |
from langflow.custom.custom_component.component_with_cache import ComponentWithCache | |
from langflow.inputs import MultilineInput | |
from langflow.schema import dotdict | |
from langflow.schema.message import Message | |
from langflow.template import Output | |
class AssistantsRun(ComponentWithCache): | |
display_name = "Run Assistant" | |
description = "Executes an Assistant Run against a thread" | |
icon = "AstraDB" | |
def __init__(self, **kwargs) -> None: | |
super().__init__(**kwargs) | |
self.client = get_patched_openai_client(self._shared_component_cache) | |
self.thread_id = None | |
def update_build_config( | |
self, | |
build_config: dotdict, | |
field_value: Any, | |
field_name: str | None = None, | |
) -> None: | |
if field_name == "thread_id": | |
if field_value is None: | |
thread = self.client.beta.threads.create() | |
self.thread_id = thread.id | |
build_config["thread_id"] = field_value | |
inputs = [ | |
MultilineInput( | |
name="assistant_id", | |
display_name="Assistant ID", | |
info=( | |
"The ID of the assistant to run. \n\n" | |
"Can be retrieved using the List Assistants component or created with the Create Assistant component." | |
), | |
), | |
MultilineInput( | |
name="user_message", | |
display_name="User Message", | |
info="User message to pass to the run.", | |
), | |
MultilineInput( | |
name="thread_id", | |
display_name="Thread ID", | |
required=False, | |
info="Thread ID to use with the run. If not provided, a new thread will be created.", | |
), | |
MultilineInput( | |
name="env_set", | |
display_name="Environment Set", | |
info="Dummy input to allow chaining with Dotenv Component.", | |
), | |
] | |
outputs = [Output(display_name="Assistant Response", name="assistant_response", method="process_inputs")] | |
def process_inputs(self) -> Message: | |
text = "" | |
if self.thread_id is None: | |
thread = self.client.beta.threads.create() | |
self.thread_id = thread.id | |
# add the user message | |
self.client.beta.threads.messages.create(thread_id=self.thread_id, role="user", content=self.user_message) | |
class EventHandler(AssistantEventHandler): | |
def __init__(self) -> None: | |
super().__init__() | |
def on_exception(self, exception: Exception) -> None: | |
raise exception | |
event_handler = EventHandler() | |
with self.client.beta.threads.runs.create_and_stream( | |
thread_id=self.thread_id, | |
assistant_id=self.assistant_id, | |
event_handler=event_handler, | |
) as stream: | |
for part in stream.text_deltas: | |
text += part | |
return Message(text=text) | |