Spaces:
Sleeping
Sleeping
import pdf2image | |
import numpy as np | |
from PIL import Image | |
import matplotlib.pyplot as plt | |
import layoutparser as lp | |
import cv2 | |
from PyPDF2 import PdfReader, PdfWriter | |
import pandas as pd | |
import pdfminer.high_level | |
import pdfminer.layout | |
from operator import itemgetter | |
# inputs: pdf_file, page #, bounding box (optional) (llur or ullr), output_bbox | |
class TableExtractor(object): | |
def __init__(self, output_bbox=True): | |
self.pdf_file = "" | |
self.page = "" | |
self.image_dpi = 200 | |
self.pdf_dpi = 72 | |
self.output_bbox = output_bbox | |
self.blocks = {} | |
self.title_y = 0 | |
self.column_header_y = 0 | |
self.model = None | |
self.img = None | |
self.output_image = True | |
self.tagging = { | |
'substance': ['compound', 'salt', 'base', 'solvent', 'CBr4', 'collidine', 'InX3', 'substrate', 'ligand', 'PPh3', 'PdL2', 'Cu', 'compd', 'reagent', 'reagant', 'acid', 'aldehyde', 'amine', 'Ln', 'H2O', 'enzyme', 'cofactor', 'oxidant', 'Pt(COD)Cl2', 'CuBr2', 'additive'], | |
'ratio': [':'], | |
'measurement': ['μM', 'nM', 'IC50', 'CI', 'excitation', 'emission', 'Φ', 'φ', 'shift', 'ee', 'ΔG', 'ΔH', 'TΔS', 'Δ', 'distance', 'trajectory', 'V', 'eV'], | |
'temperature': ['temp', 'temperature', 'T', '°C'], | |
'time': ['time', 't(', 't ('], | |
'result': ['yield', 'aa', 'result', 'product', 'conversion', '(%)'], | |
'alkyl group': ['R', 'Ar', 'X', 'Y'], | |
'solvent': ['solvent'], | |
'counter': ['entry', 'no.'], | |
'catalyst': ['catalyst', 'cat.'], | |
'conditions': ['condition'], | |
'reactant': ['reactant'], | |
} | |
def set_output_image(self, oi): | |
self.output_image = oi | |
def set_pdf_file(self, pdf): | |
self.pdf_file = pdf | |
def set_page_num(self, pn): | |
self.page = pn | |
def set_output_bbox(self, ob): | |
self.output_bbox = ob | |
def run_model(self, page_info): | |
#img = np.asarray(pdf2image.convert_from_path(self.pdf_file, dpi=self.image_dpi)[self.page]) | |
#model = lp.Detectron2LayoutModel('lp://PubLayNet/mask_rcnn_X_101_32x8d_FPN_3x/config', extra_config=["MODEL.ROI_HEADS.SCORE_THRESH_TEST", 0.5], label_map={0: "Text", 1: "Title", 2: "List", 3:"Table", 4:"Figure"}) | |
img = np.asarray(page_info) | |
self.img = img | |
layout_result = self.model.detect(img) | |
text_blocks = lp.Layout([b for b in layout_result if b.type == 'Text']) | |
title_blocks = lp.Layout([b for b in layout_result if b.type == 'Title']) | |
list_blocks = lp.Layout([b for b in layout_result if b.type == 'List']) | |
table_blocks = lp.Layout([b for b in layout_result if b.type == 'Table']) | |
figure_blocks = lp.Layout([b for b in layout_result if b.type == 'Figure']) | |
self.blocks.update({'text': text_blocks}) | |
self.blocks.update({'title': title_blocks}) | |
self.blocks.update({'list': list_blocks}) | |
self.blocks.update({'table': table_blocks}) | |
self.blocks.update({'figure': figure_blocks}) | |
# type is what coordinates you want to get. it comes in text, title, list, table, and figure | |
def convert_to_pdf_coordinates(self, type): | |
# scale coordinates | |
blocks = self.blocks[type] | |
coordinates = [blocks[a].scale(self.pdf_dpi/self.image_dpi) for a in range(len(blocks))] | |
reader = PdfReader(self.pdf_file) | |
writer = PdfWriter() | |
p = reader.pages[self.page] | |
a = p.mediabox.upper_left | |
new_coords = [] | |
for new_block in coordinates: | |
new_coords.append((new_block.block.x_1, pd.to_numeric(a[1]) - new_block.block.y_2, new_block.block.x_2, pd.to_numeric(a[1]) - new_block.block.y_1)) | |
return new_coords | |
# output: list of bounding boxes for tables but in pdf coordinates | |
# input: new_coords is singular table bounding box in pdf coordinates | |
def extract_singular_table(self, new_coords): | |
for page_layout in pdfminer.high_level.extract_pages(self.pdf_file, page_numbers=[self.page]): | |
elements = [] | |
for element in page_layout: | |
if isinstance(element, pdfminer.layout.LTTextBox): | |
for e in element._objs: | |
temp = e.bbox | |
if temp[0] > min(new_coords[0], new_coords[2]) and temp[0] < max(new_coords[0], new_coords[2]) and temp[1] > min(new_coords[1], new_coords[3]) and temp[1] < max(new_coords[1], new_coords[3]) and temp[2] > min(new_coords[0], new_coords[2]) and temp[2] < max(new_coords[0], new_coords[2]) and temp[3] > min(new_coords[1], new_coords[3]) and temp[3] < max(new_coords[1], new_coords[3]) and isinstance(e, pdfminer.layout.LTTextLineHorizontal): | |
elements.append([e.bbox[0], e.bbox[1], e.bbox[2], e.bbox[3], e.get_text()]) | |
elements = sorted(elements, key=itemgetter(0)) | |
w = sorted(elements, key=itemgetter(3), reverse=True) | |
if len(w) <= 1: | |
continue | |
ret = {} | |
i = 1 | |
g = [w[0]] | |
while i < len(w) and w[i][3] > w[i-1][1]: | |
g.append(w[i]) | |
i += 1 | |
g = sorted(g, key=itemgetter(0)) | |
# check for overlaps | |
for a in range(len(g)-1, 0, -1): | |
if g[a][0] < g[a-1][2]: | |
g[a-1][0] = min(g[a][0], g[a-1][0]) | |
g[a-1][1] = min(g[a][1], g[a-1][1]) | |
g[a-1][2] = max(g[a][2], g[a-1][2]) | |
g[a-1][3] = max(g[a][3], g[a-1][3]) | |
g[a-1][4] = g[a-1][4].strip() + " " + g[a][4] | |
g.pop(a) | |
ret.update({"columns":[]}) | |
for t in g: | |
temp_bbox = t[:4] | |
column_text = t[4].strip() | |
tag = 'unknown' | |
tagged = False | |
for key in self.tagging.keys(): | |
for word in self.tagging[key]: | |
if word in column_text: | |
tag = key | |
tagged = True | |
break | |
if tagged: | |
break | |
if self.output_bbox: | |
ret["columns"].append({'text':column_text,'tag': tag, 'bbox':temp_bbox}) | |
else: | |
ret["columns"].append({'text':column_text,'tag': tag}) | |
self.column_header_y = max(t[1], t[3]) | |
ret.update({"rows":[]}) | |
g.insert(0, [0, 0, new_coords[0], 0, '']) | |
g.append([new_coords[2], 0, 0, 0, '']) | |
while i < len(w): | |
group = [w[i]] | |
i += 1 | |
while i < len(w) and w[i][3] > w[i-1][1]: | |
group.append(w[i]) | |
i += 1 | |
group = sorted(group, key=itemgetter(0)) | |
for a in range(len(group)-1, 0, -1): | |
if group[a][0] < group[a-1][2]: | |
group[a-1][0] = min(group[a][0], group[a-1][0]) | |
group[a-1][1] = min(group[a][1], group[a-1][1]) | |
group[a-1][2] = max(group[a][2], group[a-1][2]) | |
group[a-1][3] = max(group[a][3], group[a-1][3]) | |
group[a-1][4] = group[a-1][4].strip() + " " + group[a][4] | |
group.pop(a) | |
a = 1 | |
while a < len(g) - 1: | |
if a > len(group): | |
group.append([0, 0, 0, 0, '\n']) | |
a += 1 | |
continue | |
if group[a-1][0] >= g[a-1][2] and group[a-1][2] <= g[a+1][0]: | |
pass | |
""" | |
if a < len(group) and group[a][0] >= g[a-1][2] and group[a][2] <= g[a+1][0]: | |
g.insert(1, [g[0][2], 0, group[a-1][2], 0, '']) | |
#ret["columns"].insert(0, '') | |
else: | |
a += 1 | |
continue | |
""" | |
else: group.insert(a-1, [0, 0, 0, 0, '\n']) | |
a += 1 | |
added_row = [] | |
for t in group: | |
temp_bbox = t[:4] | |
if self.output_bbox: | |
added_row.append({'text':t[4].strip(), 'bbox':temp_bbox}) | |
else: | |
added_row.append(t[4].strip()) | |
ret["rows"].append(added_row) | |
if ret["rows"] and len(ret["rows"][0]) != len(ret["columns"]): | |
ret["columns"] = ret["rows"][0] | |
ret["rows"] = ret["rows"][1:] | |
for col in ret['columns']: | |
tag = 'unknown' | |
tagged = False | |
for key in self.tagging.keys(): | |
for word in self.tagging[key]: | |
if word in col['text']: | |
tag = key | |
tagged = True | |
break | |
if tagged: | |
break | |
col['tag'] = tag | |
return ret | |
def get_title_and_footnotes(self, tb_coords): | |
for page_layout in pdfminer.high_level.extract_pages(self.pdf_file, page_numbers=[self.page]): | |
title = (0, 0, 0, 0, '') | |
footnote = (0, 0, 0, 0, '') | |
title_gap = 30 | |
footnote_gap = 30 | |
for element in page_layout: | |
if isinstance(element, pdfminer.layout.LTTextBoxHorizontal): | |
if (element.bbox[0] >= tb_coords[0] and element.bbox[0] <= tb_coords[2]) or (element.bbox[2] >= tb_coords[0] and element.bbox[2] <= tb_coords[2]) or (tb_coords[0] >= element.bbox[0] and tb_coords[0] <= element.bbox[2]) or (tb_coords[2] >= element.bbox[0] and tb_coords[2] <= element.bbox[2]): | |
#print(element) | |
if 'Table' in element.get_text(): | |
if abs(element.bbox[1] - tb_coords[3]) < title_gap: | |
title = tuple(element.bbox) + (element.get_text()[element.get_text().index('Table'):].replace('\n', ' '),) | |
title_gap = abs(element.bbox[1] - tb_coords[3]) | |
if 'Scheme' in element.get_text(): | |
if abs(element.bbox[1] - tb_coords[3]) < title_gap: | |
title = tuple(element.bbox) + (element.get_text()[element.get_text().index('Scheme'):].replace('\n', ' '),) | |
title_gap = abs(element.bbox[1] - tb_coords[3]) | |
if element.bbox[1] >= tb_coords[1] and element.bbox[3] <= tb_coords[3]: continue | |
#print(element) | |
temp = ['aA', 'aB', 'aC', 'aD', 'aE', 'aF', 'aG', 'aH', 'aI', 'aJ', 'aK', 'aL', 'aM', 'aN', 'aO', 'aP', 'aQ', 'aR', 'aS', 'aT', 'aU', 'aV', 'aW', 'aX', 'aY', 'aZ', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a0'] | |
for segment in temp: | |
if segment in element.get_text(): | |
if abs(element.bbox[3] - tb_coords[1]) < footnote_gap: | |
footnote = tuple(element.bbox) + (element.get_text()[element.get_text().index(segment):].replace('\n', ' '),) | |
footnote_gap = abs(element.bbox[3] - tb_coords[1]) | |
break | |
self.title_y = min(title[1], title[3]) | |
if self.output_bbox: | |
return ({'text': title[4], 'bbox': list(title[:4])}, {'text': footnote[4], 'bbox': list(footnote[:4])}) | |
else: | |
return (title[4], footnote[4]) | |
def extract_table_information(self): | |
#self.run_model(page_info) # changed | |
table_coordinates = self.blocks['table'] #should return a list of layout objects | |
table_coordinates_in_pdf = self.convert_to_pdf_coordinates('table') #should return a list of lists | |
ans = [] | |
i = 0 | |
for coordinate in table_coordinates_in_pdf: | |
ret = {} | |
pad = 20 | |
coordinate = [coordinate[0] - pad, coordinate[1], coordinate[2] + pad, coordinate[3]] | |
ullr_coord = [coordinate[0], coordinate[3], coordinate[2], coordinate[1]] | |
table_results = self.extract_singular_table(coordinate) | |
tf = self.get_title_and_footnotes(coordinate) | |
figure = Image.fromarray(table_coordinates[i].crop_image(self.img)) | |
ret.update({'title': tf[0]}) | |
ret.update({'figure': { | |
'image': None, | |
'bbox': [] | |
}}) | |
if self.output_image: | |
ret['figure']['image'] = figure | |
ret.update({'table': {'bbox': list(coordinate), 'content': table_results}}) | |
ret.update({'footnote': tf[1]}) | |
if abs(self.title_y - self.column_header_y) > 50: | |
ret['figure']['bbox'] = list(coordinate) | |
ret.update({'page':self.page}) | |
ans.append(ret) | |
i += 1 | |
return ans | |
def extract_figure_information(self): | |
figure_coordinates = self.blocks['figure'] | |
figure_coordinates_in_pdf = self.convert_to_pdf_coordinates('figure') | |
ans = [] | |
for i in range(len(figure_coordinates)): | |
ret = {} | |
coordinate = figure_coordinates_in_pdf[i] | |
ullr_coord = [coordinate[0], coordinate[3], coordinate[2], coordinate[1]] | |
tf = self.get_title_and_footnotes(coordinate) | |
figure = Image.fromarray(figure_coordinates[i].crop_image(self.img)) | |
ret.update({'title':tf[0]}) | |
ret.update({'figure': { | |
'image': None, | |
'bbox': [] | |
}}) | |
if self.output_image: | |
ret['figure']['image'] = figure | |
ret.update({'table': { | |
'bbox': [], | |
'content': None | |
}}) | |
ret.update({'footnote': tf[1]}) | |
ret['figure']['bbox'] = list(coordinate) | |
ret.update({'page':self.page}) | |
ans.append(ret) | |
return ans | |
def extract_all_tables_and_figures(self, pages, pdfparser, content=None): | |
self.model = pdfparser | |
ret = [] | |
for i in range(len(pages)): | |
self.set_page_num(i) | |
self.run_model(pages[i]) | |
table_info = self.extract_table_information() | |
figure_info = self.extract_figure_information() | |
if content == 'tables': | |
ret += table_info | |
elif content == 'figures': | |
ret += figure_info | |
for table in table_info: | |
if table['figure']['bbox'] != []: | |
ret.append(table) | |
else: | |
ret += table_info | |
ret += figure_info | |
return ret | |