Spaces:
Paused
Paused
| import os | |
| import re | |
| import time | |
| import requests | |
| import logging | |
| import folium | |
| import gradio as gr | |
| import tempfile | |
| import torch | |
| from datetime import datetime | |
| import numpy as np | |
| from gtts import gTTS | |
| from googlemaps import Client as GoogleMapsClient | |
| from diffusers import StableDiffusion3Pipeline | |
| import concurrent.futures | |
| from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
| from langchain_pinecone import PineconeVectorStore | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import RetrievalQA | |
| from langchain.chains.conversation.memory import ConversationBufferWindowMemory | |
| from langchain.agents import Tool, initialize_agent | |
| from huggingface_hub import login | |
| # Check if the token is already set in the environment variables | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token is None: | |
| # If the token is not set, prompt for it (this should be done securely) | |
| print("Please set your Hugging Face token in the environment variables.") | |
| else: | |
| # Login using the token | |
| login(token=hf_token) | |
| # Your application logic goes here | |
| print("Logged in successfully to Hugging Face Hub!") | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| # Initialize OpenAI embeddings | |
| embeddings = OpenAIEmbeddings(api_key=os.environ['OPENAI_API_KEY']) | |
| # Initialize Pinecone | |
| from pinecone import Pinecone | |
| pc = Pinecone(api_key=os.environ['PINECONE_API_KEY']) | |
| index_name = "omaha-details" | |
| vectorstore = PineconeVectorStore(index_name=index_name, embedding=embeddings) | |
| retriever = vectorstore.as_retriever(search_kwargs={'k': 5}) | |
| # Initialize ChatOpenAI model | |
| chat_model = ChatOpenAI(api_key=os.environ['OPENAI_API_KEY'], | |
| temperature=0, model='gpt-4o') | |
| conversational_memory = ConversationBufferWindowMemory( | |
| memory_key='chat_history', | |
| k=10, | |
| return_messages=True | |
| ) | |
| def get_current_time_and_date(): | |
| now = datetime.now() | |
| return now.strftime("%Y-%m-%d %H:%M:%S") | |
| # Example usage | |
| current_time_and_date = get_current_time_and_date() | |
| def fetch_local_events(): | |
| api_key = os.environ['SERP_API'] | |
| url = f'https://serpapi.com/search.json?engine=google_events&q=Events+in+Omaha&hl=en&gl=us&api_key={api_key}' | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| events_results = response.json().get("events_results", []) | |
| events_html = """ | |
| <h2 style="font-family: 'Georgia', serif; color: #4CAF50; background-color: #f8f8f8; padding: 10px; border-radius: 10px;">Local Events</h2> | |
| <style> | |
| .event-item { | |
| font-family: 'Verdana', sans-serif; | |
| color: #333; | |
| background-color: #f0f8ff; | |
| margin-bottom: 15px; | |
| padding: 10px; | |
| border: 1px solid #ddd; | |
| border-radius: 5px; | |
| transition: box-shadow 0.3s ease, background-color 0.3s ease; | |
| font-weight: bold; | |
| } | |
| .event-item:hover { | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); | |
| background-color: #e6f7ff; | |
| } | |
| .event-item a { | |
| color: #1E90FF; | |
| text-decoration: none; | |
| font-weight: bold; | |
| } | |
| .event-item a:hover { | |
| text-decoration: underline; | |
| } | |
| .event-preview { | |
| position: absolute; | |
| display: none; | |
| border: 1px solid #ccc; | |
| border-radius: 5px; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
| background-color: white; | |
| z-index: 1000; | |
| max-width: 300px; | |
| padding: 10px; | |
| font-family: 'Verdana', sans-serif; | |
| color: #333; | |
| } | |
| </style> | |
| <script> | |
| function showPreview(event, previewContent) { | |
| var previewBox = document.getElementById('event-preview'); | |
| previewBox.innerHTML = previewContent; | |
| previewBox.style.left = event.pageX + 'px'; | |
| previewBox.style.top = event.pageY + 'px'; | |
| previewBox.style.display = 'block'; | |
| } | |
| function hidePreview() { | |
| var previewBox = document.getElementById('event-preview'); | |
| previewBox.style.display = 'none'; | |
| } | |
| </script> | |
| <div id="event-preview" class="event-preview"></div> | |
| """ | |
| for index, event in enumerate(events_results): | |
| title = event.get("title", "No title") | |
| date = event.get("date", "No date") | |
| location = event.get("address", "No location") | |
| link = event.get("link", "#") | |
| events_html += f""" | |
| <div class="event-item" onmouseover="showPreview(event, 'Date: {date}<br>Location: {location}')" onmouseout="hidePreview()"> | |
| <a href='{link}' target='_blank'>{index + 1}. {title}</a> | |
| <p>Date: {date}<br>Location: {location}</p> | |
| </div> | |
| """ | |
| return events_html | |
| else: | |
| return "<p>Failed to fetch local events</p>" | |
| def fetch_local_weather(): | |
| try: | |
| api_key = os.environ['WEATHER_API'] | |
| url = f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/omaha?unitGroup=metric&include=events%2Calerts%2Chours%2Cdays%2Ccurrent&key={api_key}' | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| jsonData = response.json() | |
| current_conditions = jsonData.get("currentConditions", {}) | |
| temp_celsius = current_conditions.get("temp", "N/A") | |
| if temp_celsius != "N/A": | |
| temp_fahrenheit = (temp_celsius * 9/5) + 32 | |
| else: | |
| temp_fahrenheit = "N/A" | |
| condition = current_conditions.get("conditions", "N/A") | |
| humidity = current_conditions.get("humidity", "N/A") | |
| weather_html = f""" | |
| <div class="weather-theme"> | |
| <h2 style="font-family: 'Georgia', serif; color: #4CAF50; background-color: #f8f8f8; padding: 10px; border-radius: 10px;">Local Weather</h2> | |
| <div class="weather-content"> | |
| <div class="weather-icon"> | |
| <img src="https://www.weatherbit.io/static/img/icons/{get_weather_icon(condition)}.png" alt="{condition}" style="width: 100px; height: 100px;"> | |
| </div> | |
| <div class="weather-details"> | |
| <p style="font-family: 'Verdana', sans-serif; color: #333; font-size: 1.2em;">Temperature: {temp_fahrenheit}°F</p> | |
| <p style="font-family: 'Verdana', sans-serif; color: #333; font-size: 1.2em;">Condition: {condition}</p> | |
| <p style="font-family: 'Verdana', sans-serif; color: #333; font-size: 1.2em;">Humidity: {humidity}%</p> | |
| </div> | |
| </div> | |
| </div> | |
| <style> | |
| .weather-theme {{ | |
| animation: backgroundAnimation 10s infinite alternate; | |
| border: 1px solid #ddd; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin-bottom: 15px; | |
| background: linear-gradient(45deg, #ffcc33, #ff6666, #ffcc33, #ff6666); | |
| background-size: 400% 400%; | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); | |
| transition: box-shadow 0.3s ease, background-color 0.3s ease; | |
| }} | |
| .weather-theme:hover {{ | |
| box-shadow: 0 8px 16px rgba(0, 0, 0, 0.2); | |
| background-position: 100% 100%; | |
| }} | |
| @keyframes backgroundAnimation {{ | |
| 0% {{ background-position: 0% 50%; }} | |
| 100% {{ background-position: 100% 50%; }} | |
| }} | |
| .weather-content {{ | |
| display: flex; | |
| align-items: center; | |
| }} | |
| .weather-icon {{ | |
| flex: 1; | |
| }} | |
| .weather-details {{ | |
| flex: 3; | |
| }} | |
| </style> | |
| """ | |
| return weather_html | |
| except requests.exceptions.RequestException as e: | |
| return f"<p>Failed to fetch local weather: {e}</p>" | |
| def get_weather_icon(condition): | |
| condition_map = { | |
| "Clear": "c01d", | |
| "Partly Cloudy": "c02d", | |
| "Cloudy": "c03d", | |
| "Overcast": "c04d", | |
| "Mist": "a01d", | |
| "Patchy rain possible": "r01d", | |
| "Light rain": "r02d", | |
| "Moderate rain": "r03d", | |
| "Heavy rain": "r04d", | |
| "Snow": "s01d", | |
| "Thunderstorm": "t01d", | |
| "Fog": "a05d", | |
| } | |
| return condition_map.get(condition, "c04d") | |
| # Update prompt templates to include fetched details | |
| current_time_and_date = get_current_time_and_date() | |
| # Define prompt templates | |
| template1 = f"""You are an expert concierge who is helpful and a renowned guide for Omaha, Nebraska. Based on weather being a sunny bright day and the today's date is {current_time_and_date}, use the following pieces of context, | |
| memory, and message history, along with your knowledge of perennial events in Omaha, Nebraska, to answer the question at the end. If you don't know the answer, just say "Homie, I need to get more data for this," and don't try to make up an answer. | |
| Use fifteen sentences maximum. Keep the answer as detailed as possible. Always include the address, time, date, and | |
| event type and description. Always say "It was my pleasure!" at the end of the answer. | |
| {{context}} | |
| Question: {{question}} | |
| Helpful Answer:""" | |
| template2 = f"""You are an expert concierge who is helpful and a renowned guide for Omaha, Nebraska. Based on today's weather being a sunny bright day and today's date is {current_time_and_date}, take the location or address but don't show the location or address on the output prompts. Use the following pieces of context, | |
| memory, and message history, along with your knowledge of perennial events in Omaha, Nebraska, to answer the question at the end. If you don't know the answer, just say "Homie, I need to get more data for this," and don't try to make up an answer. | |
| Keep the answer short and sweet and crisp. Always say "It was my pleasure!" at the end of the answer. | |
| {{context}} | |
| Question: {{question}} | |
| Helpful Answer:""" | |
| QA_CHAIN_PROMPT_1 = PromptTemplate(input_variables=["context", "question"], template=template1) | |
| QA_CHAIN_PROMPT_2 = PromptTemplate(input_variables=["context", "question"], template=template2) | |
| # Define the retrieval QA chain | |
| def build_qa_chain(prompt_template): | |
| qa_chain = RetrievalQA.from_chain_type( | |
| llm=chat_model, | |
| chain_type="stuff", | |
| retriever=retriever, | |
| chain_type_kwargs={"prompt": prompt_template} | |
| ) | |
| tools = [ | |
| Tool( | |
| name='Knowledge Base', | |
| func=qa_chain, | |
| description='Use this tool when answering general knowledge queries to get more information about the topic' | |
| ) | |
| ] | |
| return qa_chain, tools | |
| # Define the agent initializer | |
| def initialize_agent_with_prompt(prompt_template): | |
| qa_chain, tools = build_qa_chain(prompt_template) | |
| agent = initialize_agent( | |
| agent='chat-conversational-react-description', | |
| tools=tools, | |
| llm=chat_model, | |
| verbose=False, | |
| max_iteration=5, | |
| early_stopping_method='generate', | |
| memory=conversational_memory | |
| ) | |
| return agent | |
| # Define the function to generate answers | |
| def generate_answer(message, choice): | |
| logging.debug(f"generate_answer called with prompt_choice: {choice}") | |
| if choice == "Details": | |
| agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_1) | |
| elif choice == "Conversational": | |
| agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_2) | |
| else: | |
| logging.error(f"Invalid prompt_choice: {choice}. Defaulting to 'Conversational'") | |
| agent = initialize_agent_with_prompt(QA_CHAIN_PROMPT_2) | |
| response = agent(message) | |
| # Extract addresses for mapping regardless of the choice | |
| addresses = extract_addresses(response['output']) | |
| return response['output'], addresses | |
| def bot(history, choice): | |
| if not history: | |
| return history | |
| response, addresses = generate_answer(history[-1][0], choice) | |
| history[-1][1] = "" | |
| # Generate audio for the entire response in a separate thread | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| audio_future = executor.submit(generate_audio_elevenlabs, response) | |
| for character in response: | |
| history[-1][1] += character | |
| time.sleep(0.05) # Adjust the speed of text appearance | |
| yield history, None | |
| audio_path = audio_future.result() | |
| yield history, audio_path | |
| def add_message(history, message): | |
| history.append((message, None)) | |
| return history, gr.Textbox(value="", interactive=True, placeholder="Enter message or upload file...", show_label=False) | |
| def print_like_dislike(x: gr.LikeData): | |
| print(x.index, x.value, x.liked) | |
| def extract_addresses(response): | |
| if not isinstance(response, str): | |
| response = str(response) | |
| address_patterns = [ | |
| r'([A-Z].*,\sOmaha,\sNE\s\d{5})', | |
| r'(\d{4}\s.*,\sOmaha,\sNE\s\d{5})', | |
| r'([A-Z].*,\sNE\s\d{5})', | |
| r'([A-Z].*,.*\sSt,\sOmaha,\sNE\s\d{5})', | |
| r'([A-Z].*,.*\sStreets,\sOmaha,\sNE\s\d{5})', | |
| r'(\d{2}.*\sStreets)', | |
| r'([A-Z].*\s\d{2},\sOmaha,\sNE\s\d{5})' | |
| ] | |
| addresses = [] | |
| for pattern in address_patterns: | |
| addresses.extend(re.findall(pattern, response)) | |
| return addresses | |
| all_addresses = [] | |
| def generate_map(location_names): | |
| global all_addresses | |
| all_addresses.extend(location_names) | |
| api_key = os.environ['GOOGLEMAPS_API_KEY'] | |
| gmaps = GoogleMapsClient(key=api_key) | |
| m = folium.Map(location=[41.2565, -95.9345], zoom_start=12) | |
| for location_name in all_addresses: | |
| geocode_result = gmaps.geocode(location_name) | |
| if geocode_result: | |
| location = geocode_result[0]['geometry']['location'] | |
| folium.Marker( | |
| [location['lat'], location['lng']], | |
| tooltip=f"{geocode_result[0]['formatted_address']}" | |
| ).add_to(m) | |
| map_html = m._repr_html_() | |
| return map_html | |
| def fetch_local_news(): | |
| api_key = os.environ['SERP_API'] | |
| url = f'https://serpapi.com/search.json?engine=google_news&q=ohama headline&api_key={api_key}' | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| results = response.json().get("news_results", []) | |
| news_html = """ | |
| <h2 style="font-family: 'Georgia', serif; color: #4CAF50; background-color: #f8f8f8; padding: 10px; border-radius: 10px;">Omaha Today Headlines</h2> | |
| <style> | |
| .news-item { | |
| font-family: 'Verdana', sans-serif; | |
| color: #333; | |
| background-color: #f0f8ff; | |
| margin-bottom: 15px; | |
| padding: 10px; | |
| border: 1px solid #ddd; | |
| border-radius: 5px; | |
| transition: box-shadow 0.3s ease, background-color 0.3s ease; | |
| font-weight: bold; | |
| } | |
| .news-item:hover { | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); | |
| background-color: #e6f7ff; | |
| } | |
| .news-item a { | |
| color: #1E90FF; | |
| text-decoration: none; | |
| font-weight: bold; | |
| } | |
| .news-item a:hover { | |
| text-decoration: underline; | |
| } | |
| .news-preview { | |
| position: absolute; | |
| display: none; | |
| border: 1px solid #ccc; | |
| border-radius: 5px; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
| background-color: white; | |
| z-index: 1000; | |
| max-width: 300px; | |
| padding: 10px; | |
| font-family: 'Verdana', sans-serif; | |
| color: #333; | |
| } | |
| </style> | |
| <script> | |
| function showPreview(event, previewContent) { | |
| var previewBox = document.getElementById('news-preview'); | |
| previewBox.innerHTML = previewContent; | |
| previewBox.style.left = event.pageX + 'px'; | |
| previewBox.style.top = event.pageY + 'px'; | |
| previewBox.style.display = 'block'; | |
| } | |
| function hidePreview() { | |
| var previewBox = document.getElementById('news-preview'); | |
| previewBox.style.display = 'none'; | |
| } | |
| </script> | |
| <div id="news-preview" class="news-preview"></div> | |
| """ | |
| for index, result in enumerate(results[:7]): | |
| title = result.get("title", "No title") | |
| link = result.get("link", "#") | |
| snippet = result.get("snippet", "") | |
| news_html += f""" | |
| <div class="news-item" onmouseover="showPreview(event, '{snippet}')" onmouseout="hidePreview()"> | |
| <a href='{link}' target='_blank'>{index + 1}. {title}</a> | |
| <p>{snippet}</p> | |
| </div> | |
| """ | |
| return news_html | |
| else: | |
| return "<p>Failed to fetch local news</p>" | |
| # Voice Control | |
| import numpy as np | |
| import torch | |
| from transformers import pipeline, AutoModelForSpeechSeq2Seq, AutoProcessor | |
| model_id = 'openai/whisper-large-v3' | |
| device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| model = AutoModelForSpeechSeq2Seq.from_pretrained(model_id, torch_dtype=torch_dtype, | |
| #low_cpu_mem_usage=True, | |
| use_safetensors=True).to(device) | |
| processor = AutoProcessor.from_pretrained(model_id) | |
| # Optimized ASR pipeline | |
| pipe_asr = pipeline("automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, max_new_tokens=128, chunk_length_s=15, batch_size=16, torch_dtype=torch_dtype, device=device, return_timestamps=True) | |
| base_audio_drive = "/data/audio" | |
| import numpy as np | |
| def transcribe_function(stream, new_chunk): | |
| try: | |
| sr, y = new_chunk[0], new_chunk[1] | |
| except TypeError: | |
| print(f"Error chunk structure: {type(new_chunk)}, content: {new_chunk}") | |
| return stream, "", None | |
| y = y.astype(np.float32) / np.max(np.abs(y)) | |
| if stream is not None: | |
| stream = np.concatenate([stream, y]) | |
| else: | |
| stream = y | |
| result = pipe_asr({"array": stream, "sampling_rate": sr}, return_timestamps=False) | |
| full_text = result.get("text", "") | |
| return stream, full_text, result | |
| def update_map_with_response(history): | |
| if not history: | |
| return "" | |
| response = history[-1][1] | |
| addresses = extract_addresses(response) | |
| return generate_map(addresses) | |
| def clear_textbox(): | |
| return "" | |
| def show_map_if_details(history,choice): | |
| if choice in ["Details", "Conversational"]: | |
| return gr.update(visible=True), update_map_with_response(history) | |
| else: | |
| return gr.update(visible(False), "") | |
| def generate_audio_elevenlabs(text): | |
| XI_API_KEY = os.environ['ELEVENLABS_API'] | |
| VOICE_ID = 'SHZHI20rSPDR3iE8SvZ0' # Replace with your voice ID | |
| tts_url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}/stream" | |
| headers = { | |
| "Accept": "application/json", | |
| "xi-api-key": XI_API_KEY | |
| } | |
| data = { | |
| "text": str(text), | |
| "model_id": "eleven_multilingual_v2", | |
| "voice_settings": { | |
| "stability": 0.7, | |
| "similarity_boost": 0.5, | |
| "style": 0.50, # Adjust style for more romantic tone | |
| "use_speaker_boost": False | |
| } | |
| } | |
| response = requests.post(tts_url, headers=headers, json=data, stream=True) | |
| if response.ok: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as f: | |
| for chunk in response.iter_content(chunk_size=1024): | |
| f.write(chunk) | |
| temp_audio_path = f.name | |
| logging.debug(f"Audio saved to {temp_audio_path}") | |
| return temp_audio_path | |
| else: | |
| logging.error(f"Error generating audio: {response.text}") | |
| return None | |
| # Stable Diffusion setup | |
| pipe = StableDiffusion3Pipeline.from_pretrained("stabilityai/stable-diffusion-3-medium-diffusers", torch_dtype=torch.float16) | |
| pipe = pipe.to("cuda") | |
| def generate_image(prompt): | |
| image = pipe( | |
| prompt, | |
| negative_prompt="", | |
| num_inference_steps=28, | |
| guidance_scale=3.0, | |
| ).images[0] | |
| return image | |
| # Hardcoded prompt for image generation | |
| hardcoded_prompt = "Useing The top events like 'Summer Art Festival' and current time - 4:07 PM ,Date - 06/17/2024 ,Weather-Sunny Bright Day.Create Highly Visually Compelling High Resolution and High Quality Photographics Advatizement for 'Toyota'" | |
| with gr.Blocks(theme='rawrsor1/Everforest') as demo: | |
| with gr.Row(): | |
| chatbot = gr.Chatbot([], elem_id="chatbot", bubble_full_width=False) | |
| with gr.Column(): | |
| weather_output = gr.HTML(value=fetch_local_weather()) | |
| with gr.Column(): | |
| news_output = gr.HTML(value=fetch_local_news()) | |
| def setup_ui(): | |
| state = gr.State() | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("<h1>Choose the prompt</h1>", elem_id="prompt-markdown") | |
| choice = gr.Radio(label="Choose a prompt", choices=["Details", "Conversational"], value="Details") | |
| with gr.Column(): # Larger scale for the right column | |
| gr.Markdown("<h1>Enter the query / Voice Output</h1>", elem_id="query-markdown") | |
| chat_input = gr.Textbox(show_copy_button=True, interactive=True, show_label=False, label="Transcription") | |
| chat_msg = chat_input.submit(add_message, [chatbot, chat_input], [chatbot, chat_input]) | |
| bot_msg = chat_msg.then(bot, [chatbot, choice], [chatbot, gr.Audio(interactive=False, autoplay=True)]) | |
| bot_msg.then(lambda: gr.Textbox(value="", interactive=True, placeholder="Enter message or upload file...", show_label=False), None, [chat_input]) | |
| chatbot.like(print_like_dislike, None, None) | |
| clear_button = gr.Button("Clear") | |
| clear_button.click(fn=clear_textbox, inputs=None, outputs=chat_input) | |
| with gr.Column(): # Smaller scale for the left column | |
| gr.Markdown("<h1>Stream your Voice</h1>", elem_id="voice-markdown") | |
| audio_input = gr.Audio(sources=["microphone"], streaming=True, type='numpy') | |
| audio_input.stream(transcribe_function, inputs=[state, audio_input], outputs=[state, chat_input], api_name="SAMLOne_real_time") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("<h1>Locate the Events</h1>", elem_id="location-markdown") | |
| location_output = gr.HTML() | |
| bot_msg.then(show_map_if_details, [chatbot, choice], [location_output, location_output]) | |
| # with gr.Column(): | |
| # gr.Markdown("<h1>Listen to the audio</h1>", elem_id="audio-markdown") | |
| # audio_output = gr.Audio() | |
| # bot_msg_audio = bot_msg.then(lambda history: generate_audio_elevenlabs(history[-1][1]), chatbot, audio_output) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("<h1>Local Events</h1>", elem_id="events-markdown") | |
| news_output = gr.HTML(value=fetch_local_events()) | |
| with gr.Column(): | |
| gr.Markdown("<h1>Generated Image</h1>", elem_id="image-markdown") | |
| image_output = gr.Image(value=generate_image(hardcoded_prompt)) | |
| setup_ui() | |
| demo.queue() | |
| demo.launch(share=True) |