Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
import gradio as gr | |
import pandas as pd | |
import requests | |
import random | |
import urllib.parse | |
from tempfile import NamedTemporaryFile | |
from typing import List | |
from bs4 import BeautifulSoup | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_community.llms import HuggingFaceHub | |
from langchain_core.runnables import RunnableParallel, RunnablePassthrough | |
from langchain_core.documents import Document | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
# Memory database to store question-answer pairs | |
memory_database = {} | |
conversation_history = [] | |
def load_and_split_document_basic(file): | |
"""Loads and splits the document into pages.""" | |
loader = PyPDFLoader(file.name) | |
data = loader.load_and_split() | |
return data | |
def load_and_split_document_recursive(file: NamedTemporaryFile) -> List[Document]: | |
"""Loads and splits the document into chunks.""" | |
loader = PyPDFLoader(file.name) | |
pages = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len, | |
) | |
chunks = text_splitter.split_documents(pages) | |
return chunks | |
def get_embeddings(): | |
return HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") | |
def create_or_update_database(data, embeddings): | |
if os.path.exists("faiss_database"): | |
db = FAISS.load_local("faiss_database", embeddings, allow_dangerous_deserialization=True) | |
db.add_documents(data) | |
else: | |
db = FAISS.from_documents(data, embeddings) | |
db.save_local("faiss_database") | |
def clear_cache(): | |
if os.path.exists("faiss_database"): | |
os.remove("faiss_database") | |
return "Cache cleared successfully." | |
else: | |
return "No cache to clear." | |
def get_similarity(text1, text2): | |
vectorizer = TfidfVectorizer().fit_transform([text1, text2]) | |
return cosine_similarity(vectorizer[0:1], vectorizer[1:2])[0][0] | |
prompt = """ | |
Answer the question based on the following information: | |
Conversation History: | |
{history} | |
Context from documents: | |
{context} | |
Current Question: {question} | |
If the question is referring to the conversation history, use that information to answer. | |
If the question is not related to the conversation history, use the context from documents to answer. | |
If you don't have enough information to answer, say so. | |
Provide a concise and direct answer to the question: | |
""" | |
def get_model(temperature, top_p, repetition_penalty): | |
return HuggingFaceHub( | |
repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
model_kwargs={ | |
"temperature": temperature, | |
"top_p": top_p, | |
"repetition_penalty": repetition_penalty, | |
"max_length": 1000 | |
}, | |
huggingfacehub_api_token=huggingface_token | |
) | |
def generate_chunked_response(model, prompt, max_tokens=1000, max_chunks=5): | |
full_response = "" | |
for i in range(max_chunks): | |
chunk = model(prompt + full_response, max_new_tokens=max_tokens) | |
chunk = chunk.strip() | |
if chunk.endswith((".", "!", "?")): | |
full_response += chunk | |
break | |
full_response += chunk | |
return full_response.strip() | |
def manage_conversation_history(question, answer, history, max_history=5): | |
history.append({"question": question, "answer": answer}) | |
if len(history) > max_history: | |
history.pop(0) | |
return history | |
def is_related_to_history(question, history, threshold=0.3): | |
if not history: | |
return False | |
history_text = " ".join([f"{h['question']} {h['answer']}" for h in history]) | |
similarity = get_similarity(question, history_text) | |
return similarity > threshold | |
def extract_text_from_webpage(html): | |
soup = BeautifulSoup(html, 'html.parser') | |
for script in soup(["script", "style"]): | |
script.extract() # Remove scripts and styles | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text | |
_useragent_list = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Edge/91.0.864.59 Safari/537.36", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Safari/537.36", | |
] | |
def google_search(term, num_results=5, lang="en", timeout=5, safe="active", ssl_verify=None): | |
escaped_term = urllib.parse.quote_plus(term) | |
start = 0 | |
all_results = [] | |
max_chars_per_page = 8000 # Limit the number of characters from each webpage to stay under the token limit | |
print(f"Starting Google search for term: '{term}'") | |
with requests.Session() as session: | |
while start < num_results: | |
try: | |
user_agent = random.choice(_useragent_list) | |
headers = { | |
'User-Agent': user_agent | |
} | |
resp = session.get( | |
url="https://www.google.com/search", | |
headers=headers, | |
params={ | |
"q": term, | |
"num": num_results - start, | |
"hl": lang, | |
"start": start, | |
"safe": safe, | |
}, | |
timeout=timeout, | |
verify=ssl_verify, | |
) | |
resp.raise_for_status() | |
print(f"Successfully retrieved search results page (start={start})") | |
except requests.exceptions.RequestException as e: | |
print(f"Error retrieving search results: {e}") | |
break | |
soup = BeautifulSoup(resp.text, "html.parser") | |
result_block = soup.find_all("div", attrs={"class": "g"}) | |
if not result_block: | |
print("No results found on this page") | |
break | |
print(f"Found {len(result_block)} results on this page") | |
for result in result_block: | |
link = result.find("a", href=True) | |
if link: | |
link = link["href"] | |
print(f"Processing link: {link}") | |
try: | |
webpage = session.get(link, headers=headers, timeout=timeout) | |
webpage.raise_for_status() | |
visible_text = extract_text_from_webpage(webpage.text) | |
if len(visible_text) > max_chars_per_page: | |
visible_text = visible_text[:max_chars_per_page] + "..." | |
all_results.append({"link": link, "text": visible_text}) | |
print(f"Successfully extracted text from {link}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error retrieving webpage content: {e}") | |
all_results.append({"link": link, "text": None}) | |
else: | |
print("No link found for this result") | |
all_results.append({"link": None, "text": None}) | |
start += len(result_block) | |
print(f"Search completed. Total results: {len(all_results)}") | |
print("Search results:") | |
for i, result in enumerate(all_results, 1): | |
print(f"Result {i}:") | |
print(f" Link: {result['link']}") | |
if result['text']: | |
print(f" Text: {result['text'][:100]}...") # Display the first 100 characters of the text for brevity | |
else: | |
print(" No text extracted") | |
return all_results | |
def process_question(question, documents, history, temperature, top_p, repetition_penalty): | |
global conversation_history | |
embeddings = get_embeddings() | |
# Check the memory database for similar questions | |
for prev_question, prev_answer in memory_database.items(): | |
similarity = get_similarity(question, prev_question) | |
if similarity > 0.7: | |
return prev_answer | |
# Load the FAISS vector store if it exists | |
if os.path.exists("faiss_database"): | |
db = FAISS.load_local("faiss_database", embeddings, allow_dangerous_deserialization=True) | |
relevant_docs = db.similarity_search(question, k=3) | |
else: | |
relevant_docs = [] | |
if len(relevant_docs) == 0: | |
# Perform web search and update the vector store | |
web_search_results = google_search(question, num_results=5) | |
web_docs = [Document(page_content=res["text"] or "", metadata={"source": res["link"]}) for res in web_search_results if res["text"]] | |
if web_docs: | |
# Update the FAISS vector store with new documents | |
create_or_update_database(web_docs, embeddings) | |
# Reload the updated FAISS store and retrieve relevant documents | |
db = FAISS.load_local("faiss_database", embeddings, allow_dangerous_deserialization=True) | |
relevant_docs = db.similarity_search(question, k=3) | |
context = "\n\n".join([doc.page_content for doc in relevant_docs]) | |
if is_related_to_history(question, history): | |
context = "None" | |
else: | |
history_text = "\n".join([f"Q: {h['question']}\nA: {h['answer']}" for h in history]) | |
context = context if context else "None" | |
prompt_text = ChatPromptTemplate( | |
input_variables=["history", "context", "question"], | |
template=prompt | |
).format(history=history_text, context=context, question=question) | |
model = get_model(temperature, top_p, repetition_penalty) | |
answer = generate_chunked_response(model, prompt_text) | |
conversation_history = manage_conversation_history(question, answer, history) | |
memory_database[question] = answer | |
return answer | |
def process_uploaded_file(file, is_recursive): | |
if is_recursive: | |
data = load_and_split_document_recursive(file) | |
else: | |
data = load_and_split_document_basic(file) | |
embeddings = get_embeddings() | |
create_or_update_database(data, embeddings) | |
return "File processed and data added to the vector database." | |
def extract_db_to_excel(): | |
embed = get_embeddings() | |
database = FAISS.load_local("faiss_database", embed, allow_dangerous_deserialization=True) | |
documents = database.docstore._dict.values() | |
data = [{"page_content": doc.page_content, "metadata": json.dumps(doc.metadata)} for doc in documents] | |
df = pd.DataFrame(data) | |
with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
excel_path = tmp.name | |
df.to_excel(excel_path, index=False) | |
return excel_path | |
def export_memory_db_to_excel(): | |
data = [{"question": question, "answer": answer} for question, answer in memory_database.items()] | |
df_memory = pd.DataFrame(data) | |
data_history = [{"question": item["question"], "answer": item["answer"]} for item in conversation_history] | |
df_history = pd.DataFrame(data_history) | |
with NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
excel_path = tmp.name | |
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer: | |
df_memory.to_excel(writer, sheet_name='Memory Database', index=False) | |
df_history.to_excel(writer, sheet_name='Conversation History', index=False) | |
return excel_path | |
with gr.Blocks() as demo: | |
with gr.Tab("Upload PDF"): | |
with gr.Row(): | |
pdf_file = gr.File(label="Upload PDF") | |
with gr.Row(): | |
recursive_check = gr.Checkbox(label="Use Recursive Text Splitter") | |
upload_button = gr.Button("Upload and Process") | |
with gr.Row(): | |
upload_output = gr.Textbox(label="Upload Output") | |
with gr.Tab("Ask Questions"): | |
with gr.Row(): | |
question = gr.Textbox(label="Your Question") | |
with gr.Row(): | |
temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, label="Temperature") | |
top_p = gr.Slider(minimum=0.0, maximum=1.0, value=0.9, label="Top P") | |
repetition_penalty = gr.Slider(minimum=0.0, maximum=2.0, value=1.0, label="Repetition Penalty") | |
with gr.Row(): | |
ask_button = gr.Button("Ask") | |
with gr.Row(): | |
answer = gr.Textbox(label="Answer") | |
with gr.Tab("Clear Cache"): | |
with gr.Row(): | |
clear_button = gr.Button("Clear Cache") | |
with gr.Row(): | |
clear_output = gr.Textbox(label="Clear Output") | |
with gr.Tab("Export Data"): | |
with gr.Row(): | |
export_db_button = gr.Button("Export Database to Excel") | |
export_db_output = gr.Textbox(label="Export Output") | |
with gr.Row(): | |
export_memory_button = gr.Button("Export Memory DB to Excel") | |
export_memory_output = gr.Textbox(label="Export Output") | |
upload_button.click(process_uploaded_file, [pdf_file, recursive_check], upload_output) | |
ask_button.click(process_question, [question, pdf_file, recursive_check, temperature, top_p, repetition_penalty], answer) | |
clear_button.click(clear_cache, [], clear_output) | |
export_db_button.click(extract_db_to_excel, [], export_db_output) | |
export_memory_button.click(export_memory_db_to_excel, [], export_memory_output) | |
demo.launch() |