Spaces:
Sleeping
Sleeping
""" | |
Модуль с парсером для XML документов. | |
""" | |
import logging | |
import os | |
import re | |
from typing import Any, BinaryIO | |
from bs4 import BeautifulSoup | |
from ...data_classes import ParsedDocument | |
from ..abstract_parser import AbstractParser | |
from ..file_types import FileType | |
from .xml.formula_parser import XMLFormulaParser | |
from .xml.image_parser import XMLImageParser | |
from .xml.meta_parser import XMLMetaParser | |
from .xml.paragraph_parser import XMLParagraphParser | |
from .xml.table_parser import XMLTableParser | |
logger = logging.getLogger(__name__) | |
class XMLParser(AbstractParser): | |
""" | |
Парсер для XML документов. | |
Поддерживает извлечение текста, таблиц, формул и других элементов | |
из XML файлов, используя BeautifulSoup. | |
""" | |
def __init__( | |
self, | |
style_cache: dict[str, Any] | None = None, | |
numbering_cache: dict[str, Any] | None = None, | |
relationships_cache: dict[str, Any] | None = None, | |
): | |
""" | |
Инициализирует XML парсер и его компоненты. | |
Args: | |
style_cache (dict[str, Any] | None): Кэш стилей для передачи парсеру параграфов | |
numbering_cache (dict[str, Any] | None): Кэш нумерации для передачи парсеру параграфов | |
relationships_cache (dict[str, Any] | None): Кэш связей для обработки референсов | |
""" | |
super().__init__(FileType.XML) | |
self.table_parser = XMLTableParser() | |
self.paragraph_parser = XMLParagraphParser(style_cache, numbering_cache, relationships_cache) | |
self.image_parser = XMLImageParser() | |
self.formula_parser = XMLFormulaParser() | |
self.meta_parser = XMLMetaParser() | |
def _detect_encoding(self, content: bytes) -> str: | |
""" | |
Определяет кодировку из XML заголовка или возвращает cp866. | |
Args: | |
content (bytes): Содержимое XML файла. | |
Returns: | |
str: Определенная кодировка или cp866 по умолчанию. | |
""" | |
try: | |
# Пытаемся прочитать первые строки как UTF-8 для поиска объявления XML | |
header = content[:1000].decode('utf-8', errors='ignore') | |
if '<?xml' in header and 'encoding=' in header: | |
match = re.search(r'encoding=["\']([^"\']+)["\']', header) | |
if match: | |
return match.group(1) | |
except Exception as e: | |
logger.debug(f"Error detecting encoding: {e}") | |
return 'cp866' | |
def parse_by_path(self, file_path: str) -> ParsedDocument: | |
""" | |
Парсит XML документ по пути к файлу и возвращает его структурное представление. | |
Args: | |
file_path (str): Путь к XML файлу для парсинга. | |
Returns: | |
ParsedDocument: Структурное представление документа. | |
Raises: | |
ValueError: Если файл не может быть прочитан или распарсен. | |
""" | |
logger.debug(f"Parsing XML file: {file_path}") | |
if not os.path.exists(file_path): | |
raise ValueError(f"File not found: {file_path}") | |
with open(file_path, 'rb') as f: | |
content = f.read() | |
# Извлекаем имя файла из пути | |
filename = os.path.basename(file_path) | |
return self._parse_content(content, filename, file_path) | |
def parse( | |
self, | |
file: BinaryIO, | |
file_type: FileType | str | None = None, | |
) -> ParsedDocument: | |
""" | |
Парсит XML документ из объекта файла и возвращает его структурное представление. | |
Args: | |
file (BinaryIO): Объект файла для парсинга. | |
file_type: Тип файла, если известен. | |
Может быть объектом FileType или строкой с расширением (".xml"). | |
Returns: | |
ParsedDocument: Структурное представление документа. | |
Raises: | |
ValueError: Если файл не может быть прочитан или распарсен. | |
""" | |
logger.debug("Parsing XML from file object") | |
if file_type and isinstance(file_type, FileType) and file_type != FileType.XML: | |
logger.warning( | |
f"Provided file_type {file_type} doesn't match parser type {FileType.XML}" | |
) | |
# Читаем содержимое файла | |
content = file.read() | |
return self._parse_content(content, "unknown.xml", None) | |
def _parse_content( | |
self, | |
content: bytes, | |
filename: str, | |
filepath: str | None, | |
) -> ParsedDocument: | |
""" | |
Внутренний метод для парсинга содержимого XML файла. | |
Args: | |
content (bytes): Содержимое XML файла. | |
filename (str): Имя файла для документа. | |
filepath (str | None): Путь к файлу (или None, если из объекта). | |
Returns: | |
ParsedDocument: Структурное представление документа. | |
Raises: | |
ValueError: Если содержимое не может быть распарсено. | |
""" | |
# Определение кодировки из XML заголовка | |
encoding = self._detect_encoding(content) | |
logger.debug(f"Detected encoding: {encoding}") | |
try: | |
xml_text = content.decode(encoding) | |
except UnicodeDecodeError as e: | |
logger.error(f"Failed to decode XML with {encoding} encoding: {e}") | |
raise ValueError(f"Cannot decode XML content with {encoding} encoding") | |
# Создание BeautifulSoup один раз | |
try: | |
soup = BeautifulSoup(xml_text, features='xml') | |
logger.debug("Created BeautifulSoup object") | |
except Exception as e: | |
logger.error(f"Failed to parse XML: {e}") | |
raise ValueError("Cannot parse XML content") | |
# Создание базового документа | |
doc = ParsedDocument(name=filename, type="XML") | |
# Извлечение метаданных | |
doc.meta = self.meta_parser.parse(soup, filepath) | |
logger.debug("Parsed metadata") | |
# Последовательный вызов парсеров | |
try: | |
# Вызываем парсеры, которые не модифицируют soup | |
doc.tables.extend(self.table_parser.parse(soup)) | |
logger.debug(f"Parsed {len(doc.tables)} tables") | |
doc.images.extend(self.image_parser.parse(soup)) | |
logger.debug(f"Parsed {len(doc.images)} images") | |
doc.formulas.extend(self.formula_parser.parse(soup)) | |
logger.debug(f"Parsed {len(doc.formulas)} formulas") | |
# Вызываем парсер параграфов последним, т.к. он модифицирует soup | |
# (удаляет таблицы, изображения и др. элементы) | |
doc.paragraphs.extend(self.paragraph_parser.parse(soup)) | |
logger.debug(f"Parsed {len(doc.paragraphs)} paragraphs") | |
# Связываем элементы на основе полнотекстового совпадения | |
self._link_elements(doc) | |
logger.debug("Linked elements based on text matching") | |
except Exception as e: | |
logger.error(f"Error during parsing components: {e}") | |
raise ValueError("Error parsing document components") | |
return doc | |
def _link_elements(self, doc: ParsedDocument) -> None: | |
""" | |
Связывает таблицы, изображения и формулы с соответствующими параграфами | |
на основе полнотекстового совпадения. | |
Args: | |
doc (ParsedDocument): Документ для обработки. | |
""" | |
# Индексируем параграфы документа | |
for i, paragraph in enumerate(doc.paragraphs): | |
paragraph.index_in_document = i | |
for i, table in enumerate(doc.tables): | |
table.index_in_document = i | |
last_index = 0 | |
for table in doc.tables: | |
for i in range(last_index, len(doc.paragraphs)): | |
paragraph = doc.paragraphs[i] | |
if table.title == paragraph.text: | |
table.title_index_in_paragraphs = i | |
paragraph.title_of_table = table.index_in_document | |
last_index = i | |
break | |