File size: 3,459 Bytes
47fcff2
18d6e67
79cade0
18d6e67
0cce7a0
79cade0
0c5007d
61abdf6
79cade0
 
 
 
 
18d6e67
0cce7a0
79cade0
f23b3ba
79cade0
 
 
 
 
18d6e67
 
79cade0
18d6e67
 
 
 
 
79cade0
18d6e67
79cade0
0cce7a0
efe1573
5d4663f
47fcff2
18d6e67
0cce7a0
18d6e67
 
 
5d4663f
efe1573
 
 
 
3ba965f
18d6e67
 
 
 
 
 
 
 
 
 
 
 
 
3ba965f
18d6e67
 
3ba965f
18d6e67
a604d22
 
 
 
 
 
 
 
 
47fcff2
 
3ba965f
 
 
 
 
 
 
 
 
 
 
a604d22
3ba965f
 
 
 
 
 
47fcff2
 
 
 
61abdf6
47fcff2
e8c6912
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import gradio as gr
from openai import OpenAI, APIError
import os
import tenacity
import asyncio

ACCESS_TOKEN = os.getenv("HF_TOKEN")

client = OpenAI(
    base_url="https://api-inference.huggingface.co/v1/",
    api_key=ACCESS_TOKEN,
)

@tenacity.retry(wait=tenacity.wait_exponential(multiplier=1, min=4, max=10))
async def respond(
    message,
    history,
    system_message,
    max_tokens,
    temperature,
    top_p,
):
    try:
        messages = [{"role": "system", "content": system_message}]

        for val in history:
            if val[0]:
                messages.append({"role": "user", "content": val[0]})
            if val[1]:
                messages.append({"role": "assistant", "content": val[1]})

        messages.append({"role": "user", "content": message})

        response = ""
        # Properly stream chat completions using dot notation
        stream = client.chat.completions.create(
            model="NousResearch/Hermes-3-Llama-3.1-8B",
            max_tokens=max_tokens,
            stream=True,
            temperature=temperature,
            top_p=top_p,
            messages=messages,
        )
        for chunk in stream:  # Iterate over the streamed response chunks
            if hasattr(chunk.choices[0].delta, 'content'):
                token = chunk.choices[0].delta.content
                response += token
        return response
    except APIError as e:
        error_details = e.body
        error_type = error_details.get("type")
        error_code = error_details.get("code")
        error_param = error_details.get("param")
        error_message = error_details.get("message")

        if error_type:
            error_str = f"{error_type}: {error_message} (code: {error_code}, param: {error_param})"
        else:
            error_str = "An error occurred during streaming"

        print(f"Error: {error_str}")
        return error_str
    except Exception as e:
        print(f"Error: {e}")
        return "Error occurred. Please try again."


# Make the Gradio app async
async def generate_response(message, history, system_message, max_tokens, temperature, top_p):
    new_history = history + [[message, ""]]
    response = await respond(message, history, system_message, max_tokens, temperature, top_p)
    new_history[-1][1] = response
    return response, new_history


def launch_app():
    try:
        demo = gr.Blocks()
        with demo:
            gr.Markdown("# Chatbot")
            message = gr.Textbox(label="Message")
            history = gr.State([["", ""]])
            system_message = gr.Textbox(label="System message")
            max_tokens = gr.Slider(minimum=1, maximum=2048, value=2048, step=1, label="Max new tokens")
            temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature")
            top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-P")
            response = gr.Text(label="Response")

            # Use the async version of generate_response
            gr.Button("Generate Response").click(
                generate_response,
                inputs=[message, history, system_message, max_tokens, temperature, top_p],
                outputs=[response, history],
                show_progress=False,
            )
        demo.launch(show_error=True)
    except KeyError as e:
        print(f"Error: {e}")
        print("Please try again.")

if __name__ == "__main__":
    launch_app()