Eric Michael Martinez
update
ea6f2ee
from httpx import AsyncClient
import os
import requests
import gradio as gr
from fastapi import Depends, FastAPI, Request
from app.db import User, create_db_and_tables
from app.schemas import UserCreate, UserRead, UserUpdate
from app.users import auth_backend, current_active_user, fastapi_users
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_exponential
import examples as chatbot_examples
# Get the current environment from the environment variable
current_environment = os.getenv("APP_ENV", "dev")
# Load the appropriate .env file based on the current environment
if current_environment == "dev":
load_dotenv(".env.dev")
elif current_environment == "test":
load_dotenv(".env.test")
elif current_environment == "prod":
load_dotenv(".env.prod")
else:
raise ValueError("Invalid environment specified")
import openai
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
def api_login(email, password):
port = os.getenv("APP_PORT")
scheme = os.getenv("APP_SCHEME")
host = os.getenv("APP_HOST")
url = f"{scheme}://{host}:{port}/auth/jwt/login"
payload = {"username": email, "password": password}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=payload, headers=headers)
if response.status_code == 200:
response_json = response.json()
api_key = response_json["access_token"]
return True, api_key
else:
response_json = response.json()
detail = response_json["detail"]
return False, detail
def get_api_key(email, password):
successful, message = api_login(email, password)
if successful:
return os.getenv("APP_API_BASE"), message
else:
raise gr.Error(message)
return "", ""
# Define a function to get the AI's reply using the OpenAI API
def get_ai_reply(
message,
model="gpt-3.5-turbo",
system_message=None,
temperature=0,
message_history=[],
):
# Initialize the messages list
messages = []
# Add the system message to the messages list
if system_message is not None:
messages += [{"role": "system", "content": system_message}]
# Add the message history to the messages list
if message_history is not None:
messages += message_history
# Add the user's message to the messages list
messages += [{"role": "user", "content": message}]
# Make an API call to the OpenAI ChatCompletion endpoint with the model and messages
completion = openai.ChatCompletion.create(
model=model, messages=messages, temperature=temperature
)
# Extract and return the AI's response from the API response
return completion.choices[0].message.content.strip()
def get_ai_image(prompt, size="512x512"):
response = openai.Image.create(prompt=prompt, n=1, size=size)
image_1_url = response.data[0]["url"]
return image_1_url
def get_ai_transcript(path_to_audio, language=None):
audio_file = open(path_to_audio, "rb")
transcript = openai.Audio.transcribe("whisper-1", audio_file, language=language)
return transcript.text
def generate_transcription(path_to_audio_file):
try:
transcript = get_ai_transcript(path_to_audio_file)
return transcript
except Exception as e:
raise gr.Error(e)
return ""
def generate_image(prompt):
try:
image_url = get_ai_image(prompt)
return image_url
except Exception as e:
raise gr.Error(e)
return None
# Define a function to handle the chat interaction with the AI model
def chat(message, chatbot_messages, model, temperature, system_message):
history_openai_format = []
for human, assistant in chatbot_messages:
history_openai_format.append({"role": "user", "content": human})
history_openai_format.append({"role": "assistant", "content": assistant})
# Try to get the AI's reply using the get_ai_reply function
try:
ai_reply = get_ai_reply(
message,
model=model,
system_message=system_message,
message_history=history_openai_format,
temperature=temperature,
)
except Exception as e:
# If an error occurs, raise a Gradio error
raise gr.Error(e)
# Return None (empty out the user's message textbox), the updated chatbot_messages
return ai_reply
# Define a function to launch the chatbot interface using Gradio
def get_chatbot_app(additional_examples=[]):
# Load chatbot examples and merge with any additional examples provided
examples = chatbot_examples.load_examples(additional=additional_examples)
# Define a function to get the names of the examples
def get_examples():
return [example["name"] for example in examples]
# Define a function to choose an example based on the index
def choose_example(index):
if index != None:
system_message = examples[index]["system_message"].strip()
user_message = examples[index]["message"].strip()
return system_message, user_message, [], []
else:
return "", "", [], []
# Create the Gradio interface using the Blocks layout
with gr.Blocks() as app:
with gr.Tab("Conversation"):
with gr.Row():
# Create a textbox for the system message (prompt)
system_message = gr.TextArea(
label="System Message (Prompt)",
value="You are a helpful assistant.",
lines=20,
max_lines=400,
)
# Create a chatbot interface for the conversation
chatbot = gr.ChatInterface(
chat,
additional_inputs=[
gr.Dropdown(
["gpt-3.5-turbo", "gpt-3.5-turbo-16k"],
label="Model",
value="gpt-3.5-turbo",
),
gr.Slider(
label="Temperature",
minimum=0,
maximum=2,
step=0.1,
value=0,
),
system_message,
],
)
with gr.Tab("Image Generation"):
image_prompt = gr.Textbox(
label="Prompt", placeholder="A cute puppy wearing sunglasses."
)
image_btn = gr.Button(value="Generate")
image = gr.Image(label="Result", interactive=False, type="filepath")
image_btn.click(generate_image, inputs=[image_prompt], outputs=[image])
with gr.Tab("Speech-to-text"):
audio_file = gr.Audio(label="Audio", source="microphone", type="filepath")
transcribe = gr.Button(value="Transcribe")
audio_transcript = gr.Textbox(label="Transcription", interactive=False)
transcribe.click(
generate_transcription, inputs=[audio_file], outputs=[audio_transcript]
)
with gr.Tab("Get API Key"):
email_box = gr.Textbox(label="Email Address", placeholder="Student Email")
password_box = gr.Textbox(
label="Password", type="password", placeholder="Student ID"
)
btn = gr.Button(value="Generate")
api_host_box = gr.Textbox(label="OpenAI API Base", interactive=False)
api_key_box = gr.Textbox(label="OpenAI API Key", interactive=False)
btn.click(
get_api_key,
inputs=[email_box, password_box],
outputs=[api_host_box, api_key_box],
)
# Return the app
return app
app = FastAPI()
app.include_router(
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
)
app.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
)
app.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
tags=["users"],
)
@app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_user)):
return {"message": f"Hello {user.email}!"}
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/embeddings")
async def openai_api_embeddings_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/embeddings",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/engines/text-embedding-ada-002/embeddings")
async def openai_api_embeddings_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/engines/text-embedding-ada-002/embeddings",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/completions")
async def openai_api_completions_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/completions",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
@app.post("/v1/chat/completions")
async def openai_api_chat_completions_passthrough(
request: Request,
user: User = Depends(fastapi_users.current_user()),
):
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Get the request data and headers
request_data = await request.json()
request_headers = request.headers
openai_api_key = os.getenv("OPENAI_API_KEY")
if "gpt-4" in request_data["model"]:
request_data["model"] = "gpt-3.5-turbo"
# Forward the request to the OpenAI API
async with AsyncClient() as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json=request_data,
headers={
"Content-Type": request_headers.get("Content-Type"),
"Authorization": f"Bearer {openai_api_key}",
},
timeout=120.0,
)
# Return the OpenAI API response
return response.json()
# @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
# @app.post("/v1/images/generations")
# async def openai_api_chat_completions_passthrough(
# request: Request,
# user: User = Depends(fastapi_users.current_user()),
# ):
# if not user:
# raise HTTPException(status_code=401, detail="Unauthorized")
# # Get the request data and headers
# request_data = await request.json()
# request_headers = request.headers
# openai_api_key = os.getenv("OPENAI_API_KEY")
# # Forward the request to the OpenAI API
# async with AsyncClient() as client:
# response = await client.post(
# "https://api.openai.com/v1/images/generations",
# json=request_data,
# headers={
# "Content-Type": request_headers.get("Content-Type"),
# "Authorization": f"Bearer {openai_api_key}",
# },
# timeout=120.0,
# )
# # Return the OpenAI API response
# return response.json()
# @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=2, max=30))
# @app.post("/v1/audio/speech")
# async def openai_api_audio_speech_passthrough(
# request: Request,
# user: User = Depends(fastapi_users.current_user()),
# ):
# if not user:
# raise HTTPException(status_code=401, detail="Unauthorized")
# # Get the request data and headers
# request_data = await request.json()
# request_headers = request.headers
# openai_api_key = os.getenv("OPENAI_API_KEY")
# # Forward the request to the OpenAI API
# async with AsyncClient() as client:
# response = await client.post(
# "https://api.openai.com/v1/audio/speech",
# json=request_data,
# headers={
# "Content-Type": request_headers.get("Content-Type"),
# "Authorization": f"Bearer {openai_api_key}",
# },
# timeout=120.0,
# )
# # Return the OpenAI API response
# return response.json()
@app.on_event("startup")
async def on_startup():
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
gradio_gui = get_chatbot_app()
gradio_gui.auth = api_login
gradio_gui.auth_message = "Welcome, to the 4341 OpenAI Service"
app = gr.mount_gradio_app(app, gradio_gui, path="/")