muryshev's picture
update
86c402d
raw
history blame
9.33 kB
"""
Модуль с парсером для XML документов.
"""
import logging
import os
import re
from typing import Any, BinaryIO
from bs4 import BeautifulSoup
from ...data_classes import ParsedDocument
from ..abstract_parser import AbstractParser
from ..file_types import FileType
from .xml.formula_parser import XMLFormulaParser
from .xml.image_parser import XMLImageParser
from .xml.meta_parser import XMLMetaParser
from .xml.paragraph_parser import XMLParagraphParser
from .xml.table_parser import XMLTableParser
logger = logging.getLogger(__name__)
class XMLParser(AbstractParser):
"""
Парсер для XML документов.
Поддерживает извлечение текста, таблиц, формул и других элементов
из XML файлов, используя BeautifulSoup.
"""
def __init__(
self,
style_cache: dict[str, Any] | None = None,
numbering_cache: dict[str, Any] | None = None,
relationships_cache: dict[str, Any] | None = None,
):
"""
Инициализирует XML парсер и его компоненты.
Args:
style_cache (dict[str, Any] | None): Кэш стилей для передачи парсеру параграфов
numbering_cache (dict[str, Any] | None): Кэш нумерации для передачи парсеру параграфов
relationships_cache (dict[str, Any] | None): Кэш связей для обработки референсов
"""
super().__init__(FileType.XML)
self.table_parser = XMLTableParser()
self.paragraph_parser = XMLParagraphParser(style_cache, numbering_cache, relationships_cache)
self.image_parser = XMLImageParser()
self.formula_parser = XMLFormulaParser()
self.meta_parser = XMLMetaParser()
def _detect_encoding(self, content: bytes) -> str:
"""
Определяет кодировку из XML заголовка или возвращает cp866.
Args:
content (bytes): Содержимое XML файла.
Returns:
str: Определенная кодировка или cp866 по умолчанию.
"""
try:
# Пытаемся прочитать первые строки как UTF-8 для поиска объявления XML
header = content[:1000].decode('utf-8', errors='ignore')
if '<?xml' in header and 'encoding=' in header:
match = re.search(r'encoding=["\']([^"\']+)["\']', header)
if match:
return match.group(1)
except Exception as e:
logger.debug(f"Error detecting encoding: {e}")
return 'cp866'
def parse_by_path(self, file_path: str) -> ParsedDocument:
"""
Парсит XML документ по пути к файлу и возвращает его структурное представление.
Args:
file_path (str): Путь к XML файлу для парсинга.
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если файл не может быть прочитан или распарсен.
"""
logger.debug(f"Parsing XML file: {file_path}")
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
with open(file_path, 'rb') as f:
content = f.read()
# Извлекаем имя файла из пути
filename = os.path.basename(file_path)
return self._parse_content(content, filename, file_path)
def parse(
self,
file: BinaryIO,
file_type: FileType | str | None = None,
) -> ParsedDocument:
"""
Парсит XML документ из объекта файла и возвращает его структурное представление.
Args:
file (BinaryIO): Объект файла для парсинга.
file_type: Тип файла, если известен.
Может быть объектом FileType или строкой с расширением (".xml").
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если файл не может быть прочитан или распарсен.
"""
logger.debug("Parsing XML from file object")
if file_type and isinstance(file_type, FileType) and file_type != FileType.XML:
logger.warning(
f"Provided file_type {file_type} doesn't match parser type {FileType.XML}"
)
# Читаем содержимое файла
content = file.read()
return self._parse_content(content, "unknown.xml", None)
def _parse_content(
self,
content: bytes,
filename: str,
filepath: str | None,
) -> ParsedDocument:
"""
Внутренний метод для парсинга содержимого XML файла.
Args:
content (bytes): Содержимое XML файла.
filename (str): Имя файла для документа.
filepath (str | None): Путь к файлу (или None, если из объекта).
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если содержимое не может быть распарсено.
"""
# Определение кодировки из XML заголовка
encoding = self._detect_encoding(content)
logger.debug(f"Detected encoding: {encoding}")
try:
xml_text = content.decode(encoding)
except UnicodeDecodeError as e:
logger.error(f"Failed to decode XML with {encoding} encoding: {e}")
raise ValueError(f"Cannot decode XML content with {encoding} encoding")
# Создание BeautifulSoup один раз
try:
soup = BeautifulSoup(xml_text, features='xml')
logger.debug("Created BeautifulSoup object")
except Exception as e:
logger.error(f"Failed to parse XML: {e}")
raise ValueError("Cannot parse XML content")
# Создание базового документа
doc = ParsedDocument(name=filename, type="XML")
# Извлечение метаданных
doc.meta = self.meta_parser.parse(soup, filepath)
logger.debug("Parsed metadata")
# Последовательный вызов парсеров
try:
# Вызываем парсеры, которые не модифицируют soup
doc.tables.extend(self.table_parser.parse(soup))
logger.debug(f"Parsed {len(doc.tables)} tables")
doc.images.extend(self.image_parser.parse(soup))
logger.debug(f"Parsed {len(doc.images)} images")
doc.formulas.extend(self.formula_parser.parse(soup))
logger.debug(f"Parsed {len(doc.formulas)} formulas")
# Вызываем парсер параграфов последним, т.к. он модифицирует soup
# (удаляет таблицы, изображения и др. элементы)
doc.paragraphs.extend(self.paragraph_parser.parse(soup))
logger.debug(f"Parsed {len(doc.paragraphs)} paragraphs")
# Связываем элементы на основе полнотекстового совпадения
self._link_elements(doc)
logger.debug("Linked elements based on text matching")
except Exception as e:
logger.error(f"Error during parsing components: {e}")
raise ValueError("Error parsing document components")
return doc
def _link_elements(self, doc: ParsedDocument) -> None:
"""
Связывает таблицы, изображения и формулы с соответствующими параграфами
на основе полнотекстового совпадения.
Args:
doc (ParsedDocument): Документ для обработки.
"""
# Индексируем параграфы документа
for i, paragraph in enumerate(doc.paragraphs):
paragraph.index_in_document = i
for i, table in enumerate(doc.tables):
table.index_in_document = i
last_index = 0
for table in doc.tables:
for i in range(last_index, len(doc.paragraphs)):
paragraph = doc.paragraphs[i]
if table.title == paragraph.text:
table.title_index_in_paragraphs = i
paragraph.title_of_table = table.index_in_document
last_index = i
break