Spaces:
Build error
Build error
import gradio as gr | |
import os | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import Chroma | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import TextLoader, PyPDFLoader | |
from langchain.chains import RetrievalQA | |
from langchain_community.llms import HuggingFaceHub | |
import tempfile | |
import shutil | |
from langchain.prompts import PromptTemplate | |
# Define a proper prompt template | |
prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. | |
{context} | |
Question: {question} | |
Answer:""" | |
PROMPT = PromptTemplate( | |
template=prompt_template, input_variables=["context", "question"] | |
) | |
# Load environment variables | |
TOKEN = os.getenv("HF_TOKEN") | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = TOKEN | |
# Initialize LangChain components | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
# Create vector store | |
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings) | |
# Initialize LLM | |
llm = HuggingFaceHub( | |
repo_id="meta-llama/Meta-Llama-3.1-405B-Instruct-FP8", | |
model_kwargs={"temperature": 0.7, "max_length": 512} | |
) | |
# Create RetrievalQA chain | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=llm, | |
chain_type="stuff", | |
retriever=vectorstore.as_retriever(), | |
return_source_documents=True, | |
chain_type_kwargs={"prompt": PROMPT} | |
) | |
def process_uploaded_file(file): | |
if file is None: | |
return "No file uploaded." | |
try: | |
# Create a temporary directory | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Create a path for the temporary file | |
temp_file_path = os.path.join(temp_dir, os.path.basename(file.name)) | |
# Save the uploaded file to the temporary path | |
with open(temp_file_path, 'wb') as temp_file: | |
temp_file.write(file.read()) | |
# Determine file type and load accordingly | |
if file.name.endswith('.pdf'): | |
loader = PyPDFLoader(temp_file_path) | |
else: | |
loader = TextLoader(temp_file_path) | |
documents = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
texts = text_splitter.split_documents(documents) | |
# Update the vector store with new documents | |
vectorstore.add_documents(texts) | |
vectorstore.persist() | |
return f"File processed and added to the knowledge base. {len(texts)} chunks created." | |
except Exception as e: | |
return f"An error occurred while processing the file: {str(e)}" | |
def respond(message, history, system_message, max_tokens, temperature, top_p): | |
full_prompt = f"{system_message}\n\nHuman: {message}" | |
# Use the RetrievalQA chain to get the answer | |
result = qa_chain({"query": full_prompt}) | |
answer = result['result'] | |
# Return only the answer | |
yield answer | |
with gr.Blocks() as demo: | |
gr.Markdown("# RAG Chatbot with Content Upload") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox() | |
clear = gr.Button("Clear") | |
with gr.Column(scale=1): | |
file_upload = gr.File(label="Upload Content for RAG (TXT or PDF)") | |
upload_button = gr.Button("Process Uploaded File") | |
system_message = gr.Textbox(value="You are a friendly Chatbot.", label="System message") | |
max_tokens = gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens") | |
temperature = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature") | |
top_p = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)") | |
def user(user_message, history): | |
return "", history + [[user_message, None]] | |
def bot(history, system_message, max_tokens, temperature, top_p): | |
user_message = history[-1][0] | |
bot_message = next(respond(user_message, history[:-1], system_message, max_tokens, temperature, top_p)) | |
history[-1][1] = bot_message | |
return history | |
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
bot, [chatbot, system_message, max_tokens, temperature, top_p], chatbot | |
) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
upload_button.click(process_uploaded_file, inputs=[file_upload], outputs=[gr.Textbox()]) | |
if __name__ == "__main__": | |
demo.launch() |