Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import streamlit as st | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough | |
| from langchain_community.llms import Together | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_community.document_loaders import UnstructuredPDFLoader | |
| from langchain_community.document_loaders import UnstructuredWordDocumentLoader | |
| from langchain_community.document_loaders import UnstructuredExcelLoader | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.embeddings import HuggingFaceEmbeddings | |
| # Set API key | |
| os.environ["TOGETHER_API_KEY"] = os.getenv("TOGETHER_API_KEY") | |
| def inference(chain, input_query): | |
| """Invoke the processing chain with the input query.""" | |
| result = chain.invoke(input_query) | |
| return result | |
| def create_chain(retriever, prompt, model): | |
| """Compose the processing chain with the specified components.""" | |
| chain = ( | |
| {"context": retriever, "question": RunnablePassthrough()} | |
| | prompt | |
| | model | |
| | StrOutputParser() | |
| ) | |
| return chain | |
| def generate_prompt(): | |
| """Define the prompt template for question answering.""" | |
| template = """<s>[INST] Answer the question in a simple sentence based only on the following context: | |
| {context} | |
| Question: {question} [/INST] | |
| """ | |
| return ChatPromptTemplate.from_template(template) | |
| def configure_model(): | |
| """Configure the language model with specified parameters.""" | |
| return Together( | |
| model="mistralai/Mixtral-8x7B-Instruct-v0.1", | |
| temperature=0.1, | |
| max_tokens=3000, | |
| top_k=50, | |
| top_p=0.7, | |
| repetition_penalty=1.1, | |
| ) | |
| def configure_retriever(documents): | |
| """Configure the retriever with embeddings and a FAISS vector store.""" | |
| embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
| vector_db = FAISS.from_documents(documents, embeddings) | |
| return vector_db.as_retriever() | |
| def load_pdf_documents(path): | |
| """Load and preprocess PDF documents from the specified path.""" | |
| documents = [] | |
| for file in os.listdir(path): | |
| if file.endswith('.pdf'): | |
| filepath = os.path.join(path, file) | |
| loader = UnstructuredPDFLoader(filepath) | |
| documents.extend(loader.load()) | |
| return documents | |
| def load_word_documents(path): | |
| """Load and preprocess Word documents from the specified path.""" | |
| documents = [] | |
| for file in os.listdir(path): | |
| if file.endswith('.docx'): | |
| filepath = os.path.join(path, file) | |
| loader = UnstructuredWordDocumentLoader(filepath) | |
| documents.extend(loader.load()) | |
| return documents | |
| def load_excel_documents(path): | |
| """Load and preprocess Excel documents from the specified path.""" | |
| documents = [] | |
| for file in os.listdir(path): | |
| if file.endswith('.xlsx'): | |
| filepath = os.path.join(path, file) | |
| loader = UnstructuredExcelLoader(filepath) | |
| documents.extend(loader.load()) | |
| return documents | |
| def load_documents(path): | |
| """Load and preprocess documents from PDF, Word, and Excel files.""" | |
| pdf_docs = load_pdf_documents(path) | |
| word_docs = load_word_documents(path) | |
| excel_docs = load_excel_documents(path) | |
| return pdf_docs + word_docs + excel_docs | |
| def scrape_url(url): | |
| """Scrape content from a given URL and save it to a text file.""" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() # Ensure we notice bad responses | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| text = soup.get_text() | |
| # Save the text content to a file for processing | |
| text_file_path = "data/scraped_content.txt" | |
| with open(text_file_path, "w") as file: | |
| file.write(text) | |
| return text_file_path | |
| except requests.RequestException as e: | |
| st.error(f"Error fetching the URL: {e}") | |
| return None | |
| def process_document(path, input_query): | |
| """Process the document by setting up the chain and invoking it with the input query.""" | |
| documents = load_documents(path) | |
| if not documents: | |
| st.error("No documents found. Please check the uploaded files or scraped content.") | |
| return "No documents found." | |
| text_splitter = CharacterTextSplitter(chunk_size=18000, chunk_overlap=10) | |
| split_docs = text_splitter.split_documents(documents) | |
| if not split_docs: | |
| st.error("No text could be extracted from the documents.") | |
| return "No text could be extracted." | |
| llm_model = configure_model() | |
| prompt = generate_prompt() | |
| retriever = configure_retriever(split_docs) | |
| chain = create_chain(retriever, prompt, llm_model) | |
| response = inference(chain, input_query) | |
| return response | |
| def main(): | |
| """Main function to run the Streamlit app.""" | |
| tmp_folder = '/tmp/1' | |
| os.makedirs(tmp_folder, exist_ok=True) | |
| st.title("Q&A Document AI RAG Chatbot") | |
| uploaded_files = st.sidebar.file_uploader("Choose PDF, Word, or Excel files", accept_multiple_files=True, type=['pdf', 'docx', 'xlsx']) | |
| if uploaded_files: | |
| for file in uploaded_files: | |
| with open(os.path.join(tmp_folder, file.name), 'wb') as f: | |
| f.write(file.getbuffer()) | |
| st.success('Files successfully uploaded. Start prompting!') | |
| if 'chat_history' not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if uploaded_files: | |
| with st.form(key='question_form'): | |
| user_query = st.text_input("Ask a question:", key="query_input") | |
| if st.form_submit_button("Ask") and user_query: | |
| response = process_document(tmp_folder, user_query) | |
| if response: # Check if response is not empty | |
| st.session_state.chat_history.append({"question": user_query, "answer": response}) | |
| if st.button("Clear Chat History"): | |
| st.session_state.chat_history = [] | |
| for chat in st.session_state.chat_history: | |
| st.markdown(f"**Q:** {chat['question']}") | |
| st.markdown(f"**A:** {chat['answer']}") | |
| st.markdown("---") | |
| else: | |
| st.success('Upload Documents to Start Processing!') | |
| url_input = st.sidebar.text_input("Or enter a URL to scrape content from:") | |
| if st.sidebar.button("Scrape URL"): | |
| if url_input: | |
| file_path = scrape_url(url_input) | |
| if file_path: | |
| documents = load_documents(tmp_folder) | |
| if documents: # Check if documents are loaded after scraping | |
| response = process_document(tmp_folder, "What is the content of the URL?") | |
| if response: # Check if response is not empty | |
| st.session_state.chat_history.append({"question": "What is the content of the URL?", "answer": response}) | |
| st.success("URL content processed successfully!") | |
| else: | |
| st.error("Failed to load any documents from the scraped URL content.") | |
| else: | |
| st.error("Failed to process URL content.") | |
| else: | |
| st.warning("Please enter a valid URL.") | |
| if st.sidebar.button("REMOVE UPLOADED FILES"): | |
| document_count = os.listdir(tmp_folder) | |
| if len(document_count) > 0: | |
| shutil.rmtree(tmp_folder) | |
| st.sidebar.write("FILES DELETED SUCCESSFULLY!") | |
| else: | |
| st.sidebar.write("NO DOCUMENT FOUND TO DELETE! PLEASE UPLOAD DOCUMENTS TO START PROCESS!") | |
| if __name__ == "__main__": | |
| main() | |