Spaces:
Sleeping
Sleeping
File size: 10,457 Bytes
73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 6fffc87 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a 73d49e1 1de6e3a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
import os
import io
import json
import time
import queue
import logging
import streamlit as st
from dotenv import load_dotenv
from PIL import Image
from streamlit import session_state as ss
# Optional: for direct Assistants API usage:
# from openai import OpenAI
# But we'll also show a LangChain approach:
from langchain.agents.openai_assistant import OpenAIAssistantRunnable
from langchain_core.agents import AgentFinish # If you want to handle final states, etc.
#############################################
# 1) ENV & BASIC LOGGING
#############################################
load_dotenv()
logging.basicConfig(format="[%(asctime)s] %(levelname)+8s: %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ASSISTANT_ID = os.getenv("ASSISTANT_ID_SOLUTION_SPECIFIER_A")
#############################################
# 2) CREATE YOUR ASSISTANT RUNNABLE
#############################################
if not OPENAI_API_KEY or not ASSISTANT_ID:
raise ValueError("Missing OPENAI_API_KEY or ASSISTANT_ID in environment.")
assistant_runnable = OpenAIAssistantRunnable(
assistant_id=ASSISTANT_ID,
api_key=OPENAI_API_KEY,
as_agent=True
)
# We’ll store a queue for function calls (tools) we want to handle:
if "tool_requests" not in ss:
ss["tool_requests"] = queue.Queue()
#############################################
# 3) OPTIONAL: EXAMPLE CUSTOM FUNCTION (TOOL)
#############################################
def hello_world(name: str) -> str:
"""Example function to show how to handle 'requires_action' or function calls."""
time.sleep(3)
return f"Hello, {name}! This greeting took 3s."
#############################################
# 4) STREAMING HANDLER
#############################################
def data_streamer(stream_events):
"""
Generator that processes streaming events from the Assistants API.
Yields either text, images, or triggers a function call queue item.
"""
st.toast("Thinking...", icon="🤔")
content_produced = False
# We'll mimic the logic in that Medium article:
for response in stream_events:
event_type = response.event
if event_type == "thread.message.delta":
# The model is streaming partial text or possibly an image
content = response.data.delta.content[0] # Typically a list of 1 item
content_type = content.type
if content_type == "text":
text_value = content.text.value
content_produced = True
yield text_value # yield text tokens
elif content_type == "image_file":
# The Assistant can output images
file_id = content.image_file.file_id
# You can retrieve the file from the OpenAI Assistants API, e.g.
# image_bytes = client.files.content(file_id).read()
# but with LangChain's current approach, we don't have that convenience method exposed.
# We'll skip a real API call for brevity:
st.warning("Image streaming not fully implemented in this snippet.")
# yield an "Image" object if you have it
# yield Image.open(...)
elif event_type == "thread.run.requires_action":
# The Assistant wants to call a function
logger.info("Run requires action (function call) – queueing it.")
ss["tool_requests"].put(response)
# If no text was produced yet, yield a placeholder
if not content_produced:
yield "[Assistant is requesting a function call]"
# Return so we can handle the function call
return
elif event_type == "thread.run.failed":
st.error("Run has failed.")
return
st.toast("Done.", icon="✅")
def display_stream(stream_iterator, new_chat_context=True):
"""
Wraps the `data_streamer` generator and writes to Streamlit in real-time.
If `new_chat_context=True`, we put the response in a dedicated assistant chat bubble.
"""
if new_chat_context:
with st.chat_message("assistant"):
response = st.write_stream(data_streamer(stream_iterator))
else:
# If we are continuing inside the same bubble (like after a function call),
# we skip creating a new chat bubble.
response = st.write_stream(data_streamer(stream_iterator))
return response
#############################################
# 5) ACTUAL APP
#############################################
def main():
st.set_page_config(page_title="Streamlit + Assistants Demo", layout="centered")
st.title("Enhanced Assistant Demo")
# Initialize messages
if "messages" not in ss:
ss.messages = []
# Display previous messages
for msg in ss.messages:
with st.chat_message(msg["role"]):
st.write(msg["content"])
# -- (A) FILE UPLOAD DEMO --
# If you want the user to upload a CSV and pass it to the assistant, do so here.
uploaded_file = st.file_uploader("Upload a CSV for the assistant to analyze (optional)", type=["csv"])
if uploaded_file:
st.write("We won't fully implement code interpreter logic here, but you could pass it in as a tool resource.")
# For example, you might store it in the code interpreter or do a vector search, etc.
# -- (B) Chat Input --
user_input = st.chat_input("Ask me anything or request a function call...")
if user_input:
# Show user's message
with st.chat_message("user"):
st.write(user_input)
ss.messages.append({"role": "user", "content": user_input})
# (C) Actually run the assistant in "streaming mode"
# For a brand-new conversation, omit thread_id. Otherwise, pass an existing one.
# We'll store one globally in session_state for continuity.
if "thread_id" not in ss:
ss["thread_id"] = None
# If we have no thread_id yet, this is a fresh conversation
if ss["thread_id"] is None:
resp = assistant_runnable.invoke({"content": user_input})
ss["thread_id"] = resp.thread_id
# For a single-turn request (non-streaming):
# resp_text = resp.return_values["output"]
# st.write(resp_text)
# But let's do streaming. The tricky part: langchain’s `invoke` returns
# the final message rather than a streaming generator. So, to do streaming,
# we can call the underlying Assistants API directly. Or we can do a special
# approach that merges the new article's logic.
# For demonstration, let's store the final message in a new chat bubble:
final_text = resp.return_values["output"]
with st.chat_message("assistant"):
st.write(final_text)
ss.messages.append({"role": "assistant", "content": final_text})
else:
# We have an existing thread. Let's continue the conversation with streaming
# We'll do that using the new openai client approach or via the
# same approach as the Medium article. But that means we need direct access
# to the thread, which we can do by "cheating" with the raw python SDK or by
# implementing a custom loop with the AgentExecutor.
#
# For demonstration, let's do something *conceptual*:
from openai import OpenAI
openai_client = OpenAI(api_key=OPENAI_API_KEY)
# We'll do a 'threads.runs.stream' call:
with openai_client.beta.threads.runs.stream(
thread_id=ss["thread_id"],
assistant_id=ASSISTANT_ID,
) as stream:
# We have to add the user's message to the thread first:
openai_client.beta.threads.messages.create(
thread_id=ss["thread_id"],
role="user",
content=user_input
)
# Now the assistant responds in the stream:
display_stream(stream, new_chat_context=True)
# If there's a function call required:
while not ss["tool_requests"].empty():
with st.chat_message("assistant"):
tool_request = ss["tool_requests"].get()
tool_outputs, thread_id, run_id = handle_requires_action(tool_request)
with openai_client.beta.threads.runs.submit_tool_outputs_stream(
thread_id=thread_id,
run_id=run_id,
tool_outputs=tool_outputs
) as tool_stream:
display_stream(tool_stream, new_chat_context=False)
st.write("---")
st.info("This is a demo of combining streaming, function calls, and file upload.")
def handle_requires_action(tool_request):
"""
This function is triggered when the assistant tries to call a function mid-run.
We parse the arguments, call the function, and return the outputs so the run can continue.
"""
st.toast("Assistant is requesting a function call...", icon="🔧")
data = tool_request.data
tool_outputs = []
# The list of tools the assistant wants to call
if not hasattr(data.required_action.submit_tool_outputs, "tool_calls"):
st.error("No tool calls found in the request.")
return [], data.thread_id, data.id
for tc in data.required_action.submit_tool_outputs.tool_calls:
func_name = tc.function.name
func_args = json.loads(tc.function.arguments or "{}")
if func_name == "hello_world":
name_str = func_args.get("name", "Anonymous")
result = hello_world(name_str)
# Return the output to the assistant
tool_outputs.append({
"tool_call_id": tc.id,
"output": result
})
else:
# Unrecognized function
error_msg = f"Function '{func_name}' not recognized."
tool_outputs.append({
"tool_call_id": tc.id,
"output": json.dumps({"error": error_msg})
})
return tool_outputs, data.thread_id, data.id
if __name__ == "__main__":
main() |