“vinit5112”
changes
65726e0
from google import genai
from .vector_store import VectorStore
import PyPDF2
from docx import Document
from typing import List
import os
from langchain_text_splitters import RecursiveCharacterTextSplitter
import asyncio
class RAG:
def __init__(self, google_api_key: str, collection_name: str = "ca-documents"):
# Setup Gemini
# The client gets the API key from the environment variable `GOOGLE_API_KEY`
# or from the `api_key` argument.
self.client = genai.Client(api_key=google_api_key)
# Setup Vector Store (Qdrant configuration is handled via environment variables)
self.vector_store = VectorStore()
# Setup Text Splitter
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
async def initialize(self):
"""Asynchronous initialization to be called after object creation."""
await self.vector_store.initialize()
await self.vector_store.verify_collection_health()
def process_pdf(self, file_path: str) -> List[str]:
"""Extract text from PDF and split into chunks using RecursiveTextSplitter"""
full_text = ""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text = page.extract_text()
if text.strip():
full_text += text + "\n"
# Use RecursiveCharacterTextSplitter for better chunking
chunks = self.text_splitter.split_text(full_text)
return [chunk.strip() for chunk in chunks if chunk.strip()]
def process_docx(self, file_path: str) -> List[str]:
"""Extract text from DOCX and split into chunks using RecursiveTextSplitter"""
doc = Document(file_path)
full_text = "\n".join([paragraph.text for paragraph in doc.paragraphs])
# Use RecursiveCharacterTextSplitter for better chunking
chunks = self.text_splitter.split_text(full_text)
return [chunk.strip() for chunk in chunks if chunk.strip()]
async def upload_document(self, file_path: str) -> bool:
"""Upload and process document"""
try:
filename = os.path.basename(file_path)
if file_path.endswith('.pdf'):
chunks = self.process_pdf(file_path)
elif file_path.endswith('.docx'):
chunks = self.process_docx(file_path)
elif file_path.endswith('.txt'):
with open(file_path, 'r', encoding='utf-8') as f:
full_text = f.read()
chunks = self.text_splitter.split_text(full_text)
chunks = [chunk.strip() for chunk in chunks if chunk.strip()]
else:
print("Unsupported file format")
return False
# Store chunks in Qdrant
for i, chunk in enumerate(chunks):
await self.vector_store.add_document(
text=chunk,
metadata={"source": filename, "chunk_id": i}
)
print(f"Uploaded {len(chunks)} chunks from {filename}")
return True
except Exception as e:
print(f"Error uploading document: {e}")
return False
def is_casual_conversation(self, question: str) -> bool:
"""Determine if the question is casual conversation vs CA-specific query"""
question_lower = question.lower().strip()
# Pure casual greetings (exact matches or very short)
pure_casual = [
'hello', 'hi', 'hey', 'good morning', 'good afternoon', 'good evening',
'how are you', 'what\'s up', 'greetings', 'thanks', 'thank you',
'bye', 'goodbye', 'see you', 'nice to meet you', 'who are you',
'what can you do', 'help me', 'what is your name', 'introduce yourself',
'how do you work', 'what are you', 'can you help me'
]
# Check for exact matches first
if question_lower in pure_casual:
return True
# Check if it's a very short greeting (≤ 4 words) without technical terms
words = question_lower.split()
if len(words) <= 4:
# Technical/question indicators
technical_indicators = [
'what', 'how', 'why', 'when', 'where', 'explain', 'define', 'calculate',
'accounting', 'audit', 'tax', 'finance', 'depreciation', 'balance', 'sheet',
'profit', 'loss', 'asset', 'liability', 'equity', 'revenue', 'expense',
'journal', 'ledger', 'trial', 'cash', 'flow', 'ratio', 'analysis'
]
# If no technical indicators and contains casual words, it's casual
has_casual = any(casual in question_lower for casual in ['hello', 'hi', 'hey', 'thanks', 'bye'])
has_technical = any(tech in question_lower for tech in technical_indicators)
if has_casual and not has_technical:
return True
# Check for greetings followed by actual questions
# Pattern: "hello, what is..." or "hi there, how do..."
greeting_patterns = [
r'^(hello|hi|hey|good morning|good afternoon|good evening),?\s+(what|how|why|when|where|explain|define|tell|can you)',
r'^(hello|hi|hey)\s+(there|everyone)?,?\s+(what|how|why|when|where|explain|define|tell|can you)'
]
import re
for pattern in greeting_patterns:
if re.search(pattern, question_lower):
return False # It's a question with greeting prefix, not pure casual
return False
async def ask_question_stream(self, question: str):
"""Ask a question and get a streaming answer"""
try:
# 1. Check if this is casual conversation
if self.is_casual_conversation(question):
# Respond as a friendly CA assistant for casual conversation
casual_prompt = f"""You are a friendly CA (Chartered Accountant) study assistant. The user said: "{question}"
Respond naturally and warmly as a CA study assistant. Be helpful and mention that you can help with CA studies, accounting concepts, financial topics, etc. Keep it brief but friendly."""
for chunk in self.client.models.generate_content_stream(
model='gemini-2.5-flash',
contents=casual_prompt
):
yield chunk.text
return
# 2. For CA-specific questions, search for similar documents
similar_docs = await self.vector_store.search_similar(question, limit=3)
if similar_docs and len(similar_docs) > 0:
# 3. Create context from similar documents
context = "\n\n".join([doc["text"] for doc in similar_docs])
# 4. Create prompt for Gemini with context
prompt = f"""You are a CA study assistant. Based on the following context from uploaded documents, answer the question.
Context:
{context}
Question: {question}
Please provide a detailed answer based on the context above. If you need more specific information, suggest what documents might be helpful."""
else:
# 5. No documents found, but still be helpful
prompt = f"""You are a CA (Chartered Accountant) study assistant. The user asked: "{question}"
Even though no specific study materials have been uploaded yet, provide a helpful answer based on your knowledge of CA studies, accounting, finance, taxation, and auditing. Be informative and suggest that uploading relevant study materials would help provide more specific and detailed answers.
Question: {question}"""
# 6. Get answer from Gemini
for chunk in self.client.models.generate_content_stream(
model='gemini-2.5-flash',
contents=prompt
):
yield chunk.text
except Exception as e:
yield f"Error generating answer: {e}"
# Simple usage example
# if __name__ == "__main__":
# # Initialize
# rag = RAG(
# google_api_key="your_google_api_key",
# collection_name="ca-documents"
# )