Spaces:
Running
Running
from flask import Flask, render_template, request, redirect, url_for, session | |
import os | |
from werkzeug.utils import secure_filename | |
from retrival import generate_data_store | |
from langchain_community.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.prompts import ChatPromptTemplate | |
from huggingface_hub import InferenceClient | |
from langchain.schema import Document | |
from langchain_core.documents import Document | |
from dotenv import load_dotenv | |
import re | |
import glob | |
import shutil | |
from werkzeug.utils import secure_filename | |
app = Flask(__name__) | |
# Set the secret key for session management | |
app.secret_key = os.urandom(24) | |
# Configurations | |
UPLOAD_FOLDER = "uploads/" | |
VECTOR_DB_FOLDER = "VectorDB/" | |
NLTK_FOLDER = "nltk_data/" | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
os.environ["MPLCONFIGDIR"] = "/tmp" | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
os.makedirs(NLTK_FOLDER, exist_ok=True) | |
# Global variables | |
CHROMA_PATH = None | |
PROMPT_TEMPLATE = """ | |
You are working with a retrieval-augmented generation (RAG) setup. Your task is to generate a response based on the context provided and the question asked. Consider only the following context strictly, and use it to answer the question. Do not include any external information. | |
Context: | |
{context} | |
--- | |
Question: | |
{question} | |
Response: | |
""" | |
HFT = os.getenv('HF_TOKEN') | |
client = InferenceClient(api_key=HFT) | |
def home(): | |
return render_template('home.html') | |
def chat(): | |
if 'history' not in session: | |
session['history'] = [] | |
print("sessionhist1",session['history']) | |
global CHROMA_PATH | |
old_db = session.get('old_db', None) | |
print(f"Selected DB: {CHROMA_PATH}") | |
if old_db != None: | |
if CHROMA_PATH != old_db: | |
session['history'] = [] | |
#print("sessionhist1",session['history']) | |
if request.method == 'POST': | |
query_text = request.form['query_text'] | |
if CHROMA_PATH is None: | |
return render_template('chat.html', error="No vector database selected!", history=[]) | |
# Load the selected database | |
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
results = db.similarity_search_with_relevance_scores(query_text, k=3) | |
context_text = "\n\n---\n\n".join([doc.page_content for doc, _score in results]) | |
# Prepare the prompt and query the model | |
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) | |
prompt = prompt_template.format(context=context_text, question=query_text) | |
print("results------------------->",prompt) | |
response = client.chat.completions.create( | |
model="mistralai/Mistral-7B-Instruct-v0.3", | |
messages=[{"role": "system", "content": "You are an assistant specifically designed to generate responses based on the context provided. Your task is to answer questions strictly using the context without adding any external knowledge or information. Please ensure that your responses are relevant, accurate, and based solely on the given context."}, | |
{"role": "user", "content": prompt}], | |
max_tokens=1000, | |
temperature=0.3 | |
) | |
data = response.choices[0].message.content | |
if re.search(r'\bmention\b|\bnot mention\b|\bnot mentioned\b|\bnot contain\b|\bnot include\b|\bnot provide\b|\bdoes not\b|\bnot explicitly\b|\bnot explicitly mentioned\b', data, re.IGNORECASE): | |
data = "We do not have information related to your query on our end." | |
# Save the query and answer to the session history | |
session['history'].append((query_text, data)) | |
# Mark the session as modified to ensure it gets saved | |
session.modified = True | |
print("sessionhist2",session['history']) | |
return render_template('chat.html', query_text=query_text, answer=data, history=session['history'],old_db=CHROMA_PATH) | |
return render_template('chat.html', history=session['history'], old_db=CHROMA_PATH) | |
def create_db(): | |
if request.method == 'POST': | |
db_name = request.form['db_name'] | |
# Get all files from the uploaded folder | |
files = request.files.getlist('folder') | |
if not files: | |
return "No files uploaded", 400 | |
# if not exist | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
# Define the base upload path | |
upload_base_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(db_name)) | |
#upload_base_path = upload_base_path.replace("\\", "/") | |
print(f"Base Upload Path: {upload_base_path}") | |
os.makedirs(upload_base_path, exist_ok=True) | |
# Save each file and recreate folder structure | |
for file in files: | |
print("file , files",files,file) | |
#relative_path = file.filename # This should contain the subfolder structure | |
file_path = os.path.join(upload_base_path) | |
#file_path = file_path.replace("\\", "/") | |
# Ensure the directory exists before saving the file | |
print(f"Saving to: {file_path}") | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Get the file path and save it | |
file_path = os.path.join(upload_base_path, secure_filename(file.filename)) | |
file.save(file_path) | |
# Generate datastore | |
generate_data_store(upload_base_path, db_name) | |
# # Clean up uploaded files (if needed) | |
#if os.path.exists(app.config['UPLOAD_FOLDER']): | |
# shutil.rmtree(app.config['UPLOAD_FOLDER']) | |
return redirect(url_for('list_dbs')) | |
return render_template('create_db.html') | |
def list_dbs(): | |
vector_dbs = [name for name in os.listdir(VECTOR_DB_FOLDER) if os.path.isdir(os.path.join(VECTOR_DB_FOLDER, name))] | |
return render_template('list_dbs.html', vector_dbs=vector_dbs) | |
def select_db(db_name): | |
global CHROMA_PATH | |
print(f"Selected DB: {CHROMA_PATH}") | |
CHROMA_PATH = os.path.join(VECTOR_DB_FOLDER, db_name) | |
CHROMA_PATH = CHROMA_PATH.replace("\\", "/") | |
print(f"Selected DB: {CHROMA_PATH}") | |
return redirect(url_for('chat')) | |
if __name__ == "__main__": | |
app.run(debug=False, use_reloader=False) |