import os from re import L import tempfile import gradio as gr # import openai from numpy._core.defchararray import endswith, isdecimal from openai import OpenAI from dotenv import load_dotenv from pathlib import Path from time import sleep import audioread import queue import threading from glob import glob import copy load_dotenv(override=True) key = os.getenv('OPENAI_API_KEY') users = os.getenv('LOGNAME') unames = users.split(',') pwds = os.getenv('PASSWORD') pwdList = pwds.split(',') site = os.getenv('SITE') if site == 'local': dp = Path('./data') dp.mkdir(exist_ok=True) dataDir = './data/' else: dp = Path('/data') dp.mkdir(exist_ok=True) dataDir = '/data/' speak_file = dataDir + "speek.wav" client = OpenAI(api_key = key) #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} def genUsageStats(do_reset=False): result = [] ttotal4o_in = 0 ttotal4o_out = 0 ttotal4mini_in = 0 ttotal4mini_out = 0 totalAudio = 0 totalSpeech = 0 for user in unames: tokens4o_in = 0 tokens4o_out = 0 tokens4mini_in = 0 tokens4mini_out = 0 fp = dataDir + user + '_log.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (u, t) = line.split(':') (t, m) = t.split('-') (tin, tout) = t.split('/') incount = int(tin) outcount = int(tout) if 'mini' in m: tokens4mini_in += incount tokens4mini_out += outcount ttotal4mini_in += incount ttotal4mini_out += outcount else: tokens4o_in += incount tokens4o_out += outcount ttotal4o_in += incount ttotal4o_out += outcount accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading stats for user: {user}' userAudio = 0 fp = dataDir + user + '_audio.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userAudio += int(len) totalAudio += int(userAudio) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading audio stats for user: {user}' userSpeech = 0 fp = dataDir + user + '_speech.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userSpeech += int(len) totalSpeech += int(userSpeech) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading speech stats for user: {user}' result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}']) result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}']) return result def new_conversation(user): clean_up(user) return [None, [], None, []] def updatePassword(txt): return [txt.lower().strip(), "*********"] # def setModel(val): # return val def chat(prompt, user_window, pwd_window, past, response, gptModel): user_window = user_window.lower().strip() isBoss = False if user_window == unames[0] and pwd_window == pwdList[0]: isBoss = True if prompt == 'stats': response = genUsageStats() list_permanent_files() return [past, response, None, gptModel] if prompt == 'reset': response = genUsageStats(True) return [past, response, None, gptModel] if prompt.startswith('gpt4'): gptModel = 'gpt-4o' prompt = prompt[5:] if prompt.startswith("clean"): user = prompt[6:] response = f'cleaned all .wav files for {user}' final_clean_up(user) return [past, response, None, gptModel] if prompt.startswith('files'): (log_cnt, wav_cnt, other_cnt, others) = list_permanent_files() response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}' return [past, response, None, gptModel] if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: past.append({"role":"user", "content":prompt}) completion = client.chat.completions.create(model=gptModel, messages=past) reply = completion.choices[0].message.content tokens_in = completion.usage.prompt_tokens tokens_out = completion.usage.completion_tokens tokens = completion.usage.total_tokens response += "\n\nYOU: " + prompt + "\nGPT: " + reply if isBoss: response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}" if tokens > 40000: response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." past.append({"role":"assistant", "content": reply}) accessOk = False for i in range(3): try: dataFile = new_func(user_window) with open(dataFile, 'a') as f: m = '4o' if 'mini' in gptModel: m = '4omini' f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') accessOk = True break except Exception as e: sleep(3) if not accessOk: response += f"\nDATA LOG FAILED, path = {dataFile}" return [past, response , None, gptModel] else: return [[], "User name and/or password are incorrect", prompt, gptModel] def new_func(user): dataFile = dataDir + user + '_log.txt' return dataFile def transcribe(user, pwd, fpath): user = user.lower().strip() pwd = pwd.lower().strip() if not (user in unames and pwd in pwdList): return 'Bad credentials' with audioread.audio_open(fpath) as audio: duration = int(audio.duration) if duration > 0: with open(dataDir + user + '_audio.txt','a') as f: f.write(f'audio:{str(duration)}\n') with open(fpath,'rb') as audio_file: transcript = client.audio.transcriptions.create( model='whisper-1', file = audio_file ,response_format = 'text' ) reply = transcript return str(reply) def pause_message(): return "Audio input is paused. Resume or Stop as desired" # def gen_output_audio(txt): # if len(txt) < 10: # txt = "This dialog is too short to mess with!" # response = client.audio.speech.create(model="tts-1", voice="fable", input=txt) # with open(speak_file, 'wb') as fp: # fp.write(response.content) # return speak_file def set_speak_button(txt): vis = False if len(txt) > 2: vis = True return gr.Button(visible=vis) def update_user(txt): user = txt.strip().lower() return [user, user] def speech_worker(chunks=[],q=[]): for chunk in chunks: fpath = q.pop(0) response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fpath, 'wb') as fp: fp.write(response.content) def gen_speech_file_names(user, cnt): rv = [] for i in range(0, cnt): rv.append(dataDir + f'{user}_speech{i}.wav') return rv def final_clean_up(user): if user.strip().lower() == 'all': flist = glob(dataDir + '*_speech*.wav') else: flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def list_permanent_files(): flist = os.listdir(dataDir) others = [] log_cnt = 0 wav_cnt = 0 other_cnt = 0 for fpath in flist: if fpath.endswith('.txt'): log_cnt += 1 elif fpath.endswith('.wav'): wav_cnt += 1 else: others.append(fpath) other_cnt = len(others) return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others)) with gr.Blocks() as demo: history = gr.State([]) password = gr.State("") user = gr.State("unknown") model = gr.State("gpt-4o-mini") q = gr.State([]) qsave = gr.State([]) def clean_up(user): flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def initial_audio_output(txt, user): global digits global abbrevs if not user in unames: return [gr.Audio(sources=None), []] clean_up(user) q = [] if len(txt.strip()) < 5: return ['None', q] for s,x in abbrevs.items(): txt = txt.replace(s, x) words_in = txt.replace('**', '').splitlines(False) words_out = [] for s in words_in: s = s.lstrip('- *@#$%^&_=+-') if len(s) > 0: loc = s.index(' ') if loc > 1: val = s[0:loc] isnum = val.replace('.','0').isdecimal() if isnum: if val.endswith('.'): val = val[:-1].replace('.',' point ') + '., ' else: val = val.replace('.', ' point ') + ', ' s = 'num'+ val + s[loc:] words_out.append(s) chunklist = [] for chunk in words_out: if chunk.strip() == '': continue isnumbered = chunk.startswith('num') number = '' loc = 0 if isnumbered: chunk = chunk[3:] loc = chunk.index(',') number = chunk[0:loc] chunk = chunk[loc:] locs = [] for i in range(1,len(chunk)-1): (a, b, c) = chunk[i-1:i+2] if a.isdecimal() and b == '.' and c.isdecimal(): locs.append(i) for i in locs: chunk = chunk[:i] + ' point ' + chunk[i+1:] if len(chunk) > 50: finechunks = chunk.split('.') for fchunk in finechunks: if isnumbered: fchunk = number + fchunk isnumbered = False if len(fchunk) > 0: if fchunk != '"': chunklist.append(fchunk) else: line = number + chunk if line != '"': chunklist.append(line) total_speech = 0 for chunk in chunklist: total_speech += len(chunk) with open(dataDir + user + '_speech.txt','a') as f: f.write(f'speech:{str(total_speech)}\n') chunk = chunklist[0] if chunk.strip() == '': return gr.Audio(sources=None) fname_list = gen_speech_file_names(user, len(chunklist)) q = fname_list.copy() qsave = fname_list.copy() fname = q.pop(0) if len(chunklist) > 0: threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start() response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fname, 'wb') as fp: fp.write(response.content) return [fname, q] def gen_output_audio(q, user): try: fname = q.pop(0) except: final_clean_up(user) return [None, gr.Audio(sources=None)] return [fname, q] gr.Markdown('# GPT Chat') gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes. ' + 'You can enter prompts by voice. Tap "Record", speak, then tap "Stop". ' + 'Tap "Reset Voice Entry" to enter more voice. Tap "Speak Dialog" to hear dialog. ' + 'Note: first voice response may take a longer time.') with gr.Row(): user_window = gr.Textbox(label = "User Name") user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) pwd_window = gr.Textbox(label = "Password") pwd_window.blur(updatePassword, pwd_window, [password, pwd_window]) with gr.Row(): audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() with gr.Row(): clear_button = gr.Button(value="Restart Conversation") # gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], # value="gpt-3.5-turbo", label="GPT Model", interactive=True) submit_button = gr.Button(value="Submit Prompt/Question") speak_output = gr.Button(value="Speak Dialog", visible=False) prompt_window = gr.Textbox(label = "Prompt or Question") output_window = gr.Textbox(label = "Dialog") submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model], outputs=[history, output_window, prompt_window, model]) clear_button.click(fn=new_conversation, inputs=user_window, outputs=[prompt_window, history, output_window]) audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], outputs=[prompt_window]) audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) reset_button.add(audio_widget) audio_out = gr.Audio(autoplay=True, visible=False) audio_out.stop(fn=gen_output_audio, inputs=[q, user], outputs = [audio_out, q]) speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q]) output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output) # demo.unload(final_clean_up(user)) demo.launch(share=True)