Jatin Mehra
Refactor and reorganize codebase for improved maintainability and clarity
ba907cd
import os
from langchain_community.document_loaders import PyMuPDFLoader
import faiss
from langchain_groq import ChatGroq
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory
from sentence_transformers import SentenceTransformer
import dotenv
from langchain.tools import tool
import traceback
dotenv.load_dotenv()
# Initialize LLM and tools globally
def model_selection(model_name):
llm = ChatGroq(
model=model_name,
api_key=os.getenv("GROQ_API_KEY"),
temperature=0.1, # Lower temperature for more consistent tool calling
max_tokens=2048 # Reasonable limit for responses
)
return llm
# Create tools with better error handling
def create_tavily_tool():
try:
return TavilySearchResults(
max_results=5,
search_depth="advanced",
include_answer=True,
include_raw_content=False
)
except Exception as e:
print(f"Warning: Could not create Tavily tool: {e}")
return None
# Initialize tools globally but with error handling
_tavily_tool = create_tavily_tool()
tools = [_tavily_tool] if _tavily_tool else []
# Note: Memory should be created per session, not globally
def estimate_tokens(text):
"""Estimate the number of tokens in a text (rough approximation)."""
return len(text) // 4
def process_pdf_file(file_path):
"""Load a PDF file and extract its text with metadata."""
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file {file_path} does not exist.")
loader = PyMuPDFLoader(file_path)
documents = loader.load()
return documents # Return list of Document objects with metadata
def chunk_text(documents, max_length=1000):
"""Split documents into chunks with metadata."""
chunks = []
for doc in documents:
text = doc.page_content
metadata = doc.metadata
paragraphs = text.split("\n\n")
current_chunk = ""
current_metadata = metadata.copy()
for paragraph in paragraphs:
# Skip very short paragraphs (less than 10 characters)
if len(paragraph.strip()) < 10:
continue
if estimate_tokens(current_chunk + paragraph) <= max_length // 4:
current_chunk += paragraph + "\n\n"
else:
# Only add chunks with meaningful content
if current_chunk.strip() and len(current_chunk.strip()) > 20:
chunks.append({"text": current_chunk.strip(), "metadata": current_metadata})
current_chunk = paragraph + "\n\n"
# Add the last chunk if it has meaningful content
if current_chunk.strip() and len(current_chunk.strip()) > 20:
chunks.append({"text": current_chunk.strip(), "metadata": current_metadata})
return chunks
def create_embeddings(chunks, model):
"""Create embeddings for a list of chunk texts."""
texts = [chunk["text"] for chunk in chunks]
embeddings = model.encode(texts, show_progress_bar=True, convert_to_tensor=True)
return embeddings.cpu().numpy(), chunks
def build_faiss_index(embeddings):
"""Build a FAISS HNSW index from embeddings for similarity search."""
dim = embeddings.shape[1]
index = faiss.IndexHNSWFlat(dim, 32) # 32 = number of neighbors in HNSW graph
index.hnsw.efConstruction = 200 # Higher = better quality, slower build
index.hnsw.efSearch = 50 # Higher = better accuracy, slower search
index.add(embeddings)
return index
def retrieve_similar_chunks(query, index, chunks_with_metadata, embedding_model, k=10, max_chunk_length=1000):
"""Retrieve top k similar chunks to the query from the FAISS index."""
query_embedding = embedding_model.encode([query], convert_to_tensor=True).cpu().numpy()
distances, indices = index.search(query_embedding, k)
# Ensure indices are within bounds and create mapping for correct distances
valid_results = []
for idx_pos, chunk_idx in enumerate(indices[0]):
if 0 <= chunk_idx < len(chunks_with_metadata):
chunk_text = chunks_with_metadata[chunk_idx]["text"][:max_chunk_length]
# Only include chunks with meaningful content
if chunk_text.strip(): # Skip empty chunks
valid_results.append((
chunk_text,
distances[0][idx_pos], # Use original position for correct distance
chunks_with_metadata[chunk_idx]["metadata"]
))
return valid_results
def create_vector_search_tool(faiss_index, document_chunks_with_metadata, embedding_model, k=3, max_chunk_length=1000):
@tool
def vector_database_search(query: str) -> str:
"""Search the uploaded PDF document for information related to the query.
Args:
query: The search query string to find relevant information in the document.
Returns:
A string containing relevant information found in the document.
"""
# Handle very short or empty queries
if not query or len(query.strip()) < 3:
return "Please provide a more specific search query with at least 3 characters."
try:
# Retrieve similar chunks using the provided session-specific components
similar_chunks_data = retrieve_similar_chunks(
query,
faiss_index,
document_chunks_with_metadata, # This is the list of dicts {text: ..., metadata: ...}
embedding_model,
k=k,
max_chunk_length=max_chunk_length
)
# Format the response
if not similar_chunks_data:
return "No relevant information found in the document for that query. Please try rephrasing your question or using different keywords."
# Filter out chunks with very high distance (low similarity)
filtered_chunks = [chunk for chunk in similar_chunks_data if chunk[1] < 1.5] # Adjust threshold as needed
if not filtered_chunks:
return "No sufficiently relevant information found in the document for that query. Please try rephrasing your question or using different keywords."
context = "\n\n---\n\n".join([chunk_text for chunk_text, _, _ in filtered_chunks])
return f"The following information was found in the document regarding '{query}':\n{context}"
except Exception as e:
print(f"Error in vector search tool: {e}")
return f"Error searching the document: {str(e)}"
return vector_database_search
def agentic_rag(llm, agent_specific_tools, query, context_chunks, memory, Use_Tavily=False):
# Validate inputs
if not query or not query.strip():
return {"output": "Please provide a valid question."}
if not agent_specific_tools:
print("Warning: No tools provided, using direct LLM response")
# Use direct LLM call without agent if no tools
fallback_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that answers questions about documents. Use the provided context to answer the user's question."),
("human", "Context: {context}\n\nQuestion: {input}")
])
try:
formatted_prompt = fallback_prompt.format_prompt(context="No context available", input=query).to_messages()
response = llm.invoke(formatted_prompt)
return {"output": response.content if hasattr(response, 'content') else str(response)}
except Exception as e:
print(f"Direct LLM call failed: {e}")
return {"output": "I'm sorry, I encountered an error processing your request."}
print(f"Available tools: {[tool.name for tool in agent_specific_tools]}")
# Sort chunks by relevance (lower distance = more relevant)
context_chunks = sorted(context_chunks, key=lambda x: x[1]) if context_chunks else []
context = ""
total_tokens = 0
max_tokens = 7000 # Leave room for prompt and response
# Filter out chunks with very high distance scores (low similarity)
relevant_chunks = [chunk for chunk in context_chunks if len(chunk) >= 3 and chunk[1] < 1.5]
for chunk, _, _ in relevant_chunks:
if chunk and chunk.strip(): # Ensure chunk has content
chunk_tokens = estimate_tokens(chunk)
if total_tokens + chunk_tokens <= max_tokens:
context += chunk + "\n\n"
total_tokens += chunk_tokens
else:
break
context = context.strip() if context else "No initial context provided from preliminary search."
print(f"Using context length: {len(context)} characters")
# Dynamically build the tool guidance for the prompt
# Tool names: 'vector_database_search', 'tavily_search_results_json'
has_document_search = any(t.name == "vector_database_search" for t in agent_specific_tools)
has_web_search = any(t.name == "tavily_search_results_json" for t in agent_specific_tools)
# Simplified tool guidance
tool_instructions = ""
if has_document_search:
tool_instructions += "Use vector_database_search to find information in the uploaded document. "
if has_web_search:
tool_instructions += "Use tavily_search_results_json for web searches when document search is insufficient. "
if not tool_instructions:
tool_instructions = "Answer based on the provided context only. "
prompt = ChatPromptTemplate.from_messages([
("system", f"""You are a helpful AI assistant that answers questions about documents.
Context: {{context}}
Tools available: {tool_instructions}
Instructions:
- Use the provided context first
- If context is insufficient, use available tools to search for more information
- Provide clear, helpful answers
- If you cannot find an answer, say so clearly"""),
("human", "{input}"),
MessagesPlaceholder(variable_name="chat_history"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
try:
print(f"Creating agent with {len(agent_specific_tools)} tools")
# Validate that tools are properly formatted
for tool in agent_specific_tools:
print(f"Tool: {tool.name} - {type(tool)}")
# Ensure tool has required attributes
if not hasattr(tool, 'name') or not hasattr(tool, 'description'):
raise ValueError(f"Tool {tool} is missing required attributes")
agent = create_tool_calling_agent(llm, agent_specific_tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=agent_specific_tools,
memory=memory,
verbose=True,
handle_parsing_errors=True,
max_iterations=2, # Reduced further to prevent issues
return_intermediate_steps=False,
early_stopping_method="generate"
)
print(f"Invoking agent with query: '{query}' and context length: {len(context)} chars")
# Create input with simpler structure
agent_input = {
"input": query,
"context": context,
}
response_payload = agent_executor.invoke(agent_input)
print(f"Agent response keys: {response_payload.keys() if response_payload else 'None'}")
# Extract and validate the output
agent_output = response_payload.get("output", "") if response_payload else ""
print(f"Agent output length: {len(agent_output)} chars")
print(f"Agent output preview: {agent_output[:100]}..." if len(agent_output) > 100 else f"Agent output: {agent_output}")
# Validate response quality
if not agent_output or len(agent_output.strip()) < 10:
print(f"Warning: Agent returned insufficient response (length: {len(agent_output)}), using fallback")
raise ValueError("Insufficient response from agent")
# Check if response is just a prefix without content
problematic_prefixes = [
"Based on the Document,",
"According to a web search,",
"Based on the available information,",
"I need to",
"Let me"
]
stripped_output = agent_output.strip()
if any(stripped_output == prefix.strip() or stripped_output == prefix.strip() + "." for prefix in problematic_prefixes):
print(f"Warning: Agent returned only prefix without content: '{stripped_output}', using fallback")
raise ValueError("Agent returned incomplete response")
return response_payload
except Exception as e:
error_msg = str(e)
print(f"Error during agent execution: {error_msg} \nTraceback: {traceback.format_exc()}")
# Check if it's a specific Groq function calling error
if "Failed to call a function" in error_msg or "function" in error_msg.lower():
print("Detected Groq function calling error, trying simpler approach...")
# Try with a simpler agent setup or direct LLM call
try:
# First, try to use tools individually without agent framework
if agent_specific_tools:
print("Attempting manual tool usage...")
tool_results = []
# Try vector search first if available
vector_tool = next((t for t in agent_specific_tools if t.name == "vector_database_search"), None)
if vector_tool:
try:
search_result = vector_tool.run(query)
if search_result and "No relevant information" not in search_result:
tool_results.append(f"Document Search: {search_result}")
except Exception as tool_error:
print(f"Vector tool error: {tool_error}")
# Try web search if needed and available
if Use_Tavily:
web_tool = next((t for t in agent_specific_tools if t.name == "tavily_search_results_json"), None)
if web_tool:
try:
web_result = web_tool.run(query)
if web_result:
tool_results.append(f"Web Search: {web_result}")
except Exception as tool_error:
print(f"Web tool error: {tool_error}")
# Combine tool results with context
enhanced_context = context
if tool_results:
enhanced_context += "\n\nAdditional Information:\n" + "\n\n".join(tool_results)
# Use direct LLM call with enhanced context
direct_prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant. Use the provided context and information to answer the user's question clearly and completely."),
("human", "Context and Information: {context}\n\nQuestion: {input}")
])
formatted_prompt = direct_prompt.format_prompt(context=enhanced_context, input=query).to_messages()
response = llm.invoke(formatted_prompt)
direct_output = response.content if hasattr(response, 'content') else str(response)
print(f"Direct tool usage response length: {len(direct_output)} chars")
return {"output": direct_output}
except Exception as manual_error:
print(f"Manual tool usage also failed: {manual_error}")
print("Using fallback direct LLM response...")
fallback_prompt_template = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant that answers questions about documents.
Use the provided context to answer the user's question.
If the context contains relevant information, start your answer with "Based on the Document, ..."
If the context is insufficient, clearly state what you don't know."""),
("human", "Context: {context}\n\nQuestion: {input}")
])
try:
# Format the prompt with the actual context and query
formatted_fallback_prompt = fallback_prompt_template.format_prompt(context=context, input=query).to_messages()
response = llm.invoke(formatted_fallback_prompt)
fallback_output = response.content if hasattr(response, 'content') else str(response)
print(f"Fallback response length: {len(fallback_output)} chars")
return {"output": fallback_output}
except Exception as fallback_error:
print(f"Fallback also failed: {str(fallback_error)}")
return {"output": "I'm sorry, I encountered an error processing your request. Please try again."}
"""if __name__ == "__main__":
# Process PDF and prepare index
dotenv.load_dotenv()
pdf_file = "JatinCV.pdf"
llm = model_selection("meta-llama/llama-4-scout-17b-16e-instruct")
texts = process_pdf_file(pdf_file)
chunks = chunk_text(texts, max_length=1500)
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = create_embeddings(chunks, model)
index = build_faiss_index(embeddings)
# Chat loop
print("Chat with the assistant (type 'exit' or 'quit' to stop):")
while True:
query = input("User: ")
if query.lower() in ["exit", "quit"]:
break
# Retrieve similar chunks
similar_chunks = retrieve_similar_chunks(query, index, chunks, model, k=3)
# context = "\n".join([chunk for chunk, _ in similar_chunks])
# Generate response
response = agentic_rag(llm, tools, query=query, context=similar_chunks, Use_Tavily=True, memory=memory)
print("Assistant:", response["output"])"""