Tai Truong
fix readme
d202ada
from typing import Any
from openai.lib.streaming import AssistantEventHandler
from langflow.base.astra_assistants.util import get_patched_openai_client
from langflow.custom.custom_component.component_with_cache import ComponentWithCache
from langflow.inputs import MultilineInput
from langflow.schema import dotdict
from langflow.schema.message import Message
from langflow.template import Output
class AssistantsRun(ComponentWithCache):
display_name = "Run Assistant"
description = "Executes an Assistant Run against a thread"
icon = "AstraDB"
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.client = get_patched_openai_client(self._shared_component_cache)
self.thread_id = None
def update_build_config(
self,
build_config: dotdict,
field_value: Any,
field_name: str | None = None,
) -> None:
if field_name == "thread_id":
if field_value is None:
thread = self.client.beta.threads.create()
self.thread_id = thread.id
build_config["thread_id"] = field_value
inputs = [
MultilineInput(
name="assistant_id",
display_name="Assistant ID",
info=(
"The ID of the assistant to run. \n\n"
"Can be retrieved using the List Assistants component or created with the Create Assistant component."
),
),
MultilineInput(
name="user_message",
display_name="User Message",
info="User message to pass to the run.",
),
MultilineInput(
name="thread_id",
display_name="Thread ID",
required=False,
info="Thread ID to use with the run. If not provided, a new thread will be created.",
),
MultilineInput(
name="env_set",
display_name="Environment Set",
info="Dummy input to allow chaining with Dotenv Component.",
),
]
outputs = [Output(display_name="Assistant Response", name="assistant_response", method="process_inputs")]
def process_inputs(self) -> Message:
text = ""
if self.thread_id is None:
thread = self.client.beta.threads.create()
self.thread_id = thread.id
# add the user message
self.client.beta.threads.messages.create(thread_id=self.thread_id, role="user", content=self.user_message)
class EventHandler(AssistantEventHandler):
def __init__(self) -> None:
super().__init__()
def on_exception(self, exception: Exception) -> None:
raise exception
event_handler = EventHandler()
with self.client.beta.threads.runs.create_and_stream(
thread_id=self.thread_id,
assistant_id=self.assistant_id,
event_handler=event_handler,
) as stream:
for part in stream.text_deltas:
text += part
return Message(text=text)