import os import io import json import time import queue import logging import streamlit as st from dotenv import load_dotenv from PIL import Image from streamlit import session_state as ss # Optional: for direct Assistants API usage: # from openai import OpenAI # But we'll also show a LangChain approach: from langchain.agents.openai_assistant import OpenAIAssistantRunnable from langchain_core.agents import AgentFinish # If you want to handle final states, etc. ############################################# # 1) ENV & BASIC LOGGING ############################################# load_dotenv() logging.basicConfig(format="[%(asctime)s] %(levelname)+8s: %(message)s") logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") ASSISTANT_ID = os.getenv("ASSISTANT_ID_SOLUTION_SPECIFIER_A") ############################################# # 2) CREATE YOUR ASSISTANT RUNNABLE ############################################# if not OPENAI_API_KEY or not ASSISTANT_ID: raise ValueError("Missing OPENAI_API_KEY or ASSISTANT_ID in environment.") assistant_runnable = OpenAIAssistantRunnable( assistant_id=ASSISTANT_ID, api_key=OPENAI_API_KEY, as_agent=True ) # We’ll store a queue for function calls (tools) we want to handle: if "tool_requests" not in ss: ss["tool_requests"] = queue.Queue() ############################################# # 3) OPTIONAL: EXAMPLE CUSTOM FUNCTION (TOOL) ############################################# def hello_world(name: str) -> str: """Example function to show how to handle 'requires_action' or function calls.""" time.sleep(3) return f"Hello, {name}! This greeting took 3s." ############################################# # 4) STREAMING HANDLER ############################################# def data_streamer(stream_events): """ Generator that processes streaming events from the Assistants API. Yields either text, images, or triggers a function call queue item. """ st.toast("Thinking...", icon="🤔") content_produced = False # We'll mimic the logic in that Medium article: for response in stream_events: event_type = response.event if event_type == "thread.message.delta": # The model is streaming partial text or possibly an image content = response.data.delta.content[0] # Typically a list of 1 item content_type = content.type if content_type == "text": text_value = content.text.value content_produced = True yield text_value # yield text tokens elif content_type == "image_file": # The Assistant can output images file_id = content.image_file.file_id # You can retrieve the file from the OpenAI Assistants API, e.g. # image_bytes = client.files.content(file_id).read() # but with LangChain's current approach, we don't have that convenience method exposed. # We'll skip a real API call for brevity: st.warning("Image streaming not fully implemented in this snippet.") # yield an "Image" object if you have it # yield Image.open(...) elif event_type == "thread.run.requires_action": # The Assistant wants to call a function logger.info("Run requires action (function call) – queueing it.") ss["tool_requests"].put(response) # If no text was produced yet, yield a placeholder if not content_produced: yield "[Assistant is requesting a function call]" # Return so we can handle the function call return elif event_type == "thread.run.failed": st.error("Run has failed.") return st.toast("Done.", icon="✅") def display_stream(stream_iterator, new_chat_context=True): """ Wraps the `data_streamer` generator and writes to Streamlit in real-time. If `new_chat_context=True`, we put the response in a dedicated assistant chat bubble. """ if new_chat_context: with st.chat_message("assistant"): response = st.write_stream(data_streamer(stream_iterator)) else: # If we are continuing inside the same bubble (like after a function call), # we skip creating a new chat bubble. response = st.write_stream(data_streamer(stream_iterator)) return response ############################################# # 5) ACTUAL APP ############################################# def main(): st.set_page_config(page_title="Streamlit + Assistants Demo", layout="centered") st.title("Enhanced Assistant Demo") # Initialize messages if "messages" not in ss: ss.messages = [] # Display previous messages for msg in ss.messages: with st.chat_message(msg["role"]): st.write(msg["content"]) # -- (A) FILE UPLOAD DEMO -- # If you want the user to upload a CSV and pass it to the assistant, do so here. uploaded_file = st.file_uploader("Upload a CSV for the assistant to analyze (optional)", type=["csv"]) if uploaded_file: st.write("We won't fully implement code interpreter logic here, but you could pass it in as a tool resource.") # For example, you might store it in the code interpreter or do a vector search, etc. # -- (B) Chat Input -- user_input = st.chat_input("Ask me anything or request a function call...") if user_input: # Show user's message with st.chat_message("user"): st.write(user_input) ss.messages.append({"role": "user", "content": user_input}) # (C) Actually run the assistant in "streaming mode" # For a brand-new conversation, omit thread_id. Otherwise, pass an existing one. # We'll store one globally in session_state for continuity. if "thread_id" not in ss: ss["thread_id"] = None # If we have no thread_id yet, this is a fresh conversation if ss["thread_id"] is None: resp = assistant_runnable.invoke({"content": user_input}) ss["thread_id"] = resp.thread_id # For a single-turn request (non-streaming): # resp_text = resp.return_values["output"] # st.write(resp_text) # But let's do streaming. The tricky part: langchain’s `invoke` returns # the final message rather than a streaming generator. So, to do streaming, # we can call the underlying Assistants API directly. Or we can do a special # approach that merges the new article's logic. # For demonstration, let's store the final message in a new chat bubble: final_text = resp.return_values["output"] with st.chat_message("assistant"): st.write(final_text) ss.messages.append({"role": "assistant", "content": final_text}) else: # We have an existing thread. Let's continue the conversation with streaming # We'll do that using the new openai client approach or via the # same approach as the Medium article. But that means we need direct access # to the thread, which we can do by "cheating" with the raw python SDK or by # implementing a custom loop with the AgentExecutor. # # For demonstration, let's do something *conceptual*: from openai import OpenAI openai_client = OpenAI(api_key=OPENAI_API_KEY) # We'll do a 'threads.runs.stream' call: with openai_client.beta.threads.runs.stream( thread_id=ss["thread_id"], assistant_id=ASSISTANT_ID, ) as stream: # We have to add the user's message to the thread first: openai_client.beta.threads.messages.create( thread_id=ss["thread_id"], role="user", content=user_input ) # Now the assistant responds in the stream: display_stream(stream, new_chat_context=True) # If there's a function call required: while not ss["tool_requests"].empty(): with st.chat_message("assistant"): tool_request = ss["tool_requests"].get() tool_outputs, thread_id, run_id = handle_requires_action(tool_request) with openai_client.beta.threads.runs.submit_tool_outputs_stream( thread_id=thread_id, run_id=run_id, tool_outputs=tool_outputs ) as tool_stream: display_stream(tool_stream, new_chat_context=False) st.write("---") st.info("This is a demo of combining streaming, function calls, and file upload.") def handle_requires_action(tool_request): """ This function is triggered when the assistant tries to call a function mid-run. We parse the arguments, call the function, and return the outputs so the run can continue. """ st.toast("Assistant is requesting a function call...", icon="🔧") data = tool_request.data tool_outputs = [] # The list of tools the assistant wants to call if not hasattr(data.required_action.submit_tool_outputs, "tool_calls"): st.error("No tool calls found in the request.") return [], data.thread_id, data.id for tc in data.required_action.submit_tool_outputs.tool_calls: func_name = tc.function.name func_args = json.loads(tc.function.arguments or "{}") if func_name == "hello_world": name_str = func_args.get("name", "Anonymous") result = hello_world(name_str) # Return the output to the assistant tool_outputs.append({ "tool_call_id": tc.id, "output": result }) else: # Unrecognized function error_msg = f"Function '{func_name}' not recognized." tool_outputs.append({ "tool_call_id": tc.id, "output": json.dumps({"error": error_msg}) }) return tool_outputs, data.thread_id, data.id if __name__ == "__main__": main()