import streamlit as st |
import os |
import base64 |
import time |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
from transformers import pipeline |
import torch |
import textwrap |
from dotenv import find_dotenv, load_dotenv |
from langchain.document_loaders import PyPDFLoader, DirectoryLoader, PDFMinerLoader |
from langchain.text_splitter import RecursiveCharacterTextSplitter |
from langchain.embeddings import SentenceTransformerEmbeddings |
from langchain.vectorstores import Chroma |
from langchain.llms import HuggingFacePipeline |
from langchain.chains import RetrievalQA |
from constants import CHROMA_SETTINGS |
from streamlit_chat import message |
load_dotenv(find_dotenv()) |
st.set_page_config(layout="wide") |
checkpoint = "google/flan-ul2" |
tokenizer = AutoTokenizer.from_pretrained(checkpoint) |
base_model = AutoModelForSeq2SeqLM.from_pretrained( |
checkpoint, |
device_map="auto", |
torch_dtype = torch.float32 |
) |
persist_directory = "db" |
@st.cache_resource |
def data_ingestion(): |
for root, dirs, files in os.walk("docs"): |
for file in files: |
if file.endswith(".pdf"): |
print(file) |
loader = PDFMinerLoader(os.path.join(root, file)) |
documents = loader.load() |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=500) |
texts = text_splitter.split_documents(documents) |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") |
db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS) |
db.persist() |
db=None |
@st.cache_resource |
def llm_pipeline(): |
pipe = pipeline( |
'text2text-generation', |
tokenizer = tokenizer, |
max_length = 256, |
do_sample = True, |
temperature = 0.3, |
top_p= 0.95 |
) |
local_llm = HuggingFacePipeline(pipeline=pipe) |
return local_llm |
@st.cache_resource |
def qa_llm(): |
llm = llm_pipeline() |
embeddings = SentenceTransformerEmbeddings(model_name="all-MiniLM-L6-v2") |
db = Chroma(persist_directory="db", embedding_function = embeddings, client_settings=CHROMA_SETTINGS) |
retriever = db.as_retriever() |
qa = RetrievalQA.from_chain_type( |
llm = llm, |
chain_type = "stuff", |
retriever = retriever, |
return_source_documents=True |
) |
return qa |
def process_answer(instruction): |
response = '' |
instruction = instruction |
qa = qa_llm() |
generated_text = qa(instruction) |
answer = generated_text['result'] |
return answer |
def get_file_size(file): |
file.seek(0, os.SEEK_END) |
file_size = file.tell() |
file.seek(0) |
return file_size |
@st.cache_data |
def displayPDF(file): |
with open(file, "rb") as f: |
base64_pdf = base64.b64encode(f.read()).decode('utf-8') |
pdf_display = F'<iframe src="data:application/pdf;base64,{base64_pdf}" width="100%" height="600" type="application/pdf"></iframe>' |
st.markdown(pdf_display, unsafe_allow_html=True) |
def display_conversation(history): |
for i in range(len(history["generated"])): |
message(history["past"][i], is_user=True, key=str(i) + "_user") |
message(history["generated"][i],key=str(i)) |
def main(): |
st.markdown("<h1 style='text-align: center; color: blue;'>ChatPDFv2</h1>", unsafe_allow_html=True) |
st.markdown("<h2 style='text-align: center; color:red;'>Upload your PDF</h2>", unsafe_allow_html=True) |
uploaded_file = st.file_uploader("", type=["pdf"]) |
if uploaded_file is not None: |
file_details = { |
"Filename": uploaded_file.name, |
"File size": get_file_size(uploaded_file) |
} |
filepath = "docs/"+uploaded_file.name |
with open(filepath, "wb") as temp_file: |
temp_file.write(uploaded_file.read()) |
col1, col2= st.columns([1,2]) |
with col1: |
st.markdown("<h4 style color:black;'>File details</h4>", unsafe_allow_html=True) |
st.json(file_details) |
st.markdown("<h4 style color:black;'>File preview</h4>", unsafe_allow_html=True) |
pdf_view = displayPDF(filepath) |
with col2: |
with st.spinner('Embeddings are in process...'): |
ingested_data = data_ingestion() |
st.success('Embeddings are created successfully!') |
st.markdown("<h4 style color:black;'>Chat Here</h4>", unsafe_allow_html=True) |
user_input = st.text_input("", key="input") |
if "generated" not in st.session_state: |
st.session_state["generated"] = ["I am ready to help you"] |
if "past" not in st.session_state: |
st.session_state["past"] = ["Hey there!"] |
if user_input: |
answer = process_answer({'query': user_input}) |
st.session_state["past"].append(user_input) |
response = answer |
st.session_state["generated"].append(response) |
if st.session_state["generated"]: |
display_conversation(st.session_state) |
if __name__ == "__main__": |
main() |