|
from src.data_processing.loader import MultiFormatDocumentLoader |
|
from src.data_processing.chunker import SDPMChunker, BGEM3Embeddings |
|
|
|
import pandas as pd |
|
from typing import List, Dict, Any |
|
from pinecone import Pinecone, ServerlessSpec |
|
import time |
|
from tqdm import tqdm |
|
from dotenv import load_dotenv |
|
import os |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') |
|
|
|
embedding_model = BGEM3Embeddings(model_name="BAAI/bge-m3") |
|
|
|
|
|
def load_documents(file_paths: List[str], output_path='./data/output.md'): |
|
""" |
|
Load documents from multiple sources and combine them into a single markdown file |
|
""" |
|
loader = MultiFormatDocumentLoader( |
|
file_paths=file_paths, |
|
enable_ocr=False, |
|
enable_tables=True |
|
) |
|
|
|
|
|
with open(output_path, 'w') as f: |
|
for doc in loader.lazy_load(): |
|
|
|
f.write('---\n') |
|
for key, value in doc.metadata.items(): |
|
f.write(f'{key}: {value}\n') |
|
f.write('---\n\n') |
|
f.write(doc.page_content) |
|
f.write('\n\n') |
|
|
|
return output_path |
|
|
|
def process_chunks(markdown_path: str, chunk_size: int = 256, |
|
threshold: float = 0.7, skip_window: int = 2): |
|
""" |
|
Process the markdown file into chunks and prepare for vector storage |
|
""" |
|
chunker = SDPMChunker( |
|
embedding_model=embedding_model, |
|
chunk_size=chunk_size, |
|
threshold=threshold, |
|
skip_window=skip_window |
|
) |
|
|
|
|
|
with open(markdown_path, 'r') as file: |
|
text = file.read() |
|
|
|
|
|
chunks = chunker.chunk(text) |
|
|
|
|
|
processed_chunks = [] |
|
for chunk in chunks: |
|
|
|
processed_chunks.append({ |
|
'text': chunk.text, |
|
'token_count': chunk.token_count, |
|
'start_index': chunk.start_index, |
|
'end_index': chunk.end_index, |
|
'num_sentences': len(chunk.sentences), |
|
}) |
|
|
|
return processed_chunks |
|
|
|
def save_to_parquet(chunks: List[Dict[str, Any]], output_path='./data/chunks.parquet'): |
|
""" |
|
Save processed chunks to a Parquet file |
|
""" |
|
df = pd.DataFrame(chunks) |
|
print(f"Saving to Parquet: {output_path}") |
|
df.to_parquet(output_path) |
|
print(f"Saved to Parquet: {output_path}") |
|
return output_path |
|
|
|
|
|
class PineconeRetriever: |
|
def __init__( |
|
self, |
|
pinecone_client: Pinecone, |
|
index_name: str, |
|
namespace: str, |
|
embedding_generator: BGEM3Embeddings |
|
): |
|
"""Initialize the retriever with Pinecone client and embedding generator. |
|
|
|
Args: |
|
pinecone_client: Initialized Pinecone client |
|
index_name: Name of the Pinecone index |
|
namespace: Namespace in the index |
|
embedding_generator: BGEM3Embeddings instance |
|
""" |
|
self.pinecone = pinecone_client |
|
self.index = self.pinecone.Index(index_name) |
|
self.namespace = namespace |
|
self.embedding_generator = embedding_generator |
|
|
|
def invoke(self, question: str, top_k: int = 5): |
|
"""Retrieve similar documents for a question. |
|
|
|
Args: |
|
question: Query string |
|
top_k: Number of results to return |
|
|
|
Returns: |
|
List of dictionaries containing retrieved documents |
|
""" |
|
|
|
question_embedding = self.embedding_generator.embed(question) |
|
question_embedding = question_embedding.tolist() |
|
|
|
results = self.index.query( |
|
namespace=self.namespace, |
|
vector=question_embedding, |
|
top_k=top_k, |
|
include_values=False, |
|
include_metadata=True |
|
) |
|
|
|
|
|
retrieved_docs = [ |
|
{"page_content": match.metadata["text"], "score": match.score} |
|
for match in results.matches |
|
] |
|
|
|
return retrieved_docs |
|
|
|
def ingest_data( |
|
pc, |
|
parquet_path: str, |
|
text_column: str, |
|
pinecone_client: Pinecone, |
|
index_name= "vector-index", |
|
namespace= "rag", |
|
batch_size: int = 100 |
|
): |
|
"""Ingest data from a Parquet file into Pinecone. |
|
|
|
Args: |
|
parquet_path: Path to the Parquet file |
|
text_column: Name of the column containing text data |
|
pinecone_client: Initialized Pinecone client |
|
index_name: Name of the Pinecone index |
|
namespace: Namespace in the index |
|
batch_size: Batch size for processing |
|
""" |
|
|
|
print(f"Reading Parquet file: {parquet_path}") |
|
df = pd.read_parquet(parquet_path) |
|
print(f"Total records: {len(df)}") |
|
|
|
if not pinecone_client.has_index(index_name): |
|
pinecone_client.create_index( |
|
name=index_name, |
|
dimension=1024, |
|
metric="cosine", |
|
spec=ServerlessSpec( |
|
cloud='aws', |
|
region='us-east-1' |
|
) |
|
) |
|
|
|
|
|
while not pinecone_client.describe_index(index_name).status['ready']: |
|
time.sleep(1) |
|
|
|
index = pinecone_client.Index(index_name) |
|
|
|
|
|
for i in tqdm(range(0, len(df), batch_size)): |
|
batch_df = df.iloc[i:i+batch_size] |
|
|
|
|
|
texts = batch_df[text_column].tolist() |
|
embeddings = embedding_model.embed_batch(texts) |
|
print(f"embeddings for batch: {i}") |
|
|
|
records = [] |
|
for idx, (_, row) in enumerate(batch_df.iterrows()): |
|
records.append({ |
|
"id": str(row.name), |
|
"values": embeddings[idx], |
|
"metadata": {"text": row[text_column]} |
|
}) |
|
|
|
|
|
index.upsert(vectors=records, namespace=namespace) |
|
|
|
|
|
time.sleep(0.5) |
|
|
|
def get_retriever( |
|
pinecone_client: Pinecone, |
|
index_name= "vector-index", |
|
namespace= "rag" |
|
): |
|
"""Create and return a PineconeRetriever instance. |
|
|
|
Args: |
|
pinecone_client: Initialized Pinecone client |
|
index_name: Name of the Pinecone index |
|
namespace: Namespace in the index |
|
|
|
Returns: |
|
Configured PineconeRetriever instance |
|
""" |
|
return PineconeRetriever( |
|
pinecone_client=pinecone_client, |
|
index_name=index_name, |
|
namespace=namespace, |
|
embedding_generator=embedding_model |
|
) |
|
|
|
def main(): |
|
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
|
|
|
file_paths=[ |
|
|
|
|
|
|
|
'./data/UNIT 2 GENDER BASED VIOLENCE.pptx' |
|
] |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\nTesting retrieval...") |
|
retriever = get_retriever( |
|
pinecone_client=pc, |
|
index_name="vector-index", |
|
namespace="rag" |
|
) |
|
|
|
results = retriever.invoke( |
|
question="describe the gender based violence", |
|
top_k=5 |
|
) |
|
|
|
for i, doc in enumerate(results, 1): |
|
print(f"\nResult {i}:") |
|
print(f"Content: {doc['page_content']}...") |
|
print(f"Score: {doc['score']}") |
|
|
|
except Exception as e: |
|
print(f"Error in pipeline: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
main() |