GradioTest / app.py
dlflannery's picture
Update app.py
839c46d verified
raw
history blame
16.1 kB
import os
import gradio as gr
# import openai
from numpy._core.defchararray import endswith, isdecimal
from openai import OpenAI
from dotenv import load_dotenv
from pathlib import Path
from time import sleep
import audioread
import queue
import threading
from glob import glob
import copy
load_dotenv(override=True)
key = os.getenv('OPENAI_API_KEY')
users = os.getenv('LOGNAME')
unames = users.split(',')
pwds = os.getenv('PASSWORD')
pwdList = pwds.split(',')
site = os.getenv('SITE')
if site == 'local':
dp = Path('./data')
dp.mkdir(exist_ok=True)
dataDir = './data/'
else:
dp = Path('/data')
dp.mkdir(exist_ok=True)
dataDir = '/data/'
speak_file = dataDir + "speek.wav"
client = OpenAI(api_key = key)
#digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: ']
abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '}
def genUsageStats(do_reset=False):
result = []
ttotal4o_in = 0
ttotal4o_out = 0
ttotal4mini_in = 0
ttotal4mini_out = 0
totalAudio = 0
totalSpeech = 0
for user in unames:
tokens4o_in = 0
tokens4o_out = 0
tokens4mini_in = 0
tokens4mini_out = 0
fp = dataDir + user + '_log.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(u, t) = line.split(':')
(t, m) = t.split('-')
(tin, tout) = t.split('/')
incount = int(tin)
outcount = int(tout)
if 'mini' in m:
tokens4mini_in += incount
tokens4mini_out += outcount
ttotal4mini_in += incount
ttotal4mini_out += outcount
else:
tokens4o_in += incount
tokens4o_out += outcount
ttotal4o_in += incount
ttotal4o_out += outcount
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading stats for user: {user}'
userAudio = 0
fp = dataDir + user + '_audio.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userAudio += int(len)
totalAudio += int(userAudio)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading audio stats for user: {user}'
userSpeech = 0
fp = dataDir + user + '_speech.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userSpeech += int(len)
totalSpeech += int(userSpeech)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading speech stats for user: {user}'
result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}'])
result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}'])
return result
def new_conversation(user):
clean_up(user)
return [None, [], None, []]
def updatePassword(txt):
password = txt.lower().strip()
return [password, "*********"]
# def setModel(val):
# return val
def chat(prompt, user_window, pwd_window, past, response, gptModel):
user_window = user_window.lower().strip()
isBoss = False
if user_window == unames[0] and pwd_window == pwdList[0]:
isBoss = True
if prompt == 'stats':
response = genUsageStats()
list_permanent_files()
return [past, response, None, gptModel]
if prompt == 'reset':
response = genUsageStats(True)
return [past, response, None, gptModel]
if prompt.startswith('gpt4'):
gptModel = 'gpt-4o'
prompt = prompt[5:]
if prompt.startswith("clean"):
user = prompt[6:]
response = f'cleaned all .wav files for {user}'
final_clean_up(user)
return [past, response, None, gptModel]
if prompt.startswith('files'):
(log_cnt, wav_cnt, other_cnt, others) = list_permanent_files()
response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}'
return [past, response, None, gptModel]
if user_window in unames and pwd_window == pwdList[unames.index(user_window)]:
past.append({"role":"user", "content":prompt})
completion = client.chat.completions.create(model=gptModel,
messages=past)
reply = completion.choices[0].message.content
tokens_in = completion.usage.prompt_tokens
tokens_out = completion.usage.completion_tokens
tokens = completion.usage.total_tokens
response += "\n\nYOU: " + prompt + "\nGPT: " + reply
if isBoss:
response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}"
if tokens > 40000:
response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON."
past.append({"role":"assistant", "content": reply})
accessOk = False
for i in range(3):
try:
dataFile = new_func(user_window)
with open(dataFile, 'a') as f:
m = '4o'
if 'mini' in gptModel:
m = '4omini'
f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n')
accessOk = True
break
except Exception as e:
sleep(3)
if not accessOk:
response += f"\nDATA LOG FAILED, path = {dataFile}"
return [past, response , None, gptModel]
else:
return [[], "User name and/or password are incorrect", prompt, gptModel]
def new_func(user):
dataFile = dataDir + user + '_log.txt'
return dataFile
def transcribe(user, pwd, fpath):
user = user.lower().strip()
pwd = pwd.lower().strip()
if not (user in unames and pwd in pwdList):
return 'Bad credentials'
with audioread.audio_open(fpath) as audio:
duration = int(audio.duration)
if duration > 0:
with open(dataDir + user + '_audio.txt','a') as f:
f.write(f'audio:{str(duration)}\n')
with open(fpath,'rb') as audio_file:
transcript = client.audio.transcriptions.create(
model='whisper-1', file = audio_file ,response_format = 'text' )
reply = transcript
return str(reply)
def pause_message():
return "Audio input is paused. Resume or Stop as desired"
# def gen_output_audio(txt):
# if len(txt) < 10:
# txt = "This dialog is too short to mess with!"
# response = client.audio.speech.create(model="tts-1", voice="fable", input=txt)
# with open(speak_file, 'wb') as fp:
# fp.write(response.content)
# return speak_file
def set_speak_button(txt):
vis = False
if len(txt) > 2:
vis = True
return gr.Button(visible=vis)
def update_user(user_win):
user_win = user_win.lower().strip()
user = 'unknown'
for s in unames:
if user_win == s:
user = s
break
return [user, user]
def speech_worker(chunks=[],q=[]):
for chunk in chunks:
fpath = q.pop(0)
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav')
with open(fpath, 'wb') as fp:
fp.write(response.content)
def gen_speech_file_names(user, cnt):
rv = []
for i in range(0, cnt):
rv.append(dataDir + f'{user}_speech{i}.wav')
return rv
def final_clean_up(user):
if user.strip().lower() == 'all':
flist = glob(dataDir + '*_speech*.wav')
else:
flist = glob(dataDir + f'{user}_speech*.wav')
for fpath in flist:
try:
os.remove(fpath)
except:
continue
def list_permanent_files():
flist = os.listdir(dataDir)
others = []
log_cnt = 0
wav_cnt = 0
other_cnt = 0
for fpath in flist:
if fpath.endswith('.txt'):
log_cnt += 1
elif fpath.endswith('.wav'):
wav_cnt += 1
else:
others.append(fpath)
other_cnt = len(others)
return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others))
with gr.Blocks() as demo:
history = gr.State([])
password = gr.State("")
user = gr.State("unknown")
model = gr.State("gpt-4o-mini")
q = gr.State([])
qsave = gr.State([])
def clean_up(user):
flist = glob(dataDir + f'{user}_speech*.wav')
for fpath in flist:
try:
os.remove(fpath)
except:
continue
def initial_audio_output(txt, user):
global digits
global abbrevs
if not user in unames:
return [gr.Audio(sources=None), []]
clean_up(user)
q = []
if len(txt.strip()) < 5:
return ['None', q]
for s,x in abbrevs.items():
txt = txt.replace(s, x)
words_in = txt.replace('**', '').splitlines(False)
words_out = []
for s in words_in:
s = s.lstrip('- *@#$%^&_=+-')
if len(s) > 0:
loc = s.index(' ')
if loc > 1:
val = s[0:loc]
isnum = val.replace('.','0').isdecimal()
if isnum:
if val.endswith('.'):
val = val[:-1].replace('.',' point ') + '., '
else:
val = val.replace('.', ' point ') + ', '
s = 'num'+ val + s[loc:]
words_out.append(s)
chunklist = []
for chunk in words_out:
if chunk.strip() == '':
continue
isnumbered = chunk.startswith('num')
number = ''
loc = 0
if isnumbered:
chunk = chunk[3:]
loc = chunk.index(',')
number = chunk[0:loc]
chunk = chunk[loc:]
locs = []
for i in range(1,len(chunk)-1):
(a, b, c) = chunk[i-1:i+2]
if a.isdecimal() and b == '.' and c.isdecimal():
locs.append(i)
for i in locs:
chunk = chunk[:i] + ' point ' + chunk[i+1:]
if len(chunk) > 50:
finechunks = chunk.split('.')
for fchunk in finechunks:
if isnumbered:
fchunk = number + fchunk
isnumbered = False
if len(fchunk) > 0:
if fchunk != '"':
chunklist.append(fchunk)
else:
line = number + chunk
if line != '"':
chunklist.append(line)
total_speech = 0
for chunk in chunklist:
total_speech += len(chunk)
with open(dataDir + user + '_speech.txt','a') as f:
f.write(f'speech:{str(total_speech)}\n')
chunk = chunklist[0]
if chunk.strip() == '':
return gr.Audio(sources=None)
fname_list = gen_speech_file_names(user, len(chunklist))
q = fname_list.copy()
qsave = fname_list.copy()
fname = q.pop(0)
if len(chunklist) > 0:
threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start()
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav')
with open(fname, 'wb') as fp:
fp.write(response.content)
return [fname, q]
def gen_output_audio(q, user):
try:
fname = q.pop(0)
except:
final_clean_up(user)
return [None, gr.Audio(sources=None)]
return [fname, q]
gr.Markdown('# GPT Chat')
gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes. ' +
'You can enter prompts by voice. Tap "Record", speak, then tap "Stop". ' +
'Tap "Reset Voice Entry" to enter more voice. Tap "Speak Dialog" to hear dialog. ' +
'Note: first voice response may take a longer time.')
with gr.Row():
user_window = gr.Textbox(label = "User Name")
user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window])
pwd_window = gr.Textbox(label = "Password")
pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window])
with gr.Row():
audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions(
show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120)
reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1()
with gr.Row():
clear_button = gr.Button(value="Restart Conversation")
# gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")],
# value="gpt-3.5-turbo", label="GPT Model", interactive=True)
submit_button = gr.Button(value="Submit Prompt/Question")
speak_output = gr.Button(value="Speak Dialog", visible=False)
prompt_window = gr.Textbox(label = "Prompt or Question")
output_window = gr.Textbox(label = "Dialog")
submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model],
outputs=[history, output_window, prompt_window, model])
clear_button.click(fn=new_conversation, inputs=user_window, outputs=[prompt_window, history, output_window])
audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget],
outputs=[prompt_window])
audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window])
reset_button.add(audio_widget)
audio_out = gr.Audio(autoplay=True, visible=False)
audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q])
speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q])
output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output)
# demo.unload(final_clean_up(user))
demo.launch(share=True)