Ali2206 commited on
Commit
58a777c
·
verified ·
1 Parent(s): 029059d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +59 -82
app.py CHANGED
@@ -98,77 +98,66 @@ def extract_all_pages(file_path: str, progress_callback=None) -> str:
98
  logger.error("PDF processing error: %s", e)
99
  return f"PDF processing error: {str(e)}"
100
 
101
- def excel_to_ndjson(file_path: str) -> Generator[str, None, None]:
102
- """Stream Excel file as NDJSON for maximum performance"""
103
  try:
104
- # Use openpyxl in streaming mode
105
- with pd.ExcelFile(file_path, engine='openpyxl') as xls:
106
- for sheet_name in xls.sheet_names:
107
- for chunk in pd.read_excel(
108
- xls,
109
- sheet_name=sheet_name,
110
- header=None,
111
- dtype=str,
112
- chunksize=1000
113
- ):
114
- for _, row in chunk.iterrows():
115
- yield json.dumps({
116
- "sheet": sheet_name,
117
- "row": row.fillna("").astype(str).tolist()
118
- }) + "\n"
119
  except Exception as e:
120
- logger.error(f"Error streaming Excel: {e}")
121
- raise
122
 
123
- def csv_to_ndjson(file_path: str) -> Generator[str, None, None]:
124
- """Stream CSV file as NDJSON for maximum performance"""
125
  try:
126
- for chunk in pd.read_csv(
127
  file_path,
128
  header=None,
129
  dtype=str,
130
- chunksize=1000,
131
  encoding_errors='replace',
132
  on_bad_lines='skip'
133
- ):
134
- for _, row in chunk.iterrows():
135
- yield json.dumps({
136
- "row": row.fillna("").astype(str).tolist()
137
- }) + "\n"
 
 
138
  except Exception as e:
139
- logger.error(f"Error streaming CSV: {e}")
140
- raise
141
 
142
- def stream_file_to_json(file_path: str, file_type: str) -> Generator[str, None, None]:
143
- """Stream file content as JSON chunks"""
144
  try:
145
  if file_type == "pdf":
146
  text = extract_all_pages(file_path)
147
- yield json.dumps({
148
  "filename": os.path.basename(file_path),
149
  "content": text,
150
  "status": "initial"
151
- })
152
- elif file_type in ["csv", "xls", "xlsx"]:
153
- # Stream the file content
154
- yield json.dumps({
155
- "filename": os.path.basename(file_path),
156
- "streaming": True,
157
- "type": file_type
158
- })
159
-
160
- if file_type == "csv":
161
- stream_gen = csv_to_ndjson(file_path)
162
- else:
163
- stream_gen = excel_to_ndjson(file_path)
164
-
165
- for chunk in stream_gen:
166
- yield chunk
167
  else:
168
- yield json.dumps({"error": f"Unsupported file type: {file_type}"})
169
  except Exception as e:
170
  logger.error("Error processing %s: %s", os.path.basename(file_path), e)
171
- yield json.dumps({"error": f"Error processing {os.path.basename(file_path)}: {str(e)}"})
172
 
173
  def log_system_usage(tag=""):
174
  try:
@@ -272,15 +261,6 @@ def init_agent():
272
  logger.info("Agent Ready")
273
  return agent
274
 
275
- def batched(iterable, n):
276
- """Batch data into tuples of length n. The last batch may be shorter."""
277
- it = iter(iterable)
278
- while True:
279
- batch = list(islice(it, n))
280
- if not batch:
281
- return
282
- yield batch
283
-
284
  def create_ui(agent):
285
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
286
  gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
@@ -306,14 +286,15 @@ Patient Record Excerpt (Chunk {0} of {1}):
306
  file_hash_value = ""
307
 
308
  if files:
309
- # Process files in parallel with streaming
310
  with ThreadPoolExecutor(max_workers=4) as executor:
311
  futures = []
312
  for f in files:
313
  file_type = f.name.split(".")[-1].lower()
314
  futures.append(executor.submit(
315
- lambda f: list(stream_file_to_json(f.name, file_type)),
316
- f
 
317
  ))
318
 
319
  for future in as_completed(futures):
@@ -321,39 +302,35 @@ Patient Record Excerpt (Chunk {0} of {1}):
321
  extracted.extend(future.result())
322
  except Exception as e:
323
  logger.error(f"File processing error: {e}")
324
- extracted.append(json.dumps({
325
- "error": f"Error processing file: {str(e)}"
326
- }))
327
 
328
  file_hash_value = file_hash(files[0].name) if files else ""
329
  history.append({"role": "assistant", "content": "✅ File processing complete"})
330
  yield history, None, ""
331
 
 
 
 
332
  # Process chunks in parallel with dynamic batching
333
- chunk_size = 8000 # Larger chunks reduce overhead
 
334
  combined_response = ""
 
335
 
336
  try:
337
- # Convert extracted data to text chunks
338
- text_content = "\n".join(extracted)
339
- chunks = [text_content[i:i+chunk_size] for i in range(0, len(text_content), chunk_size)]
340
-
341
- # Process chunks in parallel batches
342
- batch_size = 4 # Optimal for most GPUs
343
- total_chunks = len(chunks)
344
-
345
- for batch_idx, batch_chunks in enumerate(batched(chunks, batch_size)):
346
  batch_prompts = [
347
  prompt_template.format(
348
- batch_idx * batch_size + i + 1,
349
- total_chunks,
350
- chunk=chunk[:6000] # Slightly larger context
351
  )
352
  for i, chunk in enumerate(batch_chunks)
353
  ]
354
 
355
- progress((batch_idx * batch_size) / total_chunks,
356
- desc=f"Analyzing batch {batch_idx + 1}/{(total_chunks + batch_size - 1) // batch_size}")
357
 
358
  # Process batch in parallel
359
  with ThreadPoolExecutor(max_workers=len(batch_prompts)) as executor:
@@ -381,7 +358,7 @@ Patient Record Excerpt (Chunk {0} of {1}):
381
  if cleaned:
382
  chunk_response += cleaned + " "
383
 
384
- combined_response += f"--- Analysis for Chunk {batch_idx * batch_size + 1} ---\n{chunk_response.strip()}\n"
385
  history[-1] = {"role": "assistant", "content": combined_response.strip()}
386
  yield history, None, ""
387
 
 
98
  logger.error("PDF processing error: %s", e)
99
  return f"PDF processing error: {str(e)}"
100
 
101
+ def excel_to_json(file_path: str) -> List[dict]:
102
+ """Convert Excel file to JSON data with proper error handling"""
103
  try:
104
+ # First try with openpyxl (faster for xlsx)
105
+ try:
106
+ df = pd.read_excel(file_path, engine='openpyxl', header=None, dtype=str)
107
+ except Exception:
108
+ # Fall back to xlrd if needed
109
+ df = pd.read_excel(file_path, engine='xlrd', header=None, dtype=str)
110
+
111
+ # Convert to list of lists
112
+ content = df.fillna("").astype(str).values.tolist()
113
+
114
+ return [{
115
+ "filename": os.path.basename(file_path),
116
+ "rows": content
117
+ }]
 
118
  except Exception as e:
119
+ logger.error(f"Error processing Excel file: {e}")
120
+ return [{"error": f"Error processing Excel file: {str(e)}"}]
121
 
122
+ def csv_to_json(file_path: str) -> List[dict]:
123
+ """Convert CSV file to JSON data with proper error handling"""
124
  try:
125
+ df = pd.read_csv(
126
  file_path,
127
  header=None,
128
  dtype=str,
 
129
  encoding_errors='replace',
130
  on_bad_lines='skip'
131
+ )
132
+ content = df.fillna("").astype(str).values.tolist()
133
+
134
+ return [{
135
+ "filename": os.path.basename(file_path),
136
+ "rows": content
137
+ }]
138
  except Exception as e:
139
+ logger.error(f"Error processing CSV file: {e}")
140
+ return [{"error": f"Error processing CSV file: {str(e)}"}]
141
 
142
+ def process_file(file_path: str, file_type: str) -> List[dict]:
143
+ """Process file based on type and return JSON data"""
144
  try:
145
  if file_type == "pdf":
146
  text = extract_all_pages(file_path)
147
+ return [{
148
  "filename": os.path.basename(file_path),
149
  "content": text,
150
  "status": "initial"
151
+ }]
152
+ elif file_type in ["xls", "xlsx"]:
153
+ return excel_to_json(file_path)
154
+ elif file_type == "csv":
155
+ return csv_to_json(file_path)
 
 
 
 
 
 
 
 
 
 
 
156
  else:
157
+ return [{"error": f"Unsupported file type: {file_type}"}]
158
  except Exception as e:
159
  logger.error("Error processing %s: %s", os.path.basename(file_path), e)
160
+ return [{"error": f"Error processing {os.path.basename(file_path)}: {str(e)}"}]
161
 
162
  def log_system_usage(tag=""):
163
  try:
 
261
  logger.info("Agent Ready")
262
  return agent
263
 
 
 
 
 
 
 
 
 
 
264
  def create_ui(agent):
265
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
266
  gr.Markdown("<h1 style='text-align: center;'>🩺 Clinical Oversight Assistant</h1>")
 
286
  file_hash_value = ""
287
 
288
  if files:
289
+ # Process files in parallel
290
  with ThreadPoolExecutor(max_workers=4) as executor:
291
  futures = []
292
  for f in files:
293
  file_type = f.name.split(".")[-1].lower()
294
  futures.append(executor.submit(
295
+ process_file,
296
+ f.name,
297
+ file_type
298
  ))
299
 
300
  for future in as_completed(futures):
 
302
  extracted.extend(future.result())
303
  except Exception as e:
304
  logger.error(f"File processing error: {e}")
305
+ extracted.append({"error": f"Error processing file: {str(e)}"})
 
 
306
 
307
  file_hash_value = file_hash(files[0].name) if files else ""
308
  history.append({"role": "assistant", "content": "✅ File processing complete"})
309
  yield history, None, ""
310
 
311
+ # Convert extracted data to text
312
+ text_content = "\n".join(json.dumps(item) for item in extracted)
313
+
314
  # Process chunks in parallel with dynamic batching
315
+ chunk_size = 8000
316
+ chunks = [text_content[i:i+chunk_size] for i in range(0, len(text_content), chunk_size)]
317
  combined_response = ""
318
+ batch_size = 4 # Optimal for most GPUs
319
 
320
  try:
321
+ for batch_idx in range(0, len(chunks), batch_size):
322
+ batch_chunks = chunks[batch_idx:batch_idx + batch_size]
 
 
 
 
 
 
 
323
  batch_prompts = [
324
  prompt_template.format(
325
+ batch_idx + i + 1,
326
+ len(chunks),
327
+ chunk=chunk[:6000]
328
  )
329
  for i, chunk in enumerate(batch_chunks)
330
  ]
331
 
332
+ progress((batch_idx) / len(chunks),
333
+ desc=f"Analyzing batch {(batch_idx // batch_size) + 1}/{(len(chunks) + batch_size - 1) // batch_size}")
334
 
335
  # Process batch in parallel
336
  with ThreadPoolExecutor(max_workers=len(batch_prompts)) as executor:
 
358
  if cleaned:
359
  chunk_response += cleaned + " "
360
 
361
+ combined_response += f"--- Analysis for Chunk {batch_idx + 1} ---\n{chunk_response.strip()}\n"
362
  history[-1] = {"role": "assistant", "content": combined_response.strip()}
363
  yield history, None, ""
364