Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
from langchain_community.document_loaders import PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import HumanMessage | |
from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
from langchain_voyageai import VoyageAIEmbeddings | |
from langchain_pinecone import PineconeVectorStore | |
from langchain.prompts import PromptTemplate | |
from pinecone import Pinecone | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import openai | |
import gradio as gr | |
# Load API keys | |
load_dotenv() | |
openai.api_key = os.environ.get("OPENAI_API_KEY") | |
pinecone_api_key = os.environ.get("PINECONE_API_KEY") | |
voyage_api_key = os.environ.get("VOYAGE_API_KEY") | |
# Initialize Pinecone | |
pc = Pinecone(api_key=pinecone_api_key) | |
embeddings = VoyageAIEmbeddings(voyage_api_key=voyage_api_key, model="voyage-law-2") | |
# πΉ Query Expansion using GPT-4 | |
def expand_query(query): | |
llm = ChatOpenAI(model="gpt-4", openai_api_key=openai.api_key, temperature=0.3) | |
prompt = f"Rewrite this vague query into a more specific one:\nQuery: {query}\nSpecific Query:" | |
refined_query = llm([HumanMessage(content=prompt)]).content.strip() | |
return refined_query if refined_query else query | |
# πΉ Hybrid Search (TF-IDF + Semantic Retrieval) | |
def hybrid_search(query, user_groups, index_name="briefmeta", min_score=0.01, fetch_k=50): | |
vector_store = PineconeVectorStore(index_name=index_name, embedding=embeddings) | |
semantic_results = vector_store.max_marginal_relevance_search(query, k=10, fetch_k=fetch_k) | |
all_texts = [doc.page_content for doc in semantic_results] | |
vectorizer = TfidfVectorizer(stop_words="english") | |
tfidf_matrix = vectorizer.fit_transform(all_texts) | |
query_tfidf = vectorizer.transform([query]) | |
keyword_scores = cosine_similarity(query_tfidf, tfidf_matrix).flatten() | |
combined_results, seen_ids = [], set() | |
for i, doc in enumerate(semantic_results): | |
doc_id, doc_groups = doc.metadata.get("id"), doc.metadata.get("groups", []) | |
semantic_score = float(doc.metadata.get("score", 0)) | |
keyword_score = float(keyword_scores[i]) | |
final_score = 0.7 * semantic_score + 0.3 * keyword_score # Hybrid score | |
if doc_id not in seen_ids and any(group in user_groups for group in doc_groups) and final_score > min_score: | |
seen_ids.add(doc_id) | |
doc.metadata["final_score"] = final_score | |
combined_results.append(doc) | |
combined_results.sort(key=lambda x: x.metadata["final_score"], reverse=True) | |
return [ | |
{ | |
"doc_id": doc.metadata.get("doc_id", "N/A"), | |
"chunk_id": doc.metadata.get("id", "N/A"), | |
"title": doc.metadata.get("source", "N/A"), | |
"text": doc.page_content, | |
"page_number": str(doc.metadata.get("page_number", "N/A")), | |
"score": str(doc.metadata.get("final_score", "N/A")), | |
} | |
for doc in combined_results | |
] | |
# πΉ Metadata-Weighted Reranking | |
def rerank(query, context): | |
reranker = pc.inference.rerank( | |
model="bge-reranker-v2-m3", query=query, documents=context, top_n=10, return_documents=True | |
) | |
final_reranked = [] | |
for entry in reranker.data: | |
doc, score = entry["document"], float(entry["score"]) | |
citation_boost = 1.2 if "high_citations" in doc.get("tags", []) else 1.0 | |
recency_boost = 1.1 if "recent_upload" in doc.get("tags", []) else 1.0 | |
final_score = score * citation_boost * recency_boost | |
doc["final_score"] = final_score | |
final_reranked.append(doc) | |
final_reranked.sort(key=lambda x: x["final_score"], reverse=True) | |
return final_reranked | |
# πΉ Intelligent Search Summary Generator | |
def generate_search_summary(search_results, query): | |
if not search_results: | |
return "No relevant documents found. Try refining your query." | |
num_results = len(search_results) | |
doc_titles = [doc.get("title", "Unknown Document") for doc in search_results] | |
doc_pages = [doc.get("page_number", "N/A") for doc in search_results] | |
relevance_scores = [float(doc.get("score", 0)) for doc in search_results] | |
summary_prompt = f""" | |
Generate a concise 1-3 sentence summary: | |
- User Query: "{query}" | |
- Matching Documents: {num_results} found | |
- Titles: {", ".join(set(doc_titles))} | |
- Pages Referenced: {", ".join(set(doc_pages))} | |
- Relevance Scores (0-1): {relevance_scores} | |
Provide a clear, user-friendly summary with an action suggestion. | |
""" | |
llm = ChatOpenAI(model="gpt-4", openai_api_key=openai.api_key, temperature=0.5) | |
summary = llm([HumanMessage(content=summary_prompt)]).content.strip() | |
return summary if summary else "No intelligent summary available." | |
# πΉ LLM-based Answer Generation | |
def generate_output(context, query): | |
if not context.strip(): | |
return "No relevant information found. Try refining your query." | |
llm = ChatOpenAI(model="gpt-4", openai_api_key=openai.api_key, temperature=0.5) | |
prompt_template = PromptTemplate( | |
template="Use the following context to answer the question:\nContext: {context}\nQuestion: {question}\nAnswer:", | |
input_variables=["context", "question"], | |
) | |
prompt = prompt_template.format(context=context, question=query) | |
response = llm([HumanMessage(content=prompt)]).content.strip() | |
return response if response else "No relevant answer found." | |
# πΉ Full Workflow | |
def complete_workflow(query, user_groups, index_name="briefmeta"): | |
try: | |
refined_query = expand_query(query) | |
context_data = hybrid_search(refined_query, user_groups) | |
reranked_results = rerank(refined_query, context_data) | |
context_data = [ | |
{ | |
'chunk_id': doc["chunk_id"], | |
'doc_id': doc["doc_id"], | |
'title': doc["title"], | |
'text': doc["text"], | |
'page_number': str(doc["page_number"]), | |
'score': str(doc["final_score"]) | |
} | |
for doc in reranked_results | |
] | |
document_titles = list({os.path.basename(doc["title"]) for doc in context_data}) | |
formatted_titles = " " + "\n".join(document_titles) | |
intelligent_search_summary = generate_search_summary(context_data, refined_query) | |
results = { | |
"results": [ | |
{ | |
"natural_language_output": generate_output(doc["text"], refined_query), | |
"chunk_id": doc["chunk_id"], | |
"document_id": doc["doc_id"], | |
"title": doc["title"], | |
"text": doc["text"], | |
"page_number": doc["page_number"], | |
"score": doc["score"], | |
} | |
for doc in context_data | |
], | |
"total_results": len(context_data), | |
"intelligent_search_summary": intelligent_search_summary | |
} | |
return results, formatted_titles, intelligent_search_summary | |
except Exception as e: | |
return {"results": [], "total_results": 0, "intelligent_search_summary": "Error generating summary."}, f"Error in workflow: {str(e)}" | |
# πΉ Gradio UI | |
def gradio_app(): | |
with gr.Blocks() as app: | |
gr.Markdown("### π Intelligent Document Search Prototype-v0.2") | |
user_query = gr.Textbox(label="π Enter Search Query") | |
user_groups = gr.Textbox(label="π₯ User Groups", placeholder="e.g., ['KarthikPersonal']") | |
index_name = gr.Textbox(label="π Index Name", placeholder="Default: briefmeta") | |
search_btn = gr.Button("π Search") | |
search_summary = gr.Textbox(label="π Intelligent Search Summary", interactive=False) | |
result_output = gr.JSON(label="π Search Results") | |
titles_output = gr.Textbox(label="π Retrieved Document Titles", interactive=False) | |
search_btn.click(complete_workflow, inputs=[user_query, user_groups, index_name], outputs=[result_output, titles_output, search_summary]) | |
return app | |
# Launch the App | |
gradio_app().launch() | |