muryshev's picture
init
57cf043
raw
history blame
8.13 kB
import logging
import re
from components.parser.features.documents_dataset import DatasetRow, DocumentsDataset
from components.parser.features.hierarchy_parser import Hierarchy
from components.parser.xml.structures import ParsedXML
logger = logging.getLogger(__name__)
class DatasetCreator:
"""
Класс для создания датасета из обработанных документов.
"""
def __init__(
self,
):
"""
Инициализация создателя датасета.
"""
self._index = 0
def create_dataset(
self,
parsed_xmls: dict[int, ParsedXML],
hierarchies: dict[int, tuple[Hierarchy, Hierarchy]],
start_index: int = 0,
) -> DocumentsDataset:
"""
Создание датасета из обработанных документов.
Аргументы:
parsed_xmls: Структура с данными из XML файлов
hierarchies: Словарь с иерархическими структурами чанков
Возвращает:
DocumentsDataset: Датасет, готовый для векторизации
"""
logger.info('Starting dataset creation from hierarchies')
self._index = start_index
dataset_rows = []
for doc_id, (text_hierarchy, table_hierarchy) in hierarchies.items():
xml_data = parsed_xmls[doc_id]
logger.debug(
f'Processing document {doc_id} with {len(text_hierarchy)} text sections and {len(table_hierarchy)} table sections'
)
text_rows = self._process_text_hierarchy(text_hierarchy, xml_data)
table_rows = self._process_table_hierarchy(table_hierarchy, xml_data)
dataset_rows.extend(text_rows)
dataset_rows.extend(table_rows)
logger.info(f'Created dataset with {len(dataset_rows)} rows')
return DocumentsDataset(dataset_rows)
def _process_text_hierarchy(
self,
text_hierarchy: Hierarchy,
xml_data: ParsedXML,
) -> list[DatasetRow]:
"""
Обработка иерархии текста.
"""
rows = []
for key in text_hierarchy.keys():
split_key = key.split('_')
paragraph = 'unknown'
level_paragraph = 'unknown'
duplicate = 'unknown'
part_lvl1 = 'unknown'
part_lvl2 = 'unknown'
appendix = 'unknown'
paragraph_appendix = 'unknown'
level_paragraph_appendix = 'unknown'
duplicate_appendix = 'unknown'
part_lvl1_appendix = 'unknown'
if re.search(r'Содержание', key):
level_paragraph = -1
paragraph = split_key[1]
elif re.search(r'Предисловие', key):
level_paragraph = -1
paragraph = split_key[1]
if '^' in paragraph:
split_parag = paragraph.split('^')
paragraph = split_parag[0]
# Обработка Приложений
elif re.search(r'Приложение[А-Я]\d+', key):
appendix = split_key[1].replace('Приложение', '')[0]
if len(split_key) == 3:
part_lvl1_appendix = split_key[-1]
elif len(split_key) == 4:
if 'Таблица' in key:
level_paragraph_appendix = -1
paragraph_appendix = split_key[3]
else:
level_paragraph_appendix = split_key[2]
paragraph_appendix = split_key[3]
if ':' in paragraph_appendix:
paragraph_appendix, duplicate_appendix = (
paragraph_appendix.split(':')[:2]
)
paragraph_appendix = paragraph_appendix.replace(
'PatternText', ''
)
duplicate_appendix = duplicate_appendix.replace('Duplicate', '')
else:
paragraph_appendix = paragraph_appendix.replace(
'PatternText', ''
)
elif len(split_key) == 5:
level_paragraph_appendix = split_key[2]
paragraph_appendix = split_key[3]
if ':' in paragraph_appendix:
paragraph_appendix, duplicate_appendix = (
paragraph_appendix.split(':')[:2]
)
paragraph_appendix = paragraph_appendix.replace(
'PatternText', ''
)
duplicate_appendix = duplicate_appendix.replace('Duplicate', '')
else:
paragraph_appendix = paragraph_appendix.replace(
'PatternText', ''
)
part_lvl1_appendix = split_key[-1].replace('PartLevel', '')
else:
if len(split_key) == 2:
if '^' in split_key[1]:
split_parag = split_key[1].split('^')
level_paragraph = -1
# paragraph = split_key[1].split('^')[-1].replace('UniqueNumber', '')
part_lvl1 = int(split_parag[1].replace('PartLevel', ''))
else:
level_paragraph = -1
elif len(split_key) >= 3:
level_paragraph = split_key[1][-1]
paragraph = split_key[2].replace('PatternText', '')
if ':' in paragraph:
paragraph, duplicate = paragraph.split(':')[:2]
paragraph = paragraph.replace('PatternText', '')
duplicate = duplicate.replace('Duplicate', '')
if len(split_key) == 4:
if 'Table' in key:
part_lvl1 = split_key[3]
else:
part_lvl1 = split_key[3].replace('PartLevel', '')
if len(split_key) == 5:
part_lvl1 = split_key[3].replace('PartLevel', '')
part_lvl2 = split_key[4].replace('PartLeveL', '')
rows.append(
DatasetRow(
Index=self._index,
Text=text_hierarchy[key],
DocName=f'{xml_data.id}.XML',
DocNumber=xml_data.id,
Title=xml_data.name,
LevelParagraph=level_paragraph,
Pargaraph=paragraph,
Duplicate=duplicate,
PartLevel1=part_lvl1,
PartLevel2=part_lvl2,
Appendix=appendix,
LevelParagraphAppendix=level_paragraph_appendix,
PargaraphAppendix=paragraph_appendix,
DuplicateAppendix=duplicate_appendix,
PartLevel1Appendix=part_lvl1_appendix,
)
)
self._index += 1
return rows
def _process_table_hierarchy(
self,
table_hierarchy: Hierarchy,
xml_data: ParsedXML,
) -> list[DatasetRow]:
"""
Обработка иерархии таблиц.
"""
rows = []
for key in table_hierarchy.keys():
rows.append(
DatasetRow(
Index=self._index,
Text=table_hierarchy[key],
DocName=f'{xml_data.id}.XML',
DocNumber=xml_data.id,
Title=xml_data.name,
Table=key.split('_')[1].replace('Table', ''),
)
)
self._index += 1
return rows