Spaces:
Sleeping
Sleeping
import streamlit as st | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
import google.generativeai as genai | |
from langchain.vectorstores import FAISS | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.prompts import PromptTemplate | |
import os | |
import json | |
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, TextStreamer, ConversationalPipeline | |
####CREDIT##### | |
#Credit to the author (Sri Laxmi) of original code reference: SriLaxmi1993 | |
#Sri LaxmiGithub Link: https://github.com/SriLaxmi1993/Document-Genie-using-RAG-Framwork | |
#Sri Laxmi Youtube:https://www.youtube.com/watch?v=SkY2u4UUr6M&t=112s | |
############### | |
os.system("pip install -r requirements.txt") | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer | |
model = AutoModelForCausalLM.from_pretrained("Writer/palmyra-small") | |
tokenizer = AutoTokenizer.from_pretrained("Writer/palmyra-small") | |
st.set_page_config(page_title="Gemini RAG", layout="wide") | |
# This is the first API key input; no need to repeat it in the main function. | |
api_key = 'AIzaSyCvXRggpO2yNwIpZmoMy_5Xhm2bDyD-pOo' | |
#os.mkdir('faiss_index') | |
#empty faise_index and chat_history.json | |
def delete_files_in_folder(folder_path): | |
try: | |
chat_history_file = "chat_history.json" | |
if os.path.exists(chat_history_file): | |
os.remove(chat_history_file) | |
for file_name in os.listdir(folder_path): | |
file_path = os.path.join(folder_path, file_name) | |
if os.path.isfile(file_path): # Check if it's a file | |
os.remove(file_path) # Delete the file | |
print(f"Deleted file: {file_path}") | |
print("All files within the folder have been deleted successfully!") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
with st.sidebar: | |
st.title("Menu:") | |
if st.button("Reset Files", key="reset_button"): | |
folder_path = 'faiss_index' | |
delete_files_in_folder(folder_path) | |
CH_size = st.slider("Chunk Size", 0, 1000, 450) | |
CH_overlap = st.slider("Chunk Overlap", 0, 1000, 50) | |
def get_pdf_text(pdf_docs): | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
def get_text_chunks(text): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CH_size, chunk_overlap=CH_overlap) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
def get_vector_store(text_chunks, api_key): | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) | |
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) | |
vector_store.save_local("faiss_index") | |
def get_conversational_chain(): | |
prompt_template = """ | |
Answer the question as detailed as possible from the provided context, make sure to provide all the details, if the answer is not in | |
provided context just say, "answer is not available in the context", don't provide the wrong answer. When giving an answer, try to include all mentionings of the subject being asked and include this within your response\n\n | |
Context:\n {context}?\n | |
Question: \n{question}\n | |
Answer: | |
""" | |
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.2, google_api_key=api_key) | |
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) | |
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) | |
return chain | |
#chat history functionality | |
def update_chat_history(question, reply): | |
# Check if chat history file exists | |
chat_history_file = "chat_history.json" | |
if os.path.exists(chat_history_file): | |
# If file exists, load existing chat history | |
with open(chat_history_file, "r") as file: | |
chat_history = json.load(file) | |
else: | |
# If file doesn't exist, initialize chat history | |
chat_history = {"conversations": []} | |
# Add current conversation to chat history | |
chat_history["conversations"].append({"question": question, "reply": reply}) | |
# Write updated chat history back to file | |
with open(chat_history_file, "w") as file: | |
json.dump(chat_history, file, indent=4) | |
# Display chat history | |
st.subheader("Chat History") | |
for conversation in chat_history["conversations"]: | |
st.write(f"**Question:** {conversation['question']}") | |
st.write(f"**Reply:** {conversation['reply']}") | |
st.write("---") | |
def user_input(user_question, api_key): | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) | |
new_db = FAISS.load_local("faiss_index", embeddings) | |
docs = new_db.similarity_search(user_question) | |
chain = get_conversational_chain() | |
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) | |
st.write("Reply: ", response["output_text"]) | |
#chat history | |
update_chat_history(user_question, response["output_text"]) | |
''' | |
def user_input(user_question, api_key): | |
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=api_key) | |
new_db = FAISS.load_local("faiss_index", embeddings) | |
docs = new_db.similarity_search(user_question) | |
chain = get_conversational_chain() | |
response_gemini = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) | |
# Initialize the Hugging Face model and tokenizer | |
model_name_or_path = "Writer/palmyra-small" | |
model = AutoModelForCausalLM.from_pretrained(model_name_or_path) | |
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path) | |
# Define the prompt template | |
prompt_template = f""" | |
Transform the following response into a more conversational tone without adding new information: | |
Response: | |
{response_gemini["output_text"]} | |
Transformed Response: | |
""" | |
# Tokenize the prompt | |
inputs = tokenizer(prompt_template, return_tensors="pt", truncation=True,max_length=512) | |
# Generate the transformed response using the Hugging Face model | |
outputs = model.generate(**inputs) | |
# Decode the generated response | |
transformed_response = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Display the transformed response | |
st.write("Reply: ", transformed_response) | |
# Update chat history | |
update_chat_history(user_question, transformed_response) | |
''' | |
def main(): | |
st.header("RAG based LLM Application") | |
user_question = st.text_input("Ask a Question from the PDF Files", key="user_question") | |
if user_question and api_key: | |
user_input(user_question, api_key) | |
with st.sidebar: | |
st.title("Menu:") | |
pdf_docs = st.file_uploader("Upload your PDF Files and Click on the Submit & Process Button", accept_multiple_files=True, key="pdf_uploader") | |
if st.button("Submit & Process", key="process_button") and api_key: | |
with st.spinner("Processing..."): | |
raw_text = get_pdf_text(pdf_docs) | |
text_chunks = get_text_chunks(raw_text) | |
get_vector_store(text_chunks, api_key) | |
st.success("Done") | |
if __name__ == "__main__": | |
main() |