Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| import pdfplumber | |
| import together | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| import re | |
| import unicodedata | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Set up Together.AI API Key (Replace with your actual key) | |
| assert os.getenv("TOGETHER_API_KEY"), "api key missing" | |
| # Use a sentence transformer for embeddings | |
| #'BAAI/bge-base-en-v1.5' | |
| # embedding_model = SentenceTransformer("BAAI/bge-base-en-v1.5") | |
| # 'togethercomputer/m2-bert-80M-8k-retrieval' | |
| embedding_model = SentenceTransformer( | |
| "togethercomputer/m2-bert-80M-8k-retrieval", | |
| trust_remote_code=True # Allow remote code execution | |
| ) | |
| embedding_dim = 768 # Adjust according to model | |
| # Initialize FAISS index | |
| index = faiss.IndexFlatL2(embedding_dim) | |
| documents = [] # Store raw text for reference | |
| # initialize the variables to store documents | |
| DOCUMENT_DIR = os.path.join(os.path.dirname(__file__), "documents") | |
| INDEX_FILE = "faiss_index.py" # stores embeddings | |
| METADATA_FILE = "metadata.json" # stores Document metadata | |
| # create the directory | |
| os.makedirs(DOCUMENT_DIR, exist_ok=True) | |
| # load the faiss indexes file | |
| if os.path.exists(INDEX_FILE): # check if index file exists | |
| stored_embeddings = np.load(INDEX_FILE) # load emeddings | |
| if stored_embeddings.shape[0] > 0: | |
| index.add(stored_embeddings) | |
| # load the document metadata | |
| if os.path.exists(METADATA_FILE): # check if metadata exists | |
| with open(METADATA_FILE, "r") as f: | |
| metadata = json.load(f) | |
| else: | |
| metadata = {} | |
| def store_document(text): | |
| print("storing document") | |
| # Generate a unique filename | |
| filename = os.path.join(DOCUMENT_DIR, f"doc_{len(metadata) + 1}.txt") | |
| print(filename) | |
| # Save document in a file | |
| with open(filename, "w") as f: | |
| f.write(text) | |
| print("document saved") | |
| # Generate and store embedding | |
| embedding = embedding_model.encode([text]).astype(np.float32) | |
| index.add(embedding) | |
| print("emeddings generated") | |
| # Update metadata | |
| metadata[len(metadata)] = filename | |
| with open(METADATA_FILE, "w") as f: | |
| json.dump(metadata, f) | |
| # Save FAISS index | |
| np.save(INDEX_FILE, index.reconstruct_n(0, index.ntotal)) | |
| print(f"your document has been stored at: {filename}") | |
| return "Document stored!" | |
| def retrieve_document(query): | |
| print(f"retrieving doc based on: \n{query}") | |
| query_embedding = embedding_model.encode([query]).astype(np.float32) | |
| _, closest_idx = index.search(query_embedding, 1) | |
| if closest_idx[0][0] in metadata: # Ensure a valid match | |
| filename = metadata[str(closest_idx[0][0])] | |
| with open(filename, "r") as f: | |
| return f.read() | |
| else: | |
| return None | |
| def clean_text(text): | |
| """Cleans extracted text for better processing by the model.""" | |
| print("cleaning") | |
| text = unicodedata.normalize("NFKC", text) # Normalize Unicode characters | |
| text = re.sub(r'\s+', ' ', text).strip() # Remove extra spaces and newlines | |
| text = re.sub(r'[^a-zA-Z0-9.,!?;:\'\"()\-]', ' ', text) # Keep basic punctuation | |
| text = re.sub(r'(?i)(page\s*\d+)', '', text) # Remove page numbers | |
| return text | |
| def extract_text_from_pdf(pdf_file): | |
| """Extract and clean text from the uploaded PDF.""" | |
| print("extracting") | |
| try: | |
| with pdfplumber.open(pdf_file) as pdf: | |
| text = " ".join(clean_text(text) for page in pdf.pages if (text := page.extract_text())) | |
| store_document(text) | |
| return text | |
| except Exception as e: | |
| print(f"Error extracting text: {e}") | |
| return None | |
| def split_text(text, chunk_size=500): | |
| """Splits text into smaller chunks for better processing.""" | |
| print("splitting") | |
| return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] | |
| def chatbot(pdf_file, user_question): | |
| """Processes the PDF and answers the user's question.""" | |
| print("chatbot start") | |
| if pdf_file: | |
| # Extract text from the PDF | |
| text = extract_text_from_pdf(pdf_file) | |
| if not text: | |
| return "Could not extract any text from the PDF." | |
| # retrieve the document relevant to the query | |
| doc = retrieve_document(user_question) | |
| if doc: | |
| print("found doc") | |
| # Split into smaller chunks | |
| chunks = split_text(doc) | |
| # Use only the first chunk (to optimize token usage) | |
| prompt = f"Based on this document, answer the question:\n\nDocument:\n{chunks[0]}\n\nQuestion: {user_question}" | |
| print(f"prompt: \n{prompt}") | |
| else: | |
| prompt=user_question | |
| try: | |
| print("asking") | |
| response = together.Completion.create( | |
| model="mistralai/Mistral-7B-Instruct-v0.1", | |
| prompt=prompt, | |
| max_tokens=200, | |
| temperature=0.7, | |
| ) | |
| # Return chatbot's response | |
| return response.choices[0].text | |
| except Exception as e: | |
| return f"Error generating response: {e}" | |
| # Send to Together.AI (Mistral-7B) | |
| # Gradio Interface | |
| iface = gr.Interface( | |
| fn=chatbot, | |
| inputs=[gr.File(label="Upload PDF"), gr.Textbox(label="Ask a Question")], | |
| outputs=gr.Textbox(label="Answer"), | |
| title="PDF Q&A Chatbot (Powered by Together.AI)" | |
| ) | |
| # Launch Gradio app | |
| iface.launch() |