Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
import gradio as gr | |
import requests | |
from duckduckgo_search import DDGS | |
from typing import List | |
from pydantic import BaseModel, Field | |
from tempfile import NamedTemporaryFile | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from llama_parse import LlamaParse | |
from langchain_core.documents import Document | |
from huggingface_hub import InferenceClient | |
import inspect | |
# Environment variables and configurations | |
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
llama_cloud_api_key = os.environ.get("LLAMA_CLOUD_API_KEY") | |
MODELS = [ | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"microsoft/Phi-3-mini-4k-instruct" | |
] | |
# Initialize LlamaParse | |
llama_parser = LlamaParse( | |
api_key=llama_cloud_api_key, | |
result_type="markdown", | |
num_workers=4, | |
verbose=True, | |
language="en", | |
) | |
def load_document(file: NamedTemporaryFile, parser: str = "llamaparse") -> List[Document]: | |
"""Loads and splits the document into pages.""" | |
if parser == "pypdf": | |
loader = PyPDFLoader(file.name) | |
return loader.load_and_split() | |
elif parser == "llamaparse": | |
try: | |
documents = llama_parser.load_data(file.name) | |
return [Document(page_content=doc.text, metadata={"source": file.name}) for doc in documents] | |
except Exception as e: | |
print(f"Error using Llama Parse: {str(e)}") | |
print("Falling back to PyPDF parser") | |
loader = PyPDFLoader(file.name) | |
return loader.load_and_split() | |
else: | |
raise ValueError("Invalid parser specified. Use 'pypdf' or 'llamaparse'.") | |
def get_embeddings(): | |
return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
def update_vectors(files, parser): | |
if not files: | |
return "Please upload at least one PDF file." | |
embed = get_embeddings() | |
total_chunks = 0 | |
all_data = [] | |
for file in files: | |
data = load_document(file, parser) | |
all_data.extend(data) | |
total_chunks += len(data) | |
if os.path.exists("faiss_database"): | |
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
database.add_documents(all_data) | |
else: | |
database = FAISS.from_documents(all_data, embed) | |
database.save_local("faiss_database") | |
return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files using {parser}." | |
def generate_chunked_response(prompt, model, max_tokens=1000, num_calls=3, temperature=0.2, should_stop=False): | |
print(f"Starting generate_chunked_response with {num_calls} calls") | |
client = InferenceClient(model, token=huggingface_token) | |
full_response = "" | |
messages = [{"role": "user", "content": prompt}] | |
for i in range(num_calls): | |
print(f"Starting API call {i+1}") | |
if should_stop: | |
print("Stop clicked, breaking loop") | |
break | |
try: | |
for message in client.chat_completion( | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
stream=True, | |
): | |
if should_stop: | |
print("Stop clicked during streaming, breaking") | |
break | |
if message.choices and message.choices[0].delta and message.choices[0].delta.content: | |
chunk = message.choices[0].delta.content | |
full_response += chunk | |
print(f"API call {i+1} completed") | |
except Exception as e: | |
print(f"Error in generating response: {str(e)}") | |
# Clean up the response | |
clean_response = re.sub(r'<s>\[INST\].*?\[/INST\]\s*', '', full_response, flags=re.DOTALL) | |
clean_response = clean_response.replace("Using the following context:", "").strip() | |
clean_response = clean_response.replace("Using the following context from the PDF documents:", "").strip() | |
# Remove duplicate paragraphs and sentences | |
paragraphs = clean_response.split('\n\n') | |
unique_paragraphs = [] | |
for paragraph in paragraphs: | |
if paragraph not in unique_paragraphs: | |
sentences = paragraph.split('. ') | |
unique_sentences = [] | |
for sentence in sentences: | |
if sentence not in unique_sentences: | |
unique_sentences.append(sentence) | |
unique_paragraphs.append('. '.join(unique_sentences)) | |
final_response = '\n\n'.join(unique_paragraphs) | |
print(f"Final clean response: {final_response[:100]}...") | |
return final_response | |
def duckduckgo_search(query): | |
with DDGS() as ddgs: | |
results = ddgs.text(query, max_results=5) | |
return results | |
class CitingSources(BaseModel): | |
sources: List[str] = Field( | |
..., | |
description="List of sources to cite. Should be an URL of the source." | |
) | |
def chatbot_interface(message, history, use_web_search, model, temperature, num_calls): | |
if not message.strip(): | |
return "", history | |
history = history + [(message, "")] | |
try: | |
if use_web_search: | |
for main_content, sources in get_response_with_search(message, model, num_calls=num_calls, temperature=temperature): | |
history[-1] = (message, f"{main_content}\n\n{sources}") | |
yield history | |
else: | |
for partial_response in get_response_from_pdf(message, model, num_calls=num_calls, temperature=temperature): | |
history[-1] = (message, partial_response) | |
yield history | |
except gr.CancelledError: | |
yield history | |
def retry_last_response(history, use_web_search, model, temperature, num_calls): | |
if not history: | |
return history | |
last_user_msg = history[-1][0] | |
history = history[:-1] # Remove the last response | |
return chatbot_interface(last_user_msg, history, use_web_search, model, temperature, num_calls) | |
def respond(message, history, model, temperature, num_calls, use_web_search): | |
if use_web_search: | |
for main_content, sources in get_response_with_search(message, model, num_calls=num_calls, temperature=temperature): | |
yield f"{main_content}\n\n{sources}" | |
else: | |
for partial_response in get_response_from_pdf(message, model, num_calls=num_calls, temperature=temperature): | |
yield partial_response | |
def get_response_with_search(query, model, num_calls=3, temperature=0.2): | |
search_results = duckduckgo_search(query) | |
context = "\n".join(f"{result['title']}\n{result['body']}\nSource: {result['href']}\n" | |
for result in search_results if 'body' in result) | |
prompt = f"""Using the following context: | |
{context} | |
Write a detailed and complete research document that fulfills the following user request: '{query}' | |
After writing the document, please provide a list of sources used in your response.""" | |
client = InferenceClient(model, token=huggingface_token) | |
main_content = "" | |
for i in range(num_calls): | |
for message in client.chat_completion( | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=1000, | |
temperature=temperature, | |
stream=True, | |
): | |
if message.choices and message.choices[0].delta and message.choices[0].delta.content: | |
chunk = message.choices[0].delta.content | |
main_content += chunk | |
yield main_content, "" # Yield partial main content without sources | |
def get_response_from_pdf(query, model, num_calls=3, temperature=0.2): | |
embed = get_embeddings() | |
if os.path.exists("faiss_database"): | |
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
else: | |
yield "No documents available. Please upload PDF documents to answer questions." | |
return | |
retriever = database.as_retriever() | |
relevant_docs = retriever.get_relevant_documents(query) | |
context_str = "\n".join([doc.page_content for doc in relevant_docs]) | |
prompt = f"""Using the following context from the PDF documents: | |
{context_str} | |
Write a detailed and complete response that answers the following user question: '{query}'""" | |
client = InferenceClient(model, token=huggingface_token) | |
response = "" | |
for i in range(num_calls): | |
for message in client.chat_completion( | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=1000, | |
temperature=temperature, | |
stream=True, | |
): | |
if message.choices and message.choices[0].delta and message.choices[0].delta.content: | |
chunk = message.choices[0].delta.content | |
response += chunk | |
yield response # Yield partial response | |
def vote(data: gr.LikeData): | |
if data.liked: | |
print(f"You upvoted this response: {data.value}") | |
else: | |
print(f"You downvoted this response: {data.value}") | |
css = """ | |
/* Add your custom CSS here */ | |
""" | |
# Define the checkbox outside the demo block | |
use_web_search = gr.Checkbox(label="Use Web Search", value=False) | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Dropdown(choices=MODELS, label="Select Model", value=MODELS[0]), | |
gr.Slider(minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Temperature"), | |
gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Number of API Calls"), | |
use_web_search # Add this line to include the checkbox | |
], | |
title="AI-powered Web Search and PDF Chat Assistant", | |
description="Chat with your PDFs or use web search to answer questions.", | |
theme=gr.themes.Soft( | |
primary_hue="orange", | |
secondary_hue="amber", | |
neutral_hue="gray", | |
font=[gr.themes.GoogleFont("Exo"), "ui-sans-serif", "system-ui", "sans-serif"] | |
).set( | |
body_background_fill_dark="#0c0505", | |
block_background_fill_dark="#0c0505", | |
block_border_width="1px", | |
block_title_background_fill_dark="#1b0f0f", | |
input_background_fill_dark="#140b0b", | |
button_secondary_background_fill_dark="#140b0b", | |
border_color_accent_dark="#1b0f0f", | |
border_color_primary_dark="#1b0f0f", | |
background_fill_secondary_dark="#0c0505", | |
color_accent_soft_dark="transparent", | |
code_background_fill_dark="#140b0b" | |
), | |
css=css, | |
examples=[ | |
["Tell me about the contents of the uploaded PDFs."], | |
["What are the main topics discussed in the documents?"], | |
["Can you summarize the key points from the PDFs?"] | |
], | |
cache_examples=False, | |
analytics_enabled=False, | |
) | |
# Add file upload functionality | |
with demo: | |
gr.Markdown("## Upload PDF Documents") | |
with gr.Row(): | |
file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) | |
parser_dropdown = gr.Dropdown(choices=["pypdf", "llamaparse"], label="Select PDF Parser", value="llamaparse") | |
update_button = gr.Button("Upload Document") | |
update_output = gr.Textbox(label="Update Status") | |
update_button.click(update_vectors, inputs=[file_input, parser_dropdown], outputs=update_output) | |
gr.Markdown( | |
""" | |
## How to use | |
1. Upload PDF documents using the file input at the top. | |
2. Select the PDF parser (pypdf or llamaparse) and click "Upload Document" to update the vector store. | |
3. Ask questions in the chat interface. | |
4. Toggle "Use Web Search" to switch between PDF chat and web search, the toggle box is present inside additional inputs dropdown. | |
5. Adjust Temperature and Number of API Calls to fine-tune the response generation. | |
6. Use the provided examples or ask your own questions. | |
""" | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True) |