Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import pdfplumber | |
import together | |
from transformers import pipeline | |
from sentence_transformers import SentenceTransformer | |
import faiss | |
import numpy as np | |
import huggingface_hub as login | |
import re | |
import unicodedata | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Set up Together.AI API Key (Replace with your actual key) | |
assert os.getenv("TOGETHER_API_KEY"), "api key missing" | |
# Retrieve the API token from secrets | |
api_token = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
if api_token: | |
login(api_token) # Authenticate with Hugging Face | |
# Load LLaMA-2 Model | |
llama_pipe = pipeline("text-generation", model="meta-llama/Llama-2-7b-chat-hf") | |
# Load Sentence Transformer for embeddings | |
embedding_model = SentenceTransformer("all-MiniLM-L6-v2") | |
# Initialize FAISS index | |
embedding_dim = 384 # For MiniLM model | |
index = faiss.IndexFlatL2(embedding_dim) | |
documents = [] # Store raw text for reference | |
def store_document(text): | |
print("storing document") | |
embedding = embedding_model.encode([text]) | |
index.add(np.array(embedding, dtype=np.float32)) | |
documents.append(text) | |
print(f"your document has been stored: \n{documents}") | |
return "Document stored!" | |
def retrieve_document(query): | |
print(f"retrieving doc based on {query}") | |
query_embedding = embedding_model.encode([query]) | |
_, closest_idx = index.search(np.array(query_embedding, dtype=np.float32), 1) | |
print(f"retrieved: {documents[closest_idx[0][0]]}") | |
return documents[closest_idx[0][0]] | |
def clean_text(text): | |
"""Cleans extracted text for better processing by the model.""" | |
print("cleaning") | |
text = unicodedata.normalize("NFKC", text) # Normalize Unicode characters | |
text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces and newlines | |
text = re.sub(r'[^a-zA-Z0-9.,!?;:\'\"()\-]', ' ', text) # Keep basic punctuation | |
text = re.sub(r'(?i)(page\s*\d+)', '', text) # Remove page numbers | |
return text | |
def extract_text_from_pdf(pdf_file): | |
"""Extract and clean text from the uploaded PDF.""" | |
print("extracting") | |
try: | |
with pdfplumber.open(pdf_file) as pdf: | |
text = " ".join(clean_text(text) for page in pdf.pages if (text := page.extract_text())) | |
store_document(text) | |
return text | |
except Exception as e: | |
print(f"Error extracting text: {e}") | |
return None | |
def split_text(text, chunk_size=500): | |
"""Splits text into smaller chunks for better processing.""" | |
print("splitting") | |
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
def chatbot(pdf_file, user_question): | |
"""Processes the PDF and answers the user's question.""" | |
print("chatbot start") | |
# Extract text from the PDF | |
text = extract_text_from_pdf(pdf_file) | |
if not text: | |
return "Could not extract any text from the PDF." | |
# retrieve the document relevant to the query | |
doc = retrieve_document(user_question) | |
# Split into smaller chunks | |
chunks = split_text(doc) | |
# Use only the first chunk (to optimize token usage) | |
prompt = f"Based on this document, answer the question:\n\nDocument:\n{chunks[0]}\n\nQuestion: {user_question}" | |
try: | |
response = together.Completion.create( | |
model="mistralai/Mistral-7B-Instruct-v0.1", | |
prompt=prompt, | |
max_tokens=200, | |
temperature=0.7, | |
) | |
# Return chatbot's response | |
return response.choices[0].text | |
except Exception as e: | |
return f"Error generating response: {e}" | |
# Send to Together.AI (Mistral-7B) | |
# Gradio Interface | |
iface = gr.Interface( | |
fn=chatbot, | |
inputs=[gr.File(label="Upload PDF"), gr.Textbox(label="Ask a Question")], | |
outputs=gr.Textbox(label="Answer"), | |
title="PDF Q&A Chatbot (Powered by Together.AI)" | |
) | |
# Launch Gradio app | |
iface.launch() | |