import os import gradio as gr # import openai from numpy._core.defchararray import endswith, isdecimal from openai import OpenAI from dotenv import load_dotenv from pathlib import Path from time import sleep import audioread import queue import threading from glob import glob import copy import base64 import json from PIL import Image from io import BytesIO load_dotenv(override=True) key = os.getenv('OPENAI_API_KEY') users = os.getenv('LOGNAME') unames = users.split(',') pwds = os.getenv('PASSWORD') pwdList = pwds.split(',') site = os.getenv('SITE') if site == 'local': dp = Path('./data') dp.mkdir(exist_ok=True) dataDir = './data/' else: dp = Path('/data') dp.mkdir(exist_ok=True) dataDir = '/data/' speak_file = dataDir + "speek.wav" client = OpenAI(api_key = key) #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} def genUsageStats(do_reset=False): result = [] ttotal4o_in = 0 ttotal4o_out = 0 ttotal4mini_in = 0 ttotal4mini_out = 0 totalAudio = 0 totalSpeech = 0 totalImages = 0 for user in unames: tokens4o_in = 0 tokens4o_out = 0 tokens4mini_in = 0 tokens4mini_out = 0 fp = dataDir + user + '_log.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (u, t) = line.split(':') (t, m) = t.split('-') (tin, tout) = t.split('/') incount = int(tin) outcount = int(tout) if 'mini' in m: tokens4mini_in += incount tokens4mini_out += outcount ttotal4mini_in += incount ttotal4mini_out += outcount else: tokens4o_in += incount tokens4o_out += outcount ttotal4o_in += incount ttotal4o_out += outcount accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading stats for user: {user}' userAudio = 0 fp = dataDir + user + '_audio.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userAudio += int(len) totalAudio += int(userAudio) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading audio stats for user: {user}' userSpeech = 0 fp = dataDir + user + '_speech.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userSpeech += int(len) totalSpeech += int(userSpeech) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading speech stats for user: {user}' user_images = 0 fp = image_count_path(user) if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: cnt = line.strip() user_images += int(cnt) totalImages += int(user_images) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading image gen stats for user: {user}' result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}', f'images:{user_images}']) result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}', f'images:{totalImages}']) return result def new_conversation(user): clean_up(user) return [None, [], None, None] def updatePassword(txt): password = txt.lower().strip() return [password, "*********"] # def setModel(val): # return val def chat(prompt, user_window, pwd_window, past, response, gptModel): user_window = user_window.lower().strip() isBoss = False if user_window == unames[0] and pwd_window == pwdList[0]: isBoss = True if prompt == 'stats': response = genUsageStats() list_permanent_files() return [past, response, None, gptModel] if prompt == 'reset': response = genUsageStats(True) return [past, response, None, gptModel] if prompt.startswith('gpt4'): gptModel = 'gpt-4o' prompt = prompt[5:] if prompt.startswith("clean"): user = prompt[6:] response = f'cleaned all .wav files for {user}' final_clean_up(user) return [past, response, None, gptModel] if prompt.startswith('files'): (log_cnt, wav_cnt, other_cnt, others) = list_permanent_files() response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}' return [past, response, None, gptModel] if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: past.append({"role":"user", "content":prompt}) completion =, messages=past) reply = completion.choices[0].message.content tokens_in = completion.usage.prompt_tokens tokens_out = completion.usage.completion_tokens tokens = completion.usage.total_tokens response += "\n\nYOU: " + prompt + "\nGPT: " + reply if isBoss: response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}" if tokens > 40000: response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." past.append({"role":"assistant", "content": reply}) accessOk = False for i in range(3): try: dataFile = new_func(user_window) with open(dataFile, 'a') as f: m = '4o' if 'mini' in gptModel: m = '4omini' f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') accessOk = True break except Exception as e: sleep(3) if not accessOk: response += f"\nDATA LOG FAILED, path = {dataFile}" return [past, response , None, gptModel] else: return [[], "User name and/or password are incorrect", prompt, gptModel] def new_func(user): dataFile = dataDir + user + '_log.txt' return dataFile def image_count_path(user): fpath = dataDir + user + '_image_count.txt' return fpath def transcribe(user, pwd, fpath): user = user.lower().strip() pwd = pwd.lower().strip() if not (user in unames and pwd in pwdList): return 'Bad credentials' with audioread.audio_open(fpath) as audio: duration = int(audio.duration) if duration > 0: with open(dataDir + user + '_audio.txt','a') as f: f.write(f'audio:{str(duration)}\n') with open(fpath,'rb') as audio_file: transcript = model='whisper-1', file = audio_file ,response_format = 'text' ) reply = transcript return str(reply) def pause_message(): return "Audio input is paused. Resume or Stop as desired" # def gen_output_audio(txt): # if len(txt) < 10: # txt = "This dialog is too short to mess with!" # response ="tts-1", voice="fable", input=txt) # with open(speak_file, 'wb') as fp: # fp.write(response.content) # return speak_file def set_speak_button(txt): vis = False if len(txt) > 2: vis = True return gr.Button(visible=vis) def update_user(user_win): user_win = user_win.lower().strip() user = 'unknown' for s in unames: if user_win == s: user = s break return [user, user] def speech_worker(chunks=[],q=[]): for chunk in chunks: fpath = q.pop(0) response ="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fpath, 'wb') as fp: fp.write(response.content) def gen_speech_file_names(user, cnt): rv = [] for i in range(0, cnt): rv.append(dataDir + f'{user}_speech{i}.wav') return rv def final_clean_up(user): if user.strip().lower() == 'all': flist = glob(dataDir + '*_speech*.wav') else: flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def delete_image(user): fpath = dataDir + user + '.png' if os.path.exists(fpath): os.remove(fpath) def list_permanent_files(): flist = os.listdir(dataDir) others = [] log_cnt = 0 wav_cnt = 0 other_cnt = 0 for fpath in flist: if fpath.endswith('.txt'): log_cnt += 1 elif fpath.endswith('.wav'): wav_cnt += 1 else: others.append(fpath) other_cnt = len(others) return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others)) def make_image(prompt, user, pwd): user = user.lower().strip() msg = 'Error: unable to create image.' fpath = None if user in unames and pwd == pwdList[unames.index(user)]: if len(prompt.strip()) == 0: return [None, 'You must provide a prompt describing image you desire'] try: response = client.images.generate(model='dall-e-2', prompt=prompt,size='512x512', quality='standard', response_format='b64_json') image_data =[0].b64_json image = fpath = dataDir + user + '.png' with open(image_count_path(user), 'at') as fp: fp.write('1\n') msg = 'Image created!' except: return [None, msg] else: msg = 'Incorrect user name or password' return [None, msg] return [fpath, msg] def show_help(): return ''' 1. Login with user name and password (not case-sensitive) 2. Type prompts (questions, instructions) into prompt window (OR) you can speak prompts by tapping the audio "Record" button, saying your prompt, then tapping the "Stop" button. Your prompt will appear in the Prompt window, and you can edit it there if needed. 3. Chat: 1.1 tap the "Submit Prompt/Question" button. The response will appear in the Dialog window. 1.2 To speak the response, tap the "Speak Dialog" button. 1.3 Enter follow-up questions in the Prompt window either by typing or speaking. Tap the voice entry "Reset Voice Entry" button to enable additional voice entry. Then tap "Submit Prompt/Question". 1.4 If topic changes or when done chatting, tap the "Restart Conversation" button. 4. Make Image: 1.1 Enter description of desired image in prompt window via either typing or voice entry 1.2 Tap the "Make Image" button. This can take a few seconds. 1.3 There is a download button on the image display if your system supports file downloads. 1.4 When done viewing image, tap the "Restart Conversation" button Hints: 1. Better chat and image results are obtained by including detailed descriptions and instructions in the prompt. 2. Always tap "Restart Conversation" before requesting an image or changing chat topics. 3. Audio input and output functions depend on the hardware capability of your device.''' with gr.Blocks() as demo: history = gr.State([]) password = gr.State("") user = gr.State("unknown") model = gr.State("gpt-4o-mini") q = gr.State([]) qsave = gr.State([]) def clean_up(user): flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def initial_audio_output(txt, user): global digits global abbrevs if not user in unames: return [gr.Audio(sources=None), []] clean_up(user) q = [] if len(txt.strip()) < 5: return ['None', q] for s,x in abbrevs.items(): txt = txt.replace(s, x) words_in = txt.replace('**', '').splitlines(False) words_out = [] for s in words_in: s = s.lstrip('- *@#$%^&_=+-') if len(s) > 0: loc = s.index(' ') if loc > 1: val = s[0:loc] isnum = val.replace('.','0').isdecimal() if isnum: if val.endswith('.'): val = val[:-1].replace('.',' point ') + '., ' else: val = val.replace('.', ' point ') + ', ' s = 'num'+ val + s[loc:] words_out.append(s) chunklist = [] for chunk in words_out: if chunk.strip() == '': continue isnumbered = chunk.startswith('num') number = '' loc = 0 if isnumbered: chunk = chunk[3:] loc = chunk.index(',') number = chunk[0:loc] chunk = chunk[loc:] locs = [] for i in range(1,len(chunk)-1): (a, b, c) = chunk[i-1:i+2] if a.isdecimal() and b == '.' and c.isdecimal(): locs.append(i) for i in locs: chunk = chunk[:i] + ' point ' + chunk[i+1:] if len(chunk) > 50: finechunks = chunk.split('.') for fchunk in finechunks: if isnumbered: fchunk = number + fchunk isnumbered = False if len(fchunk) > 0: if fchunk != '"': chunklist.append(fchunk) else: line = number + chunk if line != '"': chunklist.append(line) total_speech = 0 for chunk in chunklist: total_speech += len(chunk) with open(dataDir + user + '_speech.txt','a') as f: f.write(f'speech:{str(total_speech)}\n') chunk = chunklist[0] if chunk.strip() == '': return gr.Audio(sources=None) fname_list = gen_speech_file_names(user, len(chunklist)) q = fname_list.copy() qsave = fname_list.copy() fname = q.pop(0) if len(chunklist) > 0: threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start() response ="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fname, 'wb') as fp: fp.write(response.content) return [fname, q] def gen_output_audio(q, user): try: fname = q.pop(0) except: final_clean_up(user) return [None, gr.Audio(sources=None)] return [fname, q] gr.Markdown('# GPT Chat') gr.Markdown('Enter user name & password. Tap "Help & Hints" button for more instructions.') with gr.Row(): user_window = gr.Textbox(label = "User Name") user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) pwd_window = gr.Textbox(label = "Password") pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window]) help_button = gr.Button(value='Help & Hints') with gr.Row(): audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() with gr.Row(): clear_button = gr.Button(value="Restart Conversation") # gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], # value="gpt-3.5-turbo", label="GPT Model", interactive=True) button_do_image = gr.Button(value='Make Image') submit_button = gr.Button(value="Submit Prompt/Question") speak_output = gr.Button(value="Speak Dialog", visible=False) prompt_window = gr.Textbox(label = "Prompt or Question") output_window = gr.Textbox(label = "Dialog") image_window = gr.Image(), inputs=[prompt_window, user_window, password, history, output_window, model], outputs=[history, output_window, prompt_window, model]), inputs=user_window, outputs=[prompt_window, history, output_window, image_window]) audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], outputs=[prompt_window]) audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) reset_button.add(audio_widget) audio_out = gr.Audio(autoplay=True, visible=False) audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q]), inputs=[output_window, user_window], outputs=[audio_out, q]) output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output), inputs=[prompt_window,user_window, password],outputs=[image_window, output_window]) image_window.change(fn=delete_image, inputs=[user]), outputs=output_window) # demo.unload(final_clean_up(user)) demo.launch(share=True)