nchandu2023 commited on
Commit
9603774
·
verified ·
1 Parent(s): 5960b3d

Update responseparser.py

Browse files
Files changed (1) hide show
  1. responseparser.py +325 -344
responseparser.py CHANGED
@@ -1,333 +1,421 @@
 
1
  import lxml.etree as etree
2
  from datetime import datetime
3
  from typing import List, Dict, Optional, Union
 
4
 
5
  class PatientDataExtractor:
6
- """Class to extract all fields from a FHIR Patient resource in a Bundle response (XML format)."""
7
 
8
- def __init__(self, patient_data: str):
9
- """Initialize with patient data in XML string format."""
10
- # Parse XML string or use pre-parsed data
11
- self.data = etree.fromstring(patient_data) if isinstance(patient_data, str) else patient_data
12
- # Define FHIR namespace for XPath queries
13
- self.ns = {'fhir': 'http://hl7.org/fhir'}
 
 
 
 
 
14
  self.patients = self._extract_patients()
15
- self.current_patient_idx = 0 # Default to first patient
 
 
 
 
 
 
 
 
 
 
16
 
17
- def _extract_patients(self) -> List[etree._Element]:
18
- """Extract all patient entries from the Bundle."""
19
- # Use XPath to find all Patient elements in the Bundle
20
- return self.data.xpath("//fhir:entry/fhir:resource/fhir:Patient", namespaces=self.ns)
 
 
 
 
21
 
22
  def set_patient_by_index(self, index: int) -> bool:
23
- """Set the current patient by index. Returns True if successful."""
24
  if 0 <= index < len(self.patients):
25
  self.current_patient_idx = index
26
  return True
27
  return False
28
 
29
- def set_patient_by_id(self, patient_id: str) -> bool:
30
- """Set the current patient by FHIR Patient ID. Returns True if successful."""
31
- for i, patient in enumerate(self.patients):
32
- if patient.attrib.get("id") == patient_id:
33
- self.current_patient_idx = i
34
- return True
35
- return False
36
-
37
- def _get_current_patient(self) -> etree._Element:
38
  """Get the currently selected patient resource."""
39
  return self.patients[self.current_patient_idx]
40
 
41
  # Basic Identification Fields
42
  def get_id(self) -> str:
43
- """Extract FHIR Patient ID."""
44
- return self._get_current_patient().attrib.get("id", "")
45
-
46
- def get_resource_type(self) -> str:
47
- """Extract resource type (should always be 'Patient')."""
48
- return etree.QName(self._get_current_patient().tag).localname
49
-
50
- def get_meta_last_updated(self) -> str:
51
- """Extract last updated timestamp from meta."""
52
  patient = self._get_current_patient()
53
- last_updated = patient.xpath("fhir:meta/fhir:lastUpdated/@value", namespaces=self.ns)
54
- return last_updated[0] if last_updated else ""
 
 
 
55
 
56
- def get_meta_profile(self) -> List[str]:
57
- """Extract profile URIs from meta."""
58
  patient = self._get_current_patient()
59
- profiles = patient.xpath("fhir:meta/fhir:profile/@value", namespaces=self.ns)
60
- return profiles
 
 
61
 
62
- def get_text_div(self) -> str:
63
- """Extract generated text narrative (div content)."""
64
  patient = self._get_current_patient()
65
- div = patient.xpath("fhir:text/fhir:div", namespaces=self.ns)
66
- if div:
67
- return etree.tostring(div[0], encoding="unicode")
68
- return ""
 
69
 
70
  # Name Fields
71
  def get_first_name(self) -> str:
72
- """Extract patient's first name."""
73
  patient = self._get_current_patient()
74
- official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
75
- if official_names:
76
- given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns)
77
- if given:
78
- return given[0]
 
 
79
  return ""
80
 
81
  def get_last_name(self) -> str:
82
- """Extract patient's last name."""
83
- patient = self._get_current_patient()
84
- official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
85
- if official_names:
86
- family = official_names[0].xpath("fhir:family/@value", namespaces=self.ns)
87
- if family:
88
- return family[0]
89
- return ""
90
-
91
- def get_middle_initial(self) -> str:
92
- """Extract patient's middle initial (second given name initial if present)."""
93
  patient = self._get_current_patient()
94
- official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
95
- if official_names:
96
- given = official_names[0].xpath("fhir:given/@value", namespaces=self.ns)
97
- if len(given) > 1:
98
- return given[1][0]
 
 
99
  return ""
100
 
101
  def get_name_prefix(self) -> str:
102
- """Extract patient's name prefix (e.g., Mr., Mrs.)."""
103
- patient = self._get_current_patient()
104
- official_names = patient.xpath("fhir:name[fhir:use/@value='official']", namespaces=self.ns)
105
- if official_names:
106
- prefix = official_names[0].xpath("fhir:prefix/@value", namespaces=self.ns)
107
- if prefix:
108
- return prefix[0]
109
- return ""
110
-
111
- def get_maiden_name(self) -> str:
112
- """Extract patient's maiden name if available."""
113
  patient = self._get_current_patient()
114
- maiden_names = patient.xpath("fhir:name[fhir:use/@value='maiden']", namespaces=self.ns)
115
- if maiden_names:
116
- family = maiden_names[0].xpath("fhir:family/@value", namespaces=self.ns)
117
- if family:
118
- return family[0]
 
 
119
  return ""
120
 
121
  # Demographic Fields
122
  def get_dob(self) -> str:
123
- """Extract patient's date of birth."""
124
  patient = self._get_current_patient()
125
- dob = patient.xpath("fhir:birthDate/@value", namespaces=self.ns)
126
- return dob[0] if dob else ""
 
 
 
127
 
128
  def get_age(self) -> str:
129
- """Calculate patient's age based on birth date."""
130
  dob = self.get_dob()
131
  if not dob:
132
  return ""
133
- birth_date = datetime.strptime(dob, "%Y-%m-%d")
134
- today = datetime.now()
135
- age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
136
- return str(age)
 
 
 
137
 
138
  def get_gender(self) -> str:
139
- """Extract patient's gender."""
140
- patient = self._get_current_patient()
141
- gender = patient.xpath("fhir:gender/@value", namespaces=self.ns)
142
- return gender[0].capitalize() if gender else ""
143
-
144
- def get_birth_sex(self) -> str:
145
- """Extract patient's birth sex from extensions."""
146
  patient = self._get_current_patient()
147
- birth_sex = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-birthsex']/fhir:valueCode/@value", namespaces=self.ns)
148
- return birth_sex[0] if birth_sex else ""
149
-
150
- def get_multiple_birth(self) -> Union[bool, None]:
151
- """Extract multiple birth status."""
152
- patient = self._get_current_patient()
153
- multiple_birth = patient.xpath("fhir:multipleBirthBoolean/@value", namespaces=self.ns)
154
- return multiple_birth[0] == "true" if multiple_birth else None
155
 
156
  # Address Fields
157
  def get_address_line(self) -> str:
158
- """Extract patient's street address."""
159
  patient = self._get_current_patient()
160
- line = patient.xpath("fhir:address/fhir:line/@value", namespaces=self.ns)
161
- return line[0] if line else ""
 
 
 
 
162
 
163
  def get_city(self) -> str:
164
- """Extract patient's city."""
165
  patient = self._get_current_patient()
166
- city = patient.xpath("fhir:address/fhir:city/@value", namespaces=self.ns)
167
- return city[0] if city else ""
 
 
 
 
168
 
169
  def get_state(self) -> str:
170
- """Extract patient's state."""
171
  patient = self._get_current_patient()
172
- state = patient.xpath("fhir:address/fhir:state/@value", namespaces=self.ns)
173
- return state[0] if state else ""
 
 
 
 
174
 
175
  def get_zip_code(self) -> str:
176
- """Extract patient's postal code."""
177
- patient = self._get_current_patient()
178
- postal_code = patient.xpath("fhir:address/fhir:postalCode/@value", namespaces=self.ns)
179
- return postal_code[0] if postal_code else ""
180
-
181
- def get_country(self) -> str:
182
- """Extract patient's country."""
183
  patient = self._get_current_patient()
184
- country = patient.xpath("fhir:address/fhir:country/@value", namespaces=self.ns)
185
- return country[0] if country else ""
186
-
187
- def get_geolocation(self) -> Dict[str, float]:
188
- """Extract geolocation (latitude and longitude) from address extension."""
189
- patient = self._get_current_patient()
190
- lat = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='latitude']/fhir:valueDecimal/@value", namespaces=self.ns)
191
- lon = patient.xpath("fhir:address/fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/geolocation']/fhir:extension[@url='longitude']/fhir:valueDecimal/@value", namespaces=self.ns)
192
- return {
193
- "latitude": float(lat[0]) if lat else None,
194
- "longitude": float(lon[0]) if lon else None
195
- }
196
 
197
  # Contact Fields
198
  def get_phone(self) -> str:
199
- """Extract patient's phone number."""
200
- patient = self._get_current_patient()
201
- phone = patient.xpath("fhir:telecom[fhir:system/@value='phone' and fhir:use/@value='home']/fhir:value/@value", namespaces=self.ns)
202
- return phone[0] if phone else ""
203
-
204
- # Identifiers
205
- def get_identifiers(self) -> Dict[str, str]:
206
- """Extract all identifiers (e.g., SSN, MRN, Driver's License)."""
207
  patient = self._get_current_patient()
208
- id_dict = {}
209
- identifiers = patient.xpath("fhir:identifier", namespaces=self.ns)
210
- for id_entry in identifiers:
211
- id_type = id_entry.xpath("fhir:type/fhir:text/@value", namespaces=self.ns)
212
- id_value = id_entry.xpath("fhir:value/@value", namespaces=self.ns)
213
- if id_type and id_value:
214
- id_dict[id_type[0]] = id_value[0]
215
- return id_dict
216
 
217
- # Extensions
218
  def get_race(self) -> str:
219
- """Extract patient's race from extensions."""
220
  patient = self._get_current_patient()
221
- race = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-race']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns)
222
- return race[0] if race else ""
 
 
 
 
 
 
 
 
223
 
224
  def get_ethnicity(self) -> str:
225
- """Extract patient's ethnicity from extensions."""
226
  patient = self._get_current_patient()
227
- ethnicity = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity']/fhir:extension[@url='text']/fhir:valueString/@value", namespaces=self.ns)
228
- return ethnicity[0] if ethnicity else ""
229
-
230
- def get_mothers_maiden_name(self) -> str:
231
- """Extract patient's mother's maiden name from extensions."""
232
- patient = self._get_current_patient()
233
- mothers_maiden = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-mothersMaidenName']/fhir:valueString/@value", namespaces=self.ns)
234
- return mothers_maiden[0] if mothers_maiden else ""
235
-
236
- def get_birth_place(self) -> Dict[str, str]:
237
- """Extract patient's birth place from extensions."""
238
- patient = self._get_current_patient()
239
- birth_place = patient.xpath("fhir:extension[@url='http://hl7.org/fhir/StructureDefinition/patient-birthPlace']/fhir:valueAddress", namespaces=self.ns)
240
- if birth_place:
241
- city = birth_place[0].xpath("fhir:city/@value", namespaces=self.ns)
242
- state = birth_place[0].xpath("fhir:state/@value", namespaces=self.ns)
243
- country = birth_place[0].xpath("fhir:country/@value", namespaces=self.ns)
244
- return {
245
- "city": city[0] if city else "",
246
- "state": state[0] if state else "",
247
- "country": country[0] if country else ""
248
- }
249
- return {"city": "", "state": "", "country": ""}
250
-
251
- def get_disability_adjusted_life_years(self) -> Optional[float]:
252
- """Extract disability-adjusted life years from extensions."""
253
- patient = self._get_current_patient()
254
- daly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/disability-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns)
255
- return float(daly[0]) if daly else None
256
-
257
- def get_quality_adjusted_life_years(self) -> Optional[float]:
258
- """Extract quality-adjusted life years from extensions."""
259
- patient = self._get_current_patient()
260
- qaly = patient.xpath("fhir:extension[@url='http://synthetichealth.github.io/synthea/quality-adjusted-life-years']/fhir:valueDecimal/@value", namespaces=self.ns)
261
- return float(qaly[0]) if qaly else None
262
-
263
- # Marital Status
264
- def get_marital_status(self) -> str:
265
- """Extract patient's marital status."""
266
- patient = self._get_current_patient()
267
- status = patient.xpath("fhir:maritalStatus/fhir:text/@value", namespaces=self.ns)
268
- if status:
269
- return status[0]
270
- coding = patient.xpath("fhir:maritalStatus/fhir:coding/fhir:display/@value", namespaces=self.ns)
271
- return coding[0] if coding else ""
272
 
273
- # Communication
274
  def get_language(self) -> str:
275
- """Extract patient's preferred language."""
276
  patient = self._get_current_patient()
277
- language = patient.xpath("fhir:communication/fhir:language/fhir:text/@value", namespaces=self.ns)
278
- return language[0] if language else ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
279
 
280
  # Comprehensive Extraction
281
- def get_all_patient_data(self) -> Dict[str, Union[str, Dict, List, float, bool, None]]:
282
  """Extract all available data for the current patient."""
283
  return {
284
  "id": self.get_id(),
285
  "resource_type": self.get_resource_type(),
286
  "meta_last_updated": self.get_meta_last_updated(),
287
- "meta_profile": self.get_meta_profile(),
288
- "text_div": self.get_text_div(),
289
  "first_name": self.get_first_name(),
290
  "last_name": self.get_last_name(),
291
- "middle_initial": self.get_middle_initial(),
292
  "name_prefix": self.get_name_prefix(),
293
- "maiden_name": self.get_maiden_name(),
294
  "dob": self.get_dob(),
295
  "age": self.get_age(),
296
  "gender": self.get_gender(),
297
- "birth_sex": self.get_birth_sex(),
298
- "multiple_birth": self.get_multiple_birth(),
299
  "address_line": self.get_address_line(),
300
  "city": self.get_city(),
301
  "state": self.get_state(),
302
  "zip_code": self.get_zip_code(),
303
- "country": self.get_country(),
304
- "geolocation": self.get_geolocation(),
305
  "phone": self.get_phone(),
306
- "identifiers": self.get_identifiers(),
307
  "race": self.get_race(),
308
  "ethnicity": self.get_ethnicity(),
309
- "mothers_maiden_name": self.get_mothers_maiden_name(),
310
- "birth_place": self.get_birth_place(),
311
- "disability_adjusted_life_years": self.get_disability_adjusted_life_years(),
312
- "quality_adjusted_life_years": self.get_quality_adjusted_life_years(),
313
- "marital_status": self.get_marital_status(),
314
- "language": self.get_language()
315
  }
316
 
317
  def get_patient_dict(self) -> Dict[str, str]:
318
- """Return a dictionary of patient data mapped to discharge form fields (for app.py compatibility)."""
319
- patient_data = self.get_all_patient_data()
 
 
 
320
  return {
321
- "first_name": patient_data["first_name"],
322
- "last_name": patient_data["last_name"],
323
- "middle_initial": patient_data["middle_initial"],
324
- "dob": patient_data["dob"],
325
- "age": patient_data["age"],
326
- "sex": patient_data["gender"],
327
- "address": patient_data["address_line"],
328
- "city": patient_data["city"],
329
- "state": patient_data["state"],
330
- "zip_code": patient_data["zip_code"],
331
  "doctor_first_name": "",
332
  "doctor_last_name": "",
333
  "doctor_middle_initial": "",
@@ -336,21 +424,21 @@ class PatientDataExtractor:
336
  "doctor_city": "",
337
  "doctor_state": "",
338
  "doctor_zip": "",
339
- "admission_date": "",
340
  "referral_source": "",
341
  "admission_method": "",
342
- "discharge_date": "",
343
  "discharge_reason": "",
344
  "date_of_death": "",
345
- "diagnosis": "",
346
  "procedures": "",
347
- "medications": "",
348
  "preparer_name": "",
349
  "preparer_job_title": ""
350
  }
351
 
352
  def get_all_patients(self) -> List[Dict[str, str]]:
353
- """Return a list of dictionaries for all patients (for app.py)."""
354
  original_idx = self.current_patient_idx
355
  all_patients = []
356
  for i in range(len(self.patients)):
@@ -360,113 +448,6 @@ class PatientDataExtractor:
360
  return all_patients
361
 
362
  def get_patient_ids(self) -> List[str]:
363
- """Return a list of all patient IDs in the Bundle."""
364
- return [patient.attrib.get("id", "") for patient in self.patients]
365
-
366
- # # Example usage with integration into app.py
367
- # def integrate_with_app(patient_data: str):
368
- # """Integrate PatientDataExtractor with the Gradio app."""
369
- # extractor = PatientDataExtractor(patient_data)
370
-
371
- # # Function to populate form with selected patient's data
372
- # def populate_form(patient_id: str):
373
- # if extractor.set_patient_by_id(patient_id):
374
- # patient_dict = extractor.get_patient_dict()
375
- # return list(patient_dict.values()) # Return values in order expected by display_form
376
- # return [""] * 28 # Return empty values if patient not found
377
-
378
- # # Modify the Gradio app to include patient selection
379
- # with gr.Blocks() as demo:
380
- # gr.Markdown("# Patient Discharge Form with MeldRx Integration")
381
- # with gr.Tab("Authenticate with MeldRx"):
382
- # gr.Markdown("## SMART on FHIR Authentication")
383
- # auth_url_output = gr.Textbox(label="Authorization URL", value=CALLBACK_MANAGER.get_auth_url(), interactive=False)
384
- # gr.Markdown("Copy the URL above, open it in a browser, log in, and paste the 'code' from the redirect URL below.")
385
- # auth_code_input = gr.Textbox(label="Authorization Code")
386
- # auth_submit = gr.Button("Submit Code")
387
- # auth_result = gr.Textbox(label="Authentication Result")
388
- # patient_data_button = gr.Button("Fetch Patient Data")
389
- # patient_data_output = gr.Textbox(label="Patient Data")
390
- # auth_submit.click(fn=CALLBACK_MANAGER.set_auth_code, inputs=auth_code_input, outputs=auth_result)
391
- # patient_data_button.click(fn=CALLBACK_MANAGER.get_patient_data, inputs=None, outputs=patient_data_output)
392
-
393
- # with gr.Tab("Discharge Form"):
394
- # gr.Markdown("## Select Patient")
395
- # patient_dropdown = gr.Dropdown(choices=extractor.get_patient_ids(), label="Select Patient ID")
396
- # populate_button = gr.Button("Populate Form with Patient Data")
397
-
398
- # gr.Markdown("## Patient Details")
399
- # with gr.Row():
400
- # first_name = gr.Textbox(label="First Name")
401
- # last_name = gr.Textbox(label="Last Name")
402
- # middle_initial = gr.Textbox(label="Middle Initial")
403
- # with gr.Row():
404
- # dob = gr.Textbox(label="Date of Birth")
405
- # age = gr.Textbox(label="Age")
406
- # sex = gr.Textbox(label="Sex")
407
- # address = gr.Textbox(label="Address")
408
- # with gr.Row():
409
- # city = gr.Textbox(label="City")
410
- # state = gr.Textbox(label="State")
411
- # zip_code = gr.Textbox(label="Zip Code")
412
- # gr.Markdown("## Primary Healthcare Professional Details")
413
- # with gr.Row():
414
- # doctor_first_name = gr.Textbox(label="Doctor's First Name")
415
- # doctor_last_name = gr.Textbox(label="Doctor's Last Name")
416
- # doctor_middle_initial = gr.Textbox(label="Middle Initial")
417
- # hospital_name = gr.Textbox(label="Hospital/Clinic Name")
418
- # doctor_address = gr.Textbox(label="Address")
419
- # with gr.Row():
420
- # doctor_city = gr.Textbox(label="City")
421
- # doctor_state = gr.Textbox(label="State")
422
- # doctor_zip = gr.Textbox(label="Zip Code")
423
- # gr.Markdown("## Admission and Discharge Details")
424
- # with gr.Row():
425
- # admission_date = gr.Textbox(label="Date of Admission")
426
- # referral_source = gr.Textbox(label="Source of Referral")
427
- # admission_method = gr.Textbox(label="Method of Admission")
428
- # with gr.Row():
429
- # discharge_date = gr.Textbox(label="Date of Discharge")
430
- # discharge_reason = gr.Radio(["Treated", "Transferred", "Discharge Against Advice", "Patient Died"], label="Discharge Reason")
431
- # date_of_death = gr.Textbox(label="Date of Death (if applicable)")
432
- # gr.Markdown("## Diagnosis & Procedures")
433
- # diagnosis = gr.Textbox(label="Diagnosis")
434
- # procedures = gr.Textbox(label="Operation & Procedures")
435
- # gr.Markdown("## Medication Details")
436
- # medications = gr.Textbox(label="Medication on Discharge")
437
- # gr.Markdown("## Prepared By")
438
- # with gr.Row():
439
- # preparer_name = gr.Textbox(label="Name")
440
- # preparer_job_title = gr.Textbox(label="Job Title")
441
- # submit = gr.Button("Generate Form")
442
- # output = gr.Markdown()
443
-
444
- # # Inputs list for populate_form and display_form
445
- # inputs_list = [
446
- # first_name, last_name, middle_initial, dob, age, sex, address, city, state, zip_code,
447
- # doctor_first_name, doctor_last_name, doctor_middle_initial, hospital_name, doctor_address,
448
- # doctor_city, doctor_state, doctor_zip,
449
- # admission_date, referral_source, admission_method, discharge_date, discharge_reason, date_of_death,
450
- # diagnosis, procedures, medications, preparer_name, preparer_job_title
451
- # ]
452
-
453
- # # Populate form with patient data when button is clicked
454
- # populate_button.click(
455
- # fn=populate_form,
456
- # inputs=patient_dropdown,
457
- # outputs=inputs_list
458
- # )
459
-
460
- # # Generate the form output
461
- # submit.click(
462
- # display_form,
463
- # inputs=inputs_list,
464
- # outputs=output
465
- # )
466
-
467
- # return demo
468
 
469
- # # Assuming patient_data is the JSON string from your example
470
- # # patient_data = <your JSON string here>
471
- # # demo = integrate_with_app(patient_data)
472
- # # demo.launch()
 
1
+ import json
2
  import lxml.etree as etree
3
  from datetime import datetime
4
  from typing import List, Dict, Optional, Union
5
+ import base64
6
 
7
  class PatientDataExtractor:
8
+ """Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML)."""
9
 
10
+ def __init__(self, patient_data: str, format_type: str = None):
11
+ """Initialize with patient data and optional format type."""
12
+ self.format = format_type.lower() if format_type else self._detect_format(patient_data)
13
+ if self.format == "xml":
14
+ self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data
15
+ self.ns = {'hl7': 'urn:hl7-org:v3'}
16
+ elif self.format == "json":
17
+ self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
18
+ else:
19
+ raise ValueError("Unsupported format. Use 'xml' or 'json'")
20
+
21
  self.patients = self._extract_patients()
22
+ self.current_patient_idx = 0
23
+
24
+ def _detect_format(self, data: str) -> str:
25
+ """Detect the format of the input data."""
26
+ if isinstance(data, str):
27
+ data = data.strip()
28
+ if data.startswith('<'):
29
+ return 'xml'
30
+ elif data.startswith('{') or data.startswith('['):
31
+ return 'json'
32
+ raise ValueError("Cannot determine data format")
33
 
34
+ def _extract_patients(self) -> List:
35
+ """Extract all patient entries based on format."""
36
+ if self.format == "xml":
37
+ return [self.data] # C-CDA has one patient per document
38
+ elif self.format == "json":
39
+ if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
40
+ raise ValueError("Invalid FHIR Bundle format")
41
+ return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]
42
 
43
  def set_patient_by_index(self, index: int) -> bool:
44
+ """Set the current patient by index."""
45
  if 0 <= index < len(self.patients):
46
  self.current_patient_idx = index
47
  return True
48
  return False
49
 
50
+ def _get_current_patient(self):
 
 
 
 
 
 
 
 
51
  """Get the currently selected patient resource."""
52
  return self.patients[self.current_patient_idx]
53
 
54
  # Basic Identification Fields
55
  def get_id(self) -> str:
 
 
 
 
 
 
 
 
 
56
  patient = self._get_current_patient()
57
+ if self.format == "xml":
58
+ id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns)
59
+ return id_list[0] if id_list else ""
60
+ elif self.format == "json":
61
+ return patient.get("id", "")
62
 
63
+ def get_resource_type(self) -> str:
 
64
  patient = self._get_current_patient()
65
+ if self.format == "xml":
66
+ return "ClinicalDocument"
67
+ elif self.format == "json":
68
+ return patient.get("resourceType", "")
69
 
70
+ def get_meta_last_updated(self) -> str:
 
71
  patient = self._get_current_patient()
72
+ if self.format == "xml":
73
+ time_list = patient.xpath("//hl7:effectiveTime/@value", namespaces=self.ns)
74
+ return time_list[0] if time_list else ""
75
+ elif self.format == "json":
76
+ return patient.get("meta", {}).get("lastUpdated", "")
77
 
78
  # Name Fields
79
  def get_first_name(self) -> str:
 
80
  patient = self._get_current_patient()
81
+ if self.format == "xml":
82
+ given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns)
83
+ return given[0] if given else ""
84
+ elif self.format == "json":
85
+ for name in patient.get("name", []):
86
+ if name.get("use") == "official" and "given" in name:
87
+ return name["given"][0]
88
  return ""
89
 
90
  def get_last_name(self) -> str:
 
 
 
 
 
 
 
 
 
 
 
91
  patient = self._get_current_patient()
92
+ if self.format == "xml":
93
+ family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns)
94
+ return family[0] if family else ""
95
+ elif self.format == "json":
96
+ for name in patient.get("name", []):
97
+ if name.get("use") == "official" and "family" in name:
98
+ return name["family"]
99
  return ""
100
 
101
  def get_name_prefix(self) -> str:
 
 
 
 
 
 
 
 
 
 
 
102
  patient = self._get_current_patient()
103
+ if self.format == "xml":
104
+ prefix = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:prefix/text()", namespaces=self.ns)
105
+ return prefix[0] if prefix else ""
106
+ elif self.format == "json":
107
+ for name in patient.get("name", []):
108
+ if name.get("use") == "official" and "prefix" in name:
109
+ return name["prefix"][0]
110
  return ""
111
 
112
  # Demographic Fields
113
  def get_dob(self) -> str:
 
114
  patient = self._get_current_patient()
115
+ if self.format == "xml":
116
+ dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns)
117
+ return dob[0] if dob else ""
118
+ elif self.format == "json":
119
+ return patient.get("birthDate", "")
120
 
121
  def get_age(self) -> str:
 
122
  dob = self.get_dob()
123
  if not dob:
124
  return ""
125
+ try:
126
+ birth_date = datetime.strptime(dob[:8], "%Y%m%d")
127
+ today = datetime.now()
128
+ age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
129
+ return str(age)
130
+ except ValueError:
131
+ return ""
132
 
133
  def get_gender(self) -> str:
 
 
 
 
 
 
 
134
  patient = self._get_current_patient()
135
+ if self.format == "xml":
136
+ gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns)
137
+ return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else ""
138
+ elif self.format == "json":
139
+ return patient.get("gender", "").capitalize()
 
 
 
140
 
141
  # Address Fields
142
  def get_address_line(self) -> str:
 
143
  patient = self._get_current_patient()
144
+ if self.format == "xml":
145
+ line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns)
146
+ return line[0] if line else ""
147
+ elif self.format == "json":
148
+ addresses = patient.get("address", [])
149
+ return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""
150
 
151
  def get_city(self) -> str:
 
152
  patient = self._get_current_patient()
153
+ if self.format == "xml":
154
+ city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns)
155
+ return city[0] if city else ""
156
+ elif self.format == "json":
157
+ addresses = patient.get("address", [])
158
+ return addresses[0]["city"] if addresses and "city" in addresses[0] else ""
159
 
160
  def get_state(self) -> str:
 
161
  patient = self._get_current_patient()
162
+ if self.format == "xml":
163
+ state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns)
164
+ return state[0] if state else ""
165
+ elif self.format == "json":
166
+ addresses = patient.get("address", [])
167
+ return addresses[0]["state"] if addresses and "state" in addresses[0] else ""
168
 
169
  def get_zip_code(self) -> str:
 
 
 
 
 
 
 
170
  patient = self._get_current_patient()
171
+ if self.format == "xml":
172
+ zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns)
173
+ return zip[0] if zip else ""
174
+ elif self.format == "json":
175
+ addresses = patient.get("address", [])
176
+ return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
 
 
 
 
 
 
177
 
178
  # Contact Fields
179
  def get_phone(self) -> str:
 
 
 
 
 
 
 
 
180
  patient = self._get_current_patient()
181
+ if self.format == "xml":
182
+ telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns)
183
+ return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else ""
184
+ elif self.format == "json":
185
+ for telecom in patient.get("telecom", []):
186
+ if telecom.get("system") == "phone" and telecom.get("use") == "home":
187
+ return telecom.get("value", "")
188
+ return ""
189
 
190
+ # Extensions and Additional Fields
191
  def get_race(self) -> str:
 
192
  patient = self._get_current_patient()
193
+ if self.format == "xml":
194
+ race = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:raceCode/@displayName", namespaces=self.ns)
195
+ return race[0] if race else ""
196
+ elif self.format == "json":
197
+ for ext in patient.get("extension", []):
198
+ if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
199
+ for sub_ext in ext.get("extension", []):
200
+ if sub_ext.get("url") == "text":
201
+ return sub_ext.get("valueString", "")
202
+ return ""
203
 
204
  def get_ethnicity(self) -> str:
 
205
  patient = self._get_current_patient()
206
+ if self.format == "xml":
207
+ ethnicity = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:ethnicGroupCode/@displayName", namespaces=self.ns)
208
+ return ethnicity[0] if ethnicity else ""
209
+ elif self.format == "json":
210
+ for ext in patient.get("extension", []):
211
+ if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
212
+ for sub_ext in ext.get("extension", []):
213
+ if sub_ext.get("url") == "text":
214
+ return sub_ext.get("valueString", "")
215
+ return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
 
217
  def get_language(self) -> str:
 
218
  patient = self._get_current_patient()
219
+ if self.format == "xml":
220
+ lang = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:languageCommunication/hl7:languageCode/@code", namespaces=self.ns)
221
+ return lang[0] if lang else ""
222
+ elif self.format == "json":
223
+ comms = patient.get("communication", [])
224
+ return comms[0]["language"]["text"] if comms and "language" in comms[0] else ""
225
+
226
+ # Medications
227
+ def get_medications(self) -> List[Dict[str, str]]:
228
+ if self.format == "xml":
229
+ section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns)
230
+ if not section:
231
+ return []
232
+ meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
233
+ result = []
234
+ for med in meds:
235
+ start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
236
+ start = start_list[0] if start_list else ""
237
+ stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
238
+ stop = stop_list[0] if stop_list else "" # Safely handle missing <high>
239
+ desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
240
+ desc = desc_list[0] if desc_list else ""
241
+ code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
242
+ code = code_list[0] if code_list else ""
243
+ result.append({"start": start, "stop": stop, "description": desc, "code": code})
244
+ return result
245
+ elif self.format == "json":
246
+ entries = self.data.get("entry", [])
247
+ result = []
248
+ for entry in entries:
249
+ if entry["resource"]["resourceType"] == "MedicationRequest":
250
+ med = entry["resource"]
251
+ start = med.get("authoredOn", "")
252
+ stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "")
253
+ desc = med.get("medicationCodeableConcept", {}).get("text", "")
254
+ code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "")
255
+ result.append({"start": start, "stop": stop, "description": desc, "code": code})
256
+ return result
257
+
258
+ # Encounters
259
+ def get_encounters(self) -> List[Dict[str, str]]:
260
+ if self.format == "xml":
261
+ service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns)
262
+ if not service:
263
+ return []
264
+ start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
265
+ start = start_list[0] if start_list else ""
266
+ end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
267
+ end = end_list[0] if end_list else ""
268
+ return [{"start": start, "end": end, "description": "Patient Care", "code": ""}]
269
+ elif self.format == "json":
270
+ entries = self.data.get("entry", [])
271
+ result = []
272
+ for entry in entries:
273
+ if entry["resource"]["resourceType"] == "Encounter":
274
+ enc = entry["resource"]
275
+ start = enc.get("period", {}).get("start", "")
276
+ end = enc.get("period", {}).get("end", "")
277
+ desc = enc.get("type", [{}])[0].get("text", "")
278
+ code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "")
279
+ result.append({"start": start, "end": end, "description": desc, "code": code})
280
+ return result
281
+
282
+ # Conditions/Diagnoses
283
+ def get_conditions(self) -> List[Dict[str, str]]:
284
+ if self.format == "xml":
285
+ section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns)
286
+ if not section:
287
+ return []
288
+ entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else []
289
+ result = []
290
+ for entry in entries:
291
+ onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
292
+ onset = onset_list[0] if onset_list else ""
293
+ desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns)
294
+ desc = desc_list[0] if desc_list else ""
295
+ code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns)
296
+ code = code_list[0] if code_list else ""
297
+ result.append({"onset": onset, "description": desc, "code": code})
298
+ return result
299
+ elif self.format == "json":
300
+ entries = self.data.get("entry", [])
301
+ result = []
302
+ for entry in entries:
303
+ if entry["resource"]["resourceType"] == "Condition":
304
+ cond = entry["resource"]
305
+ onset = cond.get("onsetDateTime", "")
306
+ desc = cond.get("code", {}).get("text", "")
307
+ code = cond.get("code", {}).get("coding", [{}])[0].get("code", "")
308
+ result.append({"onset": onset, "description": desc, "code": code})
309
+ return result
310
+
311
+ # Immunizations
312
+ def get_immunizations(self) -> List[Dict[str, str]]:
313
+ if self.format == "xml":
314
+ section = self.data.xpath("//hl7:section[hl7:code/@code='11369-6']", namespaces=self.ns)
315
+ if not section:
316
+ return []
317
+ immunizations = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
318
+ result = []
319
+ for imm in immunizations:
320
+ date_list = imm.xpath(".//hl7:effectiveTime/@value", namespaces=self.ns)
321
+ date = date_list[0] if date_list else ""
322
+ desc_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
323
+ desc = desc_list[0] if desc_list else ""
324
+ code_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
325
+ code = code_list[0] if code_list else ""
326
+ result.append({"date": date, "description": desc, "code": code})
327
+ return result
328
+ elif self.format == "json":
329
+ entries = self.data.get("entry", [])
330
+ result = []
331
+ for entry in entries:
332
+ if entry["resource"]["resourceType"] == "Immunization":
333
+ imm = entry["resource"]
334
+ date = imm.get("occurrenceDateTime", "")
335
+ desc = imm.get("vaccineCode", {}).get("text", "")
336
+ code = imm.get("vaccineCode", {}).get("coding", [{}])[0].get("code", "")
337
+ result.append({"date": date, "description": desc, "code": code})
338
+ return result
339
+
340
+ # Diagnostic Reports
341
+ def get_diagnostic_reports(self) -> List[Dict[str, str]]:
342
+ if self.format == "xml":
343
+ section = self.data.xpath("//hl7:section[hl7:code/@code='30954-2']", namespaces=self.ns)
344
+ if not section:
345
+ return []
346
+ reports = section[0].xpath(".//hl7:organizer", namespaces=self.ns)
347
+ result = []
348
+ for report in reports:
349
+ start_list = report.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
350
+ start = start_list[0] if start_list else ""
351
+ desc_list = report.xpath(".//hl7:code/@displayName", namespaces=self.ns)
352
+ desc = desc_list[0] if desc_list else ""
353
+ code_list = report.xpath(".//hl7:code/@code", namespaces=self.ns)
354
+ code = code_list[0] if code_list else ""
355
+ result.append({"start": start, "description": desc, "code": code})
356
+ return result
357
+ elif self.format == "json":
358
+ entries = self.data.get("entry", [])
359
+ result = []
360
+ for entry in entries:
361
+ if entry["resource"]["resourceType"] == "DiagnosticReport":
362
+ report = entry["resource"]
363
+ start = report.get("effectiveDateTime", "")
364
+ desc = report.get("code", {}).get("text", "")
365
+ code = report.get("code", {}).get("coding", [{}])[0].get("code", "")
366
+ data = report.get("presentedForm", [{}])[0].get("data", "")
367
+ if data:
368
+ decoded = base64.b64decode(data).decode('utf-8')
369
+ result.append({"start": start, "description": desc, "code": code, "content": decoded})
370
+ else:
371
+ result.append({"start": start, "description": desc, "code": code})
372
+ return result
373
 
374
  # Comprehensive Extraction
375
+ def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]:
376
  """Extract all available data for the current patient."""
377
  return {
378
  "id": self.get_id(),
379
  "resource_type": self.get_resource_type(),
380
  "meta_last_updated": self.get_meta_last_updated(),
 
 
381
  "first_name": self.get_first_name(),
382
  "last_name": self.get_last_name(),
 
383
  "name_prefix": self.get_name_prefix(),
 
384
  "dob": self.get_dob(),
385
  "age": self.get_age(),
386
  "gender": self.get_gender(),
 
 
387
  "address_line": self.get_address_line(),
388
  "city": self.get_city(),
389
  "state": self.get_state(),
390
  "zip_code": self.get_zip_code(),
 
 
391
  "phone": self.get_phone(),
 
392
  "race": self.get_race(),
393
  "ethnicity": self.get_ethnicity(),
394
+ "language": self.get_language(),
395
+ "medications": self.get_medications(),
396
+ "encounters": self.get_encounters(),
397
+ "conditions": self.get_conditions(),
398
+ "immunizations": self.get_immunizations(),
399
+ "diagnostic_reports": self.get_diagnostic_reports()
400
  }
401
 
402
  def get_patient_dict(self) -> Dict[str, str]:
403
+ """Return a dictionary of patient data mapped to discharge form fields."""
404
+ data = self.get_all_patient_data()
405
+ latest_encounter = data["encounters"][-1] if data["encounters"] else {}
406
+ latest_condition = data["conditions"][-1] if data["conditions"] else {}
407
+ medications_str = "; ".join([m["description"] for m in data["medications"]])
408
  return {
409
+ "first_name": data["first_name"],
410
+ "last_name": data["last_name"],
411
+ "middle_initial": "",
412
+ "dob": data["dob"],
413
+ "age": data["age"],
414
+ "sex": data["gender"],
415
+ "address": data["address_line"],
416
+ "city": data["city"],
417
+ "state": data["state"],
418
+ "zip_code": data["zip_code"],
419
  "doctor_first_name": "",
420
  "doctor_last_name": "",
421
  "doctor_middle_initial": "",
 
424
  "doctor_city": "",
425
  "doctor_state": "",
426
  "doctor_zip": "",
427
+ "admission_date": latest_encounter.get("start", ""),
428
  "referral_source": "",
429
  "admission_method": "",
430
+ "discharge_date": latest_encounter.get("end", ""),
431
  "discharge_reason": "",
432
  "date_of_death": "",
433
+ "diagnosis": latest_condition.get("description", ""),
434
  "procedures": "",
435
+ "medications": medications_str,
436
  "preparer_name": "",
437
  "preparer_job_title": ""
438
  }
439
 
440
  def get_all_patients(self) -> List[Dict[str, str]]:
441
+ """Return a list of dictionaries for all patients."""
442
  original_idx = self.current_patient_idx
443
  all_patients = []
444
  for i in range(len(self.patients)):
 
448
  return all_patients
449
 
450
  def get_patient_ids(self) -> List[str]:
451
+ """Return a list of all patient IDs."""
452
+ return [self.get_id() for _ in self.patients]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
453