Spaces:
Sleeping
Sleeping
File size: 9,326 Bytes
86c402d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
"""
Модуль с парсером для XML документов.
"""
import logging
import os
import re
from typing import Any, BinaryIO
from bs4 import BeautifulSoup
from ...data_classes import ParsedDocument
from ..abstract_parser import AbstractParser
from ..file_types import FileType
from .xml.formula_parser import XMLFormulaParser
from .xml.image_parser import XMLImageParser
from .xml.meta_parser import XMLMetaParser
from .xml.paragraph_parser import XMLParagraphParser
from .xml.table_parser import XMLTableParser
logger = logging.getLogger(__name__)
class XMLParser(AbstractParser):
"""
Парсер для XML документов.
Поддерживает извлечение текста, таблиц, формул и других элементов
из XML файлов, используя BeautifulSoup.
"""
def __init__(
self,
style_cache: dict[str, Any] | None = None,
numbering_cache: dict[str, Any] | None = None,
relationships_cache: dict[str, Any] | None = None,
):
"""
Инициализирует XML парсер и его компоненты.
Args:
style_cache (dict[str, Any] | None): Кэш стилей для передачи парсеру параграфов
numbering_cache (dict[str, Any] | None): Кэш нумерации для передачи парсеру параграфов
relationships_cache (dict[str, Any] | None): Кэш связей для обработки референсов
"""
super().__init__(FileType.XML)
self.table_parser = XMLTableParser()
self.paragraph_parser = XMLParagraphParser(style_cache, numbering_cache, relationships_cache)
self.image_parser = XMLImageParser()
self.formula_parser = XMLFormulaParser()
self.meta_parser = XMLMetaParser()
def _detect_encoding(self, content: bytes) -> str:
"""
Определяет кодировку из XML заголовка или возвращает cp866.
Args:
content (bytes): Содержимое XML файла.
Returns:
str: Определенная кодировка или cp866 по умолчанию.
"""
try:
# Пытаемся прочитать первые строки как UTF-8 для поиска объявления XML
header = content[:1000].decode('utf-8', errors='ignore')
if '<?xml' in header and 'encoding=' in header:
match = re.search(r'encoding=["\']([^"\']+)["\']', header)
if match:
return match.group(1)
except Exception as e:
logger.debug(f"Error detecting encoding: {e}")
return 'cp866'
def parse_by_path(self, file_path: str) -> ParsedDocument:
"""
Парсит XML документ по пути к файлу и возвращает его структурное представление.
Args:
file_path (str): Путь к XML файлу для парсинга.
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если файл не может быть прочитан или распарсен.
"""
logger.debug(f"Parsing XML file: {file_path}")
if not os.path.exists(file_path):
raise ValueError(f"File not found: {file_path}")
with open(file_path, 'rb') as f:
content = f.read()
# Извлекаем имя файла из пути
filename = os.path.basename(file_path)
return self._parse_content(content, filename, file_path)
def parse(
self,
file: BinaryIO,
file_type: FileType | str | None = None,
) -> ParsedDocument:
"""
Парсит XML документ из объекта файла и возвращает его структурное представление.
Args:
file (BinaryIO): Объект файла для парсинга.
file_type: Тип файла, если известен.
Может быть объектом FileType или строкой с расширением (".xml").
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если файл не может быть прочитан или распарсен.
"""
logger.debug("Parsing XML from file object")
if file_type and isinstance(file_type, FileType) and file_type != FileType.XML:
logger.warning(
f"Provided file_type {file_type} doesn't match parser type {FileType.XML}"
)
# Читаем содержимое файла
content = file.read()
return self._parse_content(content, "unknown.xml", None)
def _parse_content(
self,
content: bytes,
filename: str,
filepath: str | None,
) -> ParsedDocument:
"""
Внутренний метод для парсинга содержимого XML файла.
Args:
content (bytes): Содержимое XML файла.
filename (str): Имя файла для документа.
filepath (str | None): Путь к файлу (или None, если из объекта).
Returns:
ParsedDocument: Структурное представление документа.
Raises:
ValueError: Если содержимое не может быть распарсено.
"""
# Определение кодировки из XML заголовка
encoding = self._detect_encoding(content)
logger.debug(f"Detected encoding: {encoding}")
try:
xml_text = content.decode(encoding)
except UnicodeDecodeError as e:
logger.error(f"Failed to decode XML with {encoding} encoding: {e}")
raise ValueError(f"Cannot decode XML content with {encoding} encoding")
# Создание BeautifulSoup один раз
try:
soup = BeautifulSoup(xml_text, features='xml')
logger.debug("Created BeautifulSoup object")
except Exception as e:
logger.error(f"Failed to parse XML: {e}")
raise ValueError("Cannot parse XML content")
# Создание базового документа
doc = ParsedDocument(name=filename, type="XML")
# Извлечение метаданных
doc.meta = self.meta_parser.parse(soup, filepath)
logger.debug("Parsed metadata")
# Последовательный вызов парсеров
try:
# Вызываем парсеры, которые не модифицируют soup
doc.tables.extend(self.table_parser.parse(soup))
logger.debug(f"Parsed {len(doc.tables)} tables")
doc.images.extend(self.image_parser.parse(soup))
logger.debug(f"Parsed {len(doc.images)} images")
doc.formulas.extend(self.formula_parser.parse(soup))
logger.debug(f"Parsed {len(doc.formulas)} formulas")
# Вызываем парсер параграфов последним, т.к. он модифицирует soup
# (удаляет таблицы, изображения и др. элементы)
doc.paragraphs.extend(self.paragraph_parser.parse(soup))
logger.debug(f"Parsed {len(doc.paragraphs)} paragraphs")
# Связываем элементы на основе полнотекстового совпадения
self._link_elements(doc)
logger.debug("Linked elements based on text matching")
except Exception as e:
logger.error(f"Error during parsing components: {e}")
raise ValueError("Error parsing document components")
return doc
def _link_elements(self, doc: ParsedDocument) -> None:
"""
Связывает таблицы, изображения и формулы с соответствующими параграфами
на основе полнотекстового совпадения.
Args:
doc (ParsedDocument): Документ для обработки.
"""
# Индексируем параграфы документа
for i, paragraph in enumerate(doc.paragraphs):
paragraph.index_in_document = i
for i, table in enumerate(doc.tables):
table.index_in_document = i
last_index = 0
for table in doc.tables:
for i in range(last_index, len(doc.paragraphs)):
paragraph = doc.paragraphs[i]
if table.title == paragraph.text:
table.title_index_in_paragraphs = i
paragraph.title_of_table = table.index_in_document
last_index = i
break
|