Spaces:
Sleeping
Sleeping
import os | |
import io | |
import json | |
import time | |
import queue | |
import logging | |
import streamlit as st | |
from dotenv import load_dotenv | |
from PIL import Image | |
from streamlit import session_state as ss | |
# Optional: for direct Assistants API usage: | |
# from openai import OpenAI | |
# But we'll also show a LangChain approach: | |
from langchain.agents.openai_assistant import OpenAIAssistantRunnable | |
from langchain_core.agents import AgentFinish # If you want to handle final states, etc. | |
############################################# | |
# 1) ENV & BASIC LOGGING | |
############################################# | |
load_dotenv() | |
logging.basicConfig(format="[%(asctime)s] %(levelname)+8s: %(message)s") | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
ASSISTANT_ID = os.getenv("ASSISTANT_ID_SOLUTION_SPECIFIER_A") | |
############################################# | |
# 2) CREATE YOUR ASSISTANT RUNNABLE | |
############################################# | |
if not OPENAI_API_KEY or not ASSISTANT_ID: | |
raise ValueError("Missing OPENAI_API_KEY or ASSISTANT_ID in environment.") | |
assistant_runnable = OpenAIAssistantRunnable( | |
assistant_id=ASSISTANT_ID, | |
api_key=OPENAI_API_KEY, | |
as_agent=True | |
) | |
# We’ll store a queue for function calls (tools) we want to handle: | |
if "tool_requests" not in ss: | |
ss["tool_requests"] = queue.Queue() | |
############################################# | |
# 3) OPTIONAL: EXAMPLE CUSTOM FUNCTION (TOOL) | |
############################################# | |
def hello_world(name: str) -> str: | |
"""Example function to show how to handle 'requires_action' or function calls.""" | |
time.sleep(3) | |
return f"Hello, {name}! This greeting took 3s." | |
############################################# | |
# 4) STREAMING HANDLER | |
############################################# | |
def data_streamer(stream_events): | |
""" | |
Generator that processes streaming events from the Assistants API. | |
Yields either text, images, or triggers a function call queue item. | |
""" | |
st.toast("Thinking...", icon="🤔") | |
content_produced = False | |
# We'll mimic the logic in that Medium article: | |
for response in stream_events: | |
event_type = response.event | |
if event_type == "thread.message.delta": | |
# The model is streaming partial text or possibly an image | |
content = response.data.delta.content[0] # Typically a list of 1 item | |
content_type = content.type | |
if content_type == "text": | |
text_value = content.text.value | |
content_produced = True | |
yield text_value # yield text tokens | |
elif content_type == "image_file": | |
# The Assistant can output images | |
file_id = content.image_file.file_id | |
# You can retrieve the file from the OpenAI Assistants API, e.g. | |
# image_bytes = client.files.content(file_id).read() | |
# but with LangChain's current approach, we don't have that convenience method exposed. | |
# We'll skip a real API call for brevity: | |
st.warning("Image streaming not fully implemented in this snippet.") | |
# yield an "Image" object if you have it | |
# yield Image.open(...) | |
elif event_type == "thread.run.requires_action": | |
# The Assistant wants to call a function | |
logger.info("Run requires action (function call) – queueing it.") | |
ss["tool_requests"].put(response) | |
# If no text was produced yet, yield a placeholder | |
if not content_produced: | |
yield "[Assistant is requesting a function call]" | |
# Return so we can handle the function call | |
return | |
elif event_type == "thread.run.failed": | |
st.error("Run has failed.") | |
return | |
st.toast("Done.", icon="✅") | |
def display_stream(stream_iterator, new_chat_context=True): | |
""" | |
Wraps the `data_streamer` generator and writes to Streamlit in real-time. | |
If `new_chat_context=True`, we put the response in a dedicated assistant chat bubble. | |
""" | |
if new_chat_context: | |
with st.chat_message("assistant"): | |
response = st.write_stream(data_streamer(stream_iterator)) | |
else: | |
# If we are continuing inside the same bubble (like after a function call), | |
# we skip creating a new chat bubble. | |
response = st.write_stream(data_streamer(stream_iterator)) | |
return response | |
############################################# | |
# 5) ACTUAL APP | |
############################################# | |
def main(): | |
st.set_page_config(page_title="Streamlit + Assistants Demo", layout="centered") | |
st.title("Enhanced Assistant Demo") | |
# Initialize messages | |
if "messages" not in ss: | |
ss.messages = [] | |
# Display previous messages | |
for msg in ss.messages: | |
with st.chat_message(msg["role"]): | |
st.write(msg["content"]) | |
# -- (A) FILE UPLOAD DEMO -- | |
# If you want the user to upload a CSV and pass it to the assistant, do so here. | |
uploaded_file = st.file_uploader("Upload a CSV for the assistant to analyze (optional)", type=["csv"]) | |
if uploaded_file: | |
st.write("We won't fully implement code interpreter logic here, but you could pass it in as a tool resource.") | |
# For example, you might store it in the code interpreter or do a vector search, etc. | |
# -- (B) Chat Input -- | |
user_input = st.chat_input("Ask me anything or request a function call...") | |
if user_input: | |
# Show user's message | |
with st.chat_message("user"): | |
st.write(user_input) | |
ss.messages.append({"role": "user", "content": user_input}) | |
# (C) Actually run the assistant in "streaming mode" | |
# For a brand-new conversation, omit thread_id. Otherwise, pass an existing one. | |
# We'll store one globally in session_state for continuity. | |
if "thread_id" not in ss: | |
ss["thread_id"] = None | |
# If we have no thread_id yet, this is a fresh conversation | |
if ss["thread_id"] is None: | |
resp = assistant_runnable.invoke({"content": user_input}) | |
ss["thread_id"] = resp.thread_id | |
# For a single-turn request (non-streaming): | |
# resp_text = resp.return_values["output"] | |
# st.write(resp_text) | |
# But let's do streaming. The tricky part: langchain’s `invoke` returns | |
# the final message rather than a streaming generator. So, to do streaming, | |
# we can call the underlying Assistants API directly. Or we can do a special | |
# approach that merges the new article's logic. | |
# For demonstration, let's store the final message in a new chat bubble: | |
final_text = resp.return_values["output"] | |
with st.chat_message("assistant"): | |
st.write(final_text) | |
ss.messages.append({"role": "assistant", "content": final_text}) | |
else: | |
# We have an existing thread. Let's continue the conversation with streaming | |
# We'll do that using the new openai client approach or via the | |
# same approach as the Medium article. But that means we need direct access | |
# to the thread, which we can do by "cheating" with the raw python SDK or by | |
# implementing a custom loop with the AgentExecutor. | |
# | |
# For demonstration, let's do something *conceptual*: | |
from openai import OpenAI | |
openai_client = OpenAI(api_key=OPENAI_API_KEY) | |
# We'll do a 'threads.runs.stream' call: | |
with openai_client.beta.threads.runs.stream( | |
thread_id=ss["thread_id"], | |
assistant_id=ASSISTANT_ID, | |
) as stream: | |
# We have to add the user's message to the thread first: | |
openai_client.beta.threads.messages.create( | |
thread_id=ss["thread_id"], | |
role="user", | |
content=user_input | |
) | |
# Now the assistant responds in the stream: | |
display_stream(stream, new_chat_context=True) | |
# If there's a function call required: | |
while not ss["tool_requests"].empty(): | |
with st.chat_message("assistant"): | |
tool_request = ss["tool_requests"].get() | |
tool_outputs, thread_id, run_id = handle_requires_action(tool_request) | |
with openai_client.beta.threads.runs.submit_tool_outputs_stream( | |
thread_id=thread_id, | |
run_id=run_id, | |
tool_outputs=tool_outputs | |
) as tool_stream: | |
display_stream(tool_stream, new_chat_context=False) | |
st.write("---") | |
st.info("This is a demo of combining streaming, function calls, and file upload.") | |
def handle_requires_action(tool_request): | |
""" | |
This function is triggered when the assistant tries to call a function mid-run. | |
We parse the arguments, call the function, and return the outputs so the run can continue. | |
""" | |
st.toast("Assistant is requesting a function call...", icon="🔧") | |
data = tool_request.data | |
tool_outputs = [] | |
# The list of tools the assistant wants to call | |
if not hasattr(data.required_action.submit_tool_outputs, "tool_calls"): | |
st.error("No tool calls found in the request.") | |
return [], data.thread_id, data.id | |
for tc in data.required_action.submit_tool_outputs.tool_calls: | |
func_name = tc.function.name | |
func_args = json.loads(tc.function.arguments or "{}") | |
if func_name == "hello_world": | |
name_str = func_args.get("name", "Anonymous") | |
result = hello_world(name_str) | |
# Return the output to the assistant | |
tool_outputs.append({ | |
"tool_call_id": tc.id, | |
"output": result | |
}) | |
else: | |
# Unrecognized function | |
error_msg = f"Function '{func_name}' not recognized." | |
tool_outputs.append({ | |
"tool_call_id": tc.id, | |
"output": json.dumps({"error": error_msg}) | |
}) | |
return tool_outputs, data.thread_id, data.id | |
if __name__ == "__main__": | |
main() |