import gradio as gr import asyncio import json import html import os import uuid import sqlite3 import datetime import difflib from tiktoken import get_encoding from openai import AzureOpenAI # Clear proxy environment variables to avoid interference os.environ.pop("HTTP_PROXY", None) os.environ.pop("HTTPS_PROXY", None) # ConversationMemory class class ConversationMemory: def __init__(self, db_path="conversation.db"): self.conn = sqlite3.connect(db_path) self.create_table() self.tokenizer = get_encoding("cl100k_base") def create_table(self): self.conn.execute(""" CREATE TABLE IF NOT EXISTS conversation_chunks ( chunk_id TEXT PRIMARY KEY, text TEXT, role TEXT, timestamp DATETIME, intent TEXT, token_count INTEGER, embedding BLOB ) """) self.conn.commit() def add_chunk(self, text, role, intent="general"): chunk_id = str(uuid.uuid4()) tokens = self.tokenizer.encode(text) token_count = len(tokens) timestamp = datetime.datetime.now().isoformat() self.conn.execute(""" INSERT INTO conversation_chunks (chunk_id, text, role, timestamp, intent, token_count) VALUES (?, ?, ?, ?, ?, ?) """, (chunk_id, text, role, timestamp, intent, token_count)) self.conn.commit() return chunk_id def get_chunk(self, chunk_id): cursor = self.conn.execute("SELECT * FROM conversation_chunks WHERE chunk_id = ?", (chunk_id,)) row = cursor.fetchone() if row: return { "chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5] } return None def update_chunk(self, chunk_id, text): tokens = self.tokenizer.encode(text) token_count = len(tokens) self.conn.execute(""" UPDATE conversation_chunks SET text = ?, token_count = ? WHERE chunk_id = ? """, (text, token_count, chunk_id)) self.conn.commit() def get_recent_chunks(self, limit=10): cursor = self.conn.execute("SELECT * FROM conversation_chunks ORDER BY timestamp DESC LIMIT ?", (limit,)) return [{"chunk_id": row[0], "text": row[1], "role": row[2], "timestamp": row[3], "intent": row[4], "token_count": row[5]} for row in cursor] # TextEditor class class TextEditor: def __init__(self, memory): self.memory = memory self.clipboard = "" def cut(self, chunk_id, start, end): chunk = self.memory.get_chunk(chunk_id) if chunk: self.clipboard = chunk['text'][start:end] chunk['text'] = chunk['text'][:start] + chunk['text'][end:] self.memory.update_chunk(chunk_id, chunk['text']) return chunk['text'] def copy(self, chunk_id, start, end): chunk = self.memory.get_chunk(chunk_id) if chunk: self.clipboard = chunk['text'][start:end] return self.clipboard def paste(self, chunk_id, position): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = chunk['text'][:position] + self.clipboard + chunk['text'][position:] self.memory.update_chunk(chunk_id, chunk['text']) return chunk['text'] def add_prefix(self, chunk_id, prefix): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = prefix + chunk['text'] self.memory.update_chunk(chunk_id, chunk['text']) return chunk['text'] def add_suffix(self, chunk_id, suffix): chunk = self.memory.get_chunk(chunk_id) if chunk: chunk['text'] = chunk['text'] + suffix self.memory.update_chunk(chunk_id, chunk['text']) return chunk['text'] def diff(self, chunk_id, original_text): chunk = self.memory.get_chunk(chunk_id) if chunk: differ = difflib.Differ() diff = list(differ.compare(original_text.splitlines(), chunk['text'].splitlines())) return '\n'.join(diff) return "" # OpenAIApi class class OpenAIApi: def __init__(self, preprompt="", endpoint="https://T-App-GPT4o.openai.azure.com/openai/v1/", model="gpt-4o", api_key=None): self.client = AzureOpenAI( azure_endpoint=endpoint, api_key=api_key or os.getenv("AZURE_OPENAI_API_KEY"), api_version="2025-01-01-preview" ) self.model = model self.preprompt = preprompt self.memory = ConversationMemory() self.editor = TextEditor(self.memory) self.functions = [ { "type": "function", "name": "cut_text", "description": "Cut text from a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "start": {"type": "integer", "description": "Start index"}, "end": {"type": "integer", "description": "End index"} }, "required": ["chunk_id", "start", "end"] } }, { "type": "function", "name": "copy_text", "description": "Copy text from a conversation chunk to clipboard.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "start": {"type": "integer", "description": "Start index"}, "end": {"type": "integer", "description": "End index"} }, "required": ["chunk_id", "start", "end"] } }, { "type": "function", "name": "paste_text", "description": "Paste clipboard content into a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "position": {"type": "integer", "description": "Position to paste"} }, "required": ["chunk_id", "position"] } }, { "type": "function", "name": "add_prefix", "description": "Add a prefix to a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "prefix": {"type": "string", "description": "Prefix to add"} }, "required": ["chunk_id", "prefix"] } }, { "type": "function", "name": "add_suffix", "description": "Add a suffix to a conversation chunk.", "parameters": { "type": "object", "properties": { "chunk_id": {"type": "string", "description": "ID of the conversation chunk"}, "suffix": {"type": "string", "description": "Suffix to add"} }, "required": ["chunk_id", "suffix"] } } ] async def fetch_response(self, raw_prompt, continue_response=False): sanitized_prompt = html.escape(raw_prompt.strip()) chunk_id = self.memory.add_chunk(sanitized_prompt, "user") messages = [] if self.preprompt: messages.append({"role": "system", "content": self.preprompt}) context = self.memory.get_recent_chunks(limit=5) messages.extend({"role": c["role"], "content": c["text"]} for c in context) messages.append({"role": "user", "content": sanitized_prompt}) try: response = await self.client.chat.completions.create( model=self.model, messages=messages, temperature=0.5, max_tokens=4000, top_p=1.0, frequency_penalty=0, presence_penalty=0, tools=self.functions, stream=True ) full_response = "" tool_calls = [] async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: full_response += chunk.choices[0].delta.content if chunk.choices and chunk.choices[0].delta.tool_calls: tool_calls.extend(chunk.choices[0].delta.tool_calls) response_chunk_id = self.memory.add_chunk(full_response, "assistant") for tool_call in tool_calls: if tool_call and hasattr(tool_call, 'function'): func_name = tool_call.function.name args = json.loads(tool_call.function.arguments) if func_name == "cut_text": result = self.editor.cut(args["chunk_id"], args["start"], args["end"]) self.memory.add_chunk(f"Cut result: {result}", "system") elif func_name == "copy_text": result = self.editor.copy(args["chunk_id"], args["start"], args["end"]) self.memory.add_chunk(f"Copy result: {result}", "system") elif func_name == "paste_text": result = self.editor.paste(args["chunk_id"], args["position"]) self.memory.add_chunk(f"Paste result: {result}", "system") elif func_name == "add_prefix": result = self.editor.add_prefix(args["chunk_id"], args["prefix"]) self.memory.add_chunk(f"Prefix result: {result}", "system") elif func_name == "add_suffix": result = self.editor.add_suffix(args["chunk_id"], args["suffix"]) self.memory.add_chunk(f"Suffix result: {result}", "system") continue_flag = len(self.memory.tokenizer.encode(full_response)) >= 4000 return {"content": full_response, "continue": continue_flag, "chunk_id": response_chunk_id} except Exception as e: error_msg = f"API Error: {str(e)}" self.memory.add_chunk(error_msg, "system") return {"error": error_msg} # Gradio UI async def chat_submit(user_input, chat_history, preprompt): api = OpenAIApi(preprompt=preprompt, api_key=os.getenv("AZURE_OPENAI_API_KEY")) response = await api.fetch_response(user_input) if "error" in response: chat_history.append({"role": "assistant", "content": f"Error: {response['error']}"}) else: chat_history.append({"role": "user", "content": user_input}) chat_history.append({"role": "assistant", "content": response["content"]}) return chat_history, preprompt def get_history(): memory = ConversationMemory() return memory.get_recent_chunks(limit=10) def select_chunk(evt: gr.SelectData): return evt.value["chunk_id"], evt.value["text"] async def edit_cut(chunk_id, start, end): api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.cut(chunk_id, int(start), int(end)) return result, api.editor.diff(chunk_id, result) async def edit_copy(chunk_id, start, end): api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.copy(chunk_id, int(start), int(end)) return result, "" async def edit_paste(chunk_id, position): api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.paste(chunk_id, int(position)) return result, api.editor.diff(chunk_id, result) async def edit_prefix(chunk_id, prefix): api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.add_prefix(chunk_id, prefix) return result, api.editor.diff(chunk_id, result) async def edit_suffix(chunk_id, suffix): api = OpenAIApi(api_key=os.getenv("AZURE_OPENAI_API_KEY")) result = api.editor.add_suffix(chunk_id, suffix) return result, api.editor.diff(chunk_id, result) def create_ui(): with gr.Blocks(title="Azure OpenAI Chat & Text Editor") as demo: gr.Markdown("# Azure OpenAI Chat with Text Editing") with gr.Tab("Chat"): chatbot = gr.Chatbot(label="Conversation", type="messages") user_input = gr.Textbox(label="Your Message", placeholder="Type your message or editing command...") preprompt = gr.Textbox(label="System Prompt", value="You are a helpful assistant with text editing capabilities.") submit_btn = gr.Button("Send") submit_btn.click( fn=chat_submit, inputs=[user_input, chatbot, preprompt], outputs=[chatbot, preprompt] ) with gr.Tab("Conversation History"): history = gr.Dataframe( label="Recent Chunks", headers=["chunk_id", "text", "role", "timestamp", "intent", "token_count"], datatype=["str", "str", "str", "str", "str", "number"] ) history_btn = gr.Button("Refresh History") history_btn.click(fn=get_history, outputs=history) with gr.Tab("Text Editor"): chunk_id = gr.Textbox(label="Selected Chunk ID") chunk_text = gr.Textbox(label="Chunk Text", interactive=False) history.select(fn=select_chunk, outputs=[chunk_id, chunk_text]) with gr.Row(): start = gr.Number(label="Start Index", precision=0) end = gr.Number(label="End Index", precision=0) position = gr.Number(label="Paste Position", precision=0) with gr.Row(): prefix = gr.Textbox(label="Prefix") suffix = gr.Textbox(label="Suffix") with gr.Row(): cut_btn = gr.Button("Cut") copy_btn = gr.Button("Copy") paste_btn = gr.Button("Paste") prefix_btn = gr.Button("Add Prefix") suffix_btn = gr.Button("Add Suffix") diff_output = gr.Textbox(label="Diff Output", interactive=False) cut_btn.click(fn=edit_cut, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) copy_btn.click(fn=edit_copy, inputs=[chunk_id, start, end], outputs=[chunk_text, diff_output]) paste_btn.click(fn=edit_paste, inputs=[chunk_id, position], outputs=[chunk_text, diff_output]) prefix_btn.click(fn=edit_prefix, inputs=[chunk_id, prefix], outputs=[chunk_text, diff_output]) suffix_btn.click(fn=edit_suffix, inputs=[chunk_id, suffix], outputs=[chunk_text, diff_output]) gr.Markdown(f"Current Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z')}") return demo if __name__ == "__main__": demo = create_ui() demo.launch(server_name="0.0.0.0", server_port=7860)