from langchain_community.document_loaders import DirectoryLoader from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document from langchain_core.documents import Document from langchain_community.vectorstores import Chroma import os import shutil import asyncio from unstructured.partition.pdf import partition_pdf from unstructured.partition.auto import partition import pytesseract import os import re import uuid from collections import defaultdict pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') # Configurations UPLOAD_FOLDER = "./uploads" VECTOR_DB_FOLDER = "./VectorDB" IMAGE_DB_FOLDER = "./Images" os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) ######################################################################################################################################################## ####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### ######################################################################################################################################################## # Loaders for loading Document text, tables and images from any file format. #data_path=r"H:\DEV PATEL\2025\RAG Project\test_data\google data" def load_document(data_path): processed_documents = [] element_content = [] table_document = [] #having different process for the pdf for root, _, files in os.walk(data_path): for file in files: file_path = os.path.join(root, file) doc_id = str(uuid.uuid4()) # Generate a unique ID for the document print(f"Processing document ID: {doc_id}, Path: {file_path}") try: # Determine the file type based on extension filename, file_extension = os.path.splitext(file.lower()) image_output = f"./Images/{filename}/" # Use specific partition techniques based on file extension if file_extension == ".pdf": elements = partition_pdf( filename=file_path, strategy="hi_res", # Use layout detection infer_table_structure=True, hi_res_model_name="yolox", extract_images_in_pdf=True, extract_image_block_types=["Image","Table"], extract_image_block_output_dir=image_output, show_progress=True, #chunking_strategy="by_title", ) else: # Default to auto partition if no specific handler is found elements = partition( filename=file_path, strategy="hi_res", infer_table_structure=True, show_progress=True, #chunking_strategy="by_title" ) except Exception as e: print(f"Failed to process document {file_path}: {e}") continue categorized_content = { "tables": {"content": [], "Metadata": []}, "images": {"content": [], "Metadata": []}, "text": {"content": [], "Metadata": []}, "text2": {"content": [], "Metadata": []} } element_content.append(elements) CNT=1 for chunk in elements: # Safely extract metadata and text chunk_type = str(type(chunk)) chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} chunk_text = getattr(chunk, "text", None) # Separate content into categories #if "Table" in chunk_type: if any( keyword in chunk_type for keyword in [ "Table", "TableChunk"]): categorized_content["tables"]["content"].append(chunk_text) categorized_content["tables"]["Metadata"].append(chunk_metadata) #test1 TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " CNT+=1 categorized_content["text"]["content"].append(TABLE_DATA) categorized_content["text"]["Metadata"].append(chunk_metadata) elif "Image" in chunk_type: categorized_content["images"]["content"].append(chunk_text) categorized_content["images"]["Metadata"].append(chunk_metadata) elif any( keyword in chunk_type for keyword in [ "CompositeElement", "Text", "NarrativeText", "Title", "Header", "Footer", "FigureCaption", "ListItem", "UncategorizedText", "Formula", "CodeSnippet", "Address", "EmailAddress", "PageBreak", ] ): categorized_content["text"]["content"].append(chunk_text) categorized_content["text"]["Metadata"].append(chunk_metadata) else: continue # Append processed document processed_documents.append({ "doc_id": doc_id, "source": file_path, **categorized_content, }) # Loop over tables and match text from the same document and page for doc in processed_documents: cnt=1 # count for storing number of the table for table_metadata in doc.get("tables", {}).get("Metadata", []): page_number = table_metadata.get("page_number") source = doc.get("source") page_content = "" for text_metadata, text_content in zip( doc.get("text", {}).get("Metadata", []), doc.get("text", {}).get("content", []) ): page_number2 = text_metadata.get("page_number") source2 = doc.get("source") if source == source2 and page_number == page_number2: print(f"Matching text found for source: {source}, page: {page_number}") page_content += f"{text_content} " # Concatenate text with a space # Add the matched content to the table metadata table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival cnt+=1 # Custom loader of document which will store the table along with the text on that page specifically # making document of each table with its content unique_id = str(uuid.uuid4()) table_document.append( Document( id =unique_id, # Add doc_id directly page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing metadata={ "source": doc["source"], "text_as_html": table_metadata.get("text_as_html", ""), "filetype": table_metadata.get("filetype", ""), "page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing "image_path": table_metadata.get("image_path", ""), "file_directory": table_metadata.get("file_directory", ""), "filename": table_metadata.get("filename", ""), "Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing } ) ) # Initialize a structure to group content by doc_id grouped_by_doc_id = defaultdict(lambda: { "text_content": [], "metadata": None, # Metadata will only be set once per doc_id }) for doc in processed_documents: doc_id = doc.get("doc_id") source = doc.get("source") text_content = doc.get("text", {}).get("content", []) metadata_list = doc.get("text", {}).get("Metadata", []) # Merge text content grouped_by_doc_id[doc_id]["text_content"].extend(text_content) # Set metadata (if not already set) if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: metadata = metadata_list[0] # Assuming metadata is consistent grouped_by_doc_id[doc_id]["metadata"] = { "source": source, "filetype": metadata.get("filetype"), "file_directory": metadata.get("file_directory"), "filename": metadata.get("filename"), "languages": str(metadata.get("languages")), } # Convert grouped content into Document objects grouped_documents = [] for doc_id, data in grouped_by_doc_id.items(): grouped_documents.append( Document( id=doc_id, page_content=" ".join(data["text_content"]).strip(), metadata=data["metadata"], ) ) # Output the grouped documents for document in grouped_documents: print(document) #Dirctory loader for loading the text data only to specific db loader = DirectoryLoader(data_path, glob="*.*") documents = loader.load() # update the metadata adding filname to the met for doc in documents: unique_id = str(uuid.uuid4()) doc.id = unique_id path=doc.metadata.get("source") match = re.search(r'([^\\]+\.[^\\]+)$', path) doc.metadata.update({"filename":match.group(1)}) return documents,grouped_documents #documents,processed_documents,table_document = load_document(data_path) ######################################################################################################################################################## ####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### ######################################################################################################################################################## def split_text(documents: list[Document]): text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=500, length_function=len, add_start_index=True, ) chunks = text_splitter.split_documents(documents) # splitting the document into chunks for index in chunks: index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 print(f"Split {len(documents)} documents into {len(chunks)} chunks.") return chunks ######################################################################################################################################################## ####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### ######################################################################################################################################################## #def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): def save_to_chroma(chunks: list[Document], name: str): CHROMA_PATH = f"./VectorDB/chroma_{name}" #TABLE_PATH = f"./TableDB/chroma_{name}" if os.path.exists(CHROMA_PATH): shutil.rmtree(CHROMA_PATH) # if os.path.exists(TABLE_PATH): # shutil.rmtree(TABLE_PATH) try: # Load the embedding model #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] print("Creating document vector database...") db = Chroma.from_documents( documents=chunks, embedding=embedding_function, persist_directory=CHROMA_PATH, ) print("Document database successfully saved.") # # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] # if tables: # print("Creating table vector database...") # tdb = Chroma.from_documents( # documents=tables, # embedding=embedding_function, # persist_directory=TABLE_PATH, # ) # print("Table database successfully saved.") # else: # tdb = None #return db, tdb return db except Exception as e: print("Error while saving to Chroma:", e) return None # def get_unique_sources(chroma_path): # db = Chroma(persist_directory=chroma_path) # metadata_list = db.get()["metadatas"] # unique_sources = {metadata["source"] for metadata in metadata_list if "source" in metadata} # return list(unique_sources) ######################################################################################################################################################## ####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### ######################################################################################################################################################## # def add_document_to_existing_db(new_documents: list[Document], db_name: str): # CHROMA_PATH = f"./VectorDB/chroma_{db_name}" # if not os.path.exists(CHROMA_PATH): # print(f"Database '{db_name}' does not exist. Please create it first.") # return # try: # embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") # #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) # print("Adding new documents to the existing database...") # chunks = split_text(new_documents) # db.add_documents(chunks) # db.persist() # print("New documents added and database updated successfully.") # except Exception as e: # print("Error while adding documents to existing database:", e) # def delete_chunks_by_source(chroma_path, source_to_delete): # if not os.path.exists(chroma_path): # print(f"Database at path '{chroma_path}' does not exist.") # return # try: # #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") # embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) # print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") # metadata_list = db.get()["metadatas"] # # Identify indices of chunks to delete # indices_to_delete = [ # idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete # ] # if not indices_to_delete: # print(f"No chunks found with source '{source_to_delete}'.") # return # print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") # db.delete(indices=indices_to_delete) # db.persist() # print("Chunks deleted and database updated successfully.") # except Exception as e: # print(f"Error while deleting chunks by source: {e}") # # update a data store # def update_data_store(file_path, db_name): # CHROMA_PATH = f"./VectorDB/chroma_{db_name}" # print(f"Filepath ===> {file_path} DB Name ====> {db_name}") # try: # documents,table_document = load_document(file_path) # print("Documents loaded successfully.") # except Exception as e: # print(f"Error loading documents: {e}") # return # try: # chunks = split_text(documents) # print(f"Text split into {len(chunks)} chunks.") # except Exception as e: # print(f"Error splitting text: {e}") # return # try: # asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name, table_document))) # print(f"Data saved to Chroma for database {db_name}.") # except Exception as e: # print(f"Error saving to Chroma: {e}") # return ######################################################################################################################################################## ####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### ######################################################################################################################################################## def generate_data_store(file_path, db_name): CHROMA_PATH = f"./VectorDB/chroma_{db_name}" print(f"Filepath ===> {file_path} DB Name ====> {db_name}") try: documents,grouped_documents = load_document(file_path) print("Documents loaded successfully.") except Exception as e: print(f"Error loading documents: {e}") return try: chunks = split_text(grouped_documents) print(f"Text split into {len(chunks)} chunks.") except Exception as e: print(f"Error splitting text: {e}") return try: #asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name, table_document))) asyncio.run(save_to_chroma(save_to_chroma(chunks, db_name))) print(f"Data saved to Chroma for database {db_name}.") except Exception as e: print(f"Error saving to Chroma: {e}") return