import logging import re from components.parser.features.documents_dataset import DatasetRow, DocumentsDataset from components.parser.features.hierarchy_parser import Hierarchy from components.parser.xml.structures import ParsedXML logger = logging.getLogger(__name__) class DatasetCreator: """ Класс для создания датасета из обработанных документов. """ def __init__( self, ): """ Инициализация создателя датасета. """ self._index = 0 def create_dataset( self, parsed_xmls: dict[int, ParsedXML], hierarchies: dict[int, tuple[Hierarchy, Hierarchy]], start_index: int = 0, ) -> DocumentsDataset: """ Создание датасета из обработанных документов. Аргументы: parsed_xmls: Структура с данными из XML файлов hierarchies: Словарь с иерархическими структурами чанков Возвращает: DocumentsDataset: Датасет, готовый для векторизации """ logger.info('Starting dataset creation from hierarchies') self._index = start_index dataset_rows = [] for doc_id, (text_hierarchy, table_hierarchy) in hierarchies.items(): xml_data = parsed_xmls[doc_id] logger.debug( f'Processing document {doc_id} with {len(text_hierarchy)} text sections and {len(table_hierarchy)} table sections' ) text_rows = self._process_text_hierarchy(text_hierarchy, xml_data) table_rows = self._process_table_hierarchy(table_hierarchy, xml_data) dataset_rows.extend(text_rows) dataset_rows.extend(table_rows) logger.info(f'Created dataset with {len(dataset_rows)} rows') return DocumentsDataset(dataset_rows) def _process_text_hierarchy( self, text_hierarchy: Hierarchy, xml_data: ParsedXML, ) -> list[DatasetRow]: """ Обработка иерархии текста. """ rows = [] for key in text_hierarchy.keys(): split_key = key.split('_') paragraph = 'unknown' level_paragraph = 'unknown' duplicate = 'unknown' part_lvl1 = 'unknown' part_lvl2 = 'unknown' appendix = 'unknown' paragraph_appendix = 'unknown' level_paragraph_appendix = 'unknown' duplicate_appendix = 'unknown' part_lvl1_appendix = 'unknown' if re.search(r'Содержание', key): level_paragraph = -1 paragraph = split_key[1] elif re.search(r'Предисловие', key): level_paragraph = -1 paragraph = split_key[1] if '^' in paragraph: split_parag = paragraph.split('^') paragraph = split_parag[0] # Обработка Приложений elif re.search(r'Приложение[А-Я]\d+', key): appendix = split_key[1].replace('Приложение', '')[0] if len(split_key) == 3: part_lvl1_appendix = split_key[-1] elif len(split_key) == 4: if 'Таблица' in key: level_paragraph_appendix = -1 paragraph_appendix = split_key[3] else: level_paragraph_appendix = split_key[2] paragraph_appendix = split_key[3] if ':' in paragraph_appendix: paragraph_appendix, duplicate_appendix = ( paragraph_appendix.split(':')[:2] ) paragraph_appendix = paragraph_appendix.replace( 'PatternText', '' ) duplicate_appendix = duplicate_appendix.replace('Duplicate', '') else: paragraph_appendix = paragraph_appendix.replace( 'PatternText', '' ) elif len(split_key) == 5: level_paragraph_appendix = split_key[2] paragraph_appendix = split_key[3] if ':' in paragraph_appendix: paragraph_appendix, duplicate_appendix = ( paragraph_appendix.split(':')[:2] ) paragraph_appendix = paragraph_appendix.replace( 'PatternText', '' ) duplicate_appendix = duplicate_appendix.replace('Duplicate', '') else: paragraph_appendix = paragraph_appendix.replace( 'PatternText', '' ) part_lvl1_appendix = split_key[-1].replace('PartLevel', '') else: if len(split_key) == 2: if '^' in split_key[1]: split_parag = split_key[1].split('^') level_paragraph = -1 # paragraph = split_key[1].split('^')[-1].replace('UniqueNumber', '') part_lvl1 = int(split_parag[1].replace('PartLevel', '')) else: level_paragraph = -1 elif len(split_key) >= 3: level_paragraph = split_key[1][-1] paragraph = split_key[2].replace('PatternText', '') if ':' in paragraph: paragraph, duplicate = paragraph.split(':')[:2] paragraph = paragraph.replace('PatternText', '') duplicate = duplicate.replace('Duplicate', '') if len(split_key) == 4: if 'Table' in key: part_lvl1 = split_key[3] else: part_lvl1 = split_key[3].replace('PartLevel', '') if len(split_key) == 5: part_lvl1 = split_key[3].replace('PartLevel', '') part_lvl2 = split_key[4].replace('PartLeveL', '') rows.append( DatasetRow( Index=self._index, Text=text_hierarchy[key], DocName=f'{xml_data.id}.XML', DocNumber=xml_data.id, Title=xml_data.name, LevelParagraph=level_paragraph, Pargaraph=paragraph, Duplicate=duplicate, PartLevel1=part_lvl1, PartLevel2=part_lvl2, Appendix=appendix, LevelParagraphAppendix=level_paragraph_appendix, PargaraphAppendix=paragraph_appendix, DuplicateAppendix=duplicate_appendix, PartLevel1Appendix=part_lvl1_appendix, ) ) self._index += 1 return rows def _process_table_hierarchy( self, table_hierarchy: Hierarchy, xml_data: ParsedXML, ) -> list[DatasetRow]: """ Обработка иерархии таблиц. """ rows = [] for key in table_hierarchy.keys(): rows.append( DatasetRow( Index=self._index, Text=table_hierarchy[key], DocName=f'{xml_data.id}.XML', DocNumber=xml_data.id, Title=xml_data.name, Table=key.split('_')[1].replace('Table', ''), ) ) self._index += 1 return rows