Spaces:
Sleeping
Sleeping
| import os | |
| from io import BytesIO | |
| import pandas as pd | |
| from fastapi import APIRouter, UploadFile, File, HTTPException | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.formrecognizer import DocumentAnalysisClient | |
| from dotenv import load_dotenv | |
| from docx import Document | |
| import re | |
| # Load environment variables | |
| load_dotenv() | |
| router = APIRouter() | |
| async def convert_to_markdown(file: UploadFile = File(...)): | |
| """ | |
| Convert a PDF file to markdown format. | |
| Args: | |
| file: The PDF file to convert | |
| Returns: | |
| StreamingResponse: Markdown file | |
| """ | |
| try: | |
| # Read the uploaded file content | |
| content = await file.read() | |
| # Save the content to a temporary file | |
| temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
| with open(temp_pdf_path, "wb") as f: | |
| f.write(content) | |
| # Analyze the document | |
| result = analyze_document(temp_pdf_path) | |
| # Create markdown file | |
| temp_md_path = "temp.md" | |
| create_markdown_file(result, temp_md_path) | |
| # Read the markdown file | |
| with open(temp_md_path, "rb") as f: | |
| markdown_content = f.read() | |
| # Clean up temporary files | |
| os.remove(temp_pdf_path) | |
| os.remove(temp_md_path) | |
| # Return the markdown file as a download | |
| return StreamingResponse( | |
| BytesIO(markdown_content), | |
| media_type="text/markdown", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.md" | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def convert_to_excel(file: UploadFile = File(...)): | |
| """ | |
| Convert tables from markdown to Excel format. | |
| Args: | |
| file: The markdown file to convert | |
| Returns: | |
| StreamingResponse: Excel file containing all tables | |
| """ | |
| try: | |
| # Read the markdown content | |
| content = await file.read() | |
| # Save the content to a temporary file | |
| temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
| with open(temp_pdf_path, "wb") as f: | |
| f.write(content) | |
| # Analyze the document | |
| result = analyze_document(temp_pdf_path) | |
| tables = [] | |
| for table in result.tables: | |
| table_data = [] | |
| for cell in table.cells: | |
| table_data.append({ | |
| "row_index": cell.row_index, | |
| "column_index": cell.column_index, | |
| "text": cell.content | |
| }) | |
| tables.append(table_data) | |
| # Create Excel file | |
| excel_buffer = create_excel_from_markdown_tables(tables) | |
| # Return the Excel file as a download | |
| return StreamingResponse( | |
| excel_buffer, | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}_tables.xlsx" | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def convert_to_word(file: UploadFile = File(...)): | |
| """ | |
| Convert markdown to Word document format. | |
| Args: | |
| file: The markdown file to convert | |
| Returns: | |
| StreamingResponse: Word document file | |
| """ | |
| try: | |
| # Read the uploaded file content | |
| content = await file.read() | |
| # Save the content to a temporary file | |
| temp_pdf_path = "temp." + file.filename.split('.')[-1] | |
| with open(temp_pdf_path, "wb") as f: | |
| f.write(content) | |
| # Analyze the document | |
| result = analyze_document(temp_pdf_path) | |
| # Create word file | |
| temp_word_path = "temp.docx" | |
| create_word_file(result, temp_word_path) | |
| # Read the word file | |
| with open(temp_word_path, "rb") as f: | |
| word_content = f.read() | |
| # Clean up temporary files | |
| os.remove(temp_pdf_path) | |
| os.remove(temp_word_path) | |
| # Return the Word file as a download | |
| return StreamingResponse( | |
| BytesIO(word_content), | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={file.filename.rsplit('.', 1)[0]}.docx" | |
| } | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def analyze_document(file_path): | |
| """Analyze document using Azure Form Recognizer""" | |
| endpoint = "https://aal-ocr-ai-azureapi.cognitiveservices.azure.com/" | |
| # endpoint = "https://zzaocrtool.cognitiveservices.azure.com/" | |
| key = os.getenv("AZURE_FORM_RECOGNIZER_KEY") | |
| document_analysis_client = DocumentAnalysisClient( | |
| endpoint=endpoint, credential=AzureKeyCredential(key) | |
| ) | |
| with open(file_path, "rb") as f: | |
| poller = document_analysis_client.begin_analyze_document( | |
| "prebuilt-layout", document=f | |
| ) | |
| result = poller.result() | |
| return result | |
| def extract_tables_from_markdown(markdown_text): | |
| """Extract tables from markdown text""" | |
| tables = [] | |
| current_table = [] | |
| lines = markdown_text.split('\n') | |
| in_table = False | |
| for line in lines: | |
| if '|' in line: | |
| # Skip separator lines (e.g., |---|---|) | |
| if re.match(r'^[\s|:-]+$', line): | |
| continue | |
| # Process table row | |
| cells = [cell.strip() for cell in line.split('|')[1:-1]] | |
| if cells: | |
| if not in_table: | |
| in_table = True | |
| current_table.append(cells) | |
| else: | |
| if in_table: | |
| if current_table: | |
| tables.append(current_table) | |
| current_table = [] | |
| in_table = False | |
| # Add the last table if exists | |
| if current_table: | |
| tables.append(current_table) | |
| return tables | |
| def create_excel_from_markdown_tables(tables): | |
| """Create Excel file from markdown tables""" | |
| excel_buffer = BytesIO() | |
| with pd.ExcelWriter(excel_buffer, engine='xlsxwriter') as writer: | |
| for i, table in enumerate(tables): | |
| df = pd.DataFrame(table) | |
| df_pivot = df.pivot(index='row_index', columns='column_index', values='text') | |
| sheet_name = f'Sheet{i+1}' | |
| df_pivot.to_excel(writer, sheet_name=sheet_name, index=False) | |
| excel_buffer.seek(0) | |
| return excel_buffer | |
| def create_markdown_file(result, output_file): | |
| """Create markdown file from analysis result""" | |
| with open(output_file, 'w', encoding='utf-8') as md_file: | |
| for page in result.pages: | |
| # md_file.write(f"### Page {page.page_number}\n\n") | |
| elements = [] | |
| elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.01, 'paragraph', paragraph) | |
| for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
| elements.sort(key=lambda x: x[0]) | |
| page_width = page.width / 2 | |
| min_distance = float('inf') | |
| title_paragraph = None | |
| for element in elements[:5]: | |
| if element[1] == 'paragraph': | |
| paragraph = element[2] | |
| midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
| midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
| distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
| if distance < min_distance: | |
| min_distance = distance | |
| title_paragraph = paragraph | |
| if title_paragraph: | |
| elements = [element for element in elements if element[2] != title_paragraph] | |
| md_file.write(f"# {title_paragraph.content}\n\n") | |
| elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.01, 'table', table) | |
| for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
| elements.sort(key=lambda x: x[0]) | |
| table_cells = set() | |
| for _, element_type, element in elements: | |
| if element_type == 'paragraph': | |
| if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables if table.bounding_regions[0].page_number == page.page_number): | |
| continue | |
| content = element.content.replace(":selected:", "").replace(":unselected:", "") | |
| md_file.write(f"{content}\n\n") | |
| elif element_type == 'table': | |
| for row_idx in range(element.row_count): | |
| row_content = "| " | |
| for col_idx in range(element.column_count): | |
| cell_content = "" | |
| for cell in element.cells: | |
| if cell.row_index == row_idx and cell.column_index == col_idx: | |
| cell_content = cell.content.replace(":selected:", "").replace(":unselected:", "") | |
| table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
| break | |
| row_content += f"{cell_content} | " | |
| md_file.write(row_content + "\n") | |
| md_file.write("\n") | |
| def create_word_file(result, output_file): | |
| """Create Word document from analysis result""" | |
| # Create a new Word document | |
| doc = Document() | |
| # Analyze pages | |
| for page in result.pages: | |
| doc.add_heading(f"File Page {page.page_number}", level=2) | |
| # Combine paragraphs, tables, and selection marks in the order they appear on the page | |
| elements = [] | |
| elements.extend([(paragraph.bounding_regions[0].polygon[0].y + paragraph.bounding_regions[0].polygon[0].x*0.01, 'paragraph', paragraph) | |
| for paragraph in result.paragraphs if paragraph.bounding_regions[0].page_number == page.page_number]) | |
| elements.sort(key=lambda x: x[0]) | |
| # Find the paragraph which is possible to be document title | |
| page_width = page.width / 2 | |
| min_distance = float('inf') | |
| title_paragraph = None | |
| for element in elements[:5]: | |
| if element[1] == 'paragraph': | |
| paragraph = element[2] | |
| midpoint_x = (paragraph.bounding_regions[0].polygon[0].x + paragraph.bounding_regions[0].polygon[1].x) / 2 | |
| midpoint_y = paragraph.bounding_regions[0].polygon[0].y | |
| distance = ((midpoint_x - page_width) ** 2 + midpoint_y ** 2) ** 0.5 | |
| if distance < min_distance: | |
| min_distance = distance | |
| title_paragraph = paragraph | |
| if title_paragraph: | |
| elements = [element for element in elements if element[2] != title_paragraph] | |
| title = title_paragraph | |
| doc.add_heading(title.content, level=1) | |
| # Continuous combine paragraphs, tables, and selection marks in the order they appear on the page | |
| elements.extend([(table.bounding_regions[0].polygon[0].y + table.bounding_regions[0].polygon[0].x*0.01, 'table', table) | |
| for table in result.tables if table.bounding_regions[0].page_number == page.page_number]) | |
| # Sort elements by the sum of their horizontal and vertical positions on the page | |
| elements.sort(key=lambda x: x[0]) | |
| # Track table cells to avoid duplicating content | |
| table_cells = set() | |
| for _, element_type, element in elements: | |
| if element_type == 'paragraph': | |
| # Skip lines that are part of a table | |
| if any(is_element_inside_table(element, get_table_max_polygon(table)) for table in result.tables if table.bounding_regions[0].page_number == page.page_number): | |
| continue | |
| content = element.content.replace(":selected:", "").replace(":unselected:", "") | |
| doc.add_paragraph(content) | |
| elif element_type == 'table': | |
| table = doc.add_table(rows=element.row_count, cols=element.column_count) | |
| table.style = 'Table Grid' | |
| for row_idx in range(element.row_count): | |
| row_cells = table.rows[row_idx].cells | |
| for col_idx in range(element.column_count): | |
| cell_content = "" | |
| for cell in element.cells: | |
| if cell.row_index == row_idx and cell.column_index == col_idx: | |
| cell_content = cell.content.replace(":selected:", "").replace(":unselected:", "") | |
| table_cells.add((cell.bounding_regions[0].polygon[0].x, cell.bounding_regions[0].polygon[0].y)) | |
| break | |
| row_cells[col_idx].text = cell_content | |
| # Save Word document | |
| doc.save(output_file) | |
| def format_polygon(polygon): | |
| """Format polygon coordinates to string""" | |
| if not polygon: | |
| return "N/A" | |
| return ", ".join([f"[{p.x}, {p.y}]" for p in polygon]) | |
| def get_table_max_polygon(table): | |
| # first coordination | |
| first_coordinate = table.bounding_regions[0].polygon[0] | |
| # last coordination | |
| last_coordinate = table.bounding_regions[0].polygon[2] | |
| # return max polygon | |
| return [first_coordinate, last_coordinate] | |
| def is_element_inside_table(element, table_max_polygon): | |
| # midpoint of the cell is inside table | |
| element_x = (element.bounding_regions[0].polygon[0].x + element.bounding_regions[0].polygon[2].x)/2 | |
| element_y = (element.bounding_regions[0].polygon[0].y + element.bounding_regions[0].polygon[2].y)/2 | |
| first_coordinate = table_max_polygon[0] | |
| last_coordinate = table_max_polygon[1] # no.3 and no.4 coordination!!!! need help here correct error | |
| return (first_coordinate.x <= element_x <= last_coordinate.x and | |
| first_coordinate.y <= element_y <= last_coordinate.y) |