RAG_AI_V2 / retrival.py
WebashalarForML's picture
Update retrival.py
f33b573 verified
from langchain_community.document_loaders import DirectoryLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from langchain_core.documents import Document
from langchain_community.vectorstores import Chroma
import os
import shutil
import asyncio
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.auto import partition
import pytesseract
import os
import re
import uuid
from langchain.schema import Document
from collections import defaultdict
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract')
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract')
# Configurations
UPLOAD_FOLDER = "./uploads"
VECTOR_DB_FOLDER = "./VectorDB"
IMAGE_DB_FOLDER = "./ImageDB"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(VECTOR_DB_FOLDER, exist_ok=True)
os.makedirs(IMAGE_DB_FOLDER, exist_ok=True)
########################################################################################################################################################
####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------####
########################################################################################################################################################
# Loaders for loading Document text, tables and images from any file format.
def load_document(data_path):
processed_documents = []
#element_content = []
table_document = []
#having different process for the pdf
for root, _, files in os.walk(data_path):
for file in files:
file_path = os.path.join(root, file)
doc_id = str(uuid.uuid4()) # Generate a unique ID for the document
print(f"Processing document ID: {doc_id}, Path: {file_path}")
try:
# Determine the file type based on extension
filename, file_extension = os.path.splitext(file.lower())
image_output = f"./ImageDB/{filename}/"
# Use specific partition techniques based on file extension
if file_extension == ".pdf":
elements = partition_pdf(
filename=file_path,
strategy="hi_res", # Use layout detection
infer_table_structure=True,
hi_res_model_name="yolox",
extract_images_in_pdf=True,
extract_image_block_types=["Image","Table"],
extract_image_block_output_dir=image_output,
show_progress=True,
#chunking_strategy="by_title",
)
else:
# Default to auto partition if no specific handler is found
elements = partition(
filename=file_path,
strategy="hi_res",
infer_table_structure=True,
show_progress=True,
#chunking_strategy="by_title"
)
except Exception as e:
print(f"Failed to process document {file_path}: {e}")
continue
categorized_content = {
"tables": {"content": [], "Metadata": []},
"images": {"content": [], "Metadata": []},
"text": {"content": [], "Metadata": []},
}
#element_content.append(elements)
CNT=1
for chunk in elements:
# Safely extract metadata and text
chunk_type = str(type(chunk))
chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {}
chunk_text = getattr(chunk, "text", None)
# Separate content into categories
#if "Table" in chunk_type:
if any(
keyword in chunk_type
for keyword in [
"Table",
"TableChunk"]):
categorized_content["tables"]["content"].append(chunk_text)
categorized_content["tables"]["Metadata"].append(chunk_metadata)
#test1
TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" "
CNT+=1
categorized_content["text"]["content"].append(TABLE_DATA)
categorized_content["text"]["Metadata"].append(chunk_metadata)
elif "Image" in chunk_type:
categorized_content["images"]["content"].append(chunk_text)
categorized_content["images"]["Metadata"].append(chunk_metadata)
elif any(
keyword in chunk_type
for keyword in [
"CompositeElement",
"Text",
"NarrativeText",
"Title",
"Header",
"Footer",
"FigureCaption",
"ListItem",
"UncategorizedText",
"Formula",
"CodeSnippet",
"Address",
"EmailAddress",
"PageBreak",
]
):
categorized_content["text"]["content"].append(chunk_text)
categorized_content["text"]["Metadata"].append(chunk_metadata)
else:
continue
# Append processed document
processed_documents.append({
"doc_id": doc_id,
"source": file_path,
**categorized_content,
})
# Loop over tables and match text from the same document and page
for doc in processed_documents:
cnt=1 # count for storing number of the table
for table_metadata in doc.get("tables", {}).get("Metadata", []):
page_number = table_metadata.get("page_number")
source = doc.get("source")
page_content = ""
for text_metadata, text_content in zip(
doc.get("text", {}).get("Metadata", []),
doc.get("text", {}).get("content", [])
):
page_number2 = text_metadata.get("page_number")
source2 = doc.get("source")
if source == source2 and page_number == page_number2:
print(f"Matching text found for source: {source}, page: {page_number}")
page_content += f"{text_content} " # Concatenate text with a space
# Add the matched content to the table metadata
table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here
table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly
table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival
cnt+=1
# Custom loader of document which will store the table along with the text on that page specifically
# making document of each table with its content
unique_id = str(uuid.uuid4())
table_document.append(
Document(
id =unique_id, # Add doc_id directly
page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing
metadata={
"source": doc["source"],
"text_as_html": table_metadata.get("text_as_html", ""),
"filetype": table_metadata.get("filetype", ""),
"page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing
"image_path": table_metadata.get("image_path", ""),
"file_directory": table_metadata.get("file_directory", ""),
"filename": table_metadata.get("filename", ""),
"Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing
}
)
)
# Initialize a structure to group content by doc_id
grouped_by_doc_id = defaultdict(lambda: {
"text_content": [],
"metadata": None, # Metadata will only be set once per doc_id
})
for doc in processed_documents:
doc_id = doc.get("doc_id")
source = doc.get("source")
text_content = doc.get("text", {}).get("content", [])
metadata_list = doc.get("text", {}).get("Metadata", [])
# Merge text content
grouped_by_doc_id[doc_id]["text_content"].extend(text_content)
# Set metadata (if not already set)
if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list:
metadata = metadata_list[0] # Assuming metadata is consistent
grouped_by_doc_id[doc_id]["metadata"] = {
"source": source,
#"filetype": metadata.get("filetype"),
"file_directory": metadata.get("file_directory"),
"filename": metadata.get("filename"),
#"languages": str(metadata.get("languages")),
}
# Convert grouped content into Document objects
grouped_documents = []
for doc_id, data in grouped_by_doc_id.items():
grouped_documents.append(
Document(
id=doc_id,
page_content=" ".join(data["text_content"]).strip(),
metadata=data["metadata"],
)
)
#Dirctory loader for loading the text data only to specific db
loader = DirectoryLoader(data_path, glob="*.*")
documents = loader.load()
# update the metadata adding filname to the met
for doc in documents:
unique_id = str(uuid.uuid4())
doc.id = unique_id
path=doc.metadata.get("source")
match = re.search(r'([^\\]+\.[^\\]+)$', path)
doc.metadata.update({"filename":match.group(1)})
return grouped_documents,documents,table_document
#grouped_documents = load_document(data_path)
#documents,processed_documents,table_document = load_document(data_path)
########################################################################################################################################################
####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------####
########################################################################################################################################################
def split_text(documents: list[Document]):
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=600,
length_function=len,
add_start_index=True,
)
chunks = text_splitter.split_documents(documents) # splitting the document into chunks
for index in chunks:
index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3
print(f"Split {len(documents)} documents into {len(chunks)} chunks.")
return chunks
########################################################################################################################################################
####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------####
########################################################################################################################################################
#def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]):
async def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]):
CHROMA_PATH = f"./VectorDB/chroma_{name}"
TABLE_PATH = f"./TableDB/chroma_{name}"
if os.path.exists(CHROMA_PATH):
shutil.rmtree(CHROMA_PATH)
if os.path.exists(TABLE_PATH):
shutil.rmtree(TABLE_PATH)
try:
# Load the embedding model
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True)
#embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1")
# Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added]
print("Creating document vector database...")
db =Chroma.from_documents(
documents=chunks,
embedding=embedding_function,
persist_directory=CHROMA_PATH,
)
print("Persisting the document database...")
db.persist()
print("Document database successfully saved.")
# Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added]
if tables !=[]:
print("Creating table vector database...")
tdb =Chroma.from_documents(
documents=tables,
embedding=embedding_function,
persist_directory=TABLE_PATH,
)
print("Persisting the table database...")
db.persist()
print("Table database successfully saved.")
else:
tdb = None
return db, tdb
#return db
except Exception as e:
print("Error while saving to Chroma:", e)
return None
########################################################################################################################################################
####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------####
########################################################################################################################################################
# adding document to Existing db
async def add_document_to_existing_db(new_chunks: list[Document], db_name: str,tables: list[Document]):
CHROMA_PATH = f"./VectorDB/{db_name}"
TABLE_PATH = f"./TableDB/{db_name}"
if not os.path.exists(CHROMA_PATH):
print(f"Database '{db_name}' does not exist. Please create it first.")
return
try:
# Load the embedding model
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True)
#embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1")
# Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added]
print("Creating document vector database...")
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function)
# db =Chroma.from_documents(
# documents=new_chunks,
# embedding=embedding_function,
# persist_directory=CHROMA_PATH,
# )
print("Persisting the document database...")
db.add_documents(new_chunks)
db.persist()
print("Document database successfully saved.")
# Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added]
if tables !=[]:
print("Creating table vector database...")
if not os.path.exists(TABLE_PATH):
print(f"Database '{db_name}' does not exist. Lets create it first.")
print("Persisting the table database...")
tdb =Chroma.from_documents(
documents=tables,
embedding=embedding_function,
persist_directory=TABLE_PATH,
)
else:
tdb = Chroma(persist_directory=TABLE_PATH, embedding_function=embedding_function)
print("Persisting the table database...")
db.add_documents(tables)
db.persist()
print("Table database successfully saved.")
else:
tdb = None
return db, tdb
#return db
except Exception as e:
print("Error while saving to Chroma:", e)
return None
#delete chunks by logics
def delete_chunks_by_source(chroma_path, source_to_delete):
if not os.path.exists(chroma_path):
print(f"Database at path '{chroma_path}' does not exist.")
return
try:
#embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1")
db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function)
print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...")
metadata_list = db.get()["metadatas"]
# Identify indices of chunks to delete
indices_to_delete = [
idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete
]
if not indices_to_delete:
print(f"No chunks found with source '{source_to_delete}'.")
return
print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...")
db.delete(indices=indices_to_delete)
db.persist()
print("Chunks deleted and database updated successfully.")
except Exception as e:
print(f"Error while deleting chunks by source: {e}")
########################################################################################################################################################
####-----------------------------------------------Combine Process of upload, Chunk and Store (FOR NEW DOC)----------------------------------------####
########################################################################################################################################################
# update a data store
async def update_data_store(file_path, db_name):
CHROMA_PATH = f"./VectorDB/chroma_{db_name}"
print(f"Filepath ===> {file_path} DB Name ====> {db_name}")
try:
documents,processed_documents,table_document = load_document(file_path)
#grouped_document,document = load_document(file_path)
print("Documents loaded successfully.")
except Exception as e:
print(f"Error loading documents: {e}")
return
try:
chunks = split_text(documents)
print(f"Text split into {len(chunks)} chunks.")
except Exception as e:
print(f"Error splitting text: {e}")
return
try:
await add_document_to_existing_db(chunks, db_name, table_document)
#await asyncio.run(save_to_chroma(chunks, db_name,table_document))
print(f"Data saved to Chroma for database {db_name}.")
except Exception as e:
print(f"Error saving to Chroma: {e}")
return
########################################################################################################################################################
####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------####
########################################################################################################################################################
async def generate_data_store(file_path, db_name):
CHROMA_PATH = f"./VectorDB/chroma_{db_name}"
print(f"Filepath ===> {file_path} DB Name ====> {db_name}")
try:
documents,processed_documents,table_document = load_document(file_path)
#grouped_document,document = load_document(file_path)
print("Documents loaded successfully.")
except Exception as e:
print(f"Error loading documents: {e}")
return
try:
chunks = split_text(documents)
print(f"Text split into {len(chunks)} chunks.")
except Exception as e:
print(f"Error splitting text: {e}")
return
try:
await save_to_chroma(chunks, db_name, table_document)
#await asyncio.run(save_to_chroma(chunks, db_name,table_document))
print(f"Data saved to Chroma for database {db_name}.")
except Exception as e:
print(f"Error saving to Chroma: {e}")
return
########################################################################################################################################################
####-------------------------------------------------------------------- Token counter -----------------------------------------------------------####
########################################################################################################################################################
def approximate_bpe_token_counter(text):
# Split on spaces, punctuation, and common subword patterns
tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE)
return len(tokens)