import os from langchain_community.document_loaders import PyMuPDFLoader import faiss from langchain_groq import ChatGroq from langchain.agents import AgentExecutor, create_tool_calling_agent from langchain_community.tools.tavily_search import TavilySearchResults from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain.memory import ConversationBufferMemory from sentence_transformers import SentenceTransformer import dotenv from langchain.tools import tool import traceback dotenv.load_dotenv() # Initialize LLM and tools globally def model_selection(model_name): llm = ChatGroq( model=model_name, api_key=os.getenv("GROQ_API_KEY"), temperature=0.1, # Lower temperature for more consistent tool calling max_tokens=2048 # Reasonable limit for responses ) return llm # Create tools with better error handling def create_tavily_tool(): try: return TavilySearchResults( max_results=5, search_depth="advanced", include_answer=True, include_raw_content=False ) except Exception as e: print(f"Warning: Could not create Tavily tool: {e}") return None # Initialize tools globally but with error handling _tavily_tool = create_tavily_tool() tools = [_tavily_tool] if _tavily_tool else [] # Note: Memory should be created per session, not globally def estimate_tokens(text): """Estimate the number of tokens in a text (rough approximation).""" return len(text) // 4 def process_pdf_file(file_path): """Load a PDF file and extract its text with metadata.""" if not os.path.exists(file_path): raise FileNotFoundError(f"The file {file_path} does not exist.") loader = PyMuPDFLoader(file_path) documents = loader.load() return documents # Return list of Document objects with metadata def chunk_text(documents, max_length=1000): """Split documents into chunks with metadata.""" chunks = [] for doc in documents: text = doc.page_content metadata = doc.metadata paragraphs = text.split("\n\n") current_chunk = "" current_metadata = metadata.copy() for paragraph in paragraphs: # Skip very short paragraphs (less than 10 characters) if len(paragraph.strip()) < 10: continue if estimate_tokens(current_chunk + paragraph) <= max_length // 4: current_chunk += paragraph + "\n\n" else: # Only add chunks with meaningful content if current_chunk.strip() and len(current_chunk.strip()) > 20: chunks.append({"text": current_chunk.strip(), "metadata": current_metadata}) current_chunk = paragraph + "\n\n" # Add the last chunk if it has meaningful content if current_chunk.strip() and len(current_chunk.strip()) > 20: chunks.append({"text": current_chunk.strip(), "metadata": current_metadata}) return chunks def create_embeddings(chunks, model): """Create embeddings for a list of chunk texts.""" texts = [chunk["text"] for chunk in chunks] embeddings = model.encode(texts, show_progress_bar=True, convert_to_tensor=True) return embeddings.cpu().numpy(), chunks def build_faiss_index(embeddings): """Build a FAISS HNSW index from embeddings for similarity search.""" dim = embeddings.shape[1] index = faiss.IndexHNSWFlat(dim, 32) # 32 = number of neighbors in HNSW graph index.hnsw.efConstruction = 200 # Higher = better quality, slower build index.hnsw.efSearch = 50 # Higher = better accuracy, slower search index.add(embeddings) return index def retrieve_similar_chunks(query, index, chunks_with_metadata, embedding_model, k=10, max_chunk_length=1000): """Retrieve top k similar chunks to the query from the FAISS index.""" query_embedding = embedding_model.encode([query], convert_to_tensor=True).cpu().numpy() distances, indices = index.search(query_embedding, k) # Ensure indices are within bounds and create mapping for correct distances valid_results = [] for idx_pos, chunk_idx in enumerate(indices[0]): if 0 <= chunk_idx < len(chunks_with_metadata): chunk_text = chunks_with_metadata[chunk_idx]["text"][:max_chunk_length] # Only include chunks with meaningful content if chunk_text.strip(): # Skip empty chunks valid_results.append(( chunk_text, distances[0][idx_pos], # Use original position for correct distance chunks_with_metadata[chunk_idx]["metadata"] )) return valid_results def create_vector_search_tool(faiss_index, document_chunks_with_metadata, embedding_model, k=3, max_chunk_length=1000): @tool def vector_database_search(query: str) -> str: """Search the uploaded PDF document for information related to the query. Args: query: The search query string to find relevant information in the document. Returns: A string containing relevant information found in the document. """ # Handle very short or empty queries if not query or len(query.strip()) < 3: return "Please provide a more specific search query with at least 3 characters." try: # Retrieve similar chunks using the provided session-specific components similar_chunks_data = retrieve_similar_chunks( query, faiss_index, document_chunks_with_metadata, # This is the list of dicts {text: ..., metadata: ...} embedding_model, k=k, max_chunk_length=max_chunk_length ) # Format the response if not similar_chunks_data: return "No relevant information found in the document for that query. Please try rephrasing your question or using different keywords." # Filter out chunks with very high distance (low similarity) filtered_chunks = [chunk for chunk in similar_chunks_data if chunk[1] < 1.5] # Adjust threshold as needed if not filtered_chunks: return "No sufficiently relevant information found in the document for that query. Please try rephrasing your question or using different keywords." context = "\n\n---\n\n".join([chunk_text for chunk_text, _, _ in filtered_chunks]) return f"The following information was found in the document regarding '{query}':\n{context}" except Exception as e: print(f"Error in vector search tool: {e}") return f"Error searching the document: {str(e)}" return vector_database_search def agentic_rag(llm, agent_specific_tools, query, context_chunks, memory, Use_Tavily=False): # Validate inputs if not query or not query.strip(): return {"output": "Please provide a valid question."} if not agent_specific_tools: print("Warning: No tools provided, using direct LLM response") # Use direct LLM call without agent if no tools fallback_prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant that answers questions about documents. Use the provided context to answer the user's question."), ("human", "Context: {context}\n\nQuestion: {input}") ]) try: formatted_prompt = fallback_prompt.format_prompt(context="No context available", input=query).to_messages() response = llm.invoke(formatted_prompt) return {"output": response.content if hasattr(response, 'content') else str(response)} except Exception as e: print(f"Direct LLM call failed: {e}") return {"output": "I'm sorry, I encountered an error processing your request."} print(f"Available tools: {[tool.name for tool in agent_specific_tools]}") # Sort chunks by relevance (lower distance = more relevant) context_chunks = sorted(context_chunks, key=lambda x: x[1]) if context_chunks else [] context = "" total_tokens = 0 max_tokens = 7000 # Leave room for prompt and response # Filter out chunks with very high distance scores (low similarity) relevant_chunks = [chunk for chunk in context_chunks if len(chunk) >= 3 and chunk[1] < 1.5] for chunk, _, _ in relevant_chunks: if chunk and chunk.strip(): # Ensure chunk has content chunk_tokens = estimate_tokens(chunk) if total_tokens + chunk_tokens <= max_tokens: context += chunk + "\n\n" total_tokens += chunk_tokens else: break context = context.strip() if context else "No initial context provided from preliminary search." print(f"Using context length: {len(context)} characters") # Dynamically build the tool guidance for the prompt # Tool names: 'vector_database_search', 'tavily_search_results_json' has_document_search = any(t.name == "vector_database_search" for t in agent_specific_tools) has_web_search = any(t.name == "tavily_search_results_json" for t in agent_specific_tools) # Simplified tool guidance tool_instructions = "" if has_document_search: tool_instructions += "Use vector_database_search to find information in the uploaded document. " if has_web_search: tool_instructions += "Use tavily_search_results_json for web searches when document search is insufficient. " if not tool_instructions: tool_instructions = "Answer based on the provided context only. " prompt = ChatPromptTemplate.from_messages([ ("system", f"""You are a helpful AI assistant that answers questions about documents. Context: {{context}} Tools available: {tool_instructions} Instructions: - Use the provided context first - If context is insufficient, use available tools to search for more information - Provide clear, helpful answers - If you cannot find an answer, say so clearly"""), ("human", "{input}"), MessagesPlaceholder(variable_name="chat_history"), MessagesPlaceholder(variable_name="agent_scratchpad"), ]) try: print(f"Creating agent with {len(agent_specific_tools)} tools") # Validate that tools are properly formatted for tool in agent_specific_tools: print(f"Tool: {tool.name} - {type(tool)}") # Ensure tool has required attributes if not hasattr(tool, 'name') or not hasattr(tool, 'description'): raise ValueError(f"Tool {tool} is missing required attributes") agent = create_tool_calling_agent(llm, agent_specific_tools, prompt) agent_executor = AgentExecutor( agent=agent, tools=agent_specific_tools, memory=memory, verbose=True, handle_parsing_errors=True, max_iterations=2, # Reduced further to prevent issues return_intermediate_steps=False, early_stopping_method="generate" ) print(f"Invoking agent with query: '{query}' and context length: {len(context)} chars") # Create input with simpler structure agent_input = { "input": query, "context": context, } response_payload = agent_executor.invoke(agent_input) print(f"Agent response keys: {response_payload.keys() if response_payload else 'None'}") # Extract and validate the output agent_output = response_payload.get("output", "") if response_payload else "" print(f"Agent output length: {len(agent_output)} chars") print(f"Agent output preview: {agent_output[:100]}..." if len(agent_output) > 100 else f"Agent output: {agent_output}") # Validate response quality if not agent_output or len(agent_output.strip()) < 10: print(f"Warning: Agent returned insufficient response (length: {len(agent_output)}), using fallback") raise ValueError("Insufficient response from agent") # Check if response is just a prefix without content problematic_prefixes = [ "Based on the Document,", "According to a web search,", "Based on the available information,", "I need to", "Let me" ] stripped_output = agent_output.strip() if any(stripped_output == prefix.strip() or stripped_output == prefix.strip() + "." for prefix in problematic_prefixes): print(f"Warning: Agent returned only prefix without content: '{stripped_output}', using fallback") raise ValueError("Agent returned incomplete response") return response_payload except Exception as e: error_msg = str(e) print(f"Error during agent execution: {error_msg} \nTraceback: {traceback.format_exc()}") # Check if it's a specific Groq function calling error if "Failed to call a function" in error_msg or "function" in error_msg.lower(): print("Detected Groq function calling error, trying simpler approach...") # Try with a simpler agent setup or direct LLM call try: # First, try to use tools individually without agent framework if agent_specific_tools: print("Attempting manual tool usage...") tool_results = [] # Try vector search first if available vector_tool = next((t for t in agent_specific_tools if t.name == "vector_database_search"), None) if vector_tool: try: search_result = vector_tool.run(query) if search_result and "No relevant information" not in search_result: tool_results.append(f"Document Search: {search_result}") except Exception as tool_error: print(f"Vector tool error: {tool_error}") # Try web search if needed and available if Use_Tavily: web_tool = next((t for t in agent_specific_tools if t.name == "tavily_search_results_json"), None) if web_tool: try: web_result = web_tool.run(query) if web_result: tool_results.append(f"Web Search: {web_result}") except Exception as tool_error: print(f"Web tool error: {tool_error}") # Combine tool results with context enhanced_context = context if tool_results: enhanced_context += "\n\nAdditional Information:\n" + "\n\n".join(tool_results) # Use direct LLM call with enhanced context direct_prompt = ChatPromptTemplate.from_messages([ ("system", "You are a helpful assistant. Use the provided context and information to answer the user's question clearly and completely."), ("human", "Context and Information: {context}\n\nQuestion: {input}") ]) formatted_prompt = direct_prompt.format_prompt(context=enhanced_context, input=query).to_messages() response = llm.invoke(formatted_prompt) direct_output = response.content if hasattr(response, 'content') else str(response) print(f"Direct tool usage response length: {len(direct_output)} chars") return {"output": direct_output} except Exception as manual_error: print(f"Manual tool usage also failed: {manual_error}") print("Using fallback direct LLM response...") fallback_prompt_template = ChatPromptTemplate.from_messages([ ("system", """You are a helpful assistant that answers questions about documents. Use the provided context to answer the user's question. If the context contains relevant information, start your answer with "Based on the Document, ..." If the context is insufficient, clearly state what you don't know."""), ("human", "Context: {context}\n\nQuestion: {input}") ]) try: # Format the prompt with the actual context and query formatted_fallback_prompt = fallback_prompt_template.format_prompt(context=context, input=query).to_messages() response = llm.invoke(formatted_fallback_prompt) fallback_output = response.content if hasattr(response, 'content') else str(response) print(f"Fallback response length: {len(fallback_output)} chars") return {"output": fallback_output} except Exception as fallback_error: print(f"Fallback also failed: {str(fallback_error)}") return {"output": "I'm sorry, I encountered an error processing your request. Please try again."} """if __name__ == "__main__": # Process PDF and prepare index dotenv.load_dotenv() pdf_file = "JatinCV.pdf" llm = model_selection("meta-llama/llama-4-scout-17b-16e-instruct") texts = process_pdf_file(pdf_file) chunks = chunk_text(texts, max_length=1500) model = SentenceTransformer('all-MiniLM-L6-v2') embeddings = create_embeddings(chunks, model) index = build_faiss_index(embeddings) # Chat loop print("Chat with the assistant (type 'exit' or 'quit' to stop):") while True: query = input("User: ") if query.lower() in ["exit", "quit"]: break # Retrieve similar chunks similar_chunks = retrieve_similar_chunks(query, index, chunks, model, k=3) # context = "\n".join([chunk for chunk, _ in similar_chunks]) # Generate response response = agentic_rag(llm, tools, query=query, context=similar_chunks, Use_Tavily=True, memory=memory) print("Assistant:", response["output"])"""