Tesneem commited on
Commit
5b0ad58
·
verified ·
1 Parent(s): 2cdf48b

Update document_chunker.py

Browse files
Files changed (1) hide show
  1. document_chunker.py +232 -50
document_chunker.py CHANGED
@@ -3,12 +3,10 @@ from typing import List, Dict, Optional
3
  from pathlib import Path
4
  from collections import defaultdict
5
  from dataclasses import dataclass
6
-
7
  from docx import Document
8
  from sentence_transformers import SentenceTransformer
9
  from sklearn.feature_extraction.text import TfidfVectorizer
10
- import fitz # PyMuPDF
11
-
12
 
13
  @dataclass
14
  class DocumentChunk:
@@ -17,7 +15,6 @@ class DocumentChunk:
17
  embedding: List[float]
18
  metadata: Dict
19
 
20
-
21
  class DocumentChunker:
22
  def __init__(self):
23
  self.embed_model = SentenceTransformer("all-MiniLM-L6-v2")
@@ -42,21 +39,25 @@ class DocumentChunker:
42
 
43
  self.patterns = {
44
  'grant_application': {
45
- 'header_patterns': [
46
- r'\*\*([^*]+)\*\*',
47
- r'^([A-Z][^a-z]*[A-Z])$',
48
- r'^([A-Z][A-Za-z\s]+)$',
49
- ],
50
- 'question_patterns': [
51
- r'^.+\?$',
52
- r'^\*?Please .+',
53
- r'^How .+',
54
- r'^What .+',
55
- r'^Describe .+',
56
- ]
57
  }
58
  }
59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
  def extract_text(self, file_path: str) -> str:
61
  if file_path.endswith(".docx"):
62
  doc = Document(file_path)
@@ -65,12 +66,10 @@ class DocumentChunker:
65
  text = ""
66
  with fitz.open(file_path) as doc:
67
  for page in doc:
68
- text += page.get_text()
69
  return text
70
- elif file_path.endswith(".txt"):
71
- return Path(file_path).read_text()
72
  else:
73
- raise ValueError("Unsupported file format")
74
 
75
  def detect_document_type(self, text: str) -> str:
76
  keywords = ['grant', 'funding', 'mission']
@@ -88,23 +87,27 @@ class DocumentChunker:
88
  headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'})
89
  return headers
90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91
  def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]:
92
  lines = text.split('\n')
93
  chunks = []
94
 
95
- if not headers:
96
- words = text.split()
97
- for i in range(0, len(words), max_words):
98
- piece = ' '.join(words[i:i + max_words])
99
- chunks.append({
100
- 'chunk_id': len(chunks) + 1,
101
- 'header': '',
102
- 'questions': [],
103
- 'content': piece,
104
- 'pattern_type': 'auto'
105
- })
106
- return chunks
107
-
108
  for i, header in enumerate(headers):
109
  start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines)
110
  content_lines = lines[start + 1:end]
@@ -113,6 +116,8 @@ class DocumentChunker:
113
 
114
  for j in range(0, len(content.split()), max_words):
115
  chunk_text = ' '.join(content.split()[j:j + max_words])
 
 
116
  chunks.append({
117
  'chunk_id': len(chunks) + 1,
118
  'header': header['text'] if header['pattern_type'] == 'header' else '',
@@ -121,24 +126,11 @@ class DocumentChunker:
121
  'pattern_type': header['pattern_type'],
122
  'split_index': j // max_words
123
  })
124
- return chunks
125
-
126
- def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]:
127
- lower_text = text.lower()
128
- match_scores = defaultdict(int)
129
- for category, patterns in self.category_patterns.items():
130
- for pattern in patterns:
131
- matches = re.findall(pattern, lower_text)
132
- match_scores[category] += len(matches)
133
 
134
- if not match_scores:
135
- return None if return_first else []
136
-
137
- sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1])
138
- return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0]
139
 
140
  def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]:
141
- clean = re.sub(r'[^\w\s]', ' ', text.lower())
142
  vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english')
143
  tfidf = vectorizer.fit_transform([clean])
144
  terms = vectorizer.get_feature_names_out()
@@ -158,10 +150,12 @@ class DocumentChunker:
158
  text = self.extract_text(str(file_path))
159
  doc_type = self.detect_document_type(text)
160
  headers = self.extract_headers(text, doc_type)
161
- raw_chunks = self.chunk_by_headers(text, headers)
 
 
162
 
163
  final_chunks = []
164
- for chunk in raw_chunks:
165
  full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip()
166
  category = self.match_category(full_text, return_first=True)
167
  categories = self.match_category(full_text, return_first=False)
@@ -179,8 +173,196 @@ class DocumentChunker:
179
  "category": category,
180
  "categories": categories,
181
  "topics": topics,
 
182
  "confidence_score": confidence
183
  }
184
  })
185
 
186
  return final_chunks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  from pathlib import Path
4
  from collections import defaultdict
5
  from dataclasses import dataclass
6
+ import fitz # PyMuPDF
7
  from docx import Document
8
  from sentence_transformers import SentenceTransformer
9
  from sklearn.feature_extraction.text import TfidfVectorizer
 
 
10
 
11
  @dataclass
12
  class DocumentChunk:
 
15
  embedding: List[float]
16
  metadata: Dict
17
 
 
18
  class DocumentChunker:
19
  def __init__(self):
20
  self.embed_model = SentenceTransformer("all-MiniLM-L6-v2")
 
39
 
40
  self.patterns = {
41
  'grant_application': {
42
+ 'header_patterns': [r'\*\*([^*]+)\*\*', r'^([A-Z][^a-z]*[A-Z])$', r'^([A-Z][A-Za-z\s]+)$'],
43
+ 'question_patterns': [r'^.+\?$', r'^\*?Please .+', r'^How .+', r'^What .+', r'^Describe .+']
 
 
 
 
 
 
 
 
 
 
44
  }
45
  }
46
 
47
+ def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]:
48
+ lower_text = text.lower()
49
+ match_scores = defaultdict(int)
50
+ for category, patterns in self.category_patterns.items():
51
+ for pattern in patterns:
52
+ matches = re.findall(pattern, lower_text)
53
+ match_scores[category] += len(matches)
54
+
55
+ if not match_scores:
56
+ return None if return_first else []
57
+
58
+ sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1])
59
+ return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0]
60
+
61
  def extract_text(self, file_path: str) -> str:
62
  if file_path.endswith(".docx"):
63
  doc = Document(file_path)
 
66
  text = ""
67
  with fitz.open(file_path) as doc:
68
  for page in doc:
69
+ text += page.get_text("text") # More accurate reading order
70
  return text
 
 
71
  else:
72
+ return Path(file_path).read_text()
73
 
74
  def detect_document_type(self, text: str) -> str:
75
  keywords = ['grant', 'funding', 'mission']
 
87
  headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'})
88
  return headers
89
 
90
+ def fallback_chunking(self, text: str, max_words=150, stride=100) -> List[Dict]:
91
+ words = text.split()
92
+ chunks = []
93
+ for i in range(0, len(words), stride):
94
+ chunk_text = ' '.join(words[i:i + max_words])
95
+ if len(chunk_text.split()) < 20:
96
+ continue
97
+ chunks.append({
98
+ 'chunk_id': len(chunks) + 1,
99
+ 'header': '',
100
+ 'questions': [],
101
+ 'content': chunk_text,
102
+ 'pattern_type': 'fallback',
103
+ 'split_index': i // stride
104
+ })
105
+ return chunks
106
+
107
  def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]:
108
  lines = text.split('\n')
109
  chunks = []
110
 
 
 
 
 
 
 
 
 
 
 
 
 
 
111
  for i, header in enumerate(headers):
112
  start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines)
113
  content_lines = lines[start + 1:end]
 
116
 
117
  for j in range(0, len(content.split()), max_words):
118
  chunk_text = ' '.join(content.split()[j:j + max_words])
119
+ if len(chunk_text.split()) < 20:
120
+ continue
121
  chunks.append({
122
  'chunk_id': len(chunks) + 1,
123
  'header': header['text'] if header['pattern_type'] == 'header' else '',
 
126
  'pattern_type': header['pattern_type'],
127
  'split_index': j // max_words
128
  })
 
 
 
 
 
 
 
 
 
129
 
130
+ return chunks
 
 
 
 
131
 
132
  def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]:
133
+ clean = re.sub(r'[^a-z0-9\s]', ' ', text.lower())
134
  vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english')
135
  tfidf = vectorizer.fit_transform([clean])
136
  terms = vectorizer.get_feature_names_out()
 
150
  text = self.extract_text(str(file_path))
151
  doc_type = self.detect_document_type(text)
152
  headers = self.extract_headers(text, doc_type)
153
+ chunks = self.chunk_by_headers(text, headers)
154
+ if not chunks:
155
+ chunks = self.fallback_chunking(text)
156
 
157
  final_chunks = []
158
+ for chunk in chunks:
159
  full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip()
160
  category = self.match_category(full_text, return_first=True)
161
  categories = self.match_category(full_text, return_first=False)
 
173
  "category": category,
174
  "categories": categories,
175
  "topics": topics,
176
+ "chunking_strategy": chunk['pattern_type'],
177
  "confidence_score": confidence
178
  }
179
  })
180
 
181
  return final_chunks
182
+
183
+ # import re
184
+ # from typing import List, Dict, Optional
185
+ # from pathlib import Path
186
+ # from collections import defaultdict
187
+ # from dataclasses import dataclass
188
+
189
+ # from docx import Document
190
+ # from sentence_transformers import SentenceTransformer
191
+ # from sklearn.feature_extraction.text import TfidfVectorizer
192
+ # import fitz # PyMuPDF
193
+
194
+
195
+ # @dataclass
196
+ # class DocumentChunk:
197
+ # chunk_id: int
198
+ # text: str
199
+ # embedding: List[float]
200
+ # metadata: Dict
201
+
202
+
203
+ # class DocumentChunker:
204
+ # def __init__(self):
205
+ # self.embed_model = SentenceTransformer("all-MiniLM-L6-v2")
206
+
207
+ # self.category_patterns = {
208
+ # "Project Summary": [r"\bsummary\b", r"\bproject overview\b"],
209
+ # "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"],
210
+ # "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"],
211
+ # "Mission Statement": [r"\bmission\b", r"\bvision\b"],
212
+ # "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"],
213
+ # "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"],
214
+ # "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"],
215
+ # "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"],
216
+ # "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"],
217
+ # "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"],
218
+ # "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"],
219
+ # "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"],
220
+ # "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"],
221
+ # "Organization's History": [r"\bhistory\b", r"\borganization background\b"],
222
+ # "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"],
223
+ # }
224
+
225
+ # self.patterns = {
226
+ # 'grant_application': {
227
+ # 'header_patterns': [
228
+ # r'\*\*([^*]+)\*\*',
229
+ # r'^([A-Z][^a-z]*[A-Z])$',
230
+ # r'^([A-Z][A-Za-z\s]+)$',
231
+ # ],
232
+ # 'question_patterns': [
233
+ # r'^.+\?$',
234
+ # r'^\*?Please .+',
235
+ # r'^How .+',
236
+ # r'^What .+',
237
+ # r'^Describe .+',
238
+ # ]
239
+ # }
240
+ # }
241
+
242
+ # def extract_text(self, file_path: str) -> str:
243
+ # if file_path.endswith(".docx"):
244
+ # doc = Document(file_path)
245
+ # return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs])
246
+ # elif file_path.endswith(".pdf"):
247
+ # text = ""
248
+ # with fitz.open(file_path) as doc:
249
+ # for page in doc:
250
+ # text += page.get_text()
251
+ # return text
252
+ # elif file_path.endswith(".txt"):
253
+ # return Path(file_path).read_text()
254
+ # else:
255
+ # raise ValueError("Unsupported file format")
256
+
257
+ # def detect_document_type(self, text: str) -> str:
258
+ # keywords = ['grant', 'funding', 'mission']
259
+ # return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic'
260
+
261
+ # def extract_headers(self, text: str, doc_type: str) -> List[Dict]:
262
+ # lines = text.split('\n')
263
+ # headers = []
264
+ # patterns = self.patterns.get(doc_type, self.patterns['grant_application'])
265
+ # for i, line in enumerate(lines):
266
+ # line = line.strip("* ")
267
+ # if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']):
268
+ # headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'})
269
+ # elif any(re.match(p, line) for p in patterns['header_patterns']):
270
+ # headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'})
271
+ # return headers
272
+
273
+ # def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]:
274
+ # lines = text.split('\n')
275
+ # chunks = []
276
+
277
+ # if not headers:
278
+ # words = text.split()
279
+ # for i in range(0, len(words), max_words):
280
+ # piece = ' '.join(words[i:i + max_words])
281
+ # chunks.append({
282
+ # 'chunk_id': len(chunks) + 1,
283
+ # 'header': '',
284
+ # 'questions': [],
285
+ # 'content': piece,
286
+ # 'pattern_type': 'auto'
287
+ # })
288
+ # return chunks
289
+
290
+ # for i, header in enumerate(headers):
291
+ # start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines)
292
+ # content_lines = lines[start + 1:end]
293
+ # questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20]
294
+ # content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions])
295
+
296
+ # for j in range(0, len(content.split()), max_words):
297
+ # chunk_text = ' '.join(content.split()[j:j + max_words])
298
+ # chunks.append({
299
+ # 'chunk_id': len(chunks) + 1,
300
+ # 'header': header['text'] if header['pattern_type'] == 'header' else '',
301
+ # 'questions': questions if header['pattern_type'] == 'question' else [],
302
+ # 'content': chunk_text,
303
+ # 'pattern_type': header['pattern_type'],
304
+ # 'split_index': j // max_words
305
+ # })
306
+ # return chunks
307
+
308
+ # def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]:
309
+ # lower_text = text.lower()
310
+ # match_scores = defaultdict(int)
311
+ # for category, patterns in self.category_patterns.items():
312
+ # for pattern in patterns:
313
+ # matches = re.findall(pattern, lower_text)
314
+ # match_scores[category] += len(matches)
315
+
316
+ # if not match_scores:
317
+ # return None if return_first else []
318
+
319
+ # sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1])
320
+ # return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0]
321
+
322
+ # def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]:
323
+ # clean = re.sub(r'[^\w\s]', ' ', text.lower())
324
+ # vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english')
325
+ # tfidf = vectorizer.fit_transform([clean])
326
+ # terms = vectorizer.get_feature_names_out()
327
+ # scores = tfidf.toarray()[0]
328
+ # top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0]
329
+ # return top_terms[:max_features]
330
+
331
+ # def calculate_confidence_score(self, chunk: Dict) -> float:
332
+ # score = 0.0
333
+ # if chunk.get('header'): score += 0.3
334
+ # if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3
335
+ # if chunk.get('questions'): score += 0.2
336
+ # return min(score, 1.0)
337
+
338
+ # def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]:
339
+ # file_path = Path(file_path)
340
+ # text = self.extract_text(str(file_path))
341
+ # doc_type = self.detect_document_type(text)
342
+ # headers = self.extract_headers(text, doc_type)
343
+ # raw_chunks = self.chunk_by_headers(text, headers)
344
+
345
+ # final_chunks = []
346
+ # for chunk in raw_chunks:
347
+ # full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip()
348
+ # category = self.match_category(full_text, return_first=True)
349
+ # categories = self.match_category(full_text, return_first=False)
350
+ # embedding = self.embed_model.encode(full_text).tolist()
351
+ # topics = self.extract_topics_tfidf(full_text)
352
+ # confidence = self.calculate_confidence_score(chunk)
353
+
354
+ # final_chunks.append({
355
+ # "chunk_id": chunk['chunk_id'],
356
+ # "text": full_text,
357
+ # "embedding": embedding,
358
+ # "metadata": {
359
+ # **chunk,
360
+ # "title": title or file_path.name,
361
+ # "category": category,
362
+ # "categories": categories,
363
+ # "topics": topics,
364
+ # "confidence_score": confidence
365
+ # }
366
+ # })
367
+
368
+ # return final_chunks