GradioTest / app.py
dlflannery's picture
Update app.py
0ac5c98 verified
raw
history blame
17.4 kB
import os
import gradio as gr
# import openai
from numpy._core.defchararray import endswith, isdecimal
from openai import OpenAI
from dotenv import load_dotenv
from pathlib import Path
from time import sleep
import audioread
import queue
import threading
from glob import glob
import copy
import base64
import json
from PIL import Image
from io import BytesIO
load_dotenv(override=True)
key = os.getenv('OPENAI_API_KEY')
users = os.getenv('LOGNAME')
unames = users.split(',')
pwds = os.getenv('PASSWORD')
pwdList = pwds.split(',')
site = os.getenv('SITE')
if site == 'local':
dp = Path('./data')
dp.mkdir(exist_ok=True)
dataDir = './data/'
else:
dp = Path('/data')
dp.mkdir(exist_ok=True)
dataDir = '/data/'
speak_file = dataDir + "speek.wav"
client = OpenAI(api_key = key)
#digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: ']
abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '}
def genUsageStats(do_reset=False):
result = []
ttotal4o_in = 0
ttotal4o_out = 0
ttotal4mini_in = 0
ttotal4mini_out = 0
totalAudio = 0
totalSpeech = 0
for user in unames:
tokens4o_in = 0
tokens4o_out = 0
tokens4mini_in = 0
tokens4mini_out = 0
fp = dataDir + user + '_log.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(u, t) = line.split(':')
(t, m) = t.split('-')
(tin, tout) = t.split('/')
incount = int(tin)
outcount = int(tout)
if 'mini' in m:
tokens4mini_in += incount
tokens4mini_out += outcount
ttotal4mini_in += incount
ttotal4mini_out += outcount
else:
tokens4o_in += incount
tokens4o_out += outcount
ttotal4o_in += incount
ttotal4o_out += outcount
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading stats for user: {user}'
userAudio = 0
fp = dataDir + user + '_audio.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userAudio += int(len)
totalAudio += int(userAudio)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading audio stats for user: {user}'
userSpeech = 0
fp = dataDir + user + '_speech.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userSpeech += int(len)
totalSpeech += int(userSpeech)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading speech stats for user: {user}'
result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}'])
result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}'])
return result
def new_conversation(user):
clean_up(user)
return [None, [], None, None]
def updatePassword(txt):
password = txt.lower().strip()
return [password, "*********"]
# def setModel(val):
# return val
def chat(prompt, user_window, pwd_window, past, response, gptModel):
user_window = user_window.lower().strip()
isBoss = False
if user_window == unames[0] and pwd_window == pwdList[0]:
isBoss = True
if prompt == 'stats':
response = genUsageStats()
list_permanent_files()
return [past, response, None, gptModel]
if prompt == 'reset':
response = genUsageStats(True)
return [past, response, None, gptModel]
if prompt.startswith('gpt4'):
gptModel = 'gpt-4o'
prompt = prompt[5:]
if prompt.startswith("clean"):
user = prompt[6:]
response = f'cleaned all .wav files for {user}'
final_clean_up(user)
return [past, response, None, gptModel]
if prompt.startswith('files'):
(log_cnt, wav_cnt, other_cnt, others) = list_permanent_files()
response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}'
return [past, response, None, gptModel]
if user_window in unames and pwd_window == pwdList[unames.index(user_window)]:
past.append({"role":"user", "content":prompt})
completion = client.chat.completions.create(model=gptModel,
messages=past)
reply = completion.choices[0].message.content
tokens_in = completion.usage.prompt_tokens
tokens_out = completion.usage.completion_tokens
tokens = completion.usage.total_tokens
response += "\n\nYOU: " + prompt + "\nGPT: " + reply
if isBoss:
response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}"
if tokens > 40000:
response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON."
past.append({"role":"assistant", "content": reply})
accessOk = False
for i in range(3):
try:
dataFile = new_func(user_window)
with open(dataFile, 'a') as f:
m = '4o'
if 'mini' in gptModel:
m = '4omini'
f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n')
accessOk = True
break
except Exception as e:
sleep(3)
if not accessOk:
response += f"\nDATA LOG FAILED, path = {dataFile}"
return [past, response , None, gptModel]
else:
return [[], "User name and/or password are incorrect", prompt, gptModel]
def new_func(user):
dataFile = dataDir + user + '_log.txt'
return dataFile
def transcribe(user, pwd, fpath):
user = user.lower().strip()
pwd = pwd.lower().strip()
if not (user in unames and pwd in pwdList):
return 'Bad credentials'
with audioread.audio_open(fpath) as audio:
duration = int(audio.duration)
if duration > 0:
with open(dataDir + user + '_audio.txt','a') as f:
f.write(f'audio:{str(duration)}\n')
with open(fpath,'rb') as audio_file:
transcript = client.audio.transcriptions.create(
model='whisper-1', file = audio_file ,response_format = 'text' )
reply = transcript
return str(reply)
def pause_message():
return "Audio input is paused. Resume or Stop as desired"
# def gen_output_audio(txt):
# if len(txt) < 10:
# txt = "This dialog is too short to mess with!"
# response = client.audio.speech.create(model="tts-1", voice="fable", input=txt)
# with open(speak_file, 'wb') as fp:
# fp.write(response.content)
# return speak_file
def set_speak_button(txt):
vis = False
if len(txt) > 2:
vis = True
return gr.Button(visible=vis)
def update_user(user_win):
user_win = user_win.lower().strip()
user = 'unknown'
for s in unames:
if user_win == s:
user = s
break
return [user, user]
def speech_worker(chunks=[],q=[]):
for chunk in chunks:
fpath = q.pop(0)
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav')
with open(fpath, 'wb') as fp:
fp.write(response.content)
def gen_speech_file_names(user, cnt):
rv = []
for i in range(0, cnt):
rv.append(dataDir + f'{user}_speech{i}.wav')
return rv
def final_clean_up(user):
if user.strip().lower() == 'all':
flist = glob(dataDir + '*_speech*.wav')
else:
flist = glob(dataDir + f'{user}_speech*.wav')
for fpath in flist:
try:
os.remove(fpath)
except:
continue
def delete_image(user):
fpath = dataDir + user + '.png'
if os.path.exists(fpath):
os.remove(fpath)
def list_permanent_files():
flist = os.listdir(dataDir)
others = []
log_cnt = 0
wav_cnt = 0
other_cnt = 0
for fpath in flist:
if fpath.endswith('.txt'):
log_cnt += 1
elif fpath.endswith('.wav'):
wav_cnt += 1
else:
others.append(fpath)
other_cnt = len(others)
return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others))
def make_image(prompt, user, pwd):
user = user.lower().strip()
msg = 'Error: unable to create image.'
fpath = None
if user in unames and pwd == pwdList[unames.index(user)]:
if len(prompt.strip()) == 0:
return [None, 'You must provide a prompt']
try:
response = client.images.generate(model='dall-e-2', prompt=prompt,size='512x512',
quality='standard', response_format='b64_json')
image_data = response.data[0].b64_json
image = Image.open(BytesIO(base64.b64decode(image_data)))
fpath = dataDir + user + '.png'
image.save(fpath)
msg = 'Image created!'
except:
return [None, msg]
else:
msg = 'Incorrect user name or password'
return [None, msg]
return [fpath, msg]
with gr.Blocks() as demo:
history = gr.State([])
password = gr.State("")
user = gr.State("unknown")
model = gr.State("gpt-4o-mini")
q = gr.State([])
qsave = gr.State([])
def clean_up(user):
flist = glob(dataDir + f'{user}_speech*.wav')
for fpath in flist:
try:
os.remove(fpath)
except:
continue
def initial_audio_output(txt, user):
global digits
global abbrevs
if not user in unames:
return [gr.Audio(sources=None), []]
clean_up(user)
q = []
if len(txt.strip()) < 5:
return ['None', q]
for s,x in abbrevs.items():
txt = txt.replace(s, x)
words_in = txt.replace('**', '').splitlines(False)
words_out = []
for s in words_in:
s = s.lstrip('- *@#$%^&_=+-')
if len(s) > 0:
loc = s.index(' ')
if loc > 1:
val = s[0:loc]
isnum = val.replace('.','0').isdecimal()
if isnum:
if val.endswith('.'):
val = val[:-1].replace('.',' point ') + '., '
else:
val = val.replace('.', ' point ') + ', '
s = 'num'+ val + s[loc:]
words_out.append(s)
chunklist = []
for chunk in words_out:
if chunk.strip() == '':
continue
isnumbered = chunk.startswith('num')
number = ''
loc = 0
if isnumbered:
chunk = chunk[3:]
loc = chunk.index(',')
number = chunk[0:loc]
chunk = chunk[loc:]
locs = []
for i in range(1,len(chunk)-1):
(a, b, c) = chunk[i-1:i+2]
if a.isdecimal() and b == '.' and c.isdecimal():
locs.append(i)
for i in locs:
chunk = chunk[:i] + ' point ' + chunk[i+1:]
if len(chunk) > 50:
finechunks = chunk.split('.')
for fchunk in finechunks:
if isnumbered:
fchunk = number + fchunk
isnumbered = False
if len(fchunk) > 0:
if fchunk != '"':
chunklist.append(fchunk)
else:
line = number + chunk
if line != '"':
chunklist.append(line)
total_speech = 0
for chunk in chunklist:
total_speech += len(chunk)
with open(dataDir + user + '_speech.txt','a') as f:
f.write(f'speech:{str(total_speech)}\n')
chunk = chunklist[0]
if chunk.strip() == '':
return gr.Audio(sources=None)
fname_list = gen_speech_file_names(user, len(chunklist))
q = fname_list.copy()
qsave = fname_list.copy()
fname = q.pop(0)
if len(chunklist) > 0:
threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],fname_list[1:])).start()
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav')
with open(fname, 'wb') as fp:
fp.write(response.content)
return [fname, q]
def gen_output_audio(q, user):
try:
fname = q.pop(0)
except:
final_clean_up(user)
return [None, gr.Audio(sources=None)]
return [fname, q]
gr.Markdown('# GPT Chat')
gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes. ' +
'You can enter prompts by voice. Tap "Record", speak, then tap "Stop". ' +
'Tap "Reset Voice Entry" to enter more voice. Tap "Speak Dialog" to hear dialog. ' +
'Note: first voice response may take a longer time.')
with gr.Row():
user_window = gr.Textbox(label = "User Name")
user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window])
pwd_window = gr.Textbox(label = "Password")
pwd_window.blur(updatePassword, inputs = pwd_window, outputs = [password, pwd_window])
with gr.Row():
audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions(
show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120)
reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1()
with gr.Row():
clear_button = gr.Button(value="Restart Conversation")
# gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")],
# value="gpt-3.5-turbo", label="GPT Model", interactive=True)
button_do_image = gr.Button(value='Make Image')
submit_button = gr.Button(value="Submit Prompt/Question")
speak_output = gr.Button(value="Speak Dialog", visible=False)
prompt_window = gr.Textbox(label = "Prompt or Question")
output_window = gr.Textbox(label = "Dialog")
image_window = gr.Image()
submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model],
outputs=[history, output_window, prompt_window, model])
clear_button.click(fn=new_conversation, inputs=user_window, outputs=[prompt_window, history, output_window, image_window])
audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget],
outputs=[prompt_window])
audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window])
reset_button.add(audio_widget)
audio_out = gr.Audio(autoplay=True, visible=False)
audio_out.stop(fn=gen_output_audio, inputs=[q, user_window], outputs = [audio_out, q])
speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=[audio_out, q])
output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output)
button_do_image.click(fn=make_image, inputs=[prompt_window,user_window, password],outputs=[image_window, output_window])
image_window.change(fn=delete_image, inputs=[user])
# demo.unload(final_clean_up(user))
demo.launch(share=True)