Spaces:
Sleeping
Sleeping
import os | |
from io import BytesIO | |
import pandas as pd | |
from fastapi import APIRouter, UploadFile, File, HTTPException | |
from fastapi.responses import StreamingResponse, JSONResponse | |
from azure.core.credentials import AzureKeyCredential | |
from azure.ai.formrecognizer import DocumentAnalysisClient | |
from dotenv import load_dotenv | |
from docx import Document | |
import re | |
# Load environment variables | |
load_dotenv() | |
router = APIRouter() | |
async def convert_to_markdown(file: UploadFile = File(...)): | |
""" | |
Convert a PDF file to markdown format. | |
Args: | |
file: The PDF file to convert | |
Returns: | |
StreamingResponse: Markdown file | |
""" | |
try: | |
# Read the uploaded file content | |
content = await file.read() | |
# Save the content to a temporary file | |
temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
with open(temp_pdf_path, "wb") as f: | |
f.write(content) | |
# Analyze the document | |
result = analyze_document(temp_pdf_path) | |
# Create markdown file | |
temp_md_path = "temp.md" | |
create_markdown_file(result, temp_md_path) | |
# Read the markdown file | |
with open(temp_md_path, "rb") as f: | |
markdown_content = f.read() | |
# Clean up temporary files | |
os.remove(temp_pdf_path) | |
os.remove(temp_md_path) | |
# Return the markdown file as a download | |
return StreamingResponse( | |
BytesIO(markdown_content), | |
media_type="text/markdown", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.md" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def convert_to_excel(file: UploadFile = File(...)): | |
""" | |
Convert tables from markdown to Excel format. | |
Args: | |
file: The markdown file to convert | |
Returns: | |
StreamingResponse: Excel file containing all tables | |
""" | |
try: | |
# Read the markdown content | |
content = await file.read() | |
# Save the content to a temporary file | |
temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
with open(temp_pdf_path, "wb") as f: | |
f.write(content) | |
# Analyze the document | |
result = analyze_document(temp_pdf_path) | |
tables = [] | |
for table in result.tables: | |
table_data = [] | |
for cell in table.cells: | |
table_data.append({ | |
"row_index": cell.row_index, | |
"column_index": cell.column_index, | |
"text": cell.content | |
}) | |
tables.append(table_data) | |
# Create Excel file | |
excel_buffer = create_excel_from_markdown_tables(tables) | |
# Return the Excel file as a download | |
return StreamingResponse( | |
excel_buffer, | |
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}_tables.xlsx" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def convert_to_word(file: UploadFile = File(...)): | |
""" | |
Convert markdown to Word document format. | |
Args: | |
file: The markdown file to convert | |
Returns: | |
StreamingResponse: Word document file | |
""" | |
try: | |
# Read the uploaded file content | |
content = await file.read() | |
# Save the content to a temporary file | |
temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
with open(temp_pdf_path, "wb") as f: | |
f.write(content) | |
# Analyze the document | |
result = analyze_document(temp_pdf_path) | |
# Create word file | |
temp_word_path = "temp.docx" | |
create_word_file(result, temp_word_path) | |
# Read the word file | |
with open(temp_word_path, "rb") as f: | |
word_content = f.read() | |
# Clean up temporary files | |
os.remove(temp_pdf_path) | |
os.remove(temp_word_path) | |
# Return the Word file as a download | |
return StreamingResponse( | |
BytesIO(word_content), | |
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
headers={ | |
"Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.docx" | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def analyze_document(file_path): | |
"""Analyze document using Azure Form Recognizer""" | |
endpoint = "https://aal-ocr-ai-azureapi.cognitiveservices.azure.com/" | |
# endpoint = "https://zzaocrtool.cognitiveservices.azure.com/" | |
key = os.getenv("AZURE_FORM_RECOGNIZER_KEY") | |
document_analysis_client = DocumentAnalysisClient( | |
endpoint=endpoint, credential=AzureKeyCredential(key) | |
) | |
with open(file_path, "rb") as f: | |
poller = document_analysis_client.begin_analyze_document( | |
"prebuilt-layout", document=f | |
) | |
result = poller.result() | |
return result | |
def extract_tables_from_markdown(markdown_text): | |
"""Extract tables from markdown text""" | |
tables = [] | |
current_table = [] | |
lines = markdown_text.split('\n') | |
in_table = False | |
for line in lines: | |
if '|' in line: | |
# Skip separator lines (e.g., |---|---|) | |
if re.match(r'^[\s|:-]+$', line): | |
continue | |
# Process table row | |
cells = [cell.strip() for cell in line.split('|')[1:-1]] | |
if cells: | |
if not in_table: | |
in_table = True | |
current_table.append(cells) | |
else: | |
if in_table: | |
if current_table: | |
tables.append(current_table) | |
current_table = [] | |
in_table = False | |
# Add the last table if exists | |
if current_table: | |
tables.append(current_table) | |
return tables | |
def create_excel_from_markdown_tables(tables): | |
"""Create Excel file from markdown tables""" | |
excel_buffer = BytesIO() | |
with pd.ExcelWriter(excel_buffer, engine='xlsxwriter') as writer: | |
for i, table in enumerate(tables): | |
df = pd.DataFrame(table) | |
df_pivot = df.pivot(index='row_index', columns='column_index', values='text') | |
sheet_name = f'Sheet{i+1}' | |
df_pivot.to_excel(writer, sheet_name=sheet_name, index=False) | |
excel_buffer.seek(0) | |
return excel_buffer | |
def create_markdown_file(result, output_file): | |
"""Create markdown file from analysis result""" | |
with open(output_file, 'w', encoding='utf-8') as md_file: | |
for page in result.pages: | |
# md_file.write(f"### Page {page.page_number}\n\n") | |
elements = [] | |
elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.01, 'paragraph', paragraph) | |
for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
elements.sort(key=lambda x: x[0]) | |
page_width = page.width / 2 | |
min_distance = float('inf') | |
title_paragraph = None | |
for element in elements[:5]: | |
if element[1] == 'paragraph': | |
paragraph = element[2] | |
midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
if distance < min_distance: | |
min_distance = distance | |
title_paragraph = paragraph | |
if title_paragraph: | |
elements = [element for element in elements if element[2] != title_paragraph] | |
md_file.write(f"# {title_paragraph.content}\n\n") | |
elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.01, 'table', table) | |
for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
elements.sort(key=lambda x: x[0]) | |
table_cells = set() | |
for _, element_type, element in elements: | |
if element_type == 'paragraph': | |
if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables if table.bounding_regions[0].page_number == page.page_number): | |
continue | |
content = element.content.replace(":selected:", "").replace(":unselected:", "") | |
md_file.write(f"{content}\n\n") | |
elif element_type == 'table': | |
for row_idx in range(element.row_count): | |
row_content = "| " | |
for col_idx in range(element.column_count): | |
cell_content = "" | |
for cell in element.cells: | |
if cell.row_index == row_idx and cell.column_index == col_idx: | |
cell_content = cell.content.replace(":selected:", "").replace(":unselected:", "") | |
table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
break | |
row_content += f"{cell_content} | " | |
md_file.write(row_content + "\n") | |
md_file.write("\n") | |
def create_word_file(result, output_file): | |
"""Create Word document from analysis result""" | |
# Create a new Word document | |
doc = Document() | |
# Analyze pages | |
for page in result.pages: | |
doc.add_heading(f"File Page {page.page_number}", level=2) | |
# Combine paragraphs, tables, and selection marks in the order they appear on the page | |
elements = [] | |
elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.01, 'paragraph', paragraph) | |
for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
elements.sort(key=lambda x: x[0]) | |
# Find the paragraph which is possible to be document title | |
page_width = page.width / 2 | |
min_distance = float('inf') | |
title_paragraph = None | |
for element in elements[:5]: | |
if element[1] == 'paragraph': | |
paragraph = element[2] | |
midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
if distance < min_distance: | |
min_distance = distance | |
title_paragraph = paragraph | |
if title_paragraph: | |
elements = [element for element in elements if element[2] != title_paragraph] | |
title = title_paragraph | |
doc.add_heading(title.content, level=1) | |
# Continuous combine paragraphs, tables, and selection marks in the order they appear on the page | |
elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.01, 'table', table) | |
for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
# Sort elements by the sum of their horizontal and vertical positions on the page | |
elements.sort(key=lambda x: x[0]) | |
# Track table cells to avoid duplicating content | |
table_cells = set() | |
for _, element_type, element in elements: | |
if element_type == 'paragraph': | |
# Skip lines that are part of a table | |
if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables if table.bounding_regions[0].page_number == page.page_number): | |
continue | |
content = element.content.replace(":selected:", "").replace(":unselected:", "") | |
doc.add_paragraph(content) | |
elif element_type == 'table': | |
table = doc.add_table(rows=element.row_count, cols=element.column_count) | |
table.style = 'Table Grid' | |
for row_idx in range(element.row_count): | |
row_cells = table.rows[row_idx].cells | |
for col_idx in range(element.column_count): | |
cell_content = "" | |
for cell in element.cells: | |
if cell.row_index == row_idx and cell.column_index == col_idx: | |
cell_content = cell.content.replace(":selected:", "").replace(":unselected:", "") | |
table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
break | |
row_cells[col_idx].text = cell_content | |
# Save Word document | |
doc.save(output_file) | |
def format_polygon(polygon): | |
"""Format polygon coordinates to string""" | |
if not polygon: | |
return "N/A" | |
return ", ".join([f"[{p.x}, {p.y}]" for p in polygon]) | |
def get_table_max_polygon(table): | |
# first coordination | |
first_coordinate = table.bounding_regions[0].polygon[0] | |
# last coordination | |
last_coordinate = table.bounding_regions[0].polygon[2] | |
# return max polygon | |
return [first_coordinate, last_coordinate] | |
def is_element_inside_table(element, table_max_polygon): | |
# midpoint of the cell is inside table | |
element_x = (element.bounding_regions[0].polygon[0].x + element.bounding_regions[0].polygon[2].x)/2 | |
element_y = (element.bounding_regions[0].polygon[0].y + element.bounding_regions[0].polygon[2].y)/2 | |
first_coordinate = table_max_polygon[0] | |
last_coordinate = table_max_polygon[1] # no.3 and no.4 coordination!!!! need help here correct error | |
return (first_coordinate.x <= element_x <= last_coordinate.x and | |
first_coordinate.y <= element_y <= last_coordinate.y) |