Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import streamlit as st | |
import requests | |
from bs4 import BeautifulSoup | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.runnables import RunnablePassthrough | |
from langchain_community.llms import Together | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.document_loaders import UnstructuredPDFLoader | |
from langchain_community.document_loaders import UnstructuredWordDocumentLoader | |
from langchain_community.document_loaders import UnstructuredExcelLoader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# Set API key | |
os.environ["TOGETHER_API_KEY"] = os.getenv("TOGETHER_API_KEY") | |
def inference(chain, input_query): | |
"""Invoke the processing chain with the input query.""" | |
result = chain.invoke(input_query) | |
return result | |
def create_chain(retriever, prompt, model): | |
"""Compose the processing chain with the specified components.""" | |
chain = ( | |
{"context": retriever, "question": RunnablePassthrough()} | |
| prompt | |
| model | |
| StrOutputParser() | |
) | |
return chain | |
def generate_prompt(): | |
"""Define the prompt template for question answering.""" | |
template = """<s>[INST] Answer the question in a simple sentence based only on the following context: | |
{context} | |
Question: {question} [/INST] | |
""" | |
return ChatPromptTemplate.from_template(template) | |
def configure_model(): | |
"""Configure the language model with specified parameters.""" | |
return Together( | |
model="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
temperature=0.1, | |
max_tokens=3000, | |
top_k=50, | |
top_p=0.7, | |
repetition_penalty=1.1, | |
) | |
def configure_retriever(documents): | |
"""Configure the retriever with embeddings and a FAISS vector store.""" | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
vector_db = FAISS.from_documents(documents, embeddings) | |
return vector_db.as_retriever() | |
def load_pdf_documents(path): | |
"""Load and preprocess PDF documents from the specified path.""" | |
documents = [] | |
for file in os.listdir(path): | |
if file.endswith('.pdf'): | |
filepath = os.path.join(path, file) | |
loader = UnstructuredPDFLoader(filepath) | |
documents.extend(loader.load()) | |
return documents | |
def load_word_documents(path): | |
"""Load and preprocess Word documents from the specified path.""" | |
documents = [] | |
for file in os.listdir(path): | |
if file.endswith('.docx'): | |
filepath = os.path.join(path, file) | |
loader = UnstructuredWordDocumentLoader(filepath) | |
documents.extend(loader.load()) | |
return documents | |
def load_excel_documents(path): | |
"""Load and preprocess Excel documents from the specified path.""" | |
documents = [] | |
for file in os.listdir(path): | |
if file.endswith('.xlsx'): | |
filepath = os.path.join(path, file) | |
loader = UnstructuredExcelLoader(filepath) | |
documents.extend(loader.load()) | |
return documents | |
def load_documents(path): | |
"""Load and preprocess documents from PDF, Word, and Excel files.""" | |
pdf_docs = load_pdf_documents(path) | |
word_docs = load_word_documents(path) | |
excel_docs = load_excel_documents(path) | |
return pdf_docs + word_docs + excel_docs | |
def scrape_url(url): | |
"""Scrape content from a given URL and save it to a text file.""" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Ensure we notice bad responses | |
soup = BeautifulSoup(response.content, 'html.parser') | |
text = soup.get_text() | |
# Save the text content to a file for processing | |
text_file_path = "data/scraped_content.txt" | |
with open(text_file_path, "w") as file: | |
file.write(text) | |
return text_file_path | |
except requests.RequestException as e: | |
st.error(f"Error fetching the URL: {e}") | |
return None | |
def process_document(path, input_query): | |
"""Process the document by setting up the chain and invoking it with the input query.""" | |
documents = load_documents(path) | |
if not documents: | |
st.error("No documents found. Please check the uploaded files or scraped content.") | |
return "No documents found." | |
text_splitter = CharacterTextSplitter(chunk_size=18000, chunk_overlap=10) | |
split_docs = text_splitter.split_documents(documents) | |
if not split_docs: | |
st.error("No text could be extracted from the documents.") | |
return "No text could be extracted." | |
llm_model = configure_model() | |
prompt = generate_prompt() | |
retriever = configure_retriever(split_docs) | |
chain = create_chain(retriever, prompt, llm_model) | |
response = inference(chain, input_query) | |
return response | |
def main(): | |
"""Main function to run the Streamlit app.""" | |
tmp_folder = '/tmp/1' | |
os.makedirs(tmp_folder, exist_ok=True) | |
st.title("Q&A Document AI RAG Chatbot") | |
uploaded_files = st.sidebar.file_uploader("Choose PDF, Word, or Excel files", accept_multiple_files=True, type=['pdf', 'docx', 'xlsx']) | |
if uploaded_files: | |
for file in uploaded_files: | |
with open(os.path.join(tmp_folder, file.name), 'wb') as f: | |
f.write(file.getbuffer()) | |
st.success('Files successfully uploaded. Start prompting!') | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
if uploaded_files: | |
with st.form(key='question_form'): | |
user_query = st.text_input("Ask a question:", key="query_input") | |
if st.form_submit_button("Ask") and user_query: | |
response = process_document(tmp_folder, user_query) | |
if response: # Check if response is not empty | |
st.session_state.chat_history.append({"question": user_query, "answer": response}) | |
if st.button("Clear Chat History"): | |
st.session_state.chat_history = [] | |
for chat in st.session_state.chat_history: | |
st.markdown(f"**Q:** {chat['question']}") | |
st.markdown(f"**A:** {chat['answer']}") | |
st.markdown("---") | |
else: | |
st.success('Upload Documents to Start Processing!') | |
url_input = st.sidebar.text_input("Or enter a URL to scrape content from:") | |
if st.sidebar.button("Scrape URL"): | |
if url_input: | |
file_path = scrape_url(url_input) | |
if file_path: | |
documents = load_documents(tmp_folder) | |
if documents: # Check if documents are loaded after scraping | |
response = process_document(tmp_folder, "What is the content of the URL?") | |
if response: # Check if response is not empty | |
st.session_state.chat_history.append({"question": "What is the content of the URL?", "answer": response}) | |
st.success("URL content processed successfully!") | |
else: | |
st.error("Failed to load any documents from the scraped URL content.") | |
else: | |
st.error("Failed to process URL content.") | |
else: | |
st.warning("Please enter a valid URL.") | |
if st.sidebar.button("REMOVE UPLOADED FILES"): | |
document_count = os.listdir(tmp_folder) | |
if len(document_count) > 0: | |
shutil.rmtree(tmp_folder) | |
st.sidebar.write("FILES DELETED SUCCESSFULLY!") | |
else: | |
st.sidebar.write("NO DOCUMENT FOUND TO DELETE! PLEASE UPLOAD DOCUMENTS TO START PROCESS!") | |
if __name__ == "__main__": | |
main() | |