document_chunker / document_chunker.py
Tesneem's picture
Create document_chunker.py
899d177 verified
raw
history blame
8.05 kB
import re
from typing import List, Dict, Optional
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass
from docx import Document
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
@dataclass
class DocumentChunk:
chunk_id: int
text: str
embedding: List[float]
metadata: Dict
class DocumentChunker:
def __init__(self):
self.embed_model = SentenceTransformer("all-MiniLM-L6-v2")
self.category_patterns = {
"Project Summary": [r"\bsummary\b", r"\bproject overview\b"],
"Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"],
"Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"],
"Mission Statement": [r"\bmission\b", r"\bvision\b"],
"Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"],
"Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"],
"Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"],
"Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"],
"Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"],
"Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"],
"Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"],
"Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"],
"Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"],
"Organization's History": [r"\bhistory\b", r"\borganization background\b"],
"Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"],
}
self.patterns = {
'grant_application': {
'header_patterns': [
r'\*\*([^*]+)\*\*',
r'^([A-Z][^a-z]*[A-Z])$',
r'^([A-Z][A-Za-z\s]+)$',
],
'question_patterns': [
r'^.+\?$',
r'^\*?Please .+',
r'^How .+',
r'^What .+',
r'^Describe .+',
]
}
}
def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]:
lower_text = text.lower()
match_scores = defaultdict(int)
for category, patterns in self.category_patterns.items():
for pattern in patterns:
matches = re.findall(pattern, lower_text)
match_scores[category] += len(matches)
if not match_scores:
return None if return_first else []
sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1])
return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0]
def extract_text_from_docx(self, file_path: str) -> str:
doc = Document(file_path)
return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs])
def detect_document_type(self, text: str) -> str:
keywords = ['grant', 'funding', 'mission']
return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic'
def extract_headers(self, text: str, doc_type: str) -> List[Dict]:
lines = text.split('\n')
headers = []
patterns = self.patterns.get(doc_type, self.patterns['grant_application'])
for i, line in enumerate(lines):
line = line.strip("* ")
if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']):
headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'})
elif any(re.match(p, line) for p in patterns['header_patterns']):
headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'})
return headers
def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]:
lines = text.split('\n')
chunks = []
if not headers:
# fallback chunking
words = text.split()
for i in range(0, len(words), max_words):
piece = ' '.join(words[i:i + max_words])
chunks.append({
'chunk_id': len(chunks) + 1,
'header': '',
'questions': [],
'content': piece,
'pattern_type': 'auto'
})
return chunks
for i, header in enumerate(headers):
start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines)
content_lines = lines[start + 1:end]
questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20]
content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions])
for j in range(0, len(content.split()), max_words):
chunk_text = ' '.join(content.split()[j:j + max_words])
chunks.append({
'chunk_id': len(chunks) + 1,
'header': header['text'] if header['pattern_type'] == 'header' else '',
'questions': questions if header['pattern_type'] == 'question' else [],
'content': chunk_text,
'pattern_type': header['pattern_type'],
'split_index': j // max_words
})
return chunks
def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]:
clean = re.sub(r'[^\w\s]', ' ', text.lower())
vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english')
tfidf = vectorizer.fit_transform([clean])
terms = vectorizer.get_feature_names_out()
scores = tfidf.toarray()[0]
top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0]
return top_terms[:max_features]
def calculate_confidence_score(self, chunk: Dict) -> float:
score = 0.0
if chunk.get('header'): score += 0.3
if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3
if chunk.get('questions'): score += 0.2
return min(score, 1.0)
def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]:
file_path = Path(file_path)
text = self.extract_text_from_docx(str(file_path)) if file_path.suffix == ".docx" else file_path.read_text()
doc_type = self.detect_document_type(text)
headers = self.extract_headers(text, doc_type)
raw_chunks = self.chunk_by_headers(text, headers)
final_chunks = []
for chunk in raw_chunks:
full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip()
category = self.match_category(full_text, return_first=True)
categories = self.match_category(full_text, return_first=False)
embedding = self.embed_model.encode(full_text).tolist()
topics = self.extract_topics_tfidf(full_text)
confidence = self.calculate_confidence_score(chunk)
final_chunks.append({
"chunk_id": chunk['chunk_id'],
"text": full_text,
"embedding": embedding,
"metadata": {
**chunk,
"title": title or file_path.name,
"category": category,
"categories": categories,
"topics": topics,
"confidence_score": confidence
}
})
return final_chunks