Cylanoid commited on
Commit
4d504fd
·
verified ·
1 Parent(s): 36b5bed

Update document_analyzer.py

Browse files
Files changed (1) hide show
  1. document_analyzer.py +39 -249
document_analyzer.py CHANGED
@@ -1,261 +1,51 @@
1
  # document_analyzer.py
2
- # Enhanced document analysis module for healthcare fraud detection with Llama 4 (text-only)
3
 
4
  import torch
5
- import re
6
- from typing import List, Dict, Any
7
  import nltk
8
  from nltk.tokenize import sent_tokenize
9
 
10
- try:
11
- nltk.data.find('tokenizers/punkt')
12
- except LookupError:
13
- nltk.download('punkt')
14
-
15
  class HealthcareFraudAnalyzer:
16
- def __init__(self, model, tokenizer, device=None):
17
  self.model = model
18
  self.tokenizer = tokenizer
19
- self.device = device if device else "cuda" if torch.cuda.is_available() else "cpu"
20
- self.model.to(self.device)
21
- self.model.eval()
22
-
23
- self.fraud_categories = [
24
- "Consent violations",
25
- "Documentation issues",
26
- "Visitation restrictions",
27
- "Medication misuse",
28
- "Chemical restraint",
29
- "Fraudulent billing",
30
- "False testimony",
31
- "Information concealment",
32
- "Patient neglect",
33
- "Hospice certification issues"
34
- ]
35
-
36
- self.key_terms = {
37
- "medication": ["haloperidol", "lorazepam", "sedation", "chemical", "restraint",
38
- "prn", "as needed", "antipsychotic", "sedative", "benadryl",
39
- "ativan", "seroquel", "comfort kit", "medication"],
40
- "documentation": ["record", "documentation", "log", "chart", "note", "missing",
41
- "altered", "backdated", "omit", "selective", "inconsistent"],
42
- "visitation": ["visit", "restriction", "limit", "family", "spouse", "access",
43
- "barrier", "monitor", "disruptive", "uncooperative"],
44
- "consent": ["consent", "authorize", "approval", "permission", "against wishes",
45
- "refused", "decline", "without knowledge"],
46
- "hospice": ["hospice", "terminal", "end of life", "palliative", "comfort care",
47
- "six months", "6 months", "prognosis", "certification"],
48
- "billing": ["charge", "bill", "payment", "medicare", "medicaid", "insurance",
49
- "reimbursement", "fee", "additional", "extra"]
50
- }
51
-
52
- def chunk_document(self, text: str, chunk_size: int = 1024, overlap: int = 256) -> List[str]:
53
- sentences = sent_tokenize(text)
54
- chunks = []
55
- current_chunk = ""
56
-
57
- for sentence in sentences:
58
- if len(current_chunk) + len(sentence) <= chunk_size:
59
- current_chunk += sentence + " "
60
- else:
61
- chunks.append(current_chunk.strip())
62
- overlap_start = max(0, len(current_chunk) - overlap)
63
- current_chunk = current_chunk[overlap_start:] + sentence + " "
64
-
65
- if current_chunk.strip():
66
- chunks.append(current_chunk.strip())
67
-
68
- return chunks
69
-
70
- def analyze_chunk(self, chunk: str) -> Dict[str, Any]:
71
- prompt = f"""<s>[INST] Analyze the following healthcare document text for evidence of fraud, neglect, abuse, or criminal conduct.
72
- Focus on: {', '.join(self.fraud_categories)}.
73
- Provide specific indicators and cite the relevant text.
74
 
75
- DOCUMENT TEXT:
76
- {chunk}
77
-
78
- ANALYSIS: [/INST]"""
79
-
80
- inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=2048).to(self.device)
81
-
82
- with torch.no_grad():
83
- output = self.model.generate(
84
- **inputs,
85
- max_new_tokens=512,
86
- temperature=0.1,
87
- top_p=0.9,
88
- repetition_penalty=1.2
89
  )
90
-
91
- response = self.tokenizer.decode(output[0], skip_special_tokens=True)
92
- analysis = response.split("ANALYSIS:")[-1].strip()
93
-
94
- term_matches = self._find_key_terms(chunk)
95
-
96
- return {
97
- "analysis": analysis,
98
- "term_matches": term_matches,
99
- "chunk_text": chunk[:200] + "..." if len(chunk) > 200 else chunk
100
- }
101
-
102
- def _find_key_terms(self, text: str) -> Dict[str, List[str]]:
103
- text = text.lower()
104
- results = {}
105
-
106
- for category, terms in self.key_terms.items():
107
- matches = []
108
- for term in terms:
109
- pattern = r'.{0,50}' + re.escape(term) + r'.{0,50}'
110
- for match in re.finditer(pattern, text):
111
- matches.append("..." + match.group(0) + "...")
112
 
113
- if matches:
114
- results[category] = matches
115
-
116
- return results
117
-
118
- def analyze_document(self, document_text: str) -> Dict[str, Any]:
119
- document_text = document_text.replace('\n', ' ').replace('\r', ' ')
120
- document_text = re.sub(r'\s+', ' ', document_text)
121
-
122
- chunks = self.chunk_document(document_text)
123
- chunk_analyses = [self.analyze_chunk(chunk) for chunk in chunks]
124
- consolidated_findings = self._consolidate_analyses(chunk_analyses)
125
-
126
- return {
127
- "summary": self._generate_summary(consolidated_findings, document_text),
128
- "detailed_findings": consolidated_findings,
129
- "chunk_analyses": chunk_analyses,
130
- "document_metadata": {
131
- "length": len(document_text),
132
- "chunk_count": len(chunks)
133
- }
134
- }
135
-
136
- def _consolidate_analyses(self, chunk_analyses: List[Dict[str, Any]]) -> Dict[str, Any]:
137
- all_term_matches = {category: [] for category in self.key_terms.keys()}
138
-
139
- for analysis in chunk_analyses:
140
- for category, matches in analysis.get("term_matches", {}).items():
141
- all_term_matches[category].extend(matches)
142
-
143
- for category in all_term_matches:
144
- if all_term_matches[category]:
145
- deduplicated = []
146
- for match in all_term_matches[category]:
147
- if not any(match in other and match != other for other in all_term_matches[category]):
148
- deduplicated.append(match)
149
- all_term_matches[category] = deduplicated[:5]
150
-
151
- categorized_findings = {category: [] for category in self.fraud_categories}
152
-
153
- for analysis in chunk_analyses:
154
- analysis_text = analysis.get("analysis", "")
155
- for category in self.fraud_categories:
156
- if category.lower() in analysis_text.lower():
157
- sentences = sent_tokenize(analysis_text)
158
- relevant = [s for s in sentences if category.lower() in s.lower()]
159
- if relevant:
160
- categorized_findings[category].extend(relevant)
161
-
162
- return {
163
- "term_matches": all_term_matches,
164
- "categorized_findings": categorized_findings
165
- }
166
-
167
- def _generate_summary(self, findings: Dict[str, Any], full_text: str) -> str:
168
- indicator_counts = {
169
- category: len(findings["categorized_findings"].get(category, []))
170
- for category in self.fraud_categories
171
- }
172
-
173
- term_match_counts = {
174
- category: len(matches)
175
- for category, matches in findings["term_matches"].items()
176
- }
177
-
178
- sorted_categories = sorted(
179
- self.fraud_categories,
180
- key=lambda x: indicator_counts.get(x, 0) + term_match_counts.get(x, 0),
181
- reverse=True
182
- )
183
-
184
- summary_lines = ["# Healthcare Fraud Detection Analysis", ""]
185
- summary_lines.append("## Key Concerns Identified")
186
-
187
- for category in sorted_categories[:3]:
188
- if indicator_counts.get(category, 0) > 0 or term_match_counts.get(category, 0) > 0:
189
- summary_lines.append(f"### {category}")
190
-
191
- if findings["categorized_findings"].get(category):
192
- summary_lines.append("Model analysis indicates:")
193
- for finding in findings["categorized_findings"].get(category, [])[:3]:
194
- summary_lines.append(f"- {finding}")
195
-
196
- category_lower = category.lower().rstrip('s')
197
- for term_category, matches in findings["term_matches"].items():
198
- if category_lower in term_category.lower() and matches:
199
- summary_lines.append(f"Key terms identified:")
200
- for match in matches[:3]:
201
- summary_lines.append(f"- {match}")
202
-
203
- summary_lines.append("")
204
-
205
- summary_lines.append("## Recommended Actions")
206
- if sum(indicator_counts.values()) > 5:
207
- summary_lines.append("- **Urgent review recommended** - Multiple indicators of potential fraud detected")
208
- summary_lines.append("- Consider referral to appropriate regulatory authorities")
209
- summary_lines.append("- Document preservation should be prioritized")
210
- elif sum(indicator_counts.values()) > 2:
211
- summary_lines.append("- **Further investigation recommended** - Several potential indicators identified")
212
- summary_lines.append("- Conduct interviews with involved personnel")
213
- summary_lines.append("- Secure additional documentation for verification")
214
- else:
215
- summary_lines.append("- **Monitor situation** - Limited indicators detected")
216
- summary_lines.append("- Consider more specific document analysis")
217
-
218
- return "\n".join(summary_lines)
219
-
220
- def print_report(self, results: Dict[str, Any]) -> None:
221
- print("\n" + "="*80)
222
- print("HEALTHCARE FRAUD DETECTION REPORT")
223
- print("="*80 + "\n")
224
-
225
- print(results["summary"])
226
-
227
- print("\n" + "="*80)
228
- print("DETAILED FINDINGS")
229
- print("="*80)
230
-
231
- for category, findings in results["detailed_findings"]["categorized_findings"].items():
232
- if findings:
233
- print(f"\n## {category.upper()}")
234
- for i, finding in enumerate(findings, 1):
235
- print(f"{i}. {finding}")
236
-
237
- print("\n" + "="*80)
238
- print("KEY TERM MATCHES")
239
- print("="*80)
240
-
241
- for category, matches in results["detailed_findings"]["term_matches"].items():
242
- if matches:
243
- print(f"\n## {category.upper()}")
244
- for match in matches:
245
- print(f"- {match}")
246
-
247
- print("\n" + "="*80 + "\n")
248
-
249
- def analyze_pdf_for_fraud(pdf_path, model, tokenizer):
250
- import pdfplumber
251
-
252
- with pdfplumber.open(pdf_path) as pdf:
253
- text = ""
254
- for page in pdf.pages:
255
- text += page.extract_text() or ""
256
-
257
- analyzer = HealthcareFraudAnalyzer(model, tokenizer)
258
- results = analyzer.analyze_document(text)
259
-
260
- analyzer.print_report(results)
261
- return results
 
1
  # document_analyzer.py
2
+ # Analyzer for healthcare fraud detection using Llama 4 Maverick (text-only)
3
 
4
  import torch
 
 
5
  import nltk
6
  from nltk.tokenize import sent_tokenize
7
 
 
 
 
 
 
8
  class HealthcareFraudAnalyzer:
9
+ def __init__(self, model, tokenizer, accelerator):
10
  self.model = model
11
  self.tokenizer = tokenizer
12
+ self.accelerator = accelerator
13
+ self.device = self.accelerator.device
14
+ try:
15
+ nltk.data.find('tokenizers/punkt')
16
+ except LookupError:
17
+ nltk.download('punkt')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
+ def analyze_document(self, sentences):
20
+ fraud_indicators = []
21
+ for sentence in sentences:
22
+ prompt = (
23
+ f"Analyze the following sentence for potential healthcare fraud indicators, "
24
+ f"such as consent violations, medication misuse, or billing irregularities. "
25
+ f"Provide a reason and confidence score (0-1). "
26
+ f"Sentence: {sentence}\nOutput format: {{'fraud_detected': bool, 'reason': str, 'confidence': float}}"
 
 
 
 
 
 
27
  )
28
+ inputs = self.tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=512).to(self.device)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ with torch.no_grad():
31
+ outputs = self.model.generate(
32
+ **inputs,
33
+ max_new_tokens=256,
34
+ temperature=0.7,
35
+ top_p=0.9,
36
+ do_sample=True
37
+ )
38
+
39
+ response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
40
+ try:
41
+ result = eval(response) if response.startswith("{") else {"fraud_detected": False, "reason": "Invalid response", "confidence": 0.0}
42
+ if result["fraud_detected"]:
43
+ fraud_indicators.append({
44
+ "sentence": sentence,
45
+ "reason": result["reason"],
46
+ "confidence": result["confidence"]
47
+ })
48
+ except:
49
+ continue
50
+
51
+ return fraud_indicators