Spaces:
Paused
Paused
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.DEBUG) | |
from langchain_openai import OpenAIEmbeddings | |
import os | |
import re | |
import folium | |
import gradio as gr | |
import time | |
import requests | |
from googlemaps import Client as GoogleMapsClient | |
from gtts import gTTS | |
import tempfile | |
import string | |
embeddings = OpenAIEmbeddings(api_key=os.environ['OPENAI_API_KEY']) | |
from pinecone import Pinecone, ServerlessSpec | |
pc = Pinecone(api_key=os.environ['PINECONE_API_KEY']) | |
index_name = "omaha-details" | |
from langchain_pinecone import PineconeVectorStore | |
vectorstore = PineconeVectorStore(index_name=index_name, embedding=embeddings) | |
retriever = vectorstore.as_retriever(search_kwargs={'k': 5}) | |
from langchain_openai import ChatOpenAI | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import RetrievalQA | |
from langchain.chains.conversation.memory import ConversationBufferWindowMemory | |
from langchain.agents import Tool, initialize_agent | |
# Build prompt | |
template1 = """You are an expert concierge who is helpful and a renowned guide for Omaha, Nebraska. Use the following pieces of context, | |
memory, and message history, along with your knowledge of perennial events in Omaha, Nebraska, to answer the question at the end. | |
If you don't know the answer, just say "Homie, I need to get more data for this," and don't try to make up an answer. | |
Use fifteen sentences maximum. Keep the answer as detailed as possible. Always include the address, time, date, and | |
event type and description. Always say "It was my pleasure!" at the end of the answer. | |
{context} | |
Question: {question} | |
Helpful Answer:""" | |
template2 = """You are an expert guide of Omaha, Nebraska's perennial events. | |
With the context, memory, and message history provided, answer the question in as crisp as possible. Always include the time, date, and | |
event type and description only apart from that don't give any other details. Always say "It was my pleasure!" at the end of the answer. | |
If you don't know the answer, simply say, "Homie, I need to get more data for this," without making up an answer. | |
{context} | |
Question: {question} | |
Helpful Answer:""" | |
QA_CHAIN_PROMPT_1 = PromptTemplate(input_variables=["context", "question"], template=template1) | |
QA_CHAIN_PROMPT_2 = PromptTemplate(input_variables=["context", "question"], template=template2) | |
chat_model = ChatOpenAI(api_key=os.environ['OPENAI_API_KEY'], | |
temperature=0, model='gpt-4o') | |
conversational_memory = ConversationBufferWindowMemory( | |
memory_key='chat_history', | |
k=10, | |
return_messages=True | |
) | |
# Define the retrieval QA chain | |
def build_qa_chain(prompt_template): | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=chat_model, | |
chain_type="stuff", | |
retriever=retriever, | |
chain_type_kwargs={"prompt": prompt_template} | |
) | |
tools = [ | |
Tool( | |
name='Knowledge Base', | |
func=qa_chain, | |
description='use this tool when answering general knowledge queries to get more information about the topic' | |
) | |
] | |
return qa_chain, tools | |
# Define the agent initializer | |
def initialize_agent_with_prompt(prompt_template): | |
qa_chain, tools = build_qa_chain(prompt_template) | |
agent = initialize_agent( | |
agent='chat-conversational-react-description', | |
tools=tools, | |
llm=chat_model, | |
verbose=False, | |
max_iteration=5, | |
early_stopping_method='generate', | |
memory=conversational_memory | |
) | |
return agent | |
# Define the function to generate answers | |
def generate_answer(message, choice): | |
logging.debug(f"generate_answer called with prompt_choice: {choice}") | |
if choice == "Details": | |
agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_1) | |
elif choice == "Conversational": | |
agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_2) | |
else: | |
logging.error(f"Invalid prompt_choice: {choice}. Defaulting to 'Details'") | |
agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_1) | |
response = agent(message) | |
return response['output'] | |
def bot(history, choice): | |
if not history: | |
return history | |
response = generate_answer(history[-1][0], choice) | |
history[-1][1] = "" | |
for character in response: | |
history[-1][1] += character | |
time.sleep(0.05) | |
yield history | |
def add_message(history, message): | |
history.append((message, None)) | |
return history, gr.Textbox(value="", interactive=True, placeholder="Enter message or upload file...", show_label=False) | |
def print_like_dislike(x: gr.LikeData): | |
print(x.index, x.value, x.liked) | |
# Function to extract addresses from the chatbot's response | |
def extract_addresses(response): | |
address_pattern_1 = r'([A-Z].*,\sOmaha,\sNE\s\d{5})' | |
address_pattern_2 = r'(\d{4}\s.*,\sOmaha,\sNE\s\d{5})' | |
address_pattern_3 = r'([A-Z].*,\sNE\s\d{5})' | |
address_pattern_4 = r'([A-Z].*,.*\sSt,\sOmaha,\sNE\s\d{5})' | |
address_pattern_5 = r'([A-Z].*,.*\sStreets,\sOmaha,\sNE\s\d{5})' | |
address_pattern_6 = r'(\d{2}.*\sStreets)' | |
address_pattern_7 = r'([A-Z].*\s\d{2},\sOmaha,\sNE\s\d{5})' | |
addresses = re.findall(address_pattern_1, response) + re.findall(address_pattern_2, response) + \ | |
re.findall(address_pattern_3, response) + re.findall(address_pattern_4, response) + \ | |
re.findall(address_pattern_5, response) + re.findall(address_pattern_6, response) + \ | |
re.findall(address_pattern_7, response) | |
return addresses | |
# Store all found addresses | |
all_addresses = [] | |
# Map generation function using Google Maps Geocoding API | |
def generate_map(location_names): | |
global all_addresses | |
all_addresses.extend(location_names) | |
api_key = os.environ['GOOGLEMAPS_API_KEY'] | |
gmaps = GoogleMapsClient(key=api_key) | |
m = folium.Map(location=[41.2565, -95.9345], zoom_start=12) | |
for location_name in all_addresses: | |
geocode_result = gmaps.geocode(location_name) | |
if geocode_result: | |
location = geocode_result[0]['geometry']['location'] | |
folium.Marker( | |
[location['lat'], location['lng']], | |
tooltip=f"{geocode_result[0]['formatted_address']}" | |
).add_to(m) | |
map_html = m._repr_html_() | |
return map_html | |
# Function to fetch local news | |
def fetch_local_news(): | |
api_key = os.environ['SERP_API'] | |
url = f'https://serpapi.com/search.json?engine=google_news&q=ohama headline&api_key={api_key}' | |
response = requests.get(url) | |
if response.status_code == 200: | |
results = response.json().get("news_results", []) | |
news_html = "<h2>Omaha Today Headline </h2>" | |
for index, result in enumerate(results[:10]): | |
title = result.get("title", "No title") | |
link = result.get("link", "#") | |
snippet = result.get("snippet", "") | |
news_html += f"<p>{index + 1}. <a href='{link}' target='_blank'>{title}</a><br>{snippet}</p>" | |
return news_html | |
else: | |
return "<p>Failed to fetch local news</p>" | |
# Function to fetch local events | |
def fetch_local_events(): | |
api_key = os.environ['SERP_API'] | |
url = f'https://serpapi.com/search.json?engine=google_events&q=Events+in+Omaha&hl=en&gl=us&api_key={api_key}' | |
response = requests.get(url) | |
if response.status_code == 200: | |
events_results = response.json().get("events_results", []) | |
events_text = "<h2>Local Events </h2>" | |
for index, event in enumerate(events_results): | |
title = event.get("title", "No title") | |
date = event.get("date", "No date") | |
location = event.get("address", "No location") | |
link = event.get("link", "#") | |
events_text += f"<p>{index + 1}. {title}<br> Date: {date}<br> Location: {location}<br> <a href='{link}' target='_blank'>Link :</a> <br>" | |
return events_text | |
else: | |
return "Failed to fetch local events" | |
# Function to fetch local weather | |
def fetch_local_weather(): | |
try: | |
api_key = os.environ['WEATHER_API'] | |
url = f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/omaha?unitGroup=metric&include=events%2Calerts%2Chours%2Cdays%2Ccurrent&key={api_key}' | |
response = requests.get(url) | |
response.raise_for_status() | |
jsonData = response.json() | |
current_conditions = jsonData.get("currentConditions", {}) | |
temp = current_conditions.get("temp", "N/A") | |
condition = current_conditions.get("conditions", "N/A") | |
humidity = current_conditions.get("humidity", "N/A") | |
weather_html = f"<h2>Local Weather</h2>" | |
weather_html += f"<p>Temperature: {temp}°C</p>" | |
weather_html += f"<p>Condition: {condition}</p>" | |
weather_html += f"<p>Humidity: {humidity}%</p>" | |
return weather_html | |
except requests.exceptions.RequestException as e: | |
return f"<p>Failed to fetch local weather: {e}</p>" | |
# Voice Control | |
import numpy as np | |
import torch | |
from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor | |
model_id = 'openai/whisper-large-v3' | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, | |
#low_cpu_mem_usage=True, | |
use_safetensors=True).to(device) | |
processor = AutoProcessor.from_pretrained(model_id) | |
# Optimized ASR pipeline | |
pipe_asr = pipeline("automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, chunk_length_s=15, batch_size=16, torch_dtype=torch_dtype, device=device, return_timestamps=True) | |
base_audio_drive = "/data/audio" | |
import numpy as np | |
def transcribe_function(stream, new_chunk): | |
try: | |
sr, y = new_chunk[0], new_chunk[1] | |
except TypeError: | |
print(f"Error chunk structure: {type(new_chunk)}, content: {new_chunk}") | |
return stream, "", None | |
y = y.astype(np.float32) / np.max(np.abs(y)) | |
if stream is not None: | |
stream = np.concatenate([stream, y]) | |
else: | |
stream = y | |
result = pipe_asr({"array": stream, "sampling_rate": sr}, return_timestamps=False) | |
full_text = result.get("text", "") | |
return stream, full_text, result | |
# Map Retrieval Function for location finder | |
def update_map_with_response(history): | |
if not history: | |
return "" | |
response = history[-1][1] | |
addresses = extract_addresses(response) | |
return generate_map(addresses) | |
def clear_textbox(): | |
return "" | |
# Gradio Blocks interface | |
with gr.Blocks(theme='rawrsor1/Everforest') as demo: | |
with gr.Row(): | |
with gr.Column(): | |
chatbot = gr.Chatbot([], elem_id="chatbot", bubble_full_width=False) | |
with gr.Column(): | |
weather_output = gr.HTML(value=fetch_local_weather()) | |
with gr.Column(): | |
news_output = gr.HTML(value=fetch_local_news()) | |
def setup_ui(): | |
state = gr.State() | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("Choose the prompt") | |
choice = gr.Radio(label="Choose a prompt", choices=["Details", "Conversational"], value="Details") | |
with gr.Column(): # Larger scale for the right column | |
gr.Markdown("Enter the query / Voice Output") | |
chat_input = gr.Textbox(show_copy_button=True, interactive=True, show_label=False, label="Transcription") | |
chat_msg = chat_input.submit(add_message, [chatbot, chat_input], [chatbot, chat_input]) | |
bot_msg = chat_msg.then(bot, [chatbot, choice], chatbot, api_name="bot_response") | |
bot_msg.then(lambda: gr.Textbox(value="", interactive=True, placeholder="Enter message or upload file...", show_label=False), None, [chat_input]) | |
chatbot.like(print_like_dislike, None, None) | |
clear_button = gr.Button("Clear") | |
clear_button.click(fn=clear_textbox, inputs=None, outputs=chat_input) | |
with gr.Column(): # Smaller scale for the left column | |
gr.Markdown("Stream your Voice") | |
audio_input = gr.Audio(sources=["microphone"], streaming=True, type='numpy') | |
audio_input.stream(transcribe_function, inputs=[state, audio_input], outputs=[state, chat_input], api_name="SAMLOne_real_time") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("Locate the Events") | |
location_output = gr.HTML() | |
bot_msg.then(update_map_with_response, chatbot, location_output) | |
setup_ui() | |
demo.queue() | |
demo.launch(share=True) | |