anindya-hf-2002's picture
Upload 19 files
db17bc0 verified
from src.data_processing.loader import MultiFormatDocumentLoader
from src.data_processing.chunker import SDPMChunker, BGEM3Embeddings
import pandas as pd
from typing import List, Dict, Any
from pinecone import Pinecone, ServerlessSpec
import time
from tqdm import tqdm
from dotenv import load_dotenv
import os
load_dotenv()
# API Keys
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY')
embedding_model = BGEM3Embeddings(model_name="BAAI/bge-m3")
def load_documents(file_paths: List[str], output_path='./data/output.md'):
"""
Load documents from multiple sources and combine them into a single markdown file
"""
loader = MultiFormatDocumentLoader(
file_paths=file_paths,
enable_ocr=False,
enable_tables=True
)
# Append all documents to the markdown file
with open(output_path, 'w') as f:
for doc in loader.lazy_load():
# Add metadata as YAML frontmatter
f.write('---\n')
for key, value in doc.metadata.items():
f.write(f'{key}: {value}\n')
f.write('---\n\n')
f.write(doc.page_content)
f.write('\n\n')
return output_path
def process_chunks(markdown_path: str, chunk_size: int = 256,
threshold: float = 0.7, skip_window: int = 2):
"""
Process the markdown file into chunks and prepare for vector storage
"""
chunker = SDPMChunker(
embedding_model=embedding_model,
chunk_size=chunk_size,
threshold=threshold,
skip_window=skip_window
)
# Read the markdown file
with open(markdown_path, 'r') as file:
text = file.read()
# Generate chunks
chunks = chunker.chunk(text)
# Prepare data for Parquet
processed_chunks = []
for chunk in chunks:
processed_chunks.append({
'text': chunk.text,
'token_count': chunk.token_count,
'start_index': chunk.start_index,
'end_index': chunk.end_index,
'num_sentences': len(chunk.sentences),
})
return processed_chunks
def save_to_parquet(chunks: List[Dict[str, Any]], output_path='./data/chunks.parquet'):
"""
Save processed chunks to a Parquet file
"""
df = pd.DataFrame(chunks)
print(f"Saving to Parquet: {output_path}")
df.to_parquet(output_path)
print(f"Saved to Parquet: {output_path}")
return output_path
class PineconeRetriever:
def __init__(
self,
pinecone_client: Pinecone,
index_name: str,
namespace: str,
embedding_generator: BGEM3Embeddings
):
"""Initialize the retriever with Pinecone client and embedding generator.
Args:
pinecone_client: Initialized Pinecone client
index_name: Name of the Pinecone index
namespace: Namespace in the index
embedding_generator: BGEM3Embeddings instance
"""
self.pinecone = pinecone_client
self.index = self.pinecone.Index(index_name)
self.namespace = namespace
self.embedding_generator = embedding_generator
def invoke(self, question: str, top_k: int = 5):
"""Retrieve similar documents for a question.
Args:
question: Query string
top_k: Number of results to return
Returns:
List of dictionaries containing retrieved documents
"""
# Generate embedding for the question
question_embedding = self.embedding_generator.embed(question)
question_embedding = question_embedding.tolist()
# Query Pinecone
results = self.index.query(
namespace=self.namespace,
vector=question_embedding,
top_k=top_k,
include_values=False,
include_metadata=True
)
# Format results
retrieved_docs = [
{"page_content": match.metadata["text"], "score": match.score}
for match in results.matches
]
return retrieved_docs
def ingest_data(
pc,
parquet_path: str,
text_column: str,
pinecone_client: Pinecone,
index_name= "vector-index",
namespace= "rag",
batch_size: int = 100
):
"""Ingest data from a Parquet file into Pinecone.
Args:
parquet_path: Path to the Parquet file
text_column: Name of the column containing text data
pinecone_client: Initialized Pinecone client
index_name: Name of the Pinecone index
namespace: Namespace in the index
batch_size: Batch size for processing
"""
# Read Parquet file
print(f"Reading Parquet file: {parquet_path}")
df = pd.read_parquet(parquet_path)
print(f"Total records: {len(df)}")
# Create or get index
if not pinecone_client.has_index(index_name):
pinecone_client.create_index(
name=index_name,
dimension=1024, # BGE-M3 dimension
metric="cosine",
spec=ServerlessSpec(
cloud='aws',
region='us-east-1'
)
)
# Wait for index to be ready
while not pinecone_client.describe_index(index_name).status['ready']:
time.sleep(1)
index = pinecone_client.Index(index_name)
# Process in batches
for i in tqdm(range(0, len(df), batch_size)):
batch_df = df.iloc[i:i+batch_size]
# Generate embeddings for batch
texts = batch_df[text_column].tolist()
embeddings = embedding_model.embed_batch(texts)
print(f"embeddings for batch: {i}")
# Prepare records for upsert
records = []
for idx, (_, row) in enumerate(batch_df.iterrows()):
records.append({
"id": str(row.name), # Using DataFrame index as ID
"values": embeddings[idx],
"metadata": {"text": row[text_column]}
})
# Upsert to Pinecone
index.upsert(vectors=records, namespace=namespace)
# Small delay to handle rate limits
time.sleep(0.5)
def get_retriever(
pinecone_client: Pinecone,
index_name= "vector-index",
namespace= "rag"
):
"""Create and return a PineconeRetriever instance.
Args:
pinecone_client: Initialized Pinecone client
index_name: Name of the Pinecone index
namespace: Namespace in the index
Returns:
Configured PineconeRetriever instance
"""
return PineconeRetriever(
pinecone_client=pinecone_client,
index_name=index_name,
namespace=namespace,
embedding_generator=embedding_model
)
def main():
# Initialize Pinecone client
pc = Pinecone(api_key=PINECONE_API_KEY)
# Define input files
file_paths=[
# './data/2404.19756v1.pdf',
# './data/OD429347375590223100.pdf',
# './data/Project Report Format.docx',
'./data/UNIT 2 GENDER BASED VIOLENCE.pptx'
]
# Process pipeline
try:
# Step 1: Load and combine documents
# print("Loading documents...")
# markdown_path = load_documents(file_paths)
# # Step 2: Process into chunks with embeddings
# print("Processing chunks...")
# chunks = process_chunks(markdown_path)
# # Step 3: Save to Parquet
# print("Saving to Parquet...")
# parquet_path = save_to_parquet(chunks)
# # Step 4: Ingest into Pinecone
# print("Ingesting into Pinecone...")
# ingest_data(
# pc,
# parquet_path=parquet_path,
# text_column="text",
# pinecone_client=pc,
# )
# Step 5: Test retrieval
print("\nTesting retrieval...")
retriever = get_retriever(
pinecone_client=pc,
index_name="vector-index",
namespace="rag"
)
results = retriever.invoke(
question="describe the gender based violence",
top_k=5
)
for i, doc in enumerate(results, 1):
print(f"\nResult {i}:")
print(f"Content: {doc['page_content']}...")
print(f"Score: {doc['score']}")
except Exception as e:
print(f"Error in pipeline: {str(e)}")
if __name__ == "__main__":
main()