Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| # import openai | |
| from numpy._core.defchararray import endswith, isdecimal | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| from time import sleep | |
| import audioread | |
| import queue | |
| import threading | |
| from glob import glob | |
| import copy | |
| import base64 | |
| import json | |
| from PIL import Image | |
| from io import BytesIO | |
| load_dotenv(override=True) | |
| key = os.getenv('OPENAI_API_KEY') | |
| users = os.getenv('LOGNAME') | |
| unames = users.split(',') | |
| pwds = os.getenv('PASSWORD') | |
| pwdList = pwds.split(',') | |
| site = os.getenv('SITE') | |
| if site == 'local': | |
| dp = Path('./data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = './data/' | |
| else: | |
| dp = Path('/data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = '/data/' | |
| speak_file = dataDir + "speek.wav" | |
| client = OpenAI(api_key = key) | |
| #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] | |
| abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} | |
| def genUsageStats(do_reset=False): | |
| result = [] | |
| ttotal4o_in = 0 | |
| ttotal4o_out = 0 | |
| ttotal4mini_in = 0 | |
| ttotal4mini_out = 0 | |
| totalAudio = 0 | |
| totalSpeech = 0 | |
| totalImages = 0 | |
| for user in unames: | |
| tokens4o_in = 0 | |
| tokens4o_out = 0 | |
| tokens4mini_in = 0 | |
| tokens4mini_out = 0 | |
| fp = dataDir + user + '_log.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (u, t) = line.split(':') | |
| (t, m) = t.split('-') | |
| (tin, tout) = t.split('/') | |
| incount = int(tin) | |
| outcount = int(tout) | |
| if 'mini' in m: | |
| tokens4mini_in += incount | |
| tokens4mini_out += outcount | |
| ttotal4mini_in += incount | |
| ttotal4mini_out += outcount | |
| else: | |
| tokens4o_in += incount | |
| tokens4o_out += outcount | |
| ttotal4o_in += incount | |
| ttotal4o_out += outcount | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading stats for user: {user}' | |
| userAudio = 0 | |
| fp = dataDir + user + '_audio.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userAudio += int(len) | |
| totalAudio += int(userAudio) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading audio stats for user: {user}' | |
| userSpeech = 0 | |
| fp = dataDir + user + '_speech.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userSpeech += int(len) | |
| totalSpeech += int(userSpeech) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading speech stats for user: {user}' | |
| user_images = 0 | |
| fp = image_count_path(user) | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| cnt = line.strip() | |
| user_images += int(cnt) | |
| totalImages += int(user_images) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading image gen stats for user: {user}' | |
| result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}', f'images:{user_images}']) | |
| result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}', f'images:{totalImages}']) | |
| return result | |
| def new_conversation(user): | |
| clean_up(user) | |
| return [None, [], None, None] | |
| def updatePassword(txt): | |
| password = txt.lower().strip() | |
| return [password, "*********"] | |
| # def setModel(val): | |
| # return val | |
| def chat(prompt, user_window, pwd_window, past, response, gptModel): | |
| user_window = user_window.lower().strip() | |
| isBoss = False | |
| if user_window == unames[0] and pwd_window == pwdList[0]: | |
| isBoss = True | |
| if prompt == 'stats': | |
| response = genUsageStats() | |
| list_permanent_files() | |
| return [past, response, None, gptModel] | |
| if prompt == 'reset': | |
| response = genUsageStats(True) | |
| return [past, response, None, gptModel] | |
| if prompt.startswith('gpt4'): | |
| gptModel = 'gpt-4o' | |
| prompt = prompt[5:] | |
| if prompt.startswith("clean"): | |
| user = prompt[6:] | |
| response = f'cleaned all .wav files for {user}' | |
| final_clean_up(user) | |
| return [past, response, None, gptModel] | |
| if prompt.startswith('files'): | |
| (log_cnt, wav_cnt, other_cnt, others) = list_permanent_files() | |
| response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}' | |
| return [past, response, None, gptModel] | |
| if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: | |
| past.append({"role":"user", "content":prompt}) | |
| completion = client.chat.completions.create(model=gptModel, | |
| messages=past) | |
| reply = completion.choices[0].message.content | |
| tokens_in = completion.usage.prompt_tokens | |
| tokens_out = completion.usage.completion_tokens | |
| tokens = completion.usage.total_tokens | |
| response += "\n\nYOU: " + prompt + "\nGPT: " + reply | |
| if isBoss: | |
| response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}" | |
| if tokens > 40000: | |
| response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." | |
| past.append({"role":"assistant", "content": reply}) | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| dataFile = new_func(user_window) | |
| with open(dataFile, 'a') as f: | |
| m = '4o' | |
| if 'mini' in gptModel: | |
| m = '4omini' | |
| f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') | |
| accessOk = True | |
| break | |
| except Exception as e: | |
| sleep(3) | |
| if not accessOk: | |
| response += f"\nDATA LOG FAILED, path = {dataFile}" | |
| return [past, response , None, gptModel] | |
| else: | |
| return [[], "User name and/or password are incorrect", prompt, gptModel] | |
| def new_func(user): | |
| dataFile = dataDir + user + '_log.txt' | |
| return dataFile | |
| def image_count_path(user): | |
| fpath = dataDir + user + '_image_count.txt' | |
| return fpath | |
| def transcribe(user, pwd, fpath): | |
| user = user.lower().strip() | |
| pwd = pwd.lower().strip() | |
| if not (user in unames and pwd in pwdList): | |
| return 'Bad credentials' | |
| with audioread.audio_open(fpath) as audio: | |
| duration = int(audio.duration) | |
| if duration > 0: | |
| with open(dataDir + user + '_audio.txt','a') as f: | |
| f.write(f'audio:{str(duration)}\n') | |
| with open(fpath,'rb') as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model='whisper-1', file = audio_file ,response_format = 'text' ) | |
| reply = transcript | |
| return str(reply) | |
| def pause_message(): | |
| return "Audio input is paused. Resume or Stop as desired" | |
| # def gen_output_audio(txt): | |
| # if len(txt) < 10: | |
| # txt = "This dialog is too short to mess with!" | |
| # response = client.audio.speech.create(model="tts-1", voice="fable", input=txt) | |
| # with open(speak_file, 'wb') as fp: | |
| # fp.write(response.content) | |
| # return speak_file | |
| def set_speak_button(txt): | |
| vis = False | |
| if len(txt) > 2: | |
| vis = True | |
| return gr.Button(visible=vis) | |
| def update_user(user_win): | |
| user_win = user_win.lower().strip() | |
| user = 'unknown' | |
| for s in unames: | |
| if user_win == s: | |
| user = s | |
| break | |
| return [user, user] | |
| def speech_worker(chunks=[],q=[]): | |
| for chunk in chunks: | |
| fpath = q.pop(0) | |
| response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') | |
| with open(fpath, 'wb') as fp: | |
| fp.write(response.content) | |
| def gen_speech_file_names(user, cnt): | |
| rv = [] | |
| for i in range(0, cnt): | |
| rv.append(dataDir + f'{user}_speech{i}.wav') | |
| return rv | |
| def final_clean_up(user): | |
| if user.strip().lower() == 'all': | |
| flist = glob(dataDir + '*_speech*.wav') | |
| else: | |
| flist = glob(dataDir + f'{user}_speech*.wav') | |
| for fpath in flist: | |
| try: | |
| os.remove(fpath) | |
| except: | |
| continue | |
| def delete_image(user): | |
| fpath = dataDir + user + '.png' | |
| if os.path.exists(fpath): | |
| os.remove(fpath) | |
| def list_permanent_files(): | |
| flist = os.listdir(dataDir) | |
| others = [] | |
| log_cnt = 0 | |
| wav_cnt = 0 | |
| other_cnt = 0 | |
| for fpath in flist: | |
| if fpath.endswith('.txt'): | |
| log_cnt += 1 | |
| elif fpath.endswith('.wav'): | |
| wav_cnt += 1 | |
| else: | |
| others.append(fpath) | |
| other_cnt = len(others) | |
| return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others)) | |
| def make_image(prompt, user, pwd): | |
| user = user.lower().strip() | |
| msg = 'Error: unable to create image.' | |
| fpath = None | |
| if user in unames and pwd == pwdList[unames.index(user)]: | |
| if len(prompt.strip()) == 0: | |
| return [None, 'You must provide a prompt describing image you desire'] | |
| try: | |
| response = client.images.generate(model='dall-e-2', prompt=prompt,size='512x512', | |
| quality='standard', response_format='b64_json') | |
| image_data = response.data[0].b64_json | |
| image = Image.open(BytesIO(base64.b64decode(image_data))) | |
| fpath = dataDir + user + '.png' | |
| image.save(fpath) | |
| with open(image_count_path(user), 'at') as fp: | |
| fp.write('1\n') | |
| msg = 'Image created!' | |
| except: | |
| return [None, msg] | |
| else: | |
| msg = 'Incorrect user name or password' | |
| return [None, msg] | |
| return [fpath, msg] | |
| def show_help(): | |
| return ''' | |
| 1. Login with user name and password (not case-sensitive) | |
| 2. Type prompts (questions, instructions) into prompt window (OR) you can speak prompts by | |
| tapping the audio "Record" button, saying your prompt, then tapping the "Stop" button. | |
| Your prompt will appear in the Prompt window, and you can edit it there if needed. | |
| 3. Chat: | |
| 1.1 tap the "Submit Prompt/Question" button. The response will appear in the Dialog window. | |
| 1.2 To speak the response, tap the "Speak Dialog" button. | |
| 1.3 Enter follow-up questions in the Prompt window either by typing or speaking. Tap the voice | |
| entry "Reset Voice Entry" button to enable additional voice entry. Then tap "Submit Prompt/Question". | |
| 1.4 If topic changes or when done chatting, tap the "Restart Conversation" button. | |
| 4. Make Image: | |
| 1.1 Enter description of desired image in prompt window via either typing or voice entry | |
| 1.2 Tap the "Make Image" button. This can take a few seconds. | |
| 1.3 There is a download button on the image display if your system supports file downloads. | |
| 1.4 When done viewing image, tap the "Restart Conversation" button | |
| Hints: | |
| 1. Better chat and image results are obtained by including detailed descriptions and instructions | |
| in the prompt. | |
| 2. Always tap "Restart Conversation" before requesting an image or changing chat topics. | |
| 3. Audio input and output functions depend on the hardware capability of your device.''' | |
| with gr.Blocks() as demo: | |
| history = gr.State([]) | |
| password = gr.State("") | |
| user = gr.State("unknown") | |
| model = gr.State("gpt-4o-mini") | |
| q = gr.State([]) | |
| qsave = gr.State([]) | |
| def clean_up(user): | |
| flist = glob(dataDir + f'{user}_speech*.wav') | |
| for fpath in flist: | |
| try: | |
| os.remove(fpath) | |
| except: | |
| continue | |
| def initial_audio_output(txt, user): | |
| global digits | |
| global abbrevs | |
| if not user in unames: | |
| return [gr.Audio(sources=None), []] | |
| clean_up(user) | |
| q = [] | |
| if len(txt.strip()) < 5: | |
| return ['None', q] | |
| for s,x in abbrevs.items(): | |
| txt = txt.replace(s, x) | |
| words_in = txt.replace('**', '').splitlines(False) | |
| words_out = [] | |
| for s in words_in: | |
| s = s.lstrip('- *@#$%^&_=+-') | |
| if len(s) > 0: | |
| loc = s.index(' ') | |
| if loc > 1: | |
| val = s[0:loc] | |
| isnum = val.replace('.','0').isdecimal() | |
| if isnum: | |
| if val.endswith('.'): | |
| val = val[:-1].replace('.',' point ') + '., ' | |
| else: | |
| val = val.replace('.', ' point ') + ', ' | |
| s = 'num'+ val + s[loc:] | |
| words_out.append(s) | |
| chunklist = [] | |
| for chunk in words_out: | |
| if chunk.strip() == '': | |
| continue | |
| isnumbered = chunk.startswith('num') | |
| number = '' | |
| loc = 0 | |
| if isnumbered: | |
| chunk = chunk[3:] | |
| loc = chunk.index(',') | |
| number = chunk[0:loc] | |
| chunk = chunk[loc:] | |
| locs = [] | |
| for i in range(1,len(chunk)-1): | |
| (a, b, c) = chunk[i-1:i+2] | |
| if a.isdecimal() and b == '.' and c.isdecimal(): | |
| locs.append(i) | |
| for i in locs: | |
| chunk = chunk[:i] + ' point ' + chunk[i+1:] | |
| if len(chunk) > 50: | |
| finechunks = chunk.split('.') | |
| for fchunk in finechunks: | |
| if isnumbered: | |
| fchunk = number + fchunk | |
| isnumbered = False | |
| if len(fchunk) > 0: | |
| if fchunk != '"': | |
| chunklist.append(fchunk) | |
| else: | |
| line = number + chunk | |
| if line != '"': | |
| chunklist.append(line) | |
| total_speech = 0 | |
| for chunk in chunklist: | |
| total_speech += len(chunk) | |
| with open(dataDir + user + '_speech.txt','a') as f: | |
| f.write(f'speech:{str(total_speech)}\n') | |
| chunk = chunklist[0] | |
| if chunk.strip() == '': | |
| return gr.Audio(sources=None) | |
| fname_list = gen_speech_file_names(user, len(chunklist)) | |
| q = fname_list.copy() | |
| qsave = fname_list.copy() | |
| fname = q.pop(0) | |
| if len(chunklist) > 0: | |
| threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start() | |
| response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') | |
| with open(fname, 'wb') as fp: | |
| fp.write(response.content) | |
| return [fname, q] | |
| def gen_output_audio(q, user): | |
| try: | |
| fname = q.pop(0) | |
| except: | |
| final_clean_up(user) | |
| return [None, gr.Audio(sources=None)] | |
| return [fname, q] | |
| gr.Markdown('# GPT Chat') | |
| gr.Markdown('Enter user name & password. Tap "Help & Hints" button for more instructions.') | |
| with gr.Row(): | |
| user_window = gr.Textbox(label = "User Name") | |
| user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) | |
| pwd_window = gr.Textbox(label = "Password") | |
| pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window]) | |
| help_button = gr.Button(value='Help & Hints') | |
| with gr.Row(): | |
| audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( | |
| show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) | |
| reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() | |
| with gr.Row(): | |
| clear_button = gr.Button(value="Restart Conversation") | |
| # gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], | |
| # value="gpt-3.5-turbo", label="GPT Model", interactive=True) | |
| button_do_image = gr.Button(value='Make Image') | |
| submit_button = gr.Button(value="Submit Prompt/Question") | |
| speak_output = gr.Button(value="Speak Dialog", visible=False) | |
| prompt_window = gr.Textbox(label = "Prompt or Question") | |
| output_window = gr.Textbox(label = "Dialog") | |
| image_window = gr.Image() | |
| submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model], | |
| outputs=[history, output_window, prompt_window, model]) | |
| clear_button.click(fn=new_conversation, inputs=user_window, outputs=[prompt_window, history, output_window, image_window]) | |
| audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], | |
| outputs=[prompt_window]) | |
| audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) | |
| reset_button.add(audio_widget) | |
| audio_out = gr.Audio(autoplay=True, visible=False) | |
| audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q]) | |
| speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q]) | |
| output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output) | |
| button_do_image.click(fn=make_image, inputs=[prompt_window,user_window, password],outputs=[image_window, output_window]) | |
| image_window.change(fn=delete_image, inputs=[user]) | |
| help_button.click(fn=show_help, outputs=output_window) | |
| # demo.unload(final_clean_up(user)) | |
| demo.launch(share=True) | |