CbCR_to_Excel / reduce_and_convert_PDF.py
ADucatez's picture
Add of tabula functionality + exception management
71aaedb
import os
import shutil
import re
from PyPDF2 import PdfReader, PdfWriter
import pandas as pd
import camelot
import openpyxl
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import numbers
from openpyxl.worksheet.table import Table, TableStyleInfo
import tabula
def extract_pages(pdf_path, start_page, end_page, output_path):
reader = PdfReader(pdf_path)
writer = PdfWriter()
for page_num in range(start_page, end_page + 1):
if page_num <= len(reader.pages):
writer.add_page(reader.pages[page_num - 1])
with open(output_path, 'wb') as output_pdf_file:
writer.write(output_pdf_file)
def reduce_pdf(pdf_folder,reduced_pdf_folder):
if os.path.exists(reduced_pdf_folder):
shutil.rmtree(reduced_pdf_folder)
os.makedirs(reduced_pdf_folder)
for filename in os.listdir(pdf_folder):
if filename.endswith('.pdf'):
match = re.search(r'_CbCR_(\d+)(?:-(\d+))?', filename)
if match:
start_page = int(match.group(1))
end_page = int(match.group(2)) if match.group(2) else start_page
base_name = re.sub(r'_CbCR_\d+(?:-\d+)?\.pdf$', '_CbCR.pdf', filename)
pdf_path = os.path.join(pdf_folder, filename)
output_path = os.path.join(reduced_pdf_folder, base_name)
extract_pages(pdf_path, start_page, end_page, output_path)
print(f'Processed {filename} -> {base_name}')
def extract_tables_camelot_or_tabula(pdf_path):
try:
tables = camelot.read_pdf(pdf_path, pages='all', flavor='stream')
return [table.df for table in tables]
except Exception as e:
print(f"Camelot failed with error: {e}")
print("Trying with Tabula...")
dfs = tabula.read_pdf(pdf_path, pages='all', multiple_tables=True)
return dfs
def get_numeric_count(row):
# Get the number of numerical values in a row
return sum(1 for x in row if (pd.notna(pd.to_numeric(str(x).replace(",", "").strip('()'), errors='coerce')) or x in ['-','–']))
def convert_to_numeric(value):
if isinstance(value, str) and value.startswith('(') and value.endswith(')'):
value = '-' + value[1:-1]
if all(char.isdigit() or char in '-,.' for char in str(value)):
cleaned_value = pd.to_numeric(str(value).replace(',', ''), errors='coerce')
return cleaned_value
return value
def get_headers(dataframes):
# Get the dataframe columns name
# if len(dataframes) >= 2:
# df_for_columns_names = dataframes[1]
# else:
# df_for_columns_names = dataframes[0]
first_numeric_idx = None
order = list(range(1, len(dataframes))) + [0]
for k in order:
if first_numeric_idx is None:
df_for_columns_names = dataframes[k]
df_for_columns_names = df_for_columns_names.astype(str).where(pd.notna(df_for_columns_names), "")
for i, row in df_for_columns_names.iterrows():
numeric_count = get_numeric_count(row)
if numeric_count >= 2:
first_numeric_idx = i
break
if first_numeric_idx is not None:
break
new_header = [" ".join(filter(None, df_for_columns_names.iloc[:first_numeric_idx, col].values)) for col in range(df_for_columns_names.shape[1])]
return new_header
def clean_dataframe(df,header):
# Rule : if a row is not numerical, merge it with the next numerical one
if len(header) < len(df.columns):
df.columns = header + [f"Unnamed_{i}" for i in range(len(header), len(df.columns))]
elif len(header) > len(df.columns):
df.columns = header[:len(df.columns)]
else:
df.columns = header
first_numeric_idx = None
for i, row in df.iterrows():
numeric_count = get_numeric_count(row)
if numeric_count >= 2:
first_numeric_idx = i
break
df = df.iloc[first_numeric_idx:]
df = df.reset_index(drop=True)
merged_rows = []
buffer = None
for i in range(len(df)):
row = df.iloc[i]
numeric_count = get_numeric_count(row)
if numeric_count < 2:
if buffer is None:
buffer = list(df.iloc[i].copy())
else:
buffer = [
" ".join(filter(lambda x: x not in [None, "None", ""], [str(buffer[j]), str(df.iloc[i, j])]))
for j in range(df.shape[1])
]
merged_rows.append(i)
else:
if buffer is not None:
df.iloc[i] = [
" ".join(filter(lambda x: x not in [None, "None", ""], [str(buffer[j]), str(df.iloc[i, j])]))
for j in range(df.shape[1])
]
buffer = None
clean_df = df.drop(merged_rows).reset_index(drop=True)
return clean_df
def clean_and_concatenate_tables(dataframes):
for i in range(len(dataframes)):
df = dataframes[i]
row_counts = df.apply(lambda row: row.notna().sum() - (row.astype(str) == "").sum(), axis=1)
col_counts = df.apply(lambda col: col.notna().sum() - (col.astype(str) == "").sum(), axis=0)
dataframes[i] = df.loc[row_counts >= 1, col_counts >= 3].reset_index(drop = True)
new_header = get_headers(dataframes)
cleaned_dfs = []
for df in dataframes:
if len(df.columns) >= 3 :
cleaned_dfs.append(clean_dataframe(df,new_header))
cleaned_dfs = [df.reset_index(drop=True) for df in cleaned_dfs if isinstance(df, pd.DataFrame) and not df.empty]
if not cleaned_dfs:
raise ValueError("After cleaning, no valid dataframe left.")
for _, df in enumerate(cleaned_dfs):
if any(col == '' for col in df.columns): # Check if there are empty column names
df.columns = [f"col_{j}" if col == '' else col for j, col in enumerate(df.columns)]
concatenated_df = pd.concat(cleaned_dfs, ignore_index=True)
if concatenated_df.shape[0] <= 4 :
raise ValueError("Dataframe too small, probable mistake")
if concatenated_df.shape[1] <= 2 :
raise ValueError("Less than 3 columns, probable mistake")
print("Success of conversion : dataframe of shape ",concatenated_df.shape)
return concatenated_df
def convert_to_excel(reduced_pdf_folder, output_folder):
if os.path.exists(output_folder):
shutil.rmtree(output_folder)
os.makedirs(output_folder)
failed_folder = os.path.join(output_folder, "failed_to_convert")
if os.path.exists(failed_folder):
shutil.rmtree(failed_folder)
os.makedirs(failed_folder)
if os.path.exists("./log_errors.txt"):
os.remove("./log_errors.txt")
number_of_files = 0
number_of_fails = 0
for filename in os.listdir(reduced_pdf_folder):
if filename.endswith('.pdf'):
number_of_files += 1
print("Trying to convert :", filename, "to excel")
pdf_path = os.path.join(reduced_pdf_folder, filename)
try:
dataframes = extract_tables_camelot_or_tabula(pdf_path)
if not dataframes:
raise ValueError(f'No tables found in {filename}')
concatenated_df = clean_and_concatenate_tables(dataframes)
excel_path = os.path.join(output_folder, filename.replace('.pdf', '.xlsx'))
for col in concatenated_df.columns:
if any(str(cell).strip() and not str(cell).strip().startswith('(') for cell in concatenated_df[col]):
concatenated_df[col] = concatenated_df[col].apply(convert_to_numeric)
wb = openpyxl.Workbook()
ws = wb.active
percentage_cells = []
# Add the DataFrame data to the worksheet
for r_idx, r in enumerate(dataframe_to_rows(concatenated_df, index=False, header=True)):
ws.append(r)
for c_idx, value in enumerate(r):
if isinstance(value, str) and value.endswith('%'):
numeric_value = pd.to_numeric(value.strip('%'), errors='coerce') / 100
ws.cell(row=r_idx + 1, column=c_idx + 1, value=numeric_value)
percentage_cells.append((r_idx + 1, c_idx + 1))
tab = Table(displayName="Table1", ref=ws.dimensions)
style = TableStyleInfo(
name="TableStyleMedium9",
showFirstColumn=False,
showLastColumn=False,
showRowStripes=True,
showColumnStripes=True
)
tab.tableStyleInfo = style
ws.add_table(tab)
# Ajuster la largeur des colonnes
for column_cells in ws.columns:
length = min(max(len(str(cell.value)) for cell in column_cells), 30)
ws.column_dimensions[column_cells[0].column_letter].width = length + 2
for row, col in percentage_cells:
cell = ws.cell(row=row, column=col)
cell.number_format = numbers.BUILTIN_FORMATS[10]
wb.save(excel_path)
except Exception as e:
error_message = f"Error converting {filename}: {e}"
print(error_message)
number_of_fails += 1
shutil.copy(pdf_path, os.path.join(failed_folder, filename))
with open("./log_errors.txt", "a") as log_file:
log_file.write(error_message + "\n")
print("Number of files considered : ",number_of_files)
print("Number of success : ", number_of_files - number_of_fails)
print("Number of fails : ", number_of_fails)
shutil.make_archive(base_name="./output", format='zip', root_dir=output_folder)
def reduce_and_convert(input_folder):
reduced_pdf_folder = "./reduced_pdf"
output_folder = "./outputs"
reduce_pdf(input_folder,reduced_pdf_folder)
convert_to_excel(reduced_pdf_folder, output_folder)
if __name__ == "__main__":
input_folder = "../example_pdf"
reduce_and_convert(input_folder)