Spaces:
Running
Running
from langchain_community.document_loaders import DirectoryLoader | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import Document | |
from langchain_core.documents import Document | |
from langchain_community.vectorstores import Chroma | |
import os | |
import shutil | |
import asyncio | |
from unstructured.partition.pdf import partition_pdf | |
from unstructured.partition.auto import partition | |
import pytesseract | |
import os | |
import re | |
import uuid | |
from collections import defaultdict | |
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
# Configurations | |
UPLOAD_FOLDER = "./uploads" | |
VECTOR_DB_FOLDER = "./VectorDB" | |
IMAGE_DB_FOLDER = "./Images" | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
######################################################################################################################################################## | |
####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### | |
######################################################################################################################################################## | |
# Loaders for loading Document text, tables and images from any file format. | |
#data_path=r"H:\DEV PATEL\2025\RAG Project\test_data\google data" | |
def load_document(data_path): | |
processed_documents = [] | |
element_content = [] | |
table_document = [] | |
#having different process for the pdf | |
for root, _, files in os.walk(data_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
doc_id = str(uuid.uuid4()) # Generate a unique ID for the document | |
print(f"Processing document ID: {doc_id}, Path: {file_path}") | |
try: | |
# Determine the file type based on extension | |
filename, file_extension = os.path.splitext(file.lower()) | |
image_output = f"./Images/{filename}/" | |
# Use specific partition techniques based on file extension | |
if file_extension == ".pdf": | |
elements = partition_pdf( | |
filename=file_path, | |
strategy="hi_res", # Use layout detection | |
infer_table_structure=True, | |
hi_res_model_name="yolox", | |
extract_images_in_pdf=True, | |
extract_image_block_types=["Image","Table"], | |
extract_image_block_output_dir=image_output, | |
show_progress=True, | |
#chunking_strategy="by_title", | |
) | |
else: | |
# Default to auto partition if no specific handler is found | |
elements = partition( | |
filename=file_path, | |
strategy="hi_res", | |
infer_table_structure=True, | |
show_progress=True, | |
#chunking_strategy="by_title" | |
) | |
except Exception as e: | |
print(f"Failed to process document {file_path}: {e}") | |
continue | |
categorized_content = { | |
"tables": {"content": [], "Metadata": []}, | |
"images": {"content": [], "Metadata": []}, | |
"text": {"content": [], "Metadata": []}, | |
"text2": {"content": [], "Metadata": []} | |
} | |
element_content.append(elements) | |
CNT=1 | |
for chunk in elements: | |
# Safely extract metadata and text | |
chunk_type = str(type(chunk)) | |
chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} | |
chunk_text = getattr(chunk, "text", None) | |
# Separate content into categories | |
#if "Table" in chunk_type: | |
if any( | |
keyword in chunk_type | |
for keyword in [ | |
"Table", | |
"TableChunk"]): | |
categorized_content["tables"]["content"].append(chunk_text) | |
categorized_content["tables"]["Metadata"].append(chunk_metadata) | |
#test1 | |
TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " | |
CNT+=1 | |
categorized_content["text"]["content"].append(TABLE_DATA) | |
categorized_content["text"]["Metadata"].append(chunk_metadata) | |
elif "Image" in chunk_type: | |
categorized_content["images"]["content"].append(chunk_text) | |
categorized_content["images"]["Metadata"].append(chunk_metadata) | |
elif any( | |
keyword in chunk_type | |
for keyword in [ | |
"CompositeElement", | |
"Text", | |
"NarrativeText", | |
"Title", | |
"Header", | |
"Footer", | |
"FigureCaption", | |
"ListItem", | |
"UncategorizedText", | |
"Formula", | |
"CodeSnippet", | |
"Address", | |
"EmailAddress", | |
"PageBreak", | |
] | |
): | |
categorized_content["text"]["content"].append(chunk_text) | |
categorized_content["text"]["Metadata"].append(chunk_metadata) | |
else: | |
continue | |
# Append processed document | |
processed_documents.append({ | |
"doc_id": doc_id, | |
"source": file_path, | |
**categorized_content, | |
}) | |
# Loop over tables and match text from the same document and page | |
''' | |
for doc in processed_documents: | |
cnt=1 # count for storing number of the table | |
for table_metadata in doc.get("tables", {}).get("Metadata", []): | |
page_number = table_metadata.get("page_number") | |
source = doc.get("source") | |
page_content = "" | |
for text_metadata, text_content in zip( | |
doc.get("text", {}).get("Metadata", []), | |
doc.get("text", {}).get("content", []) | |
): | |
page_number2 = text_metadata.get("page_number") | |
source2 = doc.get("source") | |
if source == source2 and page_number == page_number2: | |
print(f"Matching text found for source: {source}, page: {page_number}") | |
page_content += f"{text_content} " # Concatenate text with a space | |
# Add the matched content to the table metadata | |
table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here | |
table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly | |
table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival | |
cnt+=1 | |
# Custom loader of document which will store the table along with the text on that page specifically | |
# making document of each table with its content | |
unique_id = str(uuid.uuid4()) | |
table_document.append( | |
Document( | |
id =unique_id, # Add doc_id directly | |
page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing | |
metadata={ | |
"source": doc["source"], | |
"text_as_html": table_metadata.get("text_as_html", ""), | |
"filetype": table_metadata.get("filetype", ""), | |
"page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing | |
"image_path": table_metadata.get("image_path", ""), | |
"file_directory": table_metadata.get("file_directory", ""), | |
"filename": table_metadata.get("filename", ""), | |
"Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing | |
} | |
) | |
) | |
''' | |
# Initialize a structure to group content by doc_id | |
grouped_by_doc_id = defaultdict(lambda: { | |
"text_content": [], | |
"metadata": None, # Metadata will only be set once per doc_id | |
}) | |
for doc in processed_documents: | |
doc_id = doc.get("doc_id") | |
source = doc.get("source") | |
text_content = doc.get("text", {}).get("content", []) | |
metadata_list = doc.get("text", {}).get("Metadata", []) | |
# Merge text content | |
grouped_by_doc_id[doc_id]["text_content"].extend(text_content) | |
# Set metadata (if not already set) | |
if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: | |
metadata = metadata_list[0] # Assuming metadata is consistent | |
grouped_by_doc_id[doc_id]["metadata"] = { | |
"source": source, | |
"filetype": metadata.get("filetype"), | |
"file_directory": metadata.get("file_directory"), | |
"filename": metadata.get("filename"), | |
"languages": str(metadata.get("languages")), | |
} | |
# Convert grouped content into Document objects | |
grouped_documents = [] | |
for doc_id, data in grouped_by_doc_id.items(): | |
grouped_documents.append( | |
Document( | |
id=doc_id, | |
page_content=" ".join(data["text_content"]).strip(), | |
metadata=data["metadata"], | |
) | |
) | |
# Output the grouped documents | |
for document in grouped_documents: | |
print(document) | |
#Dirctory loader for loading the text data only to specific db | |
''' | |
loader = DirectoryLoader(data_path, glob="*.*") | |
documents = loader.load() | |
# update the metadata adding filname to the met | |
for doc in documents: | |
unique_id = str(uuid.uuid4()) | |
doc.id = unique_id | |
path=doc.metadata.get("source") | |
match = re.search(r'([^\\]+\.[^\\]+)$', path) | |
doc.metadata.update({"filename":match.group(1)}) | |
return documents, | |
''' | |
return grouped_documents | |
#documents,processed_documents,table_document = load_document(data_path) | |
######################################################################################################################################################## | |
####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### | |
######################################################################################################################################################## | |
def split_text(documents: list[Document]): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=500, | |
length_function=len, | |
add_start_index=True, | |
) | |
chunks = text_splitter.split_documents(documents) # splitting the document into chunks | |
for index in chunks: | |
index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 | |
print(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
return chunks | |
######################################################################################################################################################## | |
####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### | |
######################################################################################################################################################## | |
#def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
def save_to_chroma(chunks: list[Document], name: str): | |
CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
#TABLE_PATH = f"./TableDB/chroma_{name}" | |
if os.path.exists(CHROMA_PATH): | |
shutil.rmtree(CHROMA_PATH) | |
# if os.path.exists(TABLE_PATH): | |
# shutil.rmtree(TABLE_PATH) | |
try: | |
# Load the embedding model | |
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
#embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
print("Creating document vector database...") | |
db = Chroma.from_documents( | |
documents=chunks, | |
embedding=embedding_function, | |
persist_directory=CHROMA_PATH, | |
) | |
print("Document database successfully saved.") | |
# # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
# if tables: | |
# print("Creating table vector database...") | |
# tdb = Chroma.from_documents( | |
# documents=tables, | |
# embedding=embedding_function, | |
# persist_directory=TABLE_PATH, | |
# ) | |
# print("Table database successfully saved.") | |
# else: | |
# tdb = None | |
#return db, tdb | |
return db | |
except Exception as e: | |
print("Error while saving to Chroma:", e) | |
return None | |
# def get_unique_sources(chroma_path): | |
# db = Chroma(persist_directory=chroma_path) | |
# metadata_list = db.get()["metadatas"] | |
# unique_sources = {metadata["source"] for metadata in metadata_list if "source" in metadata} | |
# return list(unique_sources) | |
######################################################################################################################################################## | |
####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### | |
######################################################################################################################################################## | |
# def add_document_to_existing_db(new_documents: list[Document], db_name: str): | |
# CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
# if not os.path.exists(CHROMA_PATH): | |
# print(f"Database '{db_name}' does not exist. Please create it first.") | |
# return | |
# try: | |
# embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
# #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
# print("Adding new documents to the existing database...") | |
# chunks = split_text(new_documents) | |
# db.add_documents(chunks) | |
# db.persist() | |
# print("New documents added and database updated successfully.") | |
# except Exception as e: | |
# print("Error while adding documents to existing database:", e) | |
# def delete_chunks_by_source(chroma_path, source_to_delete): | |
# if not os.path.exists(chroma_path): | |
# print(f"Database at path '{chroma_path}' does not exist.") | |
# return | |
# try: | |
# #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
# embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) | |
# print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") | |
# metadata_list = db.get()["metadatas"] | |
# # Identify indices of chunks to delete | |
# indices_to_delete = [ | |
# idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete | |
# ] | |
# if not indices_to_delete: | |
# print(f"No chunks found with source '{source_to_delete}'.") | |
# return | |
# print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") | |
# db.delete(indices=indices_to_delete) | |
# db.persist() | |
# print("Chunks deleted and database updated successfully.") | |
# except Exception as e: | |
# print(f"Error while deleting chunks by source: {e}") | |
# # update a data store | |
# def update_data_store(file_path, db_name): | |
# CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
# print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
# try: | |
# documents,table_document = load_document(file_path) | |
# print("Documents loaded successfully.") | |
# except Exception as e: | |
# print(f"Error loading documents: {e}") | |
# return | |
# try: | |
# chunks = split_text(documents) | |
# print(f"Text split into {len(chunks)} chunks.") | |
# except Exception as e: | |
# print(f"Error splitting text: {e}") | |
# return | |
# try: | |
# asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name, table_document))) | |
# print(f"Data saved to Chroma for database {db_name}.") | |
# except Exception as e: | |
# print(f"Error saving to Chroma: {e}") | |
# return | |
######################################################################################################################################################## | |
####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### | |
######################################################################################################################################################## | |
def generate_data_store(file_path, db_name): | |
CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
try: | |
#documents,grouped_documents = load_document(file_path) | |
grouped_documents = load_document(file_path) | |
print("Documents loaded successfully.") | |
except Exception as e: | |
print(f"Error loading documents: {e}") | |
return | |
try: | |
chunks = split_text(grouped_documents) | |
print(f"Text split into {len(chunks)} chunks.") | |
except Exception as e: | |
print(f"Error splitting text: {e}") | |
return | |
try: | |
#asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name, table_document))) | |
asyncio.run(save_to_chroma(chunks, db_name)) | |
print(f"Data saved to Chroma for database {db_name}.") | |
except Exception as e: | |
print(f"Error saving to Chroma: {e}") | |
return | |