import os import gradio as gr # import openai from numpy._core.defchararray import endswith, isdecimal from openai import OpenAI from dotenv import load_dotenv from pathlib import Path from time import sleep import audioread import queue import threading from glob import glob import copy import base64 import json from PIL import Image from io import BytesIO load_dotenv(override=True) key = os.getenv('OPENAI_API_KEY') users = os.getenv('LOGNAME') unames = users.split(',') pwds = os.getenv('PASSWORD') pwdList = pwds.split(',') site = os.getenv('SITE') if site == 'local': dp = Path('./data') dp.mkdir(exist_ok=True) dataDir = './data/' else: dp = Path('/data') dp.mkdir(exist_ok=True) dataDir = '/data/' speak_file = dataDir + "speek.wav" client = OpenAI(api_key = key) #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} def genUsageStats(do_reset=False): result = [] ttotal4o_in = 0 ttotal4o_out = 0 ttotal4mini_in = 0 ttotal4mini_out = 0 totalAudio = 0 totalSpeech = 0 for user in unames: tokens4o_in = 0 tokens4o_out = 0 tokens4mini_in = 0 tokens4mini_out = 0 fp = dataDir + user + '_log.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (u, t) = line.split(':') (t, m) = t.split('-') (tin, tout) = t.split('/') incount = int(tin) outcount = int(tout) if 'mini' in m: tokens4mini_in += incount tokens4mini_out += outcount ttotal4mini_in += incount ttotal4mini_out += outcount else: tokens4o_in += incount tokens4o_out += outcount ttotal4o_in += incount ttotal4o_out += outcount accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading stats for user: {user}' userAudio = 0 fp = dataDir + user + '_audio.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userAudio += int(len) totalAudio += int(userAudio) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading audio stats for user: {user}' userSpeech = 0 fp = dataDir + user + '_speech.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userSpeech += int(len) totalSpeech += int(userSpeech) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading speech stats for user: {user}' result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}']) result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}']) return result def new_conversation(user): clean_up(user) return [None, [], None, None] def updatePassword(txt): password = txt.lower().strip() return [password, "*********"] # def setModel(val): # return val def chat(prompt, user_window, pwd_window, past, response, gptModel): user_window = user_window.lower().strip() isBoss = False if user_window == unames[0] and pwd_window == pwdList[0]: isBoss = True if prompt == 'stats': response = genUsageStats() list_permanent_files() return [past, response, None, gptModel] if prompt == 'reset': response = genUsageStats(True) return [past, response, None, gptModel] if prompt.startswith('gpt4'): gptModel = 'gpt-4o' prompt = prompt[5:] if prompt.startswith("clean"): user = prompt[6:] response = f'cleaned all .wav files for {user}' final_clean_up(user) return [past, response, None, gptModel] if prompt.startswith('files'): (log_cnt, wav_cnt, other_cnt, others) = list_permanent_files() response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}' return [past, response, None, gptModel] if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: past.append({"role":"user", "content":prompt}) completion = client.chat.completions.create(model=gptModel, messages=past) reply = completion.choices[0].message.content tokens_in = completion.usage.prompt_tokens tokens_out = completion.usage.completion_tokens tokens = completion.usage.total_tokens response += "\n\nYOU: " + prompt + "\nGPT: " + reply if isBoss: response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}" if tokens > 40000: response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." past.append({"role":"assistant", "content": reply}) accessOk = False for i in range(3): try: dataFile = new_func(user_window) with open(dataFile, 'a') as f: m = '4o' if 'mini' in gptModel: m = '4omini' f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') accessOk = True break except Exception as e: sleep(3) if not accessOk: response += f"\nDATA LOG FAILED, path = {dataFile}" return [past, response , None, gptModel] else: return [[], "User name and/or password are incorrect", prompt, gptModel] def new_func(user): dataFile = dataDir + user + '_log.txt' return dataFile def transcribe(user, pwd, fpath): user = user.lower().strip() pwd = pwd.lower().strip() if not (user in unames and pwd in pwdList): return 'Bad credentials' with audioread.audio_open(fpath) as audio: duration = int(audio.duration) if duration > 0: with open(dataDir + user + '_audio.txt','a') as f: f.write(f'audio:{str(duration)}\n') with open(fpath,'rb') as audio_file: transcript = client.audio.transcriptions.create( model='whisper-1', file = audio_file ,response_format = 'text' ) reply = transcript return str(reply) def pause_message(): return "Audio input is paused. Resume or Stop as desired" # def gen_output_audio(txt): # if len(txt) < 10: # txt = "This dialog is too short to mess with!" # response = client.audio.speech.create(model="tts-1", voice="fable", input=txt) # with open(speak_file, 'wb') as fp: # fp.write(response.content) # return speak_file def set_speak_button(txt): vis = False if len(txt) > 2: vis = True return gr.Button(visible=vis) def update_user(user_win): user_win = user_win.lower().strip() user = 'unknown' for s in unames: if user_win == s: user = s break return [user, user] def speech_worker(chunks=[],q=[]): for chunk in chunks: fpath = q.pop(0) response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fpath, 'wb') as fp: fp.write(response.content) def gen_speech_file_names(user, cnt): rv = [] for i in range(0, cnt): rv.append(dataDir + f'{user}_speech{i}.wav') return rv def final_clean_up(user): if user.strip().lower() == 'all': flist = glob(dataDir + '*_speech*.wav') else: flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def delete_image(user): fpath = dataDir + user + '.png' if os.path.exists(fpath): os.remove(fpath) def list_permanent_files(): flist = os.listdir(dataDir) others = [] log_cnt = 0 wav_cnt = 0 other_cnt = 0 for fpath in flist: if fpath.endswith('.txt'): log_cnt += 1 elif fpath.endswith('.wav'): wav_cnt += 1 else: others.append(fpath) other_cnt = len(others) return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others)) def make_image(prompt, user, pwd): user = user.lower().strip() msg = 'Error: unable to create image.' fpath = None if user in unames and pwd == pwdList[unames.index(user)]: if len(prompt.strip()) == 0: return [None, 'You must provide a prompt'] try: response = client.images.generate(model='dall-e-2', prompt=prompt,size='512x512', quality='standard', response_format='b64_json') image_data = response.data[0].b64_json image = Image.open(BytesIO(base64.b64decode(image_data))) fpath = dataDir + user + '.png' image.save(fpath) msg = 'Image created!' except: return [None, msg] else: msg = 'Incorrect user name or password' return [None, msg] return [fpath, msg] with gr.Blocks() as demo: history = gr.State([]) password = gr.State("") user = gr.State("unknown") model = gr.State("gpt-4o-mini") q = gr.State([]) qsave = gr.State([]) def clean_up(user): flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def initial_audio_output(txt, user): global digits global abbrevs if not user in unames: return [gr.Audio(sources=None), []] clean_up(user) q = [] if len(txt.strip()) < 5: return ['None', q] for s,x in abbrevs.items(): txt = txt.replace(s, x) words_in = txt.replace('**', '').splitlines(False) words_out = [] for s in words_in: s = s.lstrip('- *@#$%^&_=+-') if len(s) > 0: loc = s.index(' ') if loc > 1: val = s[0:loc] isnum = val.replace('.','0').isdecimal() if isnum: if val.endswith('.'): val = val[:-1].replace('.',' point ') + '., ' else: val = val.replace('.', ' point ') + ', ' s = 'num'+ val + s[loc:] words_out.append(s) chunklist = [] for chunk in words_out: if chunk.strip() == '': continue isnumbered = chunk.startswith('num') number = '' loc = 0 if isnumbered: chunk = chunk[3:] loc = chunk.index(',') number = chunk[0:loc] chunk = chunk[loc:] locs = [] for i in range(1,len(chunk)-1): (a, b, c) = chunk[i-1:i+2] if a.isdecimal() and b == '.' and c.isdecimal(): locs.append(i) for i in locs: chunk = chunk[:i] + ' point ' + chunk[i+1:] if len(chunk) > 50: finechunks = chunk.split('.') for fchunk in finechunks: if isnumbered: fchunk = number + fchunk isnumbered = False if len(fchunk) > 0: if fchunk != '"': chunklist.append(fchunk) else: line = number + chunk if line != '"': chunklist.append(line) total_speech = 0 for chunk in chunklist: total_speech += len(chunk) with open(dataDir + user + '_speech.txt','a') as f: f.write(f'speech:{str(total_speech)}\n') chunk = chunklist[0] if chunk.strip() == '': return gr.Audio(sources=None) fname_list = gen_speech_file_names(user, len(chunklist)) q = fname_list.copy() qsave = fname_list.copy() fname = q.pop(0) if len(chunklist) > 0: threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start() response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fname, 'wb') as fp: fp.write(response.content) return [fname, q] def gen_output_audio(q, user): try: fname = q.pop(0) except: final_clean_up(user) return [None, gr.Audio(sources=None)] return [fname, q] gr.Markdown('# GPT Chat') gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes. ' + 'You can enter prompts by voice. Tap "Record", speak, then tap "Stop". ' + 'Tap "Reset Voice Entry" to enter more voice. Tap "Speak Dialog" to hear dialog. ' + 'Note: first voice response may take a longer time.') with gr.Row(): user_window = gr.Textbox(label = "User Name") user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) pwd_window = gr.Textbox(label = "Password") pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window]) with gr.Row(): audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() with gr.Row(): clear_button = gr.Button(value="Restart Conversation") # gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], # value="gpt-3.5-turbo", label="GPT Model", interactive=True) button_do_image = gr.Button(value='Make Image') submit_button = gr.Button(value="Submit Prompt/Question") speak_output = gr.Button(value="Speak Dialog", visible=False) prompt_window = gr.Textbox(label = "Prompt or Question") output_window = gr.Textbox(label = "Dialog") image_window = gr.Image() submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model], outputs=[history, output_window, prompt_window, model]) clear_button.click(fn=new_conversation, inputs=user_window, outputs=[prompt_window, history, output_window, image_window]) audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], outputs=[prompt_window]) audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) reset_button.add(audio_widget) audio_out = gr.Audio(autoplay=True, visible=False) audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q]) speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q]) output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output) button_do_image.click(fn=make_image, inputs=[prompt_window,user_window, password],outputs=[image_window, output_window]) image_window.change(fn=delete_image, inputs=[user]) # demo.unload(final_clean_up(user)) demo.launch(share=True)