Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import shutil | |
| import re | |
| from PyPDF2 import PdfReader, PdfWriter | |
| import pandas as pd | |
| import camelot | |
| import openpyxl | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| from openpyxl.styles import numbers | |
| from openpyxl.worksheet.table import Table, TableStyleInfo | |
| def extract_pages(pdf_path, start_page, end_page, output_path): | |
| reader = PdfReader(pdf_path) | |
| writer = PdfWriter() | |
| for page_num in range(start_page, end_page + 1): | |
| if page_num <= len(reader.pages): | |
| writer.add_page(reader.pages[page_num - 1]) | |
| with open(output_path, 'wb') as output_pdf_file: | |
| writer.write(output_pdf_file) | |
| def reduce_pdf(pdf_folder,reduced_pdf_folder): | |
| if os.path.exists(reduced_pdf_folder): | |
| shutil.rmtree(reduced_pdf_folder) | |
| os.makedirs(reduced_pdf_folder) | |
| for filename in os.listdir(pdf_folder): | |
| if filename.endswith('.pdf'): | |
| match = re.search(r'_CbCR_(\d+)(?:-(\d+))?', filename) | |
| if match: | |
| start_page = int(match.group(1)) | |
| end_page = int(match.group(2)) if match.group(2) else start_page | |
| base_name = re.sub(r'_CbCR_\d+(?:-\d+)?\.pdf$', '_CbCR.pdf', filename) | |
| pdf_path = os.path.join(pdf_folder, filename) | |
| output_path = os.path.join(reduced_pdf_folder, base_name) | |
| extract_pages(pdf_path, start_page, end_page, output_path) | |
| print(f'Processed {filename} -> {base_name}') | |
| def extract_tables_camelot(pdf_path): | |
| # Extract tables from the PDF file using Camelot | |
| tables = camelot.read_pdf(pdf_path, pages='all',flavor='stream') | |
| return tables | |
| def get_numeric_count(row): | |
| # Get the number of numerical values in a row | |
| return sum(1 for x in row if (pd.notna(pd.to_numeric(x.replace(",", "").strip('()'), errors='coerce')) or x in ['-','–'])) | |
| def convert_to_numeric(value): | |
| if isinstance(value, str) and value.startswith('(') and value.endswith(')'): | |
| value = '-' + value[1:-1] | |
| if all(char.isdigit() or char in '-,.' for char in str(value)): | |
| cleaned_value = pd.to_numeric(value.replace(',', ''), errors='coerce') | |
| return cleaned_value | |
| return value | |
| def get_headers(dataframes): | |
| # Get the dataframe columns name | |
| if len(dataframes) >= 2: | |
| df_for_columns_names = dataframes[1] | |
| else: | |
| df_for_columns_names = dataframes[0] | |
| for i, row in df_for_columns_names.iterrows(): | |
| numeric_count = get_numeric_count(row) | |
| if numeric_count >= 2: | |
| first_numeric_idx = i | |
| break | |
| df_for_columns_names = df_for_columns_names.astype(str).where(pd.notna(df_for_columns_names), "") | |
| new_header = [" ".join(filter(None, df_for_columns_names.iloc[:first_numeric_idx, col].values)) for col in range(df_for_columns_names.shape[1])] | |
| return new_header | |
| def clean_dataframe(df,header): | |
| # Rule : if a row is not numerical, merge it with the next numerical one | |
| df.columns = header | |
| first_numeric_idx = None | |
| for i, row in df.iterrows(): | |
| numeric_count = get_numeric_count(row) | |
| if numeric_count >= 2: | |
| first_numeric_idx = i | |
| break | |
| df = df.iloc[first_numeric_idx:] | |
| df = df.reset_index(drop=True) | |
| merged_rows = [] | |
| buffer = None | |
| for i in range(len(df)): | |
| row = df.iloc[i] | |
| numeric_count = get_numeric_count(row) | |
| if numeric_count < 2: | |
| if buffer is None: | |
| buffer = list(df.iloc[i].copy()) | |
| else: | |
| buffer = [ | |
| " ".join(filter(lambda x: x not in [None, "None", ""], [buffer[j], df.iloc[i, j]])) | |
| for j in range(df.shape[1]) | |
| ] | |
| merged_rows.append(i) | |
| else: | |
| if buffer is not None: | |
| df.iloc[i] = [ | |
| " ".join(filter(lambda x: x not in [None, "None", ""], [buffer[j], df.iloc[i, j]])) | |
| for j in range(df.shape[1]) | |
| ] | |
| buffer = None | |
| clean_df = df.drop(merged_rows).reset_index(drop=True) | |
| return clean_df | |
| def clean_and_concatenate_tables(tables): | |
| dataframes = [table.df for table in tables] | |
| for i in range(len(dataframes)): | |
| df = dataframes[i] | |
| row_counts = df.apply(lambda row: row.notna().sum() - (row.astype(str) == "").sum(), axis=1) | |
| col_counts = df.apply(lambda col: col.notna().sum() - (col.astype(str) == "").sum(), axis=0) | |
| dataframes[i] = df.loc[row_counts >= 1, col_counts >= 3].reset_index(drop = True) | |
| new_header = get_headers(dataframes) | |
| cleaned_dfs = [] | |
| for df in dataframes: | |
| cleaned_dfs.append(clean_dataframe(df,new_header)) | |
| concatenated_df = pd.concat(cleaned_dfs, ignore_index=True) | |
| return concatenated_df | |
| def convert_to_excel(reduced_pdf_folder, output_folder): | |
| if os.path.exists(output_folder): | |
| shutil.rmtree(output_folder) | |
| os.makedirs(output_folder) | |
| for filename in os.listdir(reduced_pdf_folder): | |
| if filename.endswith('.pdf'): | |
| pdf_path = os.path.join(reduced_pdf_folder, filename) | |
| tables = extract_tables_camelot(pdf_path) | |
| if tables: | |
| concatenated_df = clean_and_concatenate_tables(tables) | |
| excel_path = os.path.join(output_folder, filename.replace('.pdf', '.xlsx')) | |
| for col in concatenated_df.columns: | |
| if any(str(cell).strip() and not str(cell).strip().startswith('(') for cell in concatenated_df[col]): | |
| concatenated_df[col] = concatenated_df[col].apply(convert_to_numeric) | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| percentage_cells = [] | |
| # Add the DataFrame data to the worksheet | |
| for r_idx, r in enumerate(dataframe_to_rows(concatenated_df, index=False, header=True)): | |
| ws.append(r) | |
| for c_idx, value in enumerate(r): | |
| if isinstance(value, str) and value.endswith('%'): | |
| numeric_value = pd.to_numeric(value.strip('%'), errors='coerce') / 100 | |
| ws.cell(row=r_idx + 1, column=c_idx + 1, value=numeric_value) | |
| percentage_cells.append((r_idx + 1, c_idx + 1)) | |
| tab = Table(displayName = "Table1",ref=ws.dimensions) | |
| style = TableStyleInfo( | |
| name="TableStyleMedium9", | |
| showFirstColumn=False, | |
| showLastColumn=False, | |
| showRowStripes=True, | |
| showColumnStripes=True | |
| ) | |
| tab.tableStyleInfo = style | |
| ws.add_table(tab) | |
| # Ajuster la largeur des colonnes | |
| for column_cells in ws.columns: | |
| length = min(max(len(str(cell.value)) for cell in column_cells),30) | |
| ws.column_dimensions[column_cells[0].column_letter].width = length + 2 | |
| for row, col in percentage_cells: | |
| cell = ws.cell(row=row, column=col) | |
| cell.number_format = numbers.BUILTIN_FORMATS[10] | |
| wb.save(excel_path) | |
| print(f'Saved {filename} as Excel file') | |
| else: | |
| print(f'No tables found in {filename}') | |
| shutil.make_archive(base_name="./output", format='zip', root_dir="./outputs") | |
| def reduce_and_convert(input_folder): | |
| reduced_pdf_folder = "./reduced_pdf" | |
| output_folder = './outputs' | |
| reduce_pdf(input_folder,reduced_pdf_folder) | |
| convert_to_excel(reduced_pdf_folder, output_folder) | |
| def ui(input_files): | |
| output_zip = "./output.zip" | |
| if os.path.exists(output_zip): | |
| os.remove(output_zip) | |
| input_folder = "./input_folder" | |
| if os.path.exists(input_folder): | |
| shutil.rmtree(input_folder) | |
| os.makedirs(input_folder) | |
| # Move files into the extract_folder | |
| for file_path in input_files: | |
| print(file_path) | |
| shutil.copy(file_path, input_folder) | |
| reduce_and_convert(input_folder) | |
| return output_zip | |
| with gr.Blocks() as appli: | |
| gr.Markdown("## CBCR PDF to Excel Conversion Tool") | |
| input_files = gr.File(label="Select an input folder", file_count="directory") | |
| process_button = gr.Button("Process Files") | |
| download_link = gr.File(label="Processed Zip") | |
| process_button.click(fn=ui, inputs=input_files, outputs=download_link) | |
| appli.launch() | |