import re from typing import List, Dict, Optional from pathlib import Path from collections import defaultdict from dataclasses import dataclass from docx import Document from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import TfidfVectorizer @dataclass class DocumentChunk: chunk_id: int text: str embedding: List[float] metadata: Dict class DocumentChunker: def __init__(self): self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") self.category_patterns = { "Project Summary": [r"\bsummary\b", r"\bproject overview\b"], "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], "Mission Statement": [r"\bmission\b", r"\bvision\b"], "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], "Organization's History": [r"\bhistory\b", r"\borganization background\b"], "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], } self.patterns = { 'grant_application': { 'header_patterns': [ r'\*\*([^*]+)\*\*', r'^([A-Z][^a-z]*[A-Z])$', r'^([A-Z][A-Za-z\s]+)$', ], 'question_patterns': [ r'^.+\?$', r'^\*?Please .+', r'^How .+', r'^What .+', r'^Describe .+', ] } } def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: lower_text = text.lower() match_scores = defaultdict(int) for category, patterns in self.category_patterns.items(): for pattern in patterns: matches = re.findall(pattern, lower_text) match_scores[category] += len(matches) if not match_scores: return None if return_first else [] sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] def extract_text_from_docx(self, file_path: str) -> str: doc = Document(file_path) return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) def detect_document_type(self, text: str) -> str: keywords = ['grant', 'funding', 'mission'] return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' def extract_headers(self, text: str, doc_type: str) -> List[Dict]: lines = text.split('\n') headers = [] patterns = self.patterns.get(doc_type, self.patterns['grant_application']) for i, line in enumerate(lines): line = line.strip("* ") if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) elif any(re.match(p, line) for p in patterns['header_patterns']): headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) return headers def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: lines = text.split('\n') chunks = [] if not headers: # fallback chunking words = text.split() for i in range(0, len(words), max_words): piece = ' '.join(words[i:i + max_words]) chunks.append({ 'chunk_id': len(chunks) + 1, 'header': '', 'questions': [], 'content': piece, 'pattern_type': 'auto' }) return chunks for i, header in enumerate(headers): start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) content_lines = lines[start + 1:end] questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) for j in range(0, len(content.split()), max_words): chunk_text = ' '.join(content.split()[j:j + max_words]) chunks.append({ 'chunk_id': len(chunks) + 1, 'header': header['text'] if header['pattern_type'] == 'header' else '', 'questions': questions if header['pattern_type'] == 'question' else [], 'content': chunk_text, 'pattern_type': header['pattern_type'], 'split_index': j // max_words }) return chunks def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: clean = re.sub(r'[^\w\s]', ' ', text.lower()) vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') tfidf = vectorizer.fit_transform([clean]) terms = vectorizer.get_feature_names_out() scores = tfidf.toarray()[0] top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] return top_terms[:max_features] def calculate_confidence_score(self, chunk: Dict) -> float: score = 0.0 if chunk.get('header'): score += 0.3 if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 if chunk.get('questions'): score += 0.2 return min(score, 1.0) def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: file_path = Path(file_path) text = self.extract_text_from_docx(str(file_path)) if file_path.suffix == ".docx" else file_path.read_text() doc_type = self.detect_document_type(text) headers = self.extract_headers(text, doc_type) raw_chunks = self.chunk_by_headers(text, headers) final_chunks = [] for chunk in raw_chunks: full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() category = self.match_category(full_text, return_first=True) categories = self.match_category(full_text, return_first=False) embedding = self.embed_model.encode(full_text).tolist() topics = self.extract_topics_tfidf(full_text) confidence = self.calculate_confidence_score(chunk) final_chunks.append({ "chunk_id": chunk['chunk_id'], "text": full_text, "embedding": embedding, "metadata": { **chunk, "title": title or file_path.name, "category": category, "categories": categories, "topics": topics, "confidence_score": confidence } }) return final_chunks