Spaces:
Running
Running
from flask import Flask, render_template, request, redirect, url_for, session | |
import os | |
from werkzeug.utils import secure_filename | |
#from retrival import generate_data_store | |
from retrival import generate_data_store #,add_document_to_existing_db, delete_chunks_by_source | |
from langchain_community.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.prompts import ChatPromptTemplate | |
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate | |
from langchain_huggingface import HuggingFaceEndpoint | |
from huggingface_hub import InferenceClient | |
from langchain.schema import Document | |
from langchain_core.documents import Document | |
from dotenv import load_dotenv | |
import re | |
import glob | |
import shutil | |
from werkzeug.utils import secure_filename | |
import asyncio | |
import nltk | |
nltk.download('punkt_tab') | |
import nltk | |
nltk.download('averaged_perceptron_tagger_eng') | |
app = Flask(__name__) | |
# Set the secret key for session management | |
app.secret_key = os.urandom(24) | |
# Configurations | |
UPLOAD_FOLDER = "uploads/" | |
VECTOR_DB_FOLDER = "VectorDB/" | |
#TABLE_DB_FOLDER = "TableDB/" | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
#os.makedirs(TABLE_DB_FOLDER, exist_ok=True) | |
# Global variables | |
CHROMA_PATH = None | |
#TABLE_PATH = None | |
#System prompt | |
PROMPT_TEMPLATE = """ | |
You are working with a retrieval-augmented generation (RAG) setup. Your task is to generate a response based on the context provided and the question asked. Consider only the following context strictly, and use it to answer the question. Do not include any external information. | |
Context: | |
{context} | |
--- | |
Question: | |
{question} | |
Response: | |
""" | |
# PROMPT_TEMPLATE = """ | |
# You are working with a retrieval-augmented generation (RAG) setup. Your task is to generate a response based on the provided context, table data, and the question asked. Consider only the given inputs strictly and use them to answer the question. Do not include any external information. | |
# If the table variable contains tabular data, analyze and extract all relevant details from it. Provide a structured response summarizing the table data if it is relevant to the question. If the table data is not relevant, base your answer only on the context. | |
# Context: | |
# {context} | |
# Table: | |
# {table} | |
# --- | |
# Question: | |
# {question} | |
# Response: | |
# """ | |
#HFT = os.getenv('HF_TOKEN') | |
#client = InferenceClient(api_key=HFT) | |
def home(): | |
return render_template('home.html') | |
def chat(): | |
if 'history' not in session: | |
session['history'] = [] | |
print("sessionhist1",session['history']) | |
global CHROMA_PATH | |
#global TABLE_PATH | |
old_db = session.get('old_db', None) | |
print(f"Selected DB: {CHROMA_PATH}") | |
# if old_db != None: | |
# if CHROMA_PATH != old_db: | |
# session['history'] = [] | |
#print("sessionhist1",session['history']) | |
if request.method == 'POST': | |
query_text = request.form['query_text'] | |
if CHROMA_PATH is None: | |
return render_template('chat.html', error="No vector database selected!", history=[]) | |
# Load the selected Document Database | |
#embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
results_document = db.similarity_search_with_relevance_scores(query_text, k=3) | |
print("results------------------->",results_document) | |
context_text_document = "\n\n---\n\n".join([doc.page_content for doc, _score in results_document]) | |
# # Load the selected Table Database | |
# #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
# embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# tdb = Chroma(persist_directory=TABLE_PATH, embedding_function=embedding_function) | |
# results_table = tdb.similarity_search_with_relevance_scores(query_text, k=2) | |
# print("results------------------->",results_table) | |
# context_text_table = "\n\n---\n\n".join([doc.page_content for doc, _score in results_table]) | |
# Prepare the prompt and query the model | |
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) | |
prompt = prompt_template.format(context=context_text_document,question=query_text) | |
#prompt = prompt_template.format(context=context_text_document,table=context_text_table, question=query_text) | |
print("results------------------->",prompt) | |
#Model Defining and its use | |
repo_id = "mistralai/Mistral-7B-Instruct-v0.3" | |
HFT = os.environ["HF_TOKEN"] | |
llm = HuggingFaceEndpoint( | |
repo_id=repo_id, | |
max_tokens=3000, | |
temperature=0.8, | |
huggingfacehub_api_token=HFT, | |
) | |
data= llm(prompt) | |
#data = response.choices[0].message.content | |
# filtering the uneccessary context. | |
if re.search(r'\bmention\b|\bnot mention\b|\bnot mentioned\b|\bnot contain\b|\bnot include\b|\bnot provide\b|\bdoes not\b|\bnot explicitly\b|\bnot explicitly mentioned\b', data, re.IGNORECASE): | |
data = "We do not have information related to your query on our end." | |
# Save the query and answer to the session history | |
session['history'].append((query_text, data)) | |
# Mark the session as modified to ensure it gets saved | |
session.modified = True | |
print("sessionhist2",session['history']) | |
return render_template('chat.html', query_text=query_text, answer=data, history=session['history'],old_db=CHROMA_PATH) | |
return render_template('chat.html', history=session['history'], old_db=CHROMA_PATH) | |
def create_db(): | |
if request.method == 'POST': | |
db_name = request.form['db_name'] | |
# Get all files from the uploaded folder | |
files = request.files.getlist('folder') | |
if not files: | |
return "No files uploaded", 400 | |
# if not exist | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
# Define the base upload path | |
upload_base_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(db_name)) | |
#upload_base_path = upload_base_path.replace("\\", "/") | |
print(f"Base Upload Path: {upload_base_path}") | |
os.makedirs(upload_base_path, exist_ok=True) | |
# Save each file and recreate folder structure | |
for file in files: | |
print("file , files",files,file) | |
#relative_path = file.filename # This should contain the subfolder structure | |
file_path = os.path.join(upload_base_path) | |
#file_path = file_path.replace("\\", "/") | |
# Ensure the directory exists before saving the file | |
print(f"Saving to: {file_path}") | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Get the file path and save it | |
file_path = os.path.join(upload_base_path, secure_filename(file.filename)) | |
file.save(file_path) | |
# Generate datastore | |
generate_data_store(upload_base_path, db_name) | |
# # Clean up uploaded files (if needed) | |
#if os.path.exists(app.config['UPLOAD_FOLDER']): | |
# shutil.rmtree(app.config['UPLOAD_FOLDER']) | |
return redirect(url_for('list_dbs')) | |
return render_template('create_db.html') | |
def list_dbs(): | |
vector_dbs = [name for name in os.listdir(VECTOR_DB_FOLDER) if os.path.isdir(os.path.join(VECTOR_DB_FOLDER, name))] | |
return render_template('list_dbs.html', vector_dbs=vector_dbs) | |
def select_db(db_name): | |
#Selecting the Documnet Vector DB | |
global CHROMA_PATH | |
print(f"Selected DB: {CHROMA_PATH}") | |
CHROMA_PATH = os.path.join(VECTOR_DB_FOLDER, db_name) | |
CHROMA_PATH = CHROMA_PATH.replace("\\", "/") | |
print(f"Selected DB: {CHROMA_PATH}") | |
#Selecting the Table Vector DB | |
# global TABLE_PATH | |
# print(f"Selected DB: {TABLE_PATH}") | |
# TABLE_PATH = os.path.join(TABLE_DB_FOLDER, db_name) | |
# TABLE_PATH = TABLE_PATH.replace("\\", "/") | |
# print(f"Selected DB: {TABLE_PATH}") | |
return redirect(url_for('chat')) | |
# @app.route('/update-dbs/<db_name>', methods=['GET','POST']) | |
# def update_db(db_name): | |
# if request.method == 'POST': | |
# db_name = request.form['db_name'] | |
# # Get all files from the uploaded folder | |
# files = request.files.getlist('folder') | |
# if not files: | |
# return "No files uploaded", 400 | |
# print(f"Selected DB: {db_name}") | |
# DB_PATH = os.path.join(VECTOR_DB_FOLDER, db_name) | |
# DB_PATH = DB_PATH.replace("\\", "/") | |
# print(f"Selected DB: {DB_PATH}") | |
# generate_data_store(DB_PATH, db_name) | |
# return redirect(url_for('list_dbs')) | |
# return render_template('update_db.html') | |
if __name__ == "__main__": | |
app.run(debug=False, use_reloader=False) | |