File size: 10,457 Bytes
73d49e1
1de6e3a
 
 
 
 
73d49e1
 
1de6e3a
 
 
 
 
 
73d49e1
1de6e3a
73d49e1
1de6e3a
 
 
73d49e1
 
1de6e3a
 
 
 
 
6fffc87
1de6e3a
 
 
 
 
 
 
 
 
 
 
73d49e1
 
 
1de6e3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73d49e1
1de6e3a
 
 
73d49e1
1de6e3a
73d49e1
1de6e3a
 
73d49e1
1de6e3a
 
73d49e1
1de6e3a
73d49e1
1de6e3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73d49e1
 
1de6e3a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
import os
import io
import json
import time
import queue
import logging
import streamlit as st
from dotenv import load_dotenv
from PIL import Image
from streamlit import session_state as ss

# Optional: for direct Assistants API usage:
# from openai import OpenAI
# But we'll also show a LangChain approach:
from langchain.agents.openai_assistant import OpenAIAssistantRunnable
from langchain_core.agents import AgentFinish  # If you want to handle final states, etc.

#############################################
# 1) ENV & BASIC LOGGING
#############################################
load_dotenv()

logging.basicConfig(format="[%(asctime)s] %(levelname)+8s: %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
ASSISTANT_ID = os.getenv("ASSISTANT_ID_SOLUTION_SPECIFIER_A") 


#############################################
# 2) CREATE YOUR ASSISTANT RUNNABLE
#############################################
if not OPENAI_API_KEY or not ASSISTANT_ID:
    raise ValueError("Missing OPENAI_API_KEY or ASSISTANT_ID in environment.")

assistant_runnable = OpenAIAssistantRunnable(
    assistant_id=ASSISTANT_ID,
    api_key=OPENAI_API_KEY,
    as_agent=True
)

# We’ll store a queue for function calls (tools) we want to handle:
if "tool_requests" not in ss:
    ss["tool_requests"] = queue.Queue()

#############################################
# 3) OPTIONAL: EXAMPLE CUSTOM FUNCTION (TOOL)
#############################################
def hello_world(name: str) -> str:
    """Example function to show how to handle 'requires_action' or function calls."""
    time.sleep(3)
    return f"Hello, {name}! This greeting took 3s."

#############################################
# 4) STREAMING HANDLER
#############################################
def data_streamer(stream_events):
    """
    Generator that processes streaming events from the Assistants API.
    Yields either text, images, or triggers a function call queue item.
    """
    st.toast("Thinking...", icon="🤔")
    content_produced = False

    # We'll mimic the logic in that Medium article:
    for response in stream_events:
        event_type = response.event

        if event_type == "thread.message.delta":
            # The model is streaming partial text or possibly an image
            content = response.data.delta.content[0]  # Typically a list of 1 item
            content_type = content.type
            if content_type == "text":
                text_value = content.text.value
                content_produced = True
                yield text_value  # yield text tokens
            elif content_type == "image_file":
                # The Assistant can output images
                file_id = content.image_file.file_id
                # You can retrieve the file from the OpenAI Assistants API, e.g.
                #    image_bytes = client.files.content(file_id).read()
                # but with LangChain's current approach, we don't have that convenience method exposed.
                # We'll skip a real API call for brevity:
                st.warning("Image streaming not fully implemented in this snippet.")
                # yield an "Image" object if you have it
                # yield Image.open(...)
        
        elif event_type == "thread.run.requires_action":
            # The Assistant wants to call a function
            logger.info("Run requires action (function call) – queueing it.")
            ss["tool_requests"].put(response)
            # If no text was produced yet, yield a placeholder
            if not content_produced:
                yield "[Assistant is requesting a function call]"
            # Return so we can handle the function call
            return

        elif event_type == "thread.run.failed":
            st.error("Run has failed.")
            return

    st.toast("Done.", icon="✅")


def display_stream(stream_iterator, new_chat_context=True):
    """
    Wraps the `data_streamer` generator and writes to Streamlit in real-time.
    If `new_chat_context=True`, we put the response in a dedicated assistant chat bubble.
    """
    if new_chat_context:
        with st.chat_message("assistant"):
            response = st.write_stream(data_streamer(stream_iterator))
    else:
        # If we are continuing inside the same bubble (like after a function call),
        # we skip creating a new chat bubble.
        response = st.write_stream(data_streamer(stream_iterator))
    return response


#############################################
# 5) ACTUAL APP
#############################################
def main():
    st.set_page_config(page_title="Streamlit + Assistants Demo", layout="centered")
    st.title("Enhanced Assistant Demo")

    # Initialize messages
    if "messages" not in ss:
        ss.messages = []

    # Display previous messages
    for msg in ss.messages:
        with st.chat_message(msg["role"]):
            st.write(msg["content"])

    # -- (A) FILE UPLOAD DEMO --
    # If you want the user to upload a CSV and pass it to the assistant, do so here.
    uploaded_file = st.file_uploader("Upload a CSV for the assistant to analyze (optional)", type=["csv"])
    if uploaded_file:
        st.write("We won't fully implement code interpreter logic here, but you could pass it in as a tool resource.")
        # For example, you might store it in the code interpreter or do a vector search, etc.

    # -- (B) Chat Input --
    user_input = st.chat_input("Ask me anything or request a function call...")

    if user_input:
        # Show user's message
        with st.chat_message("user"):
            st.write(user_input)
        ss.messages.append({"role": "user", "content": user_input})

        # (C) Actually run the assistant in "streaming mode"
        #     For a brand-new conversation, omit thread_id. Otherwise, pass an existing one.
        #     We'll store one globally in session_state for continuity.
        if "thread_id" not in ss:
            ss["thread_id"] = None

        # If we have no thread_id yet, this is a fresh conversation
        if ss["thread_id"] is None:
            resp = assistant_runnable.invoke({"content": user_input})
            ss["thread_id"] = resp.thread_id

            # For a single-turn request (non-streaming):
            # resp_text = resp.return_values["output"]
            # st.write(resp_text)

            # But let's do streaming. The tricky part: langchain’s `invoke` returns
            # the final message rather than a streaming generator. So, to do streaming,
            # we can call the underlying Assistants API directly. Or we can do a special
            # approach that merges the new article's logic.  

            # For demonstration, let's store the final message in a new chat bubble:
            final_text = resp.return_values["output"]
            with st.chat_message("assistant"):
                st.write(final_text)
            ss.messages.append({"role": "assistant", "content": final_text})

        else:
            # We have an existing thread. Let's continue the conversation with streaming
            # We'll do that using the new openai client approach or via the
            # same approach as the Medium article. But that means we need direct access
            # to the thread, which we can do by "cheating" with the raw python SDK or by
            # implementing a custom loop with the AgentExecutor. 
            # 
            # For demonstration, let's do something *conceptual*:
            from openai import OpenAI
            openai_client = OpenAI(api_key=OPENAI_API_KEY)

            # We'll do a 'threads.runs.stream' call:
            with openai_client.beta.threads.runs.stream(
                thread_id=ss["thread_id"],
                assistant_id=ASSISTANT_ID,
            ) as stream:
                # We have to add the user's message to the thread first:
                openai_client.beta.threads.messages.create(
                    thread_id=ss["thread_id"],
                    role="user",
                    content=user_input
                )
                # Now the assistant responds in the stream:
                display_stream(stream, new_chat_context=True)

                # If there's a function call required:
                while not ss["tool_requests"].empty():
                    with st.chat_message("assistant"):
                        tool_request = ss["tool_requests"].get()
                        tool_outputs, thread_id, run_id = handle_requires_action(tool_request)
                        with openai_client.beta.threads.runs.submit_tool_outputs_stream(
                            thread_id=thread_id,
                            run_id=run_id,
                            tool_outputs=tool_outputs
                        ) as tool_stream:
                            display_stream(tool_stream, new_chat_context=False)

    st.write("---")
    st.info("This is a demo of combining streaming, function calls, and file upload.")


def handle_requires_action(tool_request):
    """
    This function is triggered when the assistant tries to call a function mid-run.
    We parse the arguments, call the function, and return the outputs so the run can continue.
    """
    st.toast("Assistant is requesting a function call...", icon="🔧")
    data = tool_request.data
    tool_outputs = []

    # The list of tools the assistant wants to call
    if not hasattr(data.required_action.submit_tool_outputs, "tool_calls"):
        st.error("No tool calls found in the request.")
        return [], data.thread_id, data.id

    for tc in data.required_action.submit_tool_outputs.tool_calls:
        func_name = tc.function.name
        func_args = json.loads(tc.function.arguments or "{}")

        if func_name == "hello_world":
            name_str = func_args.get("name", "Anonymous")
            result = hello_world(name_str)
            # Return the output to the assistant
            tool_outputs.append({
                "tool_call_id": tc.id,
                "output": result
            })
        else:
            # Unrecognized function
            error_msg = f"Function '{func_name}' not recognized."
            tool_outputs.append({
                "tool_call_id": tc.id,
                "output": json.dumps({"error": error_msg})
            })

    return tool_outputs, data.thread_id, data.id


if __name__ == "__main__":
    main()