Chris4K's picture
Update app.py
9efbb97 verified
raw
history blame
10.3 kB
import os
import time
import pdfplumber
import docx
import nltk
import gradio as gr
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.embeddings import (
OpenAIEmbeddings,
CohereEmbeddings,
)
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS, Chroma
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
TokenTextSplitter,
)
from typing import List, Dict, Any
import pandas as pd
nltk.download('punkt', quiet=True)
FILES_DIR = './files'
MODELS = {
'HuggingFace': {
'e5-base-de': "danielheinz/e5-base-sts-en-de",
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2",
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2",
'gte-large': "gte-large",
'gbert-base': "gbert-base"
},
'OpenAI': {
'text-embedding-ada-002': "text-embedding-ada-002"
},
'Cohere': {
'embed-multilingual-v2.0': "embed-multilingual-v2.0"
}
}
class FileHandler:
@staticmethod
def extract_text(file_path):
ext = os.path.splitext(file_path)[-1].lower()
if ext == '.pdf':
return FileHandler._extract_from_pdf(file_path)
elif ext == '.docx':
return FileHandler._extract_from_docx(file_path)
elif ext == '.txt':
return FileHandler._extract_from_txt(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
@staticmethod
def _extract_from_pdf(file_path):
with pdfplumber.open(file_path) as pdf:
return ' '.join([page.extract_text() for page in pdf.pages])
@staticmethod
def _extract_from_docx(file_path):
doc = docx.Document(file_path)
return ' '.join([para.text for para in doc.paragraphs])
@staticmethod
def _extract_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def get_embedding_model(model_type, model_name):
if model_type == 'HuggingFace':
return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name])
elif model_type == 'OpenAI':
return OpenAIEmbeddings(model=MODELS[model_type][model_name])
elif model_type == 'Cohere':
return CohereEmbeddings(model=MODELS[model_type][model_name])
else:
raise ValueError(f"Unsupported model type: {model_type}")
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None):
if split_strategy == 'token':
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size)
elif split_strategy == 'recursive':
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap_size,
separators=custom_separators or ["\n\n", "\n", " ", ""]
)
else:
raise ValueError(f"Unsupported split strategy: {split_strategy}")
def get_vector_store(store_type, texts, embedding_model):
if store_type == 'FAISS':
return FAISS.from_texts(texts, embedding_model)
elif store_type == 'Chroma':
return Chroma.from_texts(texts, embedding_model)
else:
raise ValueError(f"Unsupported vector store type: {store_type}")
def get_retriever(vector_store, search_type, search_kwargs=None):
if search_type == 'similarity':
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs)
elif search_type == 'mmr':
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs)
else:
raise ValueError(f"Unsupported search type: {search_type}")
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators):
if file_path:
text = FileHandler.extract_text(file_path)
else:
text = ""
for file in os.listdir(FILES_DIR):
file_path = os.path.join(FILES_DIR, file)
text += FileHandler.extract_text(file_path)
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators)
chunks = text_splitter.split_text(text)
embedding_model = get_embedding_model(model_type, model_name)
return chunks, embedding_model, len(text.split())
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k):
vector_store = get_vector_store(vector_store_type, chunks, embedding_model)
retriever = get_retriever(vector_store, search_type, {"k": top_k})
start_time = time.time()
results = retriever.get_relevant_documents(query)
end_time = time.time()
return results, end_time - start_time, vector_store
def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model):
return {
"num_results": len(results),
"avg_content_length": sum(len(doc.page_content) for doc in results) / len(results) if results else 0,
"search_time": search_time,
"vector_store_size": vector_store._index.ntotal if hasattr(vector_store, '_index') else "N/A",
"num_documents": len(vector_store.docstore._dict),
"num_tokens": num_tokens,
"embedding_vocab_size": embedding_model.client.get_vocab_size() if hasattr(embedding_model, 'client') and hasattr(embedding_model.client, 'get_vocab_size') else "N/A"
}
def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k):
all_results = []
all_stats = []
settings = {
"split_strategy": split_strategy,
"chunk_size": chunk_size,
"overlap_size": overlap_size,
"custom_separators": custom_separators,
"vector_store_type": vector_store_type,
"search_type": search_type,
"top_k": top_k
}
for model_type, model_name in zip(model_types, model_names):
chunks, embedding_model, num_tokens = process_files(
file.name if file else None,
model_type,
model_name,
split_strategy,
chunk_size,
overlap_size,
custom_separators.split(',') if custom_separators else None
)
results, search_time, vector_store = search_embeddings(
chunks,
embedding_model,
vector_store_type,
search_type,
query,
top_k
)
stats = calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model)
stats["model"] = f"{model_type} - {model_name}"
stats.update(settings)
formatted_results = format_results(results, stats)
all_results.extend(formatted_results)
all_stats.append(stats)
results_df = pd.DataFrame(all_results)
stats_df = pd.DataFrame(all_stats)
return results_df, stats_df
def format_results(results, stats):
formatted_results = []
for doc in results:
result = {
"Content": doc.page_content,
"Model": stats["model"],
**doc.metadata,
**{k: v for k, v in stats.items() if k not in ["model"]}
}
formatted_results.append(result)
return formatted_results
# Gradio interface
iface = gr.Interface(
fn=compare_embeddings,
inputs=[
gr.File(label="Upload File (Optional)"),
gr.Textbox(label="Search Query"),
gr.CheckboxGroup(choices=list(MODELS.keys()), label="Embedding Model Types", value=["HuggingFace"]),
gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models], label="Embedding Models", value=["e5-base-de"]),
gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"),
gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"),
gr.Slider(0, 100, step=10, value=50, label="Overlap Size"),
gr.Textbox(label="Custom Split Separators (comma-separated, optional)"),
gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"),
gr.Radio(choices=["similarity", "mmr"], label="Search Type", value="similarity"),
gr.Slider(1, 10, step=1, value=5, label="Top K")
],
outputs=[
gr.Dataframe(label="Results"),
gr.Dataframe(label="Statistics")
],
title="Embedding Comparison Tool",
description="Compare different embedding models and retrieval strategies",
examples=[
["example.pdf", "What is machine learning?", ["HuggingFace"], ["e5-base-de"], "recursive", 500, 50, "", "FAISS", "similarity", 5]
],
allow_flagging="never"
)
tutorial_md = """
# Embedding Comparison Tool Tutorial
This tool allows you to compare different embedding models and retrieval strategies for document search. Here's how to use it:
1. **File Upload**: Optionally upload a file (PDF, DOCX, or TXT) or leave it empty to use files in the `./files` directory.
2. **Search Query**: Enter the search query you want to use for retrieving relevant documents.
3. **Embedding Model Types**: Select one or more embedding model types (HuggingFace, OpenAI, Cohere).
4. **Embedding Models**: Choose specific models for each selected model type.
5. **Split Strategy**: Select either 'token' or 'recursive' for text splitting.
6. **Chunk Size**: Set the size of text chunks (100-1000).
7. **Overlap Size**: Set the overlap between chunks (0-100).
8. **Custom Split Separators**: Optionally enter custom separators for text splitting.
9. **Vector Store Type**: Choose between FAISS and Chroma for storing vectors.
10. **Search Type**: Select 'similarity' or 'mmr' (Maximum Marginal Relevance) search.
11. **Top K**: Set the number of top results to retrieve (1-10).
After setting these parameters, click "Submit" to run the comparison. The results will be displayed in two tables:
- **Results**: Shows the retrieved document contents and metadata for each model.
- **Statistics**: Provides performance metrics and settings for each model.
You can download the results as CSV files for further analysis.
Experiment with different settings to find the best combination for your specific use case!
"""
iface = gr.TabbedInterface(
[iface, gr.Markdown(tutorial_md)],
["Embedding Comparison", "Tutorial"]
)
iface.launch(share=True)