from langchain_community.document_loaders import DirectoryLoader from langchain.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document from langchain_core.documents import Document from langchain_community.vectorstores import Chroma import os import shutil import asyncio from unstructured.partition.pdf import partition_pdf from unstructured.partition.auto import partition import pytesseract import os import re import uuid from langchain.schema import Document from collections import defaultdict pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') # Configurations UPLOAD_FOLDER = "./uploads" VECTOR_DB_FOLDER = "./VectorDB" IMAGE_DB_FOLDER = "./ImageDB" os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) os.makedirs(IMAGE_DB_FOLDER, exist_ok=True) ######################################################################################################################################################## ####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### ######################################################################################################################################################## # Loaders for loading Document text, tables and images from any file format. def load_document(data_path): processed_documents = [] #element_content = [] table_document = [] #having different process for the pdf for root, _, files in os.walk(data_path): for file in files: file_path = os.path.join(root, file) doc_id = str(uuid.uuid4()) # Generate a unique ID for the document print(f"Processing document ID: {doc_id}, Path: {file_path}") try: # Determine the file type based on extension filename, file_extension = os.path.splitext(file.lower()) image_output = f"./ImageDB/{filename}/" # Use specific partition techniques based on file extension if file_extension == ".pdf": elements = partition_pdf( filename=file_path, strategy="hi_res", # Use layout detection infer_table_structure=True, hi_res_model_name="yolox", extract_images_in_pdf=True, extract_image_block_types=["Image","Table"], extract_image_block_output_dir=image_output, show_progress=True, #chunking_strategy="by_title", ) else: # Default to auto partition if no specific handler is found elements = partition( filename=file_path, strategy="hi_res", infer_table_structure=True, show_progress=True, #chunking_strategy="by_title" ) except Exception as e: print(f"Failed to process document {file_path}: {e}") continue categorized_content = { "tables": {"content": [], "Metadata": []}, "images": {"content": [], "Metadata": []}, "text": {"content": [], "Metadata": []}, } #element_content.append(elements) CNT=1 for chunk in elements: # Safely extract metadata and text chunk_type = str(type(chunk)) chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} chunk_text = getattr(chunk, "text", None) # Separate content into categories #if "Table" in chunk_type: if any( keyword in chunk_type for keyword in [ "Table", "TableChunk"]): categorized_content["tables"]["content"].append(chunk_text) categorized_content["tables"]["Metadata"].append(chunk_metadata) #test1 TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " CNT+=1 categorized_content["text"]["content"].append(TABLE_DATA) categorized_content["text"]["Metadata"].append(chunk_metadata) elif "Image" in chunk_type: categorized_content["images"]["content"].append(chunk_text) categorized_content["images"]["Metadata"].append(chunk_metadata) elif any( keyword in chunk_type for keyword in [ "CompositeElement", "Text", "NarrativeText", "Title", "Header", "Footer", "FigureCaption", "ListItem", "UncategorizedText", "Formula", "CodeSnippet", "Address", "EmailAddress", "PageBreak", ] ): categorized_content["text"]["content"].append(chunk_text) categorized_content["text"]["Metadata"].append(chunk_metadata) else: continue # Append processed document processed_documents.append({ "doc_id": doc_id, "source": file_path, **categorized_content, }) # Loop over tables and match text from the same document and page for doc in processed_documents: cnt=1 # count for storing number of the table for table_metadata in doc.get("tables", {}).get("Metadata", []): page_number = table_metadata.get("page_number") source = doc.get("source") page_content = "" for text_metadata, text_content in zip( doc.get("text", {}).get("Metadata", []), doc.get("text", {}).get("content", []) ): page_number2 = text_metadata.get("page_number") source2 = doc.get("source") if source == source2 and page_number == page_number2: print(f"Matching text found for source: {source}, page: {page_number}") page_content += f"{text_content} " # Concatenate text with a space # Add the matched content to the table metadata table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival cnt+=1 # Custom loader of document which will store the table along with the text on that page specifically # making document of each table with its content unique_id = str(uuid.uuid4()) table_document.append( Document( id =unique_id, # Add doc_id directly page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing metadata={ "source": doc["source"], "text_as_html": table_metadata.get("text_as_html", ""), "filetype": table_metadata.get("filetype", ""), "page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing "image_path": table_metadata.get("image_path", ""), "file_directory": table_metadata.get("file_directory", ""), "filename": table_metadata.get("filename", ""), "Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing } ) ) # Initialize a structure to group content by doc_id grouped_by_doc_id = defaultdict(lambda: { "text_content": [], "metadata": None, # Metadata will only be set once per doc_id }) for doc in processed_documents: doc_id = doc.get("doc_id") source = doc.get("source") text_content = doc.get("text", {}).get("content", []) metadata_list = doc.get("text", {}).get("Metadata", []) # Merge text content grouped_by_doc_id[doc_id]["text_content"].extend(text_content) # Set metadata (if not already set) if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: metadata = metadata_list[0] # Assuming metadata is consistent grouped_by_doc_id[doc_id]["metadata"] = { "source": source, #"filetype": metadata.get("filetype"), "file_directory": metadata.get("file_directory"), "filename": metadata.get("filename"), #"languages": str(metadata.get("languages")), } # Convert grouped content into Document objects grouped_documents = [] for doc_id, data in grouped_by_doc_id.items(): grouped_documents.append( Document( id=doc_id, page_content=" ".join(data["text_content"]).strip(), metadata=data["metadata"], ) ) #Dirctory loader for loading the text data only to specific db loader = DirectoryLoader(data_path, glob="*.*") documents = loader.load() # update the metadata adding filname to the met for doc in documents: unique_id = str(uuid.uuid4()) doc.id = unique_id path=doc.metadata.get("source") match = re.search(r'([^\\]+\.[^\\]+)$', path) doc.metadata.update({"filename":match.group(1)}) return grouped_documents,documents,table_document #grouped_documents = load_document(data_path) #documents,processed_documents,table_document = load_document(data_path) ######################################################################################################################################################## ####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### ######################################################################################################################################################## def split_text(documents: list[Document]): text_splitter = RecursiveCharacterTextSplitter( chunk_size=2000, chunk_overlap=600, length_function=len, add_start_index=True, ) chunks = text_splitter.split_documents(documents) # splitting the document into chunks for index in chunks: index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 print(f"Split {len(documents)} documents into {len(chunks)} chunks.") return chunks ######################################################################################################################################################## ####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### ######################################################################################################################################################## #def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): async def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): CHROMA_PATH = f"./VectorDB/chroma_{name}" TABLE_PATH = f"./TableDB/chroma_{name}" if os.path.exists(CHROMA_PATH): shutil.rmtree(CHROMA_PATH) if os.path.exists(TABLE_PATH): shutil.rmtree(TABLE_PATH) try: # Load the embedding model embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] print("Creating document vector database...") db =Chroma.from_documents( documents=chunks, embedding=embedding_function, persist_directory=CHROMA_PATH, ) print("Persisting the document database...") db.persist() print("Document database successfully saved.") # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] if tables !=[]: print("Creating table vector database...") tdb =Chroma.from_documents( documents=tables, embedding=embedding_function, persist_directory=TABLE_PATH, ) print("Persisting the table database...") db.persist() print("Table database successfully saved.") else: tdb = None return db, tdb #return db except Exception as e: print("Error while saving to Chroma:", e) return None ######################################################################################################################################################## ####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### ######################################################################################################################################################## # adding document to Existing db async def add_document_to_existing_db(new_chunks: list[Document], db_name: str,tables: list[Document]): CHROMA_PATH = f"./VectorDB/{db_name}" TABLE_PATH = f"./TableDB/{db_name}" if not os.path.exists(CHROMA_PATH): print(f"Database '{db_name}' does not exist. Please create it first.") return try: # Load the embedding model embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) #embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") # Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] print("Creating document vector database...") db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) # db =Chroma.from_documents( # documents=new_chunks, # embedding=embedding_function, # persist_directory=CHROMA_PATH, # ) print("Persisting the document database...") db.add_documents(new_chunks) db.persist() print("Document database successfully saved.") # Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] if tables !=[]: print("Creating table vector database...") if not os.path.exists(TABLE_PATH): print(f"Database '{db_name}' does not exist. Lets create it first.") print("Persisting the table database...") tdb =Chroma.from_documents( documents=tables, embedding=embedding_function, persist_directory=TABLE_PATH, ) else: tdb = Chroma(persist_directory=TABLE_PATH, embedding_function=embedding_function) print("Persisting the table database...") db.add_documents(tables) db.persist() print("Table database successfully saved.") else: tdb = None return db, tdb #return db except Exception as e: print("Error while saving to Chroma:", e) return None #delete chunks by logics def delete_chunks_by_source(chroma_path, source_to_delete): if not os.path.exists(chroma_path): print(f"Database at path '{chroma_path}' does not exist.") return try: #embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") metadata_list = db.get()["metadatas"] # Identify indices of chunks to delete indices_to_delete = [ idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete ] if not indices_to_delete: print(f"No chunks found with source '{source_to_delete}'.") return print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") db.delete(indices=indices_to_delete) db.persist() print("Chunks deleted and database updated successfully.") except Exception as e: print(f"Error while deleting chunks by source: {e}") ######################################################################################################################################################## ####-----------------------------------------------Combine Process of upload, Chunk and Store (FOR NEW DOC)----------------------------------------#### ######################################################################################################################################################## # update a data store async def update_data_store(file_path, db_name): CHROMA_PATH = f"./VectorDB/chroma_{db_name}" print(f"Filepath ===> {file_path} DB Name ====> {db_name}") try: documents,processed_documents,table_document = load_document(file_path) #grouped_document,document = load_document(file_path) print("Documents loaded successfully.") except Exception as e: print(f"Error loading documents: {e}") return try: chunks = split_text(documents) print(f"Text split into {len(chunks)} chunks.") except Exception as e: print(f"Error splitting text: {e}") return try: await add_document_to_existing_db(chunks, db_name, table_document) #await asyncio.run(save_to_chroma(chunks, db_name,table_document)) print(f"Data saved to Chroma for database {db_name}.") except Exception as e: print(f"Error saving to Chroma: {e}") return ######################################################################################################################################################## ####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### ######################################################################################################################################################## async def generate_data_store(file_path, db_name): CHROMA_PATH = f"./VectorDB/chroma_{db_name}" print(f"Filepath ===> {file_path} DB Name ====> {db_name}") try: documents,processed_documents,table_document = load_document(file_path) #grouped_document,document = load_document(file_path) print("Documents loaded successfully.") except Exception as e: print(f"Error loading documents: {e}") return try: chunks = split_text(documents) print(f"Text split into {len(chunks)} chunks.") except Exception as e: print(f"Error splitting text: {e}") return try: await save_to_chroma(chunks, db_name, table_document) #await asyncio.run(save_to_chroma(chunks, db_name,table_document)) print(f"Data saved to Chroma for database {db_name}.") except Exception as e: print(f"Error saving to Chroma: {e}") return ######################################################################################################################################################## ####-------------------------------------------------------------------- Token counter -----------------------------------------------------------#### ######################################################################################################################################################## def approximate_bpe_token_counter(text): # Split on spaces, punctuation, and common subword patterns tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE) return len(tokens)