Spaces:
Runtime error
Runtime error
from Live_audio import GeminiHandler | |
import os | |
import re | |
from langdetect import detect | |
import asyncio | |
import gradio as gr | |
import google.generativeai as genai | |
import os | |
import time | |
import gradio as gr | |
from datetime import datetime | |
import langdetect | |
import RAG_Domain_know_doc | |
from web_search import search_autism | |
from RAG import rag_autism | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
import Old_Document | |
import User_Specific_Documents | |
import asyncio | |
import base64 | |
import time | |
from io import BytesIO | |
from dotenv import load_dotenv | |
load_dotenv() | |
from google.genai import types | |
from google.genai.types import ( | |
LiveConnectConfig, | |
SpeechConfig, | |
VoiceConfig, | |
PrebuiltVoiceConfig, | |
Content, | |
Part, | |
) | |
import gradio as gr | |
import numpy as np | |
import websockets | |
from dotenv import load_dotenv | |
from fastrtc import ( | |
AsyncAudioVideoStreamHandler, | |
Stream, | |
WebRTC, | |
get_cloudflare_turn_credentials_async, | |
wait_for_item, | |
) | |
from google import genai | |
from gradio.utils import get_space | |
from PIL import Image | |
# ------------------------------------------ | |
import asyncio | |
import base64 | |
import json | |
import os | |
import pathlib | |
import gradio as gr | |
import google.generativeai as genai | |
import os | |
import time | |
from typing import AsyncGenerator, Literal | |
import gradio as gr | |
import numpy as np | |
from dotenv import load_dotenv | |
from fastapi import FastAPI | |
from fastapi.responses import HTMLResponse | |
from fastrtc import ( | |
AsyncStreamHandler, | |
Stream, | |
get_cloudflare_turn_credentials_async, | |
wait_for_item, | |
) | |
from google import genai | |
from google.genai.types import ( | |
LiveConnectConfig, | |
PrebuiltVoiceConfig, | |
SpeechConfig, | |
VoiceConfig, | |
) | |
from gradio.utils import get_space | |
from pydantic import BaseModel | |
# ------------------------------------------------ | |
import os | |
import gradio as gr | |
import google.generativeai as genai | |
import os | |
import time | |
import io | |
import asyncio | |
from pydub import AudioSegment | |
DEEPINFRA_API_KEY = "285LUJulGIprqT6hcPhiXtcrphU04FG4" | |
# Gemini: google-genai | |
from google import genai | |
# --------------------------------------------------- | |
# VAD imports from reference code | |
import collections | |
import webrtcvad | |
import fastrtc | |
import time | |
# helper functions | |
from prompt_template import ( | |
Prompt_template_translation, | |
Prompt_template_LLM_Generation, | |
Prompt_template_Reranker, | |
Prompt_template_Wisal, | |
Prompt_template_Halluciations, | |
Prompt_template_paraphrasing, | |
Prompt_template_Translate_to_original, | |
Prompt_template_relevance, | |
Prompt_template_User_document_prompt | |
) | |
from query_utils import process_query_for_rewrite, get_non_autism_response | |
GEMINI_API_KEY="AIzaSyCUCivstFpC9pq_jMHMYdlPrmh9Bx97dFo" | |
TAVILY_API_KEY="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" | |
WEAVIATE_URL="yorcqe2sqswhcaivxvt9a.c0.us-west3.gcp.weaviate.cloud" | |
WEAVIATE_API_KEY="d2d0VGdZQTBmdTFlOWdDZl9tT2h3WDVWd1NpT1dQWHdGK0xjR1hYeWxicUxHVnFRazRUSjY2VlRUVlkwPV92MjAw" | |
DEEPINFRA_API_KEY="285LUJulGIprqT6hcPhiXtcrphU04FG4" | |
DEEPINFRA_BASE_URL="https://api.deepinfra.com/v1/openai" | |
# API Keys and Constants | |
env = os.getenv("ENVIRONMENT", "production") | |
openai = OpenAI( | |
api_key=DEEPINFRA_API_KEY, | |
base_url="https://api.deepinfra.com/v1/openai", | |
) | |
SESSION_ID = "default" | |
pending_clarifications = {} | |
def call_llm(model: str, messages: list[dict], temperature: float = 0.0, **kwargs) -> str: | |
resp = openai.chat.completions.create( | |
model=model, | |
messages=messages, | |
temperature=temperature, | |
**kwargs | |
) | |
return resp.choices[0].message.content.strip() | |
def is_greeting(text: str) -> bool: | |
return bool(re.search(r"\b(hi|hello|hey|good (morning|afternoon|evening))\b", text, re.I)) | |
def process_query(query: str, first_turn: bool = False, session_id: str = "default"): | |
intro = "" | |
process_log = [] | |
# Check if user is responding to a clarification prompt | |
if session_id in pending_clarifications: | |
if query.strip().lower() == "yes": | |
corrected_query = pending_clarifications.pop(session_id) | |
process_log.append(f"User confirmed: {corrected_query}") | |
return process_autism_pipeline(corrected_query, process_log, intro) | |
else: | |
pending_clarifications.pop(session_id) | |
redirect = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism." | |
process_log.append("User rejected clarification.") | |
_save_process_log(process_log) | |
return redirect | |
if first_turn and (not query or query.strip() == ""): | |
intro = "Hello! I'm Wisal, an AI assistant developed by Compumacy AI, specializing in Autism Spectrum Disorders. How can I help you today?" | |
process_log.append(intro) | |
_save_process_log(process_log) | |
return intro | |
if is_greeting(query): | |
greeting = intro + "Hello! I'm Wisal, your AI assistant developed by Compumacy AI. How can I help you today?" | |
process_log.append(f"Greeting detected.\n{greeting}") | |
_save_process_log(process_log) | |
return greeting | |
# Process query with the new 3-tuple return | |
corrected_query, is_autism_related, rewritten_query = process_query_for_rewrite(query) | |
process_log.append(f"Original Query: {query}") | |
process_log.append(f"Corrected Query: {corrected_query}") | |
process_log.append(f"Relevance Check: {'RELATED' if is_autism_related else 'NOT RELATED'}") | |
if rewritten_query: | |
process_log.append(f"Rewritten Query: {rewritten_query}") | |
# If not autism-related, show clarification with rewritten question | |
if not is_autism_related: | |
redirect_message = "Hello I'm Wisal, an AI assistant developed by Compumacy AI, and a knowledgeable Autism specialist.\nIf you have any question related to autism please submit a question specifically about autism." | |
clarification = f"""Your query was not clearly related to autism. Do you mean:"{rewritten_query}"?""" | |
pending_clarifications[session_id] = rewritten_query | |
process_log.append(f"Clarification Prompted: {clarification}") | |
_save_process_log(process_log) | |
return clarification | |
return process_autism_pipeline(query,corrected_query, process_log, intro) | |
def process_autism_pipeline(query,corrected_query, process_log, intro): | |
web_search_resp = asyncio.run(search_autism(corrected_query)) | |
web_answer = web_search_resp.get("answer", "") | |
process_log.append(f"Web Search: {web_answer}") | |
gen_prompt = Prompt_template_LLM_Generation.format(new_query=corrected_query) | |
generated = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": gen_prompt}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"LLM Generated: {generated}") | |
rag_resp = asyncio.run(rag_autism(corrected_query, top_k=3)) | |
rag_contexts = rag_resp.get("answer", []) | |
process_log.append(f"RAG Contexts: {rag_contexts}") | |
answers_list = f"[1] {generated}\n[2] {web_answer}\n" + "\n".join(f"[{i+3}] {c}" for i, c in enumerate(rag_contexts)) | |
rerank_prompt = Prompt_template_Reranker.format(new_query=corrected_query, answers_list=answers_list) | |
reranked = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": rerank_prompt}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"Reranked: {reranked}") | |
wisal_prompt = Prompt_template_Wisal.format(new_query=corrected_query, document=reranked) | |
wisal = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": wisal_prompt}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"Wisal Answer: {wisal}") | |
halluc_prompt = Prompt_template_Halluciations.format( | |
new_query=corrected_query, | |
answer=wisal, | |
document=generated | |
) | |
halluc = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": halluc_prompt}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"Hallucination Score: {halluc}") | |
score = int(halluc.split("Score: ")[-1]) if "Score: " in halluc else 3 | |
if score in (2, 3): | |
paraphrased = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": Prompt_template_paraphrasing.format(document=generated)}], | |
reasoning_effort="none" | |
) | |
wisal = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{"role": "user", "content": Prompt_template_Wisal.format(new_query=corrected_query, document=paraphrased)}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"Paraphrased Wisal: {wisal}") | |
try: | |
detected_lang = detect(query) | |
except: | |
detected_lang = "en" | |
is_english_text = bool(re.fullmatch(r"[A-Za-z0-9 .,?;:'\"!()\-]+", query)) | |
# Decide whether to translate | |
needs_translation = detected_lang != "en" or not is_english_text | |
if needs_translation: | |
result = call_llm( | |
model="Qwen/Qwen3-32B", | |
messages=[{ | |
"role": "user", | |
"content": Prompt_template_Translate_to_original.format(query=query, document=wisal) | |
}], | |
reasoning_effort="none" | |
) | |
process_log.append(f"Translated Back: {result}") | |
else: | |
result = wisal | |
process_log.append(f"Final Result: {result}") | |
rtl_languages = ["ar", "fa", "ur", "he"] # Arabic, Persian, Urdu, Hebrew | |
text_dir = "rtl" if detected_lang in rtl_languages else "ltr" | |
# Wrap result in direction-aware HTML | |
wrapped_result = f'<div dir="{text_dir}">{result}</div>' | |
_save_process_log(process_log) | |
return intro + wrapped_result | |
def _save_process_log(log_lines, filename="process_output.txt"): | |
import datetime | |
logs_dir = os.path.join(os.path.dirname(__file__), "logs") | |
os.makedirs(logs_dir, exist_ok=True) | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt") | |
with open(log_filename, "w", encoding="utf-8") as f: | |
for line in log_lines: | |
f.write(str(line) + "\n\n") | |
def _save_process_log(log_lines, filename="process_output.txt"): | |
import datetime | |
import os | |
# Ensure logs directory exists | |
logs_dir = os.path.join(os.path.dirname(__file__), "logs") | |
os.makedirs(logs_dir, exist_ok=True) | |
# Unique filename per question (timestamped) | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
log_filename = os.path.join(logs_dir, f"log_{timestamp}.txt") | |
try: | |
with open(log_filename, "w", encoding="utf-8") as f: | |
for line in log_lines: | |
f.write(str(line) + "\n\n") | |
except Exception as e: | |
pass | |
# Gradio UI for main pipeline, RAG_Domain_know_doc, and User_Specific_Documents , Old_Document | |
def main_pipeline_interface(query): | |
return process_query(query, first_turn=True) | |
def main_pipeline_with_doc_and_history(query, doc_file, doc_type, history): | |
response = main_pipeline_with_doc(query, doc_file, doc_type) | |
updated_history = history + f"\nUser: {query}\nWisal: {response}\n" | |
return response, updated_history | |
def main_pipeline_with_doc(query, doc_file, doc_type): | |
# If no document, use main pipeline | |
if doc_file is None or doc_type == "None": | |
return process_query(query, first_turn=True) | |
safe_filename = os.path.basename(getattr(doc_file, 'name', str(doc_file))) | |
upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs") | |
os.makedirs(upload_dir, exist_ok=True) | |
save_path = os.path.join(upload_dir, safe_filename) | |
# 💡 Check if doc_file is file-like (has `.read()`) or path-like (str or NamedString) | |
if hasattr(doc_file, 'read'): | |
# File-like object | |
file_bytes = doc_file.read() | |
else: | |
# It's a path (NamedString), read from file path | |
with open(str(doc_file), 'rb') as f: | |
file_bytes = f.read() | |
# Save the file content | |
with open(save_path, "wb") as f: | |
f.write(file_bytes) | |
# Route to correct document handler | |
if doc_type == "Knowledge Document": | |
status = RAG_Domain_know_doc.ingest_file(save_path) | |
answer = RAG_Domain_know_doc.answer_question(query) | |
return f"[Knowledge Document Uploaded]\n{status}\n\n{answer}" | |
elif doc_type == "User-Specific Document": | |
status = User_Specific_Documents.ingest_file(save_path) | |
answer = User_Specific_Documents.answer_question(query) | |
return f"[User-Specific Document Uploaded]\n{status}\n\n{answer}" | |
elif doc_type == "Old Document": | |
status = Old_Document.ingest_file(save_path) | |
answer = Old_Document.answer_question(query) | |
return f"[Old Document Uploaded]\n{status}\n\n{answer}" | |
elif doc_type == "New Documrnt": | |
status = User_Specific_Documents.ingest_file(save_path) | |
answer = User_Specific_Documents.answer_question(query) | |
return f"[New Documrnt]\n{status}\n\n{answer}" | |
else: | |
return "Invalid document type." | |
def pipeline_with_history(message, doc_file, doc_type, history): | |
if not message.strip(): | |
return history, "" | |
response = main_pipeline_with_doc(message, doc_file, doc_type) | |
history = history + [[message, response]] | |
return history, "" | |
import gradio as gr | |
import google.generativeai as genai | |
import os | |
import time | |
# Function to transcribe audio | |
def transcribe_audio(audio_filepath): | |
api_key = "AIzaSyC68cQzvDYEnas6u-5ABgbOSeJLmIKKpP8" | |
if audio_filepath is None: | |
return "No audio provided. Please record or upload an audio file first." | |
if not api_key: | |
return "API Key is missing. Please provide your Google AI API key." | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel(model_name="models/gemini-2.0-flash") # Get the model you want to use | |
print(f"Transcribing audio file: {audio_filepath}") | |
yield "Uploading audio file..." | |
# Upload the audio file | |
audio_file = genai.upload_file(path=audio_filepath) | |
# Check the processing status of the uploaded file | |
while audio_file.state.name == "PROCESSING": | |
time.sleep(2) # Wait for 2 seconds before checking again | |
audio_file = genai.get_file(audio_file.name) | |
if audio_file.state.name == "FAILED": | |
return "[ERROR] Audio file processing failed." | |
yield "Audio uploaded. Transcribing..." | |
# Request transcription from the model | |
response = model.generate_content( | |
["Please transcribe this audio recording.", audio_file], | |
request_options={"timeout": 120} # Set a timeout for the request | |
) | |
query = response.text if response and response.text else "Transcription failed. The response was empty." | |
yield query | |
except Exception as e: | |
print(f"An error occurred during transcription: {e}") | |
yield f"[ERROR] An unexpected error occurred: {e}" | |
def unified_handler(user_text, audio_file, chat_history): | |
chat_history = chat_history or [] | |
msg_from_user = None | |
if user_text and user_text.strip(): | |
msg_from_user = user_text | |
elif audio_file: | |
transcription = None | |
gen = transcribe_audio(audio_file) | |
try: | |
while True: | |
out = next(gen) | |
# Optional: Show progress in chat, if you want | |
if not out.startswith("[ERROR]"): | |
last_out = out | |
except StopIteration as e: | |
# If generator returns a value, it's in e.value | |
transcription = e.value if e.value else last_out | |
if transcription: | |
msg_from_user = transcription | |
if msg_from_user: | |
chat_history.append(("User", msg_from_user)) | |
wisal_reply = process_query(msg_from_user) | |
chat_history.append(("Wisal", wisal_reply)) | |
return chat_history, "", None | |
return chat_history, "", None | |
import gradio as gr | |
import asyncio | |
# Your process_query, transcribe_audio, and text_to_speech_ui functions should exist. | |
def wisal_handler(user_text, audio_file, chat_history): | |
# If user typed a message | |
if user_text and user_text.strip(): | |
chat_history = chat_history or [] | |
response = process_query(user_text) | |
chat_history.append(("User", user_text)) | |
chat_history.append(("Wisal", response)) | |
return chat_history, "", None # Clear input box | |
# If user provided audio | |
if audio_file: | |
transcription = None | |
gen = transcribe_audio(audio_file) | |
for out in gen: | |
if isinstance(out, str) and out.startswith("Uploading"): | |
continue | |
if isinstance(out, str) and not out.startswith("[ERROR]"): | |
transcription = out | |
if isinstance(out, str) and out.startswith("[ERROR]"): | |
chat_history.append(("System", out)) | |
return chat_history, "", None | |
if transcription: | |
chat_history.append(("User", transcription)) # Show transcription! | |
wisal_reply = process_query(transcription) | |
chat_history.append(("Wisal", wisal_reply)) | |
return chat_history, "", None | |
return chat_history, "", None # Nothing sent | |
# Make sure to escape backslashes in the file path (use raw strings or forward slashes) | |
image_path = r"C:\Users\Fouda\OneDrive\Desktop\Aya\Compumacy-Logo-Trans2.png" # Using a raw string | |
with gr.Blocks(title="Wisal Chatbot", theme='Yntec/HaleyCH_Theme_craiyon_alt') as demo: | |
chat_history = gr.State([]) | |
# Add Image (local path) | |
with gr.Row(): | |
gr.Image(value=image_path, show_label=False, container=False, height=100) | |
gr.Markdown("# 🤖 Wisal: Autism AI Assistant") | |
gr.CheckboxGroup(["Doctor", "Patient"], label="Checkbox Group") | |
chatbot = gr.Chatbot(label="Wisal Chat", height=500) | |
with gr.Row(): | |
user_input = gr.Textbox(placeholder="Type your question here...", label="", lines=1) | |
audio_input = gr.Audio( | |
sources=["microphone", "upload"], | |
type="filepath", | |
label="Record or Upload Audio" | |
) | |
send_btn = gr.Button("Send", variant="primary") | |
send_btn.click( | |
fn=wisal_handler, | |
inputs=[user_input, audio_input, chat_history], | |
outputs=[chatbot, user_input, audio_input], | |
) | |
with gr.Row(): | |
audio_output = gr.Audio(label="TTS Audio Output", interactive=True) | |
send_btn.click( | |
fn=wisal_handler, | |
inputs=[user_input, audio_input, chat_history], | |
outputs=[chatbot, user_input, audio_output], | |
api_name="wisal_handler" | |
) | |
with gr.Row() as row2: | |
with gr.Column(): | |
webrtc2 = WebRTC( | |
label="Live Chat", | |
modality="audio", | |
mode="send-receive", | |
elem_id="audio-source", | |
rtc_configuration=get_cloudflare_turn_credentials_async, | |
icon="https://www.gstatic.com/lamda/images/gemini_favicon_f069958c85030456e93de685481c559f160ea06b.png", | |
pulse_color="rgb(255, 255, 255)", | |
icon_button_color="rgb(255, 255, 255)", | |
) | |
webrtc2.stream( | |
GeminiHandler(), | |
inputs=[webrtc2], | |
outputs=[webrtc2], | |
time_limit=180 if get_space() else None, | |
concurrency_limit=2 if get_space() else None, | |
) | |
doc_file = gr.File(label="📎 Upload Document (PDF, DOCX, TXT)", file_types=[".pdf", ".docx", ".txt"]) | |
doc_type = gr.Radio( | |
["None", "Knowledge Document", "User-Specific Document"], | |
value="None", | |
label="Document Type" | |
) | |
user_doc_option = gr.Radio( | |
["New Document", "Old Document"], | |
label="Select User Document Type", | |
visible=False | |
) | |
def toggle_user_doc_visibility(selected_type): | |
return gr.update(visible=(selected_type == "User-Specific Document")) | |
doc_type.change( | |
toggle_user_doc_visibility, | |
inputs=doc_type, | |
outputs=user_doc_option | |
) | |
send_btn.click( | |
fn=pipeline_with_history, | |
inputs=[user_input, doc_file, doc_type, chatbot], | |
outputs=[chatbot, user_input] | |
) | |
clear_btn = gr.Button("Clear Chat", elem_id="clear-button") | |
clear_btn.click(lambda: [], outputs=[chatbot]) | |
# Add custom theme CSS to the app | |
theme_css = """ | |
/* Logo Row */ | |
#logo-row { | |
display: flex; | |
justify-content: center; | |
align-items: center; | |
padding: 1rem; | |
background-color: #222222; /* Dark gray background for the logo row */ | |
} | |
#logo-row img { | |
max-width: 300px; | |
object-fit: contain; | |
} | |
/* Send Button */ | |
#send-button { | |
background-color: #f44336; en color for the Send button */ | |
color: white; | |
font-size: 16px; | |
padding: 10px 24px; | |
border: none; | |
border-radius: 5px; | |
cursor: pointer; | |
} | |
#send-button:hover { | |
background-color: #e53935; | |
} | |
/* Clear Button */ | |
#clear-button { | |
background-color: #f44336; /* Red color for the Clear button */ | |
color: white; | |
font-size: 16px; | |
padding: 10px 24px; | |
border: none; | |
border-radius: 5px; | |
cursor: pointer; | |
} | |
#clear-button:hover { | |
background-color: #e53935; /* Darker red on hover */ | |
} | |
/* Main Container Background */ | |
.gradio-container { | |
background-color: #2C2C2C; /* Dark background color */ | |
padding: 20px; | |
color: white; | |
} | |
/* Saved State Item */ | |
.saved-state-item { | |
padding: 10px; | |
margin: 5px 0; | |
border-radius: 5px; | |
background-color: #333333; /* Dark gray background for saved state items */ | |
color: #ffffff; /* White text color */ | |
cursor: pointer; | |
transition: background-color 0.2s; | |
border: 1px solid #444444; | |
} | |
.saved-state-item:hover { | |
background-color: #444444; /* Slightly lighter gray on hover */ | |
} | |
/* Delete Button */ | |
.delete-button { | |
color: #ff6b6b; /* Red color for delete button */ | |
margin-left: 10px; | |
float: right; | |
font-weight: bold; | |
} | |
/* Filesystem Sessions Container */ | |
.filesystem-sessions-container { | |
max-height: 400px; | |
overflow-y: auto; | |
padding: 5px; | |
border: 1px solid #444; | |
border-radius: 5px; | |
background-color: #222222; /* Dark background for the session container */ | |
} | |
/* Highlight effect when clicking */ | |
.saved-state-item:active { | |
background-color: #555555; /* Darker gray when clicking */ | |
} | |
""" | |
demo.css = theme_css | |
if __name__ == "__main__": | |
demo.launch(debug=True) | |