Spaces:
Running
Running
from langchain_community.document_loaders import DirectoryLoader | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import Document | |
from langchain_core.documents import Document | |
from langchain_community.vectorstores import Chroma | |
import os | |
import shutil | |
import asyncio | |
from unstructured.partition.pdf import partition_pdf | |
from unstructured.partition.auto import partition | |
import pytesseract | |
import os | |
import re | |
import uuid | |
from langchain.schema import Document | |
from collections import defaultdict | |
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
pytesseract.pytesseract.tesseract_cmd = (r'/usr/bin/tesseract') | |
# Configurations | |
UPLOAD_FOLDER = "./uploads" | |
VECTOR_DB_FOLDER = "./VectorDB" | |
IMAGE_DB_FOLDER = "./ImageDB" | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(VECTOR_DB_FOLDER, exist_ok=True) | |
os.makedirs(IMAGE_DB_FOLDER, exist_ok=True) | |
######################################################################################################################################################## | |
####-------------------------------------------------------------- Documnet Loader ---------------------------------------------------------------#### | |
######################################################################################################################################################## | |
# Loaders for loading Document text, tables and images from any file format. | |
def load_document(data_path): | |
processed_documents = [] | |
#element_content = [] | |
table_document = [] | |
#having different process for the pdf | |
for root, _, files in os.walk(data_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
doc_id = str(uuid.uuid4()) # Generate a unique ID for the document | |
print(f"Processing document ID: {doc_id}, Path: {file_path}") | |
try: | |
# Determine the file type based on extension | |
filename, file_extension = os.path.splitext(file.lower()) | |
image_output = f"./ImageDB/{filename}/" | |
# Use specific partition techniques based on file extension | |
if file_extension == ".pdf": | |
elements = partition_pdf( | |
filename=file_path, | |
strategy="hi_res", # Use layout detection | |
infer_table_structure=True, | |
hi_res_model_name="yolox", | |
extract_images_in_pdf=True, | |
extract_image_block_types=["Image","Table"], | |
extract_image_block_output_dir=image_output, | |
show_progress=True, | |
#chunking_strategy="by_title", | |
) | |
else: | |
# Default to auto partition if no specific handler is found | |
elements = partition( | |
filename=file_path, | |
strategy="hi_res", | |
infer_table_structure=True, | |
show_progress=True, | |
#chunking_strategy="by_title" | |
) | |
except Exception as e: | |
print(f"Failed to process document {file_path}: {e}") | |
continue | |
categorized_content = { | |
"tables": {"content": [], "Metadata": []}, | |
"images": {"content": [], "Metadata": []}, | |
"text": {"content": [], "Metadata": []}, | |
} | |
#element_content.append(elements) | |
CNT=1 | |
for chunk in elements: | |
# Safely extract metadata and text | |
chunk_type = str(type(chunk)) | |
chunk_metadata = chunk.metadata.to_dict() if chunk.metadata else {} | |
chunk_text = getattr(chunk, "text", None) | |
# Separate content into categories | |
#if "Table" in chunk_type: | |
if any( | |
keyword in chunk_type | |
for keyword in [ | |
"Table", | |
"TableChunk"]): | |
categorized_content["tables"]["content"].append(chunk_text) | |
categorized_content["tables"]["Metadata"].append(chunk_metadata) | |
#test1 | |
TABLE_DATA=f"Table number {CNT} "+chunk_metadata.get("text_as_html", "")+" " | |
CNT+=1 | |
categorized_content["text"]["content"].append(TABLE_DATA) | |
categorized_content["text"]["Metadata"].append(chunk_metadata) | |
elif "Image" in chunk_type: | |
categorized_content["images"]["content"].append(chunk_text) | |
categorized_content["images"]["Metadata"].append(chunk_metadata) | |
elif any( | |
keyword in chunk_type | |
for keyword in [ | |
"CompositeElement", | |
"Text", | |
"NarrativeText", | |
"Title", | |
"Header", | |
"Footer", | |
"FigureCaption", | |
"ListItem", | |
"UncategorizedText", | |
"Formula", | |
"CodeSnippet", | |
"Address", | |
"EmailAddress", | |
"PageBreak", | |
] | |
): | |
categorized_content["text"]["content"].append(chunk_text) | |
categorized_content["text"]["Metadata"].append(chunk_metadata) | |
else: | |
continue | |
# Append processed document | |
processed_documents.append({ | |
"doc_id": doc_id, | |
"source": file_path, | |
**categorized_content, | |
}) | |
# Loop over tables and match text from the same document and page | |
for doc in processed_documents: | |
cnt=1 # count for storing number of the table | |
for table_metadata in doc.get("tables", {}).get("Metadata", []): | |
page_number = table_metadata.get("page_number") | |
source = doc.get("source") | |
page_content = "" | |
for text_metadata, text_content in zip( | |
doc.get("text", {}).get("Metadata", []), | |
doc.get("text", {}).get("content", []) | |
): | |
page_number2 = text_metadata.get("page_number") | |
source2 = doc.get("source") | |
if source == source2 and page_number == page_number2: | |
print(f"Matching text found for source: {source}, page: {page_number}") | |
page_content += f"{text_content} " # Concatenate text with a space | |
# Add the matched content to the table metadata | |
table_metadata["page_content"] =f"Table number {cnt} "+table_metadata.get("text_as_html", "")+" "+page_content.strip() # Remove trailing spaces and have the content proper here | |
table_metadata["text_as_html"] = table_metadata.get("text_as_html", "") # we are also storing it seperatly | |
table_metadata["Table_number"] = cnt # addiing the table number it will be use in retrival | |
cnt+=1 | |
# Custom loader of document which will store the table along with the text on that page specifically | |
# making document of each table with its content | |
unique_id = str(uuid.uuid4()) | |
table_document.append( | |
Document( | |
id =unique_id, # Add doc_id directly | |
page_content=table_metadata.get("page_content", ""), # Get page_content from metadata, default to empty string if missing | |
metadata={ | |
"source": doc["source"], | |
"text_as_html": table_metadata.get("text_as_html", ""), | |
"filetype": table_metadata.get("filetype", ""), | |
"page_number": str(table_metadata.get("page_number", 0)), # Default to 0 if missing | |
"image_path": table_metadata.get("image_path", ""), | |
"file_directory": table_metadata.get("file_directory", ""), | |
"filename": table_metadata.get("filename", ""), | |
"Table_number": str(table_metadata.get("Table_number", 0)) # Default to 0 if missing | |
} | |
) | |
) | |
# Initialize a structure to group content by doc_id | |
grouped_by_doc_id = defaultdict(lambda: { | |
"text_content": [], | |
"metadata": None, # Metadata will only be set once per doc_id | |
}) | |
for doc in processed_documents: | |
doc_id = doc.get("doc_id") | |
source = doc.get("source") | |
text_content = doc.get("text", {}).get("content", []) | |
metadata_list = doc.get("text", {}).get("Metadata", []) | |
# Merge text content | |
grouped_by_doc_id[doc_id]["text_content"].extend(text_content) | |
# Set metadata (if not already set) | |
if grouped_by_doc_id[doc_id]["metadata"] is None and metadata_list: | |
metadata = metadata_list[0] # Assuming metadata is consistent | |
grouped_by_doc_id[doc_id]["metadata"] = { | |
"source": source, | |
#"filetype": metadata.get("filetype"), | |
"file_directory": metadata.get("file_directory"), | |
"filename": metadata.get("filename"), | |
#"languages": str(metadata.get("languages")), | |
} | |
# Convert grouped content into Document objects | |
grouped_documents = [] | |
for doc_id, data in grouped_by_doc_id.items(): | |
grouped_documents.append( | |
Document( | |
id=doc_id, | |
page_content=" ".join(data["text_content"]).strip(), | |
metadata=data["metadata"], | |
) | |
) | |
#Dirctory loader for loading the text data only to specific db | |
loader = DirectoryLoader(data_path, glob="*.*") | |
documents = loader.load() | |
# update the metadata adding filname to the met | |
for doc in documents: | |
unique_id = str(uuid.uuid4()) | |
doc.id = unique_id | |
path=doc.metadata.get("source") | |
match = re.search(r'([^\\]+\.[^\\]+)$', path) | |
doc.metadata.update({"filename":match.group(1)}) | |
return grouped_documents,documents,table_document | |
#grouped_documents = load_document(data_path) | |
#documents,processed_documents,table_document = load_document(data_path) | |
######################################################################################################################################################## | |
####-------------------------------------------------------------- Chunking the Text --------------------------------------------------------------#### | |
######################################################################################################################################################## | |
def split_text(documents: list[Document]): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=2000, | |
chunk_overlap=600, | |
length_function=len, | |
add_start_index=True, | |
) | |
chunks = text_splitter.split_documents(documents) # splitting the document into chunks | |
for index in chunks: | |
index.metadata["start_index"]=str(index.metadata["start_index"]) # the converstion of int metadata to str was done to store it in sqlite3 | |
print(f"Split {len(documents)} documents into {len(chunks)} chunks.") | |
return chunks | |
######################################################################################################################################################## | |
####---------------------------------------------------- Creating and Storeing Data in Vector DB --------------------------------------------------#### | |
######################################################################################################################################################## | |
#def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
async def save_to_chroma(chunks: list[Document], name: str, tables: list[Document]): | |
CHROMA_PATH = f"./VectorDB/chroma_{name}" | |
TABLE_PATH = f"./TableDB/chroma_{name}" | |
if os.path.exists(CHROMA_PATH): | |
shutil.rmtree(CHROMA_PATH) | |
if os.path.exists(TABLE_PATH): | |
shutil.rmtree(TABLE_PATH) | |
try: | |
# Load the embedding model | |
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) | |
#embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
print("Creating document vector database...") | |
db =Chroma.from_documents( | |
documents=chunks, | |
embedding=embedding_function, | |
persist_directory=CHROMA_PATH, | |
) | |
print("Persisting the document database...") | |
db.persist() | |
print("Document database successfully saved.") | |
# Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
if tables !=[]: | |
print("Creating table vector database...") | |
tdb =Chroma.from_documents( | |
documents=tables, | |
embedding=embedding_function, | |
persist_directory=TABLE_PATH, | |
) | |
print("Persisting the table database...") | |
db.persist() | |
print("Table database successfully saved.") | |
else: | |
tdb = None | |
return db, tdb | |
#return db | |
except Exception as e: | |
print("Error while saving to Chroma:", e) | |
return None | |
######################################################################################################################################################## | |
####----------------------------------------------------------- Updating Existing Data in Vector DB -----------------------------------------------#### | |
######################################################################################################################################################## | |
# adding document to Existing db | |
async def add_document_to_existing_db(new_chunks: list[Document], db_name: str,tables: list[Document]): | |
CHROMA_PATH = f"./VectorDB/{db_name}" | |
TABLE_PATH = f"./TableDB/{db_name}" | |
if not os.path.exists(CHROMA_PATH): | |
print(f"Database '{db_name}' does not exist. Please create it first.") | |
return | |
try: | |
# Load the embedding model | |
embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2",show_progress=True) | |
#embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
# Create Chroma DB for documents using from_documents [NOTE: Some of the data is converted to string because int and float show null if added] | |
print("Creating document vector database...") | |
db = Chroma(persist_directory=CHROMA_PATH, embedding_function=embedding_function) | |
# db =Chroma.from_documents( | |
# documents=new_chunks, | |
# embedding=embedding_function, | |
# persist_directory=CHROMA_PATH, | |
# ) | |
print("Persisting the document database...") | |
db.add_documents(new_chunks) | |
db.persist() | |
print("Document database successfully saved.") | |
# Create Chroma DB for tables if available [NOTE: Some of the data is converted to string because int and float show null if added] | |
if tables !=[]: | |
print("Creating table vector database...") | |
if not os.path.exists(TABLE_PATH): | |
print(f"Database '{db_name}' does not exist. Lets create it first.") | |
print("Persisting the table database...") | |
tdb =Chroma.from_documents( | |
documents=tables, | |
embedding=embedding_function, | |
persist_directory=TABLE_PATH, | |
) | |
else: | |
tdb = Chroma(persist_directory=TABLE_PATH, embedding_function=embedding_function) | |
print("Persisting the table database...") | |
db.add_documents(tables) | |
db.persist() | |
print("Table database successfully saved.") | |
else: | |
tdb = None | |
return db, tdb | |
#return db | |
except Exception as e: | |
print("Error while saving to Chroma:", e) | |
return None | |
#delete chunks by logics | |
def delete_chunks_by_source(chroma_path, source_to_delete): | |
if not os.path.exists(chroma_path): | |
print(f"Database at path '{chroma_path}' does not exist.") | |
return | |
try: | |
#embedding_function = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
embedding_function = HuggingFaceEmbeddings(model_name="mixedbread-ai/mxbai-embed-large-v1") | |
db = Chroma(persist_directory=chroma_path, embedding_function=embedding_function) | |
print(f"Retrieving all metadata to identify chunks with source '{source_to_delete}'...") | |
metadata_list = db.get()["metadatas"] | |
# Identify indices of chunks to delete | |
indices_to_delete = [ | |
idx for idx, metadata in enumerate(metadata_list) if metadata.get("source") == source_to_delete | |
] | |
if not indices_to_delete: | |
print(f"No chunks found with source '{source_to_delete}'.") | |
return | |
print(f"Deleting {len(indices_to_delete)} chunks with source '{source_to_delete}'...") | |
db.delete(indices=indices_to_delete) | |
db.persist() | |
print("Chunks deleted and database updated successfully.") | |
except Exception as e: | |
print(f"Error while deleting chunks by source: {e}") | |
######################################################################################################################################################## | |
####-----------------------------------------------Combine Process of upload, Chunk and Store (FOR NEW DOC)----------------------------------------#### | |
######################################################################################################################################################## | |
# update a data store | |
async def update_data_store(file_path, db_name): | |
CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
try: | |
documents,processed_documents,table_document = load_document(file_path) | |
#grouped_document,document = load_document(file_path) | |
print("Documents loaded successfully.") | |
except Exception as e: | |
print(f"Error loading documents: {e}") | |
return | |
try: | |
chunks = split_text(documents) | |
print(f"Text split into {len(chunks)} chunks.") | |
except Exception as e: | |
print(f"Error splitting text: {e}") | |
return | |
try: | |
await add_document_to_existing_db(chunks, db_name, table_document) | |
#await asyncio.run(save_to_chroma(chunks, db_name,table_document)) | |
print(f"Data saved to Chroma for database {db_name}.") | |
except Exception as e: | |
print(f"Error saving to Chroma: {e}") | |
return | |
######################################################################################################################################################## | |
####------------------------------------------------------- Combine Process of Load, Chunk and Store ----------------------------------------------#### | |
######################################################################################################################################################## | |
async def generate_data_store(file_path, db_name): | |
CHROMA_PATH = f"./VectorDB/chroma_{db_name}" | |
print(f"Filepath ===> {file_path} DB Name ====> {db_name}") | |
try: | |
documents,processed_documents,table_document = load_document(file_path) | |
#grouped_document,document = load_document(file_path) | |
print("Documents loaded successfully.") | |
except Exception as e: | |
print(f"Error loading documents: {e}") | |
return | |
try: | |
chunks = split_text(documents) | |
print(f"Text split into {len(chunks)} chunks.") | |
except Exception as e: | |
print(f"Error splitting text: {e}") | |
return | |
try: | |
await save_to_chroma(chunks, db_name, table_document) | |
#await asyncio.run(save_to_chroma(chunks, db_name,table_document)) | |
print(f"Data saved to Chroma for database {db_name}.") | |
except Exception as e: | |
print(f"Error saving to Chroma: {e}") | |
return | |
######################################################################################################################################################## | |
####-------------------------------------------------------------------- Token counter -----------------------------------------------------------#### | |
######################################################################################################################################################## | |
def approximate_bpe_token_counter(text): | |
# Split on spaces, punctuation, and common subword patterns | |
tokens = re.findall(r"\w+|[^\w\s]", text, re.UNICODE) | |
return len(tokens) | |