|
import os |
|
import json |
|
import re |
|
import gradio as gr |
|
import pandas as pd |
|
import requests |
|
import random |
|
import urllib.parse |
|
import spacy |
|
import nltk |
|
from nltk.tokenize import word_tokenize |
|
from nltk.tokenize import sent_tokenize |
|
from typing import List, Dict |
|
from tempfile import NamedTemporaryFile |
|
from typing import List, Dict |
|
from bs4 import BeautifulSoup |
|
from langchain.prompts import PromptTemplate |
|
from langchain.chains import LLMChain |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain_core.output_parsers import StrOutputParser |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain_community.llms import HuggingFaceHub |
|
from langchain_core.documents import Document |
|
|
|
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") |
|
|
|
|
|
nltk.download('punkt') |
|
nltk.download('averaged_perceptron_tagger') |
|
|
|
class Agent1: |
|
def __init__(self): |
|
self.question_words = set(["what", "when", "where", "who", "whom", "which", "whose", "why", "how"]) |
|
self.conjunctions = set(["and", "or"]) |
|
self.pronouns = set(["it", "its", "they", "their", "them", "he", "his", "him", "she", "her", "hers"]) |
|
self.context = {} |
|
|
|
def is_question(self, text: str) -> bool: |
|
words = word_tokenize(text.lower()) |
|
return (words[0] in self.question_words or |
|
text.strip().endswith('?') or |
|
any(word in self.question_words for word in words)) |
|
|
|
def find_subject(self, sentence): |
|
tokens = nltk.pos_tag(word_tokenize(sentence)) |
|
subject = None |
|
for word, tag in tokens: |
|
if tag.startswith('NN'): |
|
subject = word |
|
break |
|
if tag == 'IN': |
|
break |
|
return subject |
|
|
|
def replace_pronoun(self, questions: List[str]) -> List[str]: |
|
if len(questions) < 2: |
|
return questions |
|
|
|
subject = self.find_subject(questions[0]) |
|
|
|
if not subject: |
|
return questions |
|
|
|
for i in range(1, len(questions)): |
|
words = word_tokenize(questions[i]) |
|
for j, word in enumerate(words): |
|
if word.lower() in self.pronouns: |
|
words[j] = subject |
|
questions[i] = ' '.join(words) |
|
|
|
return questions |
|
|
|
def rephrase_and_split(self, user_input: str) -> List[str]: |
|
words = word_tokenize(user_input) |
|
questions = [] |
|
current_question = [] |
|
|
|
for word in words: |
|
if word.lower() in self.conjunctions and current_question: |
|
if self.is_question(' '.join(current_question)): |
|
questions.append(' '.join(current_question)) |
|
current_question = [] |
|
else: |
|
current_question.append(word) |
|
|
|
if current_question: |
|
if self.is_question(' '.join(current_question)): |
|
questions.append(' '.join(current_question)) |
|
|
|
if not questions: |
|
return [user_input] |
|
|
|
questions = self.replace_pronoun(questions) |
|
|
|
return questions |
|
|
|
def update_context(self, query: str): |
|
tokens = nltk.pos_tag(word_tokenize(query)) |
|
noun_phrases = [] |
|
current_phrase = [] |
|
|
|
for word, tag in tokens: |
|
if tag.startswith('NN') or tag.startswith('JJ'): |
|
current_phrase.append(word) |
|
else: |
|
if current_phrase: |
|
noun_phrases.append(' '.join(current_phrase)) |
|
current_phrase = [] |
|
|
|
if current_phrase: |
|
noun_phrases.append(' '.join(current_phrase)) |
|
|
|
if noun_phrases: |
|
self.context['main_topic'] = noun_phrases[0] |
|
self.context['related_topics'] = noun_phrases[1:] |
|
self.context['last_query'] = query |
|
|
|
def apply_context(self, query: str) -> str: |
|
words = word_tokenize(query.lower()) |
|
|
|
if (len(words) <= 5 or |
|
any(word in self.pronouns for word in words) or |
|
(self.context.get('main_topic') and self.context['main_topic'].lower() not in query.lower())): |
|
|
|
new_query_parts = [] |
|
main_topic_added = False |
|
|
|
for word in words: |
|
if word in self.pronouns and self.context.get('main_topic'): |
|
new_query_parts.append(self.context['main_topic']) |
|
main_topic_added = True |
|
else: |
|
new_query_parts.append(word) |
|
|
|
if not main_topic_added and self.context.get('main_topic'): |
|
new_query_parts.append(f"in the context of {self.context['main_topic']}") |
|
|
|
query = ' '.join(new_query_parts) |
|
|
|
if self.context.get('last_query'): |
|
query = f"{self.context['last_query']} and now {query}" |
|
|
|
return query |
|
|
|
def process(self, user_input: str) -> tuple[List[str], Dict[str, List[Dict[str, str]]]]: |
|
self.update_context(user_input) |
|
contextualized_input = self.apply_context(user_input) |
|
queries = self.rephrase_and_split(contextualized_input) |
|
print("Identified queries:", queries) |
|
|
|
results = {} |
|
for query in queries: |
|
results[query] = google_search(query) |
|
|
|
return queries, results |
|
|
|
def load_document(file: NamedTemporaryFile) -> List[Document]: |
|
"""Loads and splits the document into pages.""" |
|
loader = PyPDFLoader(file.name) |
|
return loader.load_and_split() |
|
|
|
def update_vectors(files): |
|
if not files: |
|
return "Please upload at least one PDF file." |
|
|
|
embed = get_embeddings() |
|
total_chunks = 0 |
|
|
|
all_data = [] |
|
for file in files: |
|
data = load_document(file) |
|
all_data.extend(data) |
|
total_chunks += len(data) |
|
|
|
if os.path.exists("faiss_database"): |
|
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
|
database.add_documents(all_data) |
|
else: |
|
database = FAISS.from_documents(all_data, embed) |
|
|
|
database.save_local("faiss_database") |
|
|
|
return f"Vector store updated successfully. Processed {total_chunks} chunks from {len(files)} files." |
|
|
|
def get_embeddings(): |
|
return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
|
|
def clear_cache(): |
|
if os.path.exists("faiss_database"): |
|
os.remove("faiss_database") |
|
return "Cache cleared successfully." |
|
else: |
|
return "No cache to clear." |
|
|
|
def get_model(temperature, top_p, repetition_penalty): |
|
return HuggingFaceHub( |
|
repo_id="mistralai/Mistral-7B-Instruct-v0.3", |
|
model_kwargs={ |
|
"temperature": temperature, |
|
"top_p": top_p, |
|
"repetition_penalty": repetition_penalty, |
|
"max_length": 1000 |
|
}, |
|
huggingfacehub_api_token=huggingface_token |
|
) |
|
|
|
def generate_chunked_response(model, prompt, max_tokens=1000, max_chunks=5): |
|
full_response = "" |
|
for i in range(max_chunks): |
|
try: |
|
chunk = model(prompt + full_response, max_new_tokens=max_tokens) |
|
chunk = chunk.strip() |
|
if chunk.endswith((".", "!", "?")): |
|
full_response += chunk |
|
break |
|
full_response += chunk |
|
except Exception as e: |
|
print(f"Error in generate_chunked_response: {e}") |
|
break |
|
return full_response.strip() |
|
|
|
def extract_text_from_webpage(html): |
|
soup = BeautifulSoup(html, 'html.parser') |
|
for script in soup(["script", "style"]): |
|
script.extract() |
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = '\n'.join(chunk for chunk in chunks if chunk) |
|
return text |
|
|
|
_useragent_list = [ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", |
|
] |
|
|
|
def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): |
|
escaped_term = urllib.parse.quote_plus(term) |
|
start = 0 |
|
all_results = [] |
|
max_chars_per_page = 8000 |
|
|
|
print(f"Starting Google search for term: '{term}'") |
|
|
|
with requests.Session() as session: |
|
while start < num_results: |
|
try: |
|
user_agent = random.choice(_useragent_list) |
|
headers = { |
|
'User-Agent': user_agent |
|
} |
|
resp = session.get( |
|
url="https://www.google.com/search", |
|
headers=headers, |
|
params={ |
|
"q": term, |
|
"num": num_results - start, |
|
"hl": lang, |
|
"start": start, |
|
"safe": safe, |
|
}, |
|
timeout=timeout, |
|
verify=ssl_verify, |
|
) |
|
resp.raise_for_status() |
|
print(f"Successfully retrieved search results page (start={start})") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error retrieving search results: {e}") |
|
break |
|
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
result_block = soup.find_all("div", attrs={"class": "g"}) |
|
if not result_block: |
|
print("No results found on this page") |
|
break |
|
|
|
print(f"Found {len(result_block)} results on this page") |
|
for result in result_block: |
|
link = result.find("a", href=True) |
|
if link: |
|
link = link["href"] |
|
print(f"Processing link: {link}") |
|
try: |
|
webpage = session.get(link, headers=headers, timeout=timeout) |
|
webpage.raise_for_status() |
|
visible_text = extract_text_from_webpage(webpage.text) |
|
if len(visible_text) > max_chars_per_page: |
|
visible_text = visible_text[:max_chars_per_page] + "..." |
|
all_results.append({"link": link, "text": visible_text}) |
|
print(f"Successfully extracted text from {link}") |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error retrieving webpage content: {e}") |
|
all_results.append({"link": link, "text": None}) |
|
else: |
|
print("No link found for this result") |
|
all_results.append({"link": None, "text": None}) |
|
start += len(result_block) |
|
|
|
print(f"Search completed. Total results: {len(all_results)}") |
|
|
|
if not all_results: |
|
print("No search results found. Returning a default message.") |
|
return [{"link": None, "text": "No information found in the web search results."}] |
|
|
|
return all_results |
|
|
|
def ask_question(question, temperature, top_p, repetition_penalty, web_search, agent1=None): |
|
if not question: |
|
return "Please enter a question." |
|
|
|
if agent1 is None: |
|
agent1 = Agent1() |
|
|
|
model = get_model(temperature, top_p, repetition_penalty) |
|
embed = get_embeddings() |
|
|
|
if os.path.exists("faiss_database"): |
|
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) |
|
else: |
|
database = None |
|
|
|
max_attempts = 3 |
|
context_reduction_factor = 0.7 |
|
|
|
agent1.update_context(question) |
|
contextualized_question = agent1.apply_context(question) |
|
|
|
if web_search: |
|
queries, search_results = agent1.process(contextualized_question) |
|
all_answers = [] |
|
|
|
for query in queries: |
|
for attempt in range(max_attempts): |
|
try: |
|
web_docs = [Document(page_content=result["text"], metadata={"source": result["link"], "query": query}) for result in search_results[query] if result["text"]] |
|
|
|
if database is None: |
|
database = FAISS.from_documents(web_docs, embed) |
|
else: |
|
database.add_documents(web_docs) |
|
|
|
database.save_local("faiss_database") |
|
|
|
context_str = "\n".join([f"Query: {doc.metadata['query']}\nSource: {doc.metadata['source']}\nContent: {doc.page_content}" for doc in web_docs]) |
|
|
|
prompt_template = """ |
|
Answer the question based on the following web search results: |
|
Web Search Results: |
|
{context} |
|
Original Question: {question} |
|
If the web search results don't contain relevant information, state that the information is not available in the search results. |
|
Provide a summarized and direct answer to the original question without mentioning the web search or these instructions. |
|
Do not include any source information in your answer. |
|
""" |
|
|
|
prompt_val = ChatPromptTemplate.from_template(prompt_template) |
|
formatted_prompt = prompt_val.format(context=context_str, question=query) |
|
|
|
full_response = generate_chunked_response(model, formatted_prompt) |
|
|
|
answer_patterns = [ |
|
r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", |
|
r"Provide a concise and direct answer to the question:", |
|
r"Answer:", |
|
r"Provide a summarized and direct answer to the original question without mentioning the web search or these instructions:", |
|
r"Do not include any source information in your answer." |
|
] |
|
|
|
for pattern in answer_patterns: |
|
match = re.split(pattern, full_response, flags=re.IGNORECASE) |
|
if len(match) > 1: |
|
answer = match[-1].strip() |
|
break |
|
else: |
|
answer = full_response.strip() |
|
|
|
all_answers.append(answer) |
|
break |
|
|
|
except Exception as e: |
|
print(f"Error in ask_question for query '{query}' (attempt {attempt + 1}): {e}") |
|
if "Input validation error" in str(e) and attempt < max_attempts - 1: |
|
print(f"Reducing context length for next attempt") |
|
elif attempt == max_attempts - 1: |
|
all_answers.append(f"I apologize, but I'm having trouble processing the query '{query}' due to its length or complexity.") |
|
|
|
answer = "\n\n".join(all_answers) |
|
sources = set(doc.metadata['source'] for docs in search_results.values() for doc in [Document(page_content=result["text"], metadata={"source": result["link"]}) for result in docs if result["text"]]) |
|
sources_section = "\n\nSources:\n" + "\n".join(f"- {source}" for source in sources) |
|
answer += sources_section |
|
|
|
return answer |
|
|
|
else: |
|
for attempt in range(max_attempts): |
|
try: |
|
if database is None: |
|
return "No documents available. Please upload documents or enable web search to answer questions." |
|
|
|
retriever = database.as_retriever() |
|
relevant_docs = retriever.get_relevant_documents(contextualized_question) |
|
context_str = "\n".join([doc.page_content for doc in relevant_docs]) |
|
|
|
if attempt > 0: |
|
words = context_str.split() |
|
context_str = " ".join(words[:int(len(words) * context_reduction_factor)]) |
|
|
|
prompt_template = """ |
|
Answer the question based on the following context: |
|
Context: |
|
{context} |
|
Current Question: {question} |
|
If the context doesn't contain relevant information, state that the information is not available. |
|
Provide a summarized and direct answer to the question. |
|
Do not include any source information in your answer. |
|
""" |
|
|
|
prompt_val = ChatPromptTemplate.from_template(prompt_template) |
|
formatted_prompt = prompt_val.format(context=context_str, question=contextualized_question) |
|
|
|
full_response = generate_chunked_response(model, formatted_prompt) |
|
|
|
answer_patterns = [ |
|
r"Provide a concise and direct answer to the question without mentioning the web search or these instructions:", |
|
r"Provide a concise and direct answer to the question:", |
|
r"Answer:", |
|
r"Provide a summarized and direct answer to the original question without mentioning the web search or these instructions:", |
|
r"Do not include any source information in your answer." |
|
] |
|
|
|
for pattern in answer_patterns: |
|
match = re.split(pattern, full_response, flags=re.IGNORECASE) |
|
if len(match) > 1: |
|
answer = match[-1].strip() |
|
break |
|
else: |
|
answer = full_response.strip() |
|
|
|
return answer |
|
|
|
except Exception as e: |
|
print(f"Error in ask_question (attempt {attempt + 1}): {e}") |
|
if "Input validation error" in str(e) and attempt < max_attempts - 1: |
|
print(f"Reducing context length for next attempt") |
|
elif attempt == max_attempts - 1: |
|
return f"I apologize, but I'm having trouble processing your question due to its length or complexity. Could you please try rephrasing it more concisely?" |
|
|
|
return "An unexpected error occurred. Please try again later." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Chat with your PDF documents and Web Search") |
|
|
|
with gr.Row(): |
|
file_input = gr.Files(label="Upload your PDF documents", file_types=[".pdf"]) |
|
update_button = gr.Button("Upload PDF") |
|
|
|
update_output = gr.Textbox(label="Update Status") |
|
update_button.click(update_vectors, inputs=[file_input], outputs=update_output) |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(label="Conversation") |
|
question_input = gr.Textbox(label="Perplexity AI lite, enable web search to retrieve any web search results. Feel free to provide any feedbacks.") |
|
submit_button = gr.Button("Submit") |
|
with gr.Column(scale=1): |
|
temperature_slider = gr.Slider(label="Temperature", minimum=0.0, maximum=1.0, value=0.5, step=0.1) |
|
top_p_slider = gr.Slider(label="Top P", minimum=0.0, maximum=1.0, value=0.9, step=0.1) |
|
repetition_penalty_slider = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, value=1.0, step=0.1) |
|
web_search_checkbox = gr.Checkbox(label="Enable Web Search", value=False) |
|
|
|
agent1 = Agent1() |
|
|
|
def chat(question, history, temperature, top_p, repetition_penalty, web_search): |
|
answer = ask_question(question, temperature, top_p, repetition_penalty, web_search, agent1) |
|
history.append((question, answer)) |
|
return "", history |
|
|
|
submit_button.click(chat, inputs=[question_input, chatbot, temperature_slider, top_p_slider, repetition_penalty_slider, web_search_checkbox], outputs=[question_input, chatbot]) |
|
|
|
clear_button = gr.Button("Clear Cache") |
|
clear_output = gr.Textbox(label="Cache Status") |
|
clear_button.click(clear_cache, inputs=[], outputs=clear_output) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |