import gradio as gr import time import base64 from openai import OpenAI import os from io import BytesIO from PIL import Image # 配置 BASE_URL = "https://api.stepfun.com/v1" # 从环境变量获取API密钥 STEP_API_KEY = os.environ.get("STEP_API_KEY", "") # 可选模型 MODELS = ["step-3", "step-r1-v-mini"] def image_to_base64(image): """将PIL图像转换为base64字符串""" if image is None: return None if isinstance(image, Image.Image): buffered = BytesIO() image.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') return img_str return None def call_step_api_stream(message, history, temperature, max_tokens, image=None): """调用Step API进行流式对话""" print(f"[DEBUG] Starting API call - Message: {message}, Has Image: {image is not None}") if not message and not image: print("[DEBUG] No message or image provided") yield history return if not STEP_API_KEY: print("[DEBUG] API key not configured") history.append([message or "[Image]", "❌ API key not configured. Please add STEP_API_KEY in Settings."]) yield history return print(f"[DEBUG] API Key exists: {bool(STEP_API_KEY)}") # 构造消息历史 messages = [] # 添加历史对话 for h in history: if h[0]: # 用户消息 messages.append({"role": "user", "content": h[0]}) if h[1]: # 助手回复 messages.append({"role": "assistant", "content": h[1]}) # 构造当前消息 if image is not None: # 有图片的情况 try: base64_image = image_to_base64(image) if base64_image is None: history.append([message or "[Image]", "❌ Failed to process image"]) yield history return current_content = [ {"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{base64_image}", "detail": "high"}} ] if message: current_content.append({"type": "text", "text": message}) messages.append({"role": "user", "content": current_content}) display_message = f"[Image] {message}" if message else "[Image]" except Exception as e: history.append([message or "[Image]", f"❌ Image processing error: {str(e)}"]) yield history return else: # 纯文本 messages.append({"role": "user", "content": message}) display_message = message # 添加到历史记录 history.append([display_message, ""]) yield history # 立即显示用户消息 print(f"[DEBUG] Messages to send: {messages}") # 创建客户端 try: client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL) print("[DEBUG] Client created successfully") except Exception as e: print(f"[DEBUG] Client initialization failed: {e}") history[-1][1] = f"❌ Client initialization failed: {str(e)}" yield history return # 调用API try: print("[DEBUG] Calling API...") response = client.chat.completions.create( model="step-3", messages=messages, temperature=temperature, max_tokens=max_tokens, stream=True ) print("[DEBUG] API call successful, processing stream...") # 处理流式响应 full_response = "" chunk_count = 0 for chunk in response: chunk_count += 1 if chunk.choices and len(chunk.choices) > 0: delta = chunk.choices[0].delta if hasattr(delta, 'content') and delta.content: full_response += delta.content history[-1][1] = full_response print(f"[DEBUG] Chunk {chunk_count}: {delta.content[:50] if delta.content else 'None'}...") yield history if not full_response: print("[DEBUG] No response content received") history[-1][1] = "⚠️ No response received from API" yield history else: print(f"[DEBUG] Final response length: {len(full_response)} chars") except Exception as e: print(f"[DEBUG] API request failed: {e}") import traceback traceback.print_exc() history[-1][1] = f"❌ API request failed: {str(e)}" yield history def user_input(message, history, image): """处理用户输入""" if message or image: return "", history, None return message, history, image def clear_history(): """Clear conversation history""" return [], None, "" # 创建Gradio界面 with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 Step-3 Hello, I am Step-3! """) with gr.Row(): with gr.Column(scale=3): # 对话界面 chatbot = gr.Chatbot( height=500, show_label=False, elem_id="chatbot", bubble_full_width=False ) with gr.Row(): with gr.Column(scale=8): msg = gr.Textbox( label="Input message", placeholder="Type your question here...", lines=1, max_lines=5, show_label=False, elem_id="msg", container=False ) with gr.Column(scale=1, min_width=100): submit_btn = gr.Button("Send", variant="primary") with gr.Column(scale=1, min_width=100): clear_btn = gr.Button("Clear") # 图片上传 with gr.Row(): image_input = gr.Image( label="Upload Image (Optional)", type="pil", height=150, scale=1 ) with gr.Column(scale=1): # 设置面板 gr.Markdown("### ⚙️ Settings") temperature_slider = gr.Slider( minimum=0, maximum=1, value=0.7, step=0.1, label="Temperature", interactive=True ) max_tokens_slider = gr.Slider( minimum=100, maximum=4000, value=2000, step=100, label="Max Output Length", interactive=True ) # 事件处理 msg.submit( user_input, [msg, chatbot, image_input], [msg, chatbot, image_input], queue=False ).then( call_step_api_stream, [msg, chatbot, temperature_slider, max_tokens_slider, image_input], chatbot ) submit_btn.click( user_input, [msg, chatbot, image_input], [msg, chatbot, image_input], queue=False ).then( call_step_api_stream, [msg, chatbot, temperature_slider, max_tokens_slider, image_input], chatbot ) clear_btn.click( clear_history, None, [chatbot, image_input, msg], queue=False ) # 页脚 gr.Markdown(""" ---
""") # 启动应用 if __name__ == "__main__": print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}") print(f"[DEBUG] Base URL: {BASE_URL}") demo.queue(max_size=10) demo.launch( share=False, debug=True )