Spaces:
Sleeping
Sleeping
from io import BytesIO | |
import streamlit as st | |
import shutil | |
import requests | |
import os | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain_openai import ChatOpenAI | |
from langchain.memory import ConversationBufferMemory | |
from langchain.chains import ConversationalRetrievalChain | |
import time | |
def getpdfdoc(): | |
with st.spinner("Loading PDF..."): | |
filename = '48lawsofpower.pdf' | |
if os.path.exists(filename): | |
with open(filename, 'rb') as f: | |
pdf_doc = f.read() | |
return pdf_doc | |
else: | |
url = 'https://pgcag.files.wordpress.com/2010/01/48lawsofpower.pdf' | |
response = requests.get(url) | |
with open(filename, 'wb') as f: | |
f.write(response.content) | |
return getpdfdoc() | |
def extract_text_from_pdf(pdf_file_obj): | |
with st.spinner("Extracting text from PDF..."): | |
pdf_reader = PdfReader(BytesIO(pdf_file_obj)) | |
text = "" | |
for page_num in range(len(pdf_reader.pages)): | |
page_obj = pdf_reader.pages[page_num] | |
text += page_obj.extract_text() | |
return text | |
def get_text_chunks(text): | |
with st.spinner("Splitting text into chunks..."): | |
text_splitter = CharacterTextSplitter( | |
separator="\n", | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len | |
) | |
chunks = text_splitter.split_text(text) | |
return chunks | |
def get_vectorstore(text_chunks): | |
with st.spinner("Creating vectorstore..."): | |
st.markdown("Creating vector store") | |
time.sleep(10) | |
metadatas = [{"source": f"{i}-pl"} for i in range(len(text_chunks))] | |
embeddings = OpenAIEmbeddings() | |
vectorstore = Chroma.from_texts(texts=text_chunks, embedding=embeddings, persist_directory="./data/vectorstore", metadatas=metadatas) | |
return vectorstore | |
def get_conversation_chain(vectorstore): | |
with st.spinner("Loading LLM..."): | |
llm = ChatOpenAI() | |
memory = ConversationBufferMemory( | |
memory_key='chat_history', return_messages=True) | |
conversation_chain = ConversationalRetrievalChain.from_llm( | |
llm=llm, | |
retriever=vectorstore.as_retriever(), | |
memory=memory | |
) | |
return conversation_chain | |
def retrain_model(): | |
st.session_state.conversation = None | |
st.session_state.chat_history = None | |
pdf_doc = getpdfdoc() # get pdf | |
raw_text = extract_text_from_pdf(pdf_doc) # get pdf text | |
text_chunks = get_text_chunks(raw_text) # get the text chunks | |
vectorstore = get_vectorstore(text_chunks) # create vector store | |
st.session_state.conversation = get_conversation_chain(vectorstore) # create conversation chain | |
def handle_userinput(user_question): | |
response = st.session_state.conversation({'question': user_question}) | |
st.session_state.chat_history = response['chat_history'] | |
for i, message in enumerate(st.session_state.chat_history): | |
if i % 2 == 0: | |
st.markdown("**User:**") | |
st.markdown(message.content) | |
else: | |
st.markdown("**AI:**") | |
st.markdown(message.content) | |
def main(): | |
if "conversation" not in st.session_state: | |
st.session_state.conversation = None | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = None | |
if st.session_state.conversation is None: | |
if os.path.isdir("./data/vectorstore"): | |
if os.listdir("./data/vectorstore"): | |
with st.spinner("Loading vector store..."): | |
vectorstore = Chroma(persist_directory="./data/vectorstore", embedding_function=OpenAIEmbeddings()) | |
st.session_state.conversation = get_conversation_chain(vectorstore) | |
else: | |
retrain_model() | |
else: | |
retrain_model() | |
if st.session_state.conversation is not None: | |
st.sidebar.button("Retrain model", on_click=retrain_model) | |
st.header("Ask questions from 48 Laws of Power:books:") | |
user_question = st.chat_input("Ask a question about your documents:") | |
if user_question: | |
handle_userinput(user_question) | |
if __name__ == '__main__': | |
main() |