muryshev's picture
init
57cf043
raw
history blame
21.4 kB
import logging
from collections import Counter
from dataclasses import dataclass
import pandas as pd
from components.parser.abbreviations.abbreviation import Abbreviation
logger = logging.getLogger(__name__)
@dataclass
class ParsedRow:
"""
Класс для хранения данных, полученных из строки таблицы.
"""
index: int
cols: list[str]
def apply_abbreviations(self, abbreviations: list) -> None:
"""
Применяет список аббревиатур к строке таблицы.
Args:
abbreviations: list[Abbreviation] - список аббревиатур, которые нужно применить
"""
for abbreviation in abbreviations:
self.cols = [abbreviation.apply(column) for column in self.cols]
def to_text(self, header: list[str] | None = None) -> str:
"""
Преобразование строки таблицы в текст.
Пример такого преобразования:
```
ПиП : Привет и Пока
```
Args:
header: list[str] | None - шапка таблицы, если обрабатывается многоколоночная таблица
Returns:
str - строка таблицы в текстовом формате
"""
if header is not None:
return '\n'.join(self._apply_header(header)).strip()
else:
return ' : '.join(self.cols).strip()
def _apply_header(self, header: list[str]) -> list[str]:
"""
Применение шапки таблицы к строке.
Args:
header: list[str] - шапка таблицы
Returns:
list[str] - список колонок с применённой шапкой
"""
if len(header) != len(self.cols):
logging.debug(
f'Количество колонок в строке {self.index} не совпадает с количеством колонок в шапке таблицы'
)
named_part = [
f'{header[col_index]}: {col_value}'
for col_index, col_value in enumerate(self.cols[: len(header)])
]
unnamed_part = self.cols[len(header) :]
return named_part + unnamed_part
@dataclass
class ParsedTable:
"""
Класс для хранения данных, полученных из таблицы.
index: int - номер таблицы
short_type: str | None - либо "сокращения", либо "регламентирующие документы", для других таблиц не заполняется
rows: list[ParsedRow] - строки таблицы
name: str | None - название таблицы, если найдено
"""
index: int
short_type: str | None
header: list[str] | None
rows: list[ParsedRow]
name: str | None = None
subtables: list['ParsedTable'] | None = None
note: str | None = None
rows_count: int = 0 # Количество строк в таблице
modal_cols_count: int = 0 # Модальное (самое частое) количество столбцов
has_merged_cells: bool = False # Наличие объединенных ячеек
def apply_abbreviations(self, abbreviations) -> None:
"""
Применяет список аббревиатур ко всем элементам таблицы.
Args:
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур
"""
# Преобразуем одиночную аббревиатуру в список для унификации обработки
if not isinstance(abbreviations, list):
abbreviations = [abbreviations]
# Применяем к названию таблицы, если оно есть
if self.name:
for abbreviation in abbreviations:
self.name = abbreviation.apply(self.name)
# Применяем к заголовку таблицы, если он есть
if self.header:
for abbreviation in abbreviations:
self.header = [abbreviation.apply(column) for column in self.header]
# Применяем к строкам таблицы
for row in self.rows:
row.apply_abbreviations(abbreviations)
# Применяем к примечанию, если оно есть
if self.note:
for abbreviation in abbreviations:
self.note = abbreviation.apply(self.note)
# Применяем к подтаблицам, если они есть
if self.subtables:
for subtable in self.subtables:
subtable.apply_abbreviations(abbreviations)
def to_text(self) -> str:
"""
Преобразование таблицы в текст для дальнейшего разбиения на чанки.
Если таблица имеет менее 12 строк, менее 5 столбцов и не содержит объединенных ячеек,
то она будет преобразована в формат Markdown.
Returns:
str - таблица в текстовом формате
"""
# Если таблица соответствует критериям для Markdown форматирования
if (self.rows_count < 12 and self.modal_cols_count < 5 and not self.has_merged_cells):
return self._to_markdown()
# Иначе используем стандартный текстовый формат
result = []
# Основная таблица
result.append('\n\n'.join(self._rich_row(row) for row in self.rows))
# Подтаблицы
if self.subtables:
for subtable in self.subtables:
result.append(subtable.to_text())
# Примечание
if self.note:
result.append(f"Примечание к таблице {self.index + 1}: {self.note}")
return '\n\n'.join(result)
def _to_markdown(self) -> str:
"""
Преобразование таблицы в формат Markdown.
Returns:
str - таблица в формате Markdown
"""
result = []
# Добавляем название таблицы, если оно есть
if self.name:
result.append(f"### {self.name}")
result.append("")
# Собираем заголовок таблицы
if self.header:
header_row = "| " + " | ".join(self.header) + " |"
separator = "| " + " | ".join(["---"] * len(self.header)) + " |"
result.append(header_row)
result.append(separator)
else:
# Если нет заголовка, используем максимальное количество колонок
max_cols = max([len(row.cols) for row in self.rows]) if self.rows else 0
if max_cols > 0:
separator = "| " + " | ".join(["---"] * max_cols) + " |"
result.append(separator)
# Добавляем строки таблицы
for row in self.rows:
# Формируем строку в формате Markdown
markdown_row = "| " + " | ".join(row.cols) + " |"
result.append(markdown_row)
# Добавляем примечание, если оно есть
if self.note:
result.append("")
result.append(f"*Примечание: {self.note}*")
# Добавляем подтаблицы, если они есть
if self.subtables:
for subtable in self.subtables:
result.append("")
result.append(subtable.to_text())
return "\n".join(result)
def _rich_row(self, row: ParsedRow) -> str:
"""
Преобразование строки таблицы в текст с учётом самой таблицы.
Примеры такого преобразования:
```
Т1 сокращения [Название таблицы]
1
ПиП : Привет и Пока
```
```
Т2 [Название таблицы]
1
Столбец 1 : Значение 1
Столбец 2 : Значение 2
```
Args:
row: ParsedRow - строка таблицы
Returns:
str - строка таблицы в текстовом формате
"""
table_header = f'Т{self.index + 1}'
if self.short_type is not None:
table_header += f' {self.short_type}'
if self.name is not None:
table_header += f' [{self.name}]'
return f'{table_header}\n{row.index}\n{row.to_text(self.header)}'
def normalize(self) -> 'ParsedTable':
"""
Нормализует таблицу, обрабатывая подтаблицы и примечания.
Нормализация включает:
1. Определение нормального количества столбцов (мода)
2. Выделение подтаблиц, если встречаются строки с одним столбцом,
когда нормальное число столбцов не равно 1
3. Обработка примечаний (последняя строка с одним столбцом)
4. Вычисление количества строк, модального количества столбцов
5. Определение наличия объединенных ячеек
Returns:
ParsedTable - нормализованная таблица
"""
if not self.rows:
return self
# Находим моду по количеству столбцов
col_counts = [len(row.cols) for row in self.rows]
mode_count = Counter(col_counts).most_common(1)[0][0]
# Устанавливаем статистику таблицы
rows_count = len(self.rows)
modal_cols_count = mode_count
# Проверяем наличие объединенных ячеек - если есть строки с разным количеством колонок
has_merged_cells = len(set(col_counts)) > 1
# Если мода не равна 1, ищем строки с одним столбцом для обработки
if mode_count != 1:
normalized_rows = []
subtables = []
current_subtable_rows = []
current_subtable_name = None
last_row_index = len(self.rows) - 1
note = None
for i, row in enumerate(self.rows):
if len(row.cols) == 1 and i != last_row_index:
# Это может быть подзаголовок подтаблицы
if current_subtable_rows:
# Создаем подтаблицу из накопленных строк
subtable = ParsedTable(
index=len(subtables),
short_type=self.short_type,
header=self.header, # Используем хедер основной таблицы
rows=current_subtable_rows,
name=current_subtable_name,
rows_count=len(current_subtable_rows),
modal_cols_count=mode_count,
has_merged_cells=has_merged_cells
)
subtables.append(subtable)
# Начинаем новую подтаблицу
# Формируем имя подтаблицы как комбинацию имени таблицы и текста подзаголовка
current_subtable_name = (
f"{self.name}: {row.cols[0]}" if self.name else row.cols[0]
)
current_subtable_rows = []
elif len(row.cols) == 1 and i == last_row_index:
# Это примечание
note = row.cols[0]
else:
# Обычная строка
if current_subtable_name:
# Добавляем в текущую подтаблицу
current_subtable_rows.append(row)
else:
# Добавляем в основную таблицу
normalized_rows.append(row)
# Добавляем последнюю подтаблицу, если она есть
if current_subtable_rows:
subtable = ParsedTable(
index=len(subtables),
short_type=self.short_type, # Используем тип основной таблицы
header=self.header, # Используем хедер основной таблицы
rows=current_subtable_rows,
name=current_subtable_name,
rows_count=len(current_subtable_rows),
modal_cols_count=mode_count,
has_merged_cells=has_merged_cells
)
subtables.append(subtable)
# Создаем новую таблицу с обновленными статистическими полями
return ParsedTable(
index=self.index,
short_type=self.short_type,
header=self.header,
rows=normalized_rows,
name=self.name,
subtables=subtables if subtables else None,
note=note,
rows_count=len(normalized_rows),
modal_cols_count=modal_cols_count,
has_merged_cells=has_merged_cells
)
# Если нет специальной обработки, просто обновляем статистические поля
self.rows_count = rows_count
self.modal_cols_count = modal_cols_count
self.has_merged_cells = has_merged_cells
return self
@dataclass
class ParsedTables:
"""
Класс для хранения данных, полученных из всех таблиц файла.
"""
tables: list[ParsedTable]
def apply_abbreviations(self, abbreviations) -> None:
"""
Применяет список аббревиатур ко всем таблицам.
Args:
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур
"""
# Преобразуем одиночную аббревиатуру в список для унификации обработки
if not isinstance(abbreviations, list):
abbreviations = [abbreviations]
for table in self.tables:
table.apply_abbreviations(abbreviations)
def to_text(self) -> str:
"""
Преобразование всех таблиц в текст для дальнейшего разбиения на чанки.
Returns:
str - все таблицы в текстовом формате
"""
return '\n\n'.join(table.to_text() for table in self.tables)
def normalize(self) -> 'ParsedTables':
"""
Нормализует все таблицы, обрабатывая подтаблицы и примечания.
Returns:
ParsedTables - нормализованные таблицы
"""
normalized_tables = [table.normalize() for table in self.tables]
return ParsedTables(tables=normalized_tables)
@dataclass
class ParsedText:
"""
Класс для хранения текста, полученного из XML файла.
"""
content: list[str]
def apply_abbreviations(
self, abbreviations: list[Abbreviation] | Abbreviation
) -> None:
"""
Применяет список аббревиатур ко всем строкам текста.
Args:
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур
"""
# Преобразуем одиночную аббревиатуру в список для унификации обработки
if not isinstance(abbreviations, list):
abbreviations = [abbreviations]
for abbreviation in abbreviations:
self.content = [abbreviation.apply(line) for line in self.content]
def to_text(self) -> str:
"""
Возвращает текстовое представление.
Returns:
str - текст документа
"""
return "\n\n".join(self.content)
@dataclass
class ParsedXML:
"""
Класс для хранения данных, полученных из xml файла.
"""
status: str
name: str | None
owner: str | None
filename: str
tables: ParsedTables | None = None
text: ParsedText | None = None
abbreviations: list = None # Список аббревиатур, извлеченных из документа
id: int | None = None
def apply_abbreviations(
self, abbreviations: list[Abbreviation] | Abbreviation
) -> None:
"""
Применяет список аббревиатур ко всем элементам документа.
Args:
abbreviations: list[Abbreviation] | Abbreviation - объект или список объектов аббревиатур
"""
# Преобразуем одиночную аббревиатуру в список для унификации обработки
if not isinstance(abbreviations, list):
abbreviations = [abbreviations]
# Применяем к содержимому таблиц, если они есть
if self.tables:
self.tables.apply_abbreviations(abbreviations)
# Применяем к текстовому содержимому, если оно есть
if self.text:
self.text.apply_abbreviations(abbreviations)
def apply_document_abbreviations(self) -> None:
"""
Применяет аббревиатуры, извлеченные из документа, ко всему его содержимому.
"""
if self.abbreviations:
self.apply_abbreviations(self.abbreviations)
def __post_init__(self) -> None:
"""
Пост-инициализация объекта ParsedXML.
"""
logger.debug(
f'Initializing ParsedXML: name="{self.name}", owner="{self.owner}", status="{self.status}"'
)
def only_info(self) -> 'ParsedXML':
"""
Создает новый объект ParsedXML только с базовой информацией, без контента.
"""
return ParsedXML(
status=self.status,
name=self.name,
owner=self.owner,
filename=self.filename,
id=self.id,
)
def to_text(self) -> str:
"""
Возвращает текстовое представление всего документа, включая таблицы и текст.
Returns:
str - полный текст документа
"""
result = []
# Добавляем текст таблиц, если они есть
if self.tables:
result.append(self.tables.to_text())
# Добавляем основной текст, если он есть
if self.text:
result.append(self.text.to_text())
return "\n\n".join(result)
@dataclass
class ParsedXMLs:
"""
Класс для хранения данных, полученных из всех xml файлов.
"""
xmls: list[ParsedXML]
def to_pandas(self) -> pd.DataFrame:
"""
Преобразование данных в pandas DataFrame.
"""
return pd.DataFrame(
[
{
'status': xml.status,
'name': xml.name,
'owner': xml.owner,
'filename': xml.filename,
}
for xml in self.xmls
]
)