import logging from collections import Counter from dataclasses import dataclass import pandas as pd from components.parser.abbreviations.abbreviation import Abbreviation logger = logging.getLogger(__name__) @dataclass class ParsedRow: """ Класс для хранения данных, полученных из строки таблицы. """ index: int cols: list[str] def apply_abbreviations(self, abbreviations: list) -> None: """ Применяет список аббревиатур к строке таблицы. Args: abbreviations: list[Abbreviation] - список аббревиатур, которые нужно применить """ for abbreviation in abbreviations: self.cols = [abbreviation.apply(column) for column in self.cols] def to_text(self, header: list[str] | None = None) -> str: """ Преобразование строки таблицы в текст. Пример такого преобразования: ``` ПиП : Привет и Пока ``` Args: header: list[str] | None - шапка таблицы, если обрабатывается многоколоночная таблица Returns: str - строка таблицы в текстовом формате """ if header is not None: return '\n'.join(self._apply_header(header)).strip() else: return ' : '.join(self.cols).strip() def _apply_header(self, header: list[str]) -> list[str]: """ Применение шапки таблицы к строке. Args: header: list[str] - шапка таблицы Returns: list[str] - список колонок с применённой шапкой """ if len(header) != len(self.cols): logging.debug( f'Количество колонок в строке {self.index} не совпадает с количеством колонок в шапке таблицы' ) named_part = [ f'{header[col_index]}: {col_value}' for col_index, col_value in enumerate(self.cols[: len(header)]) ] unnamed_part = self.cols[len(header) :] return named_part + unnamed_part @dataclass class ParsedTable: """ Класс для хранения данных, полученных из таблицы. index: int - номер таблицы short_type: str | None - либо "сокращения", либо "регламентирующие документы", для других таблиц не заполняется rows: list[ParsedRow] - строки таблицы name: str | None - название таблицы, если найдено """ index: int short_type: str | None header: list[str] | None rows: list[ParsedRow] name: str | None = None subtables: list['ParsedTable'] | None = None note: str | None = None rows_count: int = 0 # Количество строк в таблице modal_cols_count: int = 0 # Модальное (самое частое) количество столбцов has_merged_cells: bool = False # Наличие объединенных ячеек def apply_abbreviations(self, abbreviations) -> None: """ Применяет список аббревиатур ко всем элементам таблицы. Args: abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур """ # Преобразуем одиночную аббревиатуру в список для унификации обработки if not isinstance(abbreviations, list): abbreviations = [abbreviations] # Применяем к названию таблицы, если оно есть if self.name: for abbreviation in abbreviations: self.name = abbreviation.apply(self.name) # Применяем к заголовку таблицы, если он есть if self.header: for abbreviation in abbreviations: self.header = [abbreviation.apply(column) for column in self.header] # Применяем к строкам таблицы for row in self.rows: row.apply_abbreviations(abbreviations) # Применяем к примечанию, если оно есть if self.note: for abbreviation in abbreviations: self.note = abbreviation.apply(self.note) # Применяем к подтаблицам, если они есть if self.subtables: for subtable in self.subtables: subtable.apply_abbreviations(abbreviations) def to_text(self) -> str: """ Преобразование таблицы в текст для дальнейшего разбиения на чанки. Если таблица имеет менее 12 строк, менее 5 столбцов и не содержит объединенных ячеек, то она будет преобразована в формат Markdown. Returns: str - таблица в текстовом формате """ # Если таблица соответствует критериям для Markdown форматирования if (self.rows_count < 12 and self.modal_cols_count < 5 and not self.has_merged_cells): return self._to_markdown() # Иначе используем стандартный текстовый формат result = [] # Основная таблица result.append('\n\n'.join(self._rich_row(row) for row in self.rows)) # Подтаблицы if self.subtables: for subtable in self.subtables: result.append(subtable.to_text()) # Примечание if self.note: result.append(f"Примечание к таблице {self.index + 1}: {self.note}") return '\n\n'.join(result) def _to_markdown(self) -> str: """ Преобразование таблицы в формат Markdown. Returns: str - таблица в формате Markdown """ result = [] # Добавляем название таблицы, если оно есть if self.name: result.append(f"### {self.name}") result.append("") # Собираем заголовок таблицы if self.header: header_row = "| " + " | ".join(self.header) + " |" separator = "| " + " | ".join(["---"] * len(self.header)) + " |" result.append(header_row) result.append(separator) else: # Если нет заголовка, используем максимальное количество колонок max_cols = max([len(row.cols) for row in self.rows]) if self.rows else 0 if max_cols > 0: separator = "| " + " | ".join(["---"] * max_cols) + " |" result.append(separator) # Добавляем строки таблицы for row in self.rows: # Формируем строку в формате Markdown markdown_row = "| " + " | ".join(row.cols) + " |" result.append(markdown_row) # Добавляем примечание, если оно есть if self.note: result.append("") result.append(f"*Примечание: {self.note}*") # Добавляем подтаблицы, если они есть if self.subtables: for subtable in self.subtables: result.append("") result.append(subtable.to_text()) return "\n".join(result) def _rich_row(self, row: ParsedRow) -> str: """ Преобразование строки таблицы в текст с учётом самой таблицы. Примеры такого преобразования: ``` Т1 сокращения [Название таблицы] 1 ПиП : Привет и Пока ``` ``` Т2 [Название таблицы] 1 Столбец 1 : Значение 1 Столбец 2 : Значение 2 ``` Args: row: ParsedRow - строка таблицы Returns: str - строка таблицы в текстовом формате """ table_header = f'Т{self.index + 1}' if self.short_type is not None: table_header += f' {self.short_type}' if self.name is not None: table_header += f' [{self.name}]' return f'{table_header}\n{row.index}\n{row.to_text(self.header)}' def normalize(self) -> 'ParsedTable': """ Нормализует таблицу, обрабатывая подтаблицы и примечания. Нормализация включает: 1. Определение нормального количества столбцов (мода) 2. Выделение подтаблиц, если встречаются строки с одним столбцом, когда нормальное число столбцов не равно 1 3. Обработка примечаний (последняя строка с одним столбцом) 4. Вычисление количества строк, модального количества столбцов 5. Определение наличия объединенных ячеек Returns: ParsedTable - нормализованная таблица """ if not self.rows: return self # Находим моду по количеству столбцов col_counts = [len(row.cols) for row in self.rows] mode_count = Counter(col_counts).most_common(1)[0][0] # Устанавливаем статистику таблицы rows_count = len(self.rows) modal_cols_count = mode_count # Проверяем наличие объединенных ячеек - если есть строки с разным количеством колонок has_merged_cells = len(set(col_counts)) > 1 # Если мода не равна 1, ищем строки с одним столбцом для обработки if mode_count != 1: normalized_rows = [] subtables = [] current_subtable_rows = [] current_subtable_name = None last_row_index = len(self.rows) - 1 note = None for i, row in enumerate(self.rows): if len(row.cols) == 1 and i != last_row_index: # Это может быть подзаголовок подтаблицы if current_subtable_rows: # Создаем подтаблицу из накопленных строк subtable = ParsedTable( index=len(subtables), short_type=self.short_type, header=self.header, # Используем хедер основной таблицы rows=current_subtable_rows, name=current_subtable_name, rows_count=len(current_subtable_rows), modal_cols_count=mode_count, has_merged_cells=has_merged_cells ) subtables.append(subtable) # Начинаем новую подтаблицу # Формируем имя подтаблицы как комбинацию имени таблицы и текста подзаголовка current_subtable_name = ( f"{self.name}: {row.cols[0]}" if self.name else row.cols[0] ) current_subtable_rows = [] elif len(row.cols) == 1 and i == last_row_index: # Это примечание note = row.cols[0] else: # Обычная строка if current_subtable_name: # Добавляем в текущую подтаблицу current_subtable_rows.append(row) else: # Добавляем в основную таблицу normalized_rows.append(row) # Добавляем последнюю подтаблицу, если она есть if current_subtable_rows: subtable = ParsedTable( index=len(subtables), short_type=self.short_type, # Используем тип основной таблицы header=self.header, # Используем хедер основной таблицы rows=current_subtable_rows, name=current_subtable_name, rows_count=len(current_subtable_rows), modal_cols_count=mode_count, has_merged_cells=has_merged_cells ) subtables.append(subtable) # Создаем новую таблицу с обновленными статистическими полями return ParsedTable( index=self.index, short_type=self.short_type, header=self.header, rows=normalized_rows, name=self.name, subtables=subtables if subtables else None, note=note, rows_count=len(normalized_rows), modal_cols_count=modal_cols_count, has_merged_cells=has_merged_cells ) # Если нет специальной обработки, просто обновляем статистические поля self.rows_count = rows_count self.modal_cols_count = modal_cols_count self.has_merged_cells = has_merged_cells return self @dataclass class ParsedTables: """ Класс для хранения данных, полученных из всех таблиц файла. """ tables: list[ParsedTable] def apply_abbreviations(self, abbreviations) -> None: """ Применяет список аббревиатур ко всем таблицам. Args: abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур """ # Преобразуем одиночную аббревиатуру в список для унификации обработки if not isinstance(abbreviations, list): abbreviations = [abbreviations] for table in self.tables: table.apply_abbreviations(abbreviations) def to_text(self) -> str: """ Преобразование всех таблиц в текст для дальнейшего разбиения на чанки. Returns: str - все таблицы в текстовом формате """ return '\n\n'.join(table.to_text() for table in self.tables) def normalize(self) -> 'ParsedTables': """ Нормализует все таблицы, обрабатывая подтаблицы и примечания. Returns: ParsedTables - нормализованные таблицы """ normalized_tables = [table.normalize() for table in self.tables] return ParsedTables(tables=normalized_tables) @dataclass class ParsedText: """ Класс для хранения текста, полученного из XML файла. """ content: list[str] def apply_abbreviations( self, abbreviations: list[Abbreviation] | Abbreviation ) -> None: """ Применяет список аббревиатур ко всем строкам текста. Args: abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур """ # Преобразуем одиночную аббревиатуру в список для унификации обработки if not isinstance(abbreviations, list): abbreviations = [abbreviations] for abbreviation in abbreviations: self.content = [abbreviation.apply(line) for line in self.content] def to_text(self) -> str: """ Возвращает текстовое представление. Returns: str - текст документа """ return "\n\n".join(self.content) @dataclass class ParsedXML: """ Класс для хранения данных, полученных из xml файла. """ status: str name: str | None owner: str | None filename: str tables: ParsedTables | None = None text: ParsedText | None = None abbreviations: list = None # Список аббревиатур, извлеченных из документа id: int | None = None def apply_abbreviations( self, abbreviations: list[Abbreviation] | Abbreviation ) -> None: """ Применяет список аббревиатур ко всем элементам документа. Args: abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур """ # Преобразуем одиночную аббревиатуру в список для унификации обработки if not isinstance(abbreviations, list): abbreviations = [abbreviations] # Применяем к содержимому таблиц, если они есть if self.tables: self.tables.apply_abbreviations(abbreviations) # Применяем к текстовому содержимому, если оно есть if self.text: self.text.apply_abbreviations(abbreviations) def apply_document_abbreviations(self) -> None: """ Применяет аббревиатуры, извлеченные из документа, ко всему его содержимому. """ if self.abbreviations: self.apply_abbreviations(self.abbreviations) def __post_init__(self) -> None: """ Пост-инициализация объекта ParsedXML. """ logger.debug( f'Initializing ParsedXML: name="{self.name}", owner="{self.owner}", status="{self.status}"' ) def only_info(self) -> 'ParsedXML': """ Создает новый объект ParsedXML только с базовой информацией, без контента. """ return ParsedXML( status=self.status, name=self.name, owner=self.owner, filename=self.filename, id=self.id, ) def to_text(self) -> str: """ Возвращает текстовое представление всего документа, включая таблицы и текст. Returns: str - полный текст документа """ result = [] # Добавляем текст таблиц, если они есть if self.tables: result.append(self.tables.to_text()) # Добавляем основной текст, если он есть if self.text: result.append(self.text.to_text()) return "\n\n".join(result) @dataclass class ParsedXMLs: """ Класс для хранения данных, полученных из всех xml файлов. """ xmls: list[ParsedXML] def to_pandas(self) -> pd.DataFrame: """ Преобразование данных в pandas DataFrame. """ return pd.DataFrame( [ { 'status': xml.status, 'name': xml.name, 'owner': xml.owner, 'filename': xml.filename, } for xml in self.xmls ] )