Spaces:
Sleeping
Sleeping
import logging | |
from collections import Counter | |
from dataclasses import dataclass | |
import pandas as pd | |
from components.parser.abbreviations.abbreviation import Abbreviation | |
logger = logging.getLogger(__name__) | |
class ParsedRow: | |
""" | |
Класс для хранения данных, полученных из строки таблицы. | |
""" | |
index: int | |
cols: list[str] | |
def apply_abbreviations(self, abbreviations: list) -> None: | |
""" | |
Применяет список аббревиатур к строке таблицы. | |
Args: | |
abbreviations: list[Abbreviation] - список аббревиатур, которые нужно применить | |
""" | |
for abbreviation in abbreviations: | |
self.cols = [abbreviation.apply(column) for column in self.cols] | |
def to_text(self, header: list[str] | None = None) -> str: | |
""" | |
Преобразование строки таблицы в текст. | |
Пример такого преобразования: | |
``` | |
ПиП : Привет и Пока | |
``` | |
Args: | |
header: list[str] | None - шапка таблицы, если обрабатывается многоколоночная таблица | |
Returns: | |
str - строка таблицы в текстовом формате | |
""" | |
if header is not None: | |
return '\n'.join(self._apply_header(header)).strip() | |
else: | |
return ' : '.join(self.cols).strip() | |
def _apply_header(self, header: list[str]) -> list[str]: | |
""" | |
Применение шапки таблицы к строке. | |
Args: | |
header: list[str] - шапка таблицы | |
Returns: | |
list[str] - список колонок с применённой шапкой | |
""" | |
if len(header) != len(self.cols): | |
logging.debug( | |
f'Количество колонок в строке {self.index} не совпадает с количеством колонок в шапке таблицы' | |
) | |
named_part = [ | |
f'{header[col_index]}: {col_value}' | |
for col_index, col_value in enumerate(self.cols[: len(header)]) | |
] | |
unnamed_part = self.cols[len(header) :] | |
return named_part + unnamed_part | |
class ParsedTable: | |
""" | |
Класс для хранения данных, полученных из таблицы. | |
index: int - номер таблицы | |
short_type: str | None - либо "сокращения", либо "регламентирующие документы", для других таблиц не заполняется | |
rows: list[ParsedRow] - строки таблицы | |
name: str | None - название таблицы, если найдено | |
""" | |
index: int | |
short_type: str | None | |
header: list[str] | None | |
rows: list[ParsedRow] | |
name: str | None = None | |
subtables: list['ParsedTable'] | None = None | |
note: str | None = None | |
rows_count: int = 0 # Количество строк в таблице | |
modal_cols_count: int = 0 # Модальное (самое частое) количество столбцов | |
has_merged_cells: bool = False # Наличие объединенных ячеек | |
def apply_abbreviations(self, abbreviations) -> None: | |
""" | |
Применяет список аббревиатур ко всем элементам таблицы. | |
Args: | |
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
""" | |
# Преобразуем одиночную аббревиатуру в список для унификации обработки | |
if not isinstance(abbreviations, list): | |
abbreviations = [abbreviations] | |
# Применяем к названию таблицы, если оно есть | |
if self.name: | |
for abbreviation in abbreviations: | |
self.name = abbreviation.apply(self.name) | |
# Применяем к заголовку таблицы, если он есть | |
if self.header: | |
for abbreviation in abbreviations: | |
self.header = [abbreviation.apply(column) for column in self.header] | |
# Применяем к строкам таблицы | |
for row in self.rows: | |
row.apply_abbreviations(abbreviations) | |
# Применяем к примечанию, если оно есть | |
if self.note: | |
for abbreviation in abbreviations: | |
self.note = abbreviation.apply(self.note) | |
# Применяем к подтаблицам, если они есть | |
if self.subtables: | |
for subtable in self.subtables: | |
subtable.apply_abbreviations(abbreviations) | |
def to_text(self) -> str: | |
""" | |
Преобразование таблицы в текст для дальнейшего разбиения на чанки. | |
Если таблица имеет менее 12 строк, менее 5 столбцов и не содержит объединенных ячеек, | |
то она будет преобразована в формат Markdown. | |
Returns: | |
str - таблица в текстовом формате | |
""" | |
# Если таблица соответствует критериям для Markdown форматирования | |
if (self.rows_count < 12 and self.modal_cols_count < 5 and not self.has_merged_cells): | |
return self._to_markdown() | |
# Иначе используем стандартный текстовый формат | |
result = [] | |
# Основная таблица | |
result.append('\n\n'.join(self._rich_row(row) for row in self.rows)) | |
# Подтаблицы | |
if self.subtables: | |
for subtable in self.subtables: | |
result.append(subtable.to_text()) | |
# Примечание | |
if self.note: | |
result.append(f"Примечание к таблице {self.index + 1}: {self.note}") | |
return '\n\n'.join(result) | |
def _to_markdown(self) -> str: | |
""" | |
Преобразование таблицы в формат Markdown. | |
Returns: | |
str - таблица в формате Markdown | |
""" | |
result = [] | |
# Добавляем название таблицы, если оно есть | |
if self.name: | |
result.append(f"### {self.name}") | |
result.append("") | |
# Собираем заголовок таблицы | |
if self.header: | |
header_row = "| " + " | ".join(self.header) + " |" | |
separator = "| " + " | ".join(["---"] * len(self.header)) + " |" | |
result.append(header_row) | |
result.append(separator) | |
else: | |
# Если нет заголовка, используем максимальное количество колонок | |
max_cols = max([len(row.cols) for row in self.rows]) if self.rows else 0 | |
if max_cols > 0: | |
separator = "| " + " | ".join(["---"] * max_cols) + " |" | |
result.append(separator) | |
# Добавляем строки таблицы | |
for row in self.rows: | |
# Формируем строку в формате Markdown | |
markdown_row = "| " + " | ".join(row.cols) + " |" | |
result.append(markdown_row) | |
# Добавляем примечание, если оно есть | |
if self.note: | |
result.append("") | |
result.append(f"*Примечание: {self.note}*") | |
# Добавляем подтаблицы, если они есть | |
if self.subtables: | |
for subtable in self.subtables: | |
result.append("") | |
result.append(subtable.to_text()) | |
return "\n".join(result) | |
def _rich_row(self, row: ParsedRow) -> str: | |
""" | |
Преобразование строки таблицы в текст с учётом самой таблицы. | |
Примеры такого преобразования: | |
``` | |
Т1 сокращения [Название таблицы] | |
1 | |
ПиП : Привет и Пока | |
``` | |
``` | |
Т2 [Название таблицы] | |
1 | |
Столбец 1 : Значение 1 | |
Столбец 2 : Значение 2 | |
``` | |
Args: | |
row: ParsedRow - строка таблицы | |
Returns: | |
str - строка таблицы в текстовом формате | |
""" | |
table_header = f'Т{self.index + 1}' | |
if self.short_type is not None: | |
table_header += f' {self.short_type}' | |
if self.name is not None: | |
table_header += f' [{self.name}]' | |
return f'{table_header}\n{row.index}\n{row.to_text(self.header)}' | |
def normalize(self) -> 'ParsedTable': | |
""" | |
Нормализует таблицу, обрабатывая подтаблицы и примечания. | |
Нормализация включает: | |
1. Определение нормального количества столбцов (мода) | |
2. Выделение подтаблиц, если встречаются строки с одним столбцом, | |
когда нормальное число столбцов не равно 1 | |
3. Обработка примечаний (последняя строка с одним столбцом) | |
4. Вычисление количества строк, модального количества столбцов | |
5. Определение наличия объединенных ячеек | |
Returns: | |
ParsedTable - нормализованная таблица | |
""" | |
if not self.rows: | |
return self | |
# Находим моду по количеству столбцов | |
col_counts = [len(row.cols) for row in self.rows] | |
mode_count = Counter(col_counts).most_common(1)[0][0] | |
# Устанавливаем статистику таблицы | |
rows_count = len(self.rows) | |
modal_cols_count = mode_count | |
# Проверяем наличие объединенных ячеек - если есть строки с разным количеством колонок | |
has_merged_cells = len(set(col_counts)) > 1 | |
# Если мода не равна 1, ищем строки с одним столбцом для обработки | |
if mode_count != 1: | |
normalized_rows = [] | |
subtables = [] | |
current_subtable_rows = [] | |
current_subtable_name = None | |
last_row_index = len(self.rows) - 1 | |
note = None | |
for i, row in enumerate(self.rows): | |
if len(row.cols) == 1 and i != last_row_index: | |
# Это может быть подзаголовок подтаблицы | |
if current_subtable_rows: | |
# Создаем подтаблицу из накопленных строк | |
subtable = ParsedTable( | |
index=len(subtables), | |
short_type=self.short_type, | |
header=self.header, # Используем хедер основной таблицы | |
rows=current_subtable_rows, | |
name=current_subtable_name, | |
rows_count=len(current_subtable_rows), | |
modal_cols_count=mode_count, | |
has_merged_cells=has_merged_cells | |
) | |
subtables.append(subtable) | |
# Начинаем новую подтаблицу | |
# Формируем имя подтаблицы как комбинацию имени таблицы и текста подзаголовка | |
current_subtable_name = ( | |
f"{self.name}: {row.cols[0]}" if self.name else row.cols[0] | |
) | |
current_subtable_rows = [] | |
elif len(row.cols) == 1 and i == last_row_index: | |
# Это примечание | |
note = row.cols[0] | |
else: | |
# Обычная строка | |
if current_subtable_name: | |
# Добавляем в текущую подтаблицу | |
current_subtable_rows.append(row) | |
else: | |
# Добавляем в основную таблицу | |
normalized_rows.append(row) | |
# Добавляем последнюю подтаблицу, если она есть | |
if current_subtable_rows: | |
subtable = ParsedTable( | |
index=len(subtables), | |
short_type=self.short_type, # Используем тип основной таблицы | |
header=self.header, # Используем хедер основной таблицы | |
rows=current_subtable_rows, | |
name=current_subtable_name, | |
rows_count=len(current_subtable_rows), | |
modal_cols_count=mode_count, | |
has_merged_cells=has_merged_cells | |
) | |
subtables.append(subtable) | |
# Создаем новую таблицу с обновленными статистическими полями | |
return ParsedTable( | |
index=self.index, | |
short_type=self.short_type, | |
header=self.header, | |
rows=normalized_rows, | |
name=self.name, | |
subtables=subtables if subtables else None, | |
note=note, | |
rows_count=len(normalized_rows), | |
modal_cols_count=modal_cols_count, | |
has_merged_cells=has_merged_cells | |
) | |
# Если нет специальной обработки, просто обновляем статистические поля | |
self.rows_count = rows_count | |
self.modal_cols_count = modal_cols_count | |
self.has_merged_cells = has_merged_cells | |
return self | |
class ParsedTables: | |
""" | |
Класс для хранения данных, полученных из всех таблиц файла. | |
""" | |
tables: list[ParsedTable] | |
def apply_abbreviations(self, abbreviations) -> None: | |
""" | |
Применяет список аббревиатур ко всем таблицам. | |
Args: | |
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
""" | |
# Преобразуем одиночную аббревиатуру в список для унификации обработки | |
if not isinstance(abbreviations, list): | |
abbreviations = [abbreviations] | |
for table in self.tables: | |
table.apply_abbreviations(abbreviations) | |
def to_text(self) -> str: | |
""" | |
Преобразование всех таблиц в текст для дальнейшего разбиения на чанки. | |
Returns: | |
str - все таблицы в текстовом формате | |
""" | |
return '\n\n'.join(table.to_text() for table in self.tables) | |
def normalize(self) -> 'ParsedTables': | |
""" | |
Нормализует все таблицы, обрабатывая подтаблицы и примечания. | |
Returns: | |
ParsedTables - нормализованные таблицы | |
""" | |
normalized_tables = [table.normalize() for table in self.tables] | |
return ParsedTables(tables=normalized_tables) | |
class ParsedText: | |
""" | |
Класс для хранения текста, полученного из XML файла. | |
""" | |
content: list[str] | |
def apply_abbreviations( | |
self, abbreviations: list[Abbreviation] | Abbreviation | |
) -> None: | |
""" | |
Применяет список аббревиатур ко всем строкам текста. | |
Args: | |
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
""" | |
# Преобразуем одиночную аббревиатуру в список для унификации обработки | |
if not isinstance(abbreviations, list): | |
abbreviations = [abbreviations] | |
for abbreviation in abbreviations: | |
self.content = [abbreviation.apply(line) for line in self.content] | |
def to_text(self) -> str: | |
""" | |
Возвращает текстовое представление. | |
Returns: | |
str - текст документа | |
""" | |
return "\n\n".join(self.content) | |
class ParsedXML: | |
""" | |
Класс для хранения данных, полученных из xml файла. | |
""" | |
status: str | |
name: str | None | |
owner: str | None | |
filename: str | |
tables: ParsedTables | None = None | |
text: ParsedText | None = None | |
abbreviations: list = None # Список аббревиатур, извлеченных из документа | |
id: int | None = None | |
def apply_abbreviations( | |
self, abbreviations: list[Abbreviation] | Abbreviation | |
) -> None: | |
""" | |
Применяет список аббревиатур ко всем элементам документа. | |
Args: | |
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур | |
""" | |
# Преобразуем одиночную аббревиатуру в список для унификации обработки | |
if not isinstance(abbreviations, list): | |
abbreviations = [abbreviations] | |
# Применяем к содержимому таблиц, если они есть | |
if self.tables: | |
self.tables.apply_abbreviations(abbreviations) | |
# Применяем к текстовому содержимому, если оно есть | |
if self.text: | |
self.text.apply_abbreviations(abbreviations) | |
def apply_document_abbreviations(self) -> None: | |
""" | |
Применяет аббревиатуры, извлеченные из документа, ко всему его содержимому. | |
""" | |
if self.abbreviations: | |
self.apply_abbreviations(self.abbreviations) | |
def __post_init__(self) -> None: | |
""" | |
Пост-инициализация объекта ParsedXML. | |
""" | |
logger.debug( | |
f'Initializing ParsedXML: name="{self.name}", owner="{self.owner}", status="{self.status}"' | |
) | |
def only_info(self) -> 'ParsedXML': | |
""" | |
Создает новый объект ParsedXML только с базовой информацией, без контента. | |
""" | |
return ParsedXML( | |
status=self.status, | |
name=self.name, | |
owner=self.owner, | |
filename=self.filename, | |
id=self.id, | |
) | |
def to_text(self) -> str: | |
""" | |
Возвращает текстовое представление всего документа, включая таблицы и текст. | |
Returns: | |
str - полный текст документа | |
""" | |
result = [] | |
# Добавляем текст таблиц, если они есть | |
if self.tables: | |
result.append(self.tables.to_text()) | |
# Добавляем основной текст, если он есть | |
if self.text: | |
result.append(self.text.to_text()) | |
return "\n\n".join(result) | |
class ParsedXMLs: | |
""" | |
Класс для хранения данных, полученных из всех xml файлов. | |
""" | |
xmls: list[ParsedXML] | |
def to_pandas(self) -> pd.DataFrame: | |
""" | |
Преобразование данных в pandas DataFrame. | |
""" | |
return pd.DataFrame( | |
[ | |
{ | |
'status': xml.status, | |
'name': xml.name, | |
'owner': xml.owner, | |
'filename': xml.filename, | |
} | |
for xml in self.xmls | |
] | |
) | |