pdf-something / app.py
Penality's picture
Update app.py
5dc8c5b verified
raw
history blame
5.38 kB
import gradio as gr
import json
import os
import pdfplumber
import together
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import re
import unicodedata
from dotenv import load_dotenv
load_dotenv()
# Set up Together.AI API Key (Replace with your actual key)
assert os.getenv("TOGETHER_API_KEY"), "api key missing"
# Use a sentence transformer for embeddings
#'BAAI/bge-base-en-v1.5'
# embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5")
# 'togethercomputer/m2-bert-80M-8k-retrieval'
embedding_model = SentenceTransformer(
"togethercomputer/m2-bert-80M-8k-retrieval",
trust_remote_code=True # Allow remote code execution
)
embedding_dim = 768 # Adjust according to model
# Initialize FAISS index
index = faiss.IndexFlatL2(embedding_dim)
documents = [] # Store raw text for reference
# initialize the variables to store documents
DOCUMENT_DIR = os.path.join(os.path.dirname(__file__), "documents")
INDEX_FILE = "faiss_index.py" # stores embeddings
METADATA_FILE = "metadata.json" # stores Document metadata
# create the directory
os.makedirs(DOCUMENT_DIR, exist_ok=True)
# load the faiss indexes file
if os.path.exists(INDEX_FILE): # check if index file exists
stored_embeddings = np.load(INDEX_FILE) # load emeddings
if stored_embeddings.shape[0] > 0:
index.add(stored_embeddings)
# load the document metadata
if os.path.exists(METADATA_FILE): # check if metadata exists
with open(METADATA_FILE, "r") as f:
metadata = json.load(f)
else:
metadata = {}
def store_document(text):
print("storing document")
# Generate a unique filename
filename = os.path.join(DOCUMENT_DIR, f"doc_{len(metadata) + 1}.txt")
print(filename)
# Save document in a file
with open(filename, "w") as f:
f.write(text)
print("document saved")
# Generate and store embedding
embedding = embedding_model.encode([text]).astype(np.float32)
index.add(embedding)
print("emeddings generated")
# Update metadata
metadata[len(metadata)] = filename
with open(METADATA_FILE, "w") as f:
json.dump(metadata, f)
# Save FAISS index
np.save(INDEX_FILE, index.reconstruct_n(0, index.ntotal))
print(f"your document has been stored at: {filename}")
return "Document stored!"
def retrieve_document(query):
print(f"retrieving doc based on: \n{query}")
query_embedding = embedding_model.encode([query]).astype(np.float32)
_, closest_idx = index.search(query_embedding, 1)
if closest_idx[0][0] in metadata: # Ensure a valid match
filename = metadata[str(closest_idx[0][0])]
with open(filename, "r") as f:
return f.read()
else:
return None
def clean_text(text):
"""Cleans extracted text for better processing by the model."""
print("cleaning")
text = unicodedata.normalize("NFKC", text) # Normalize Unicode characters
text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces and newlines
text = re.sub(r'[^a-zA-Z0-9.,!?;:\'\"()\-]', ' ', text) # Keep basic punctuation
text = re.sub(r'(?i)(page\s*\d+)', '', text) # Remove page numbers
return text
def extract_text_from_pdf(pdf_file):
"""Extract and clean text from the uploaded PDF."""
print("extracting")
try:
with pdfplumber.open(pdf_file) as pdf:
text = " ".join(clean_text(text) for page in pdf.pages if (text := page.extract_text()))
store_document(text)
return text
except Exception as e:
print(f"Error extracting text: {e}")
return None
def split_text(text, chunk_size=500):
"""Splits text into smaller chunks for better processing."""
print("splitting")
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
def chatbot(pdf_file, user_question):
"""Processes the PDF and answers the user's question."""
print("chatbot start")
if pdf_file:
# Extract text from the PDF
text = extract_text_from_pdf(pdf_file)
if not text:
return "Could not extract any text from the PDF."
# retrieve the document relevant to the query
doc = retrieve_document(user_question)
if doc:
print("found doc")
# Split into smaller chunks
chunks = split_text(doc)
# Use only the first chunk (to optimize token usage)
prompt = f"Based on this document, answer the question:\n\nDocument:\n{chunks[0]}\n\nQuestion: {user_question}"
print(f"prompt: \n{prompt}")
else:
prompt=user_question
try:
print("asking")
response = together.Completion.create(
model="mistralai/Mistral-7B-Instruct-v0.1",
prompt=prompt,
max_tokens=200,
temperature=0.7,
)
# Return chatbot's response
return response.choices[0].text
except Exception as e:
return f"Error generating response: {e}"
# Send to Together.AI (Mistral-7B)
# Gradio Interface
iface = gr.Interface(
fn=chatbot,
inputs=[gr.File(label="Upload PDF"), gr.Textbox(label="Ask a Question")],
outputs=gr.Textbox(label="Answer"),
title="PDF Q&A Chatbot (Powered by Together.AI)"
)
# Launch Gradio app
iface.launch()