muryshev's picture
update
86c402d
raw
history blame
18.3 kB
"""
Модуль для парсинга DOCX документов.
"""
import io
import logging
import os
import shutil
import tempfile
import zipfile
from typing import Any, BinaryIO
from bs4 import BeautifulSoup
from ...data_classes import ParsedDocument
from ..abstract_parser import AbstractParser
from ..file_types import FileType
from .docx.page_estimator import DocxPageEstimator
from .docx.relationships_parser import RelationshipsParser
from .xml_parser import XMLParser
logger = logging.getLogger(__name__)
class DocxParser(AbstractParser):
"""
Парсер для DOCX документов.
"""
def __init__(self):
"""
Инициализирует парсер DOCX.
"""
super().__init__(FileType.DOCX)
self.xml_parser = None
self.page_estimator = DocxPageEstimator()
self.relationships_parser = RelationshipsParser()
def parse_by_path(self, file_path: str) -> ParsedDocument:
"""
Парсит DOCX документ по пути к файлу.
Args:
file_path (str): Путь к DOCX файлу.
Returns:
ParsedDocument: Распарсенный документ.
"""
with open(file_path, 'rb') as f:
parsed_document = self.parse(f)
parsed_document.name = os.path.basename(file_path)
parsed_document.type = FileType.DOCX.name
return parsed_document
def parse(self, file: BinaryIO) -> ParsedDocument:
"""
Парсит DOCX документ из файлового объекта.
Args:
file (BinaryIO): Файловый объект DOCX документа.
Returns:
ParsedDocument: Распарсенный документ.
"""
# Создаем временную директорию для распаковки
temp_dir = tempfile.mkdtemp()
try:
# Распаковываем DOCX во временную директорию
with zipfile.ZipFile(file) as docx:
docx.extractall(temp_dir)
# Извлекаем стили
styles = self._extract_styles(temp_dir)
# Извлекаем нумерацию
numbering = self._extract_numbering(temp_dir)
# Извлекаем связи
relationships = self.relationships_parser.parse(temp_dir)
# Создаем XML парсер с кэшами
self.xml_parser = XMLParser(styles, numbering, relationships)
# Парсим основное содержимое
document_path = os.path.join(temp_dir, 'word', 'document.xml')
if not os.path.exists(document_path):
logger.error(f"Document file not found: {document_path}")
return ParsedDocument([], {})
# Читаем и парсим основной документ
with open(document_path, 'rb') as f:
content = f.read()
# Получаем метаданные
metadata = self._extract_metadata(temp_dir)
# Предварительно оцениваем номера страниц
estimated_pages = self._estimate_page_numbers(content)
# Парсим документ через XMLParser, оборачивая байты в BytesIO
doc = self.xml_parser.parse(io.BytesIO(content))
doc.meta.note = metadata
# Применяем номера страниц к элементам документа
self._apply_page_numbers(doc, estimated_pages)
return doc
finally:
# Удаляем временную директорию
shutil.rmtree(temp_dir)
def _extract_styles(self, temp_dir: str) -> dict[str, Any]:
"""
Извлекает стили из word/styles.xml.
Args:
temp_dir (str): Путь к временной директории с распакованным DOCX.
Returns:
dict[str, Any]: Словарь с информацией о стилях.
"""
styles_path = os.path.join(temp_dir, 'word', 'styles.xml')
if not os.path.exists(styles_path):
logger.warning(f"Styles file not found: {styles_path}")
return {}
try:
with open(styles_path, 'rb') as f:
content = f.read()
# Парсим XML с помощью BeautifulSoup
soup = BeautifulSoup(content, 'xml')
# Извлекаем информацию о стилях
styles = {}
for style in soup.find_all('w:style'):
if 'w:styleId' in style.attrs:
style_id = style['w:styleId']
style_info = {}
# Имя стиля
name = style.find('w:name')
if name and 'w:val' in name.attrs:
style_info['name'] = name['w:val']
# Тип стиля (paragraph, character, table, numbering)
if 'w:type' in style.attrs:
style_info['type'] = style['w:type']
# Базовый стиль
base_style = style.find('w:basedOn')
if base_style and 'w:val' in base_style.attrs:
style_info['based_on'] = base_style['w:val']
# Следующий стиль
next_style = style.find('w:next')
if next_style and 'w:val' in next_style.attrs:
style_info['next'] = next_style['w:val']
styles[style_id] = style_info
logger.debug(f"Extracted {len(styles)} styles")
return styles
except Exception as e:
logger.error(f"Error extracting styles: {e}")
return {}
def _extract_numbering(self, temp_dir: str) -> dict[str, Any]:
"""
Извлекает информацию о нумерации из word/numbering.xml.
Args:
temp_dir (str): Путь к временной директории с распакованным DOCX.
Returns:
dict[str, Any]: Словарь с информацией о нумерации.
"""
numbering_path = os.path.join(temp_dir, 'word', 'numbering.xml')
if not os.path.exists(numbering_path):
logger.warning(f"Numbering file not found: {numbering_path}")
return {}
try:
with open(numbering_path, 'rb') as f:
content = f.read()
# Парсим XML с помощью BeautifulSoup
soup = BeautifulSoup(content, 'xml')
# Извлекаем определения абстрактной нумерации
abstract_nums = {}
for abstract_num in soup.find_all('w:abstractNum'):
if 'w:abstractNumId' in abstract_num.attrs:
abstract_id = abstract_num['w:abstractNumId']
levels = {}
# Извлекаем информацию о каждом уровне нумерации
for level in abstract_num.find_all('w:lvl'):
if 'w:ilvl' in level.attrs:
level_id = level['w:ilvl']
level_info = {}
# Формат нумерации (decimal, bullet, etc.)
num_fmt = level.find('w:numFmt')
if num_fmt and 'w:val' in num_fmt.attrs:
level_info['format'] = num_fmt['w:val']
# Текст до и после номера
level_text = level.find('w:lvlText')
if level_text and 'w:val' in level_text.attrs:
level_info['text'] = level_text['w:val']
# Выравнивание
jc = level.find('w:lvlJc')
if jc and 'w:val' in jc.attrs:
level_info['alignment'] = jc['w:val']
levels[level_id] = level_info
abstract_nums[abstract_id] = levels
# Извлекаем конкретные определения нумерации
numbering = {}
for num in soup.find_all('w:num'):
if 'w:numId' in num.attrs:
num_id = num['w:numId']
abstract_num_id = None
# Получаем ссылку на абстрактную нумерацию
abstract_num = num.find('w:abstractNumId')
if abstract_num and 'w:val' in abstract_num.attrs:
abstract_num_id = abstract_num['w:val']
if abstract_num_id in abstract_nums:
numbering[num_id] = {
'abstract_num_id': abstract_num_id,
'levels': abstract_nums[abstract_num_id],
}
logger.debug(f"Extracted {len(numbering)} numbering definitions")
return numbering
except Exception as e:
logger.error(f"Error extracting numbering: {e}")
return {}
def _extract_metadata(self, temp_dir: str) -> dict[str, Any]:
"""
Извлекает метаданные из docProps/core.xml и docProps/app.xml.
Args:
temp_dir (str): Путь к временной директории с распакованным DOCX.
Returns:
dict[str, Any]: Словарь с метаданными.
"""
metadata = {}
# Извлекаем основные свойства
core_props_path = os.path.join(temp_dir, 'docProps', 'core.xml')
if os.path.exists(core_props_path):
try:
with open(core_props_path, 'rb') as f:
content = f.read()
soup = BeautifulSoup(content, 'xml')
# Автор
creator = soup.find('dc:creator')
if creator:
metadata['creator'] = creator.text
# Заголовок
title = soup.find('dc:title')
if title:
metadata['title'] = title.text
# Тема
subject = soup.find('dc:subject')
if subject:
metadata['subject'] = subject.text
# Описание
description = soup.find('dc:description')
if description:
metadata['description'] = description.text
# Ключевые слова
keywords = soup.find('cp:keywords')
if keywords:
metadata['keywords'] = keywords.text
# Даты создания и изменения
created = soup.find('dcterms:created')
if created:
metadata['created'] = created.text
modified = soup.find('dcterms:modified')
if modified:
metadata['modified'] = modified.text
except Exception as e:
logger.error(f"Error extracting core properties: {e}")
# Извлекаем свойства приложения
app_props_path = os.path.join(temp_dir, 'docProps', 'app.xml')
if os.path.exists(app_props_path):
try:
with open(app_props_path, 'rb') as f:
content = f.read()
soup = BeautifulSoup(content, 'xml')
# Статистика документа
pages = soup.find('Pages')
if pages:
metadata['pages'] = int(pages.text)
words = soup.find('Words')
if words:
metadata['words'] = int(words.text)
characters = soup.find('Characters')
if characters:
metadata['characters'] = int(characters.text)
# Информация о приложении
application = soup.find('Application')
if application:
metadata['application'] = application.text
app_version = soup.find('AppVersion')
if app_version:
metadata['app_version'] = app_version.text
# Информация о компании
company = soup.find('Company')
if company:
metadata['company'] = company.text
# Время редактирования
total_time = soup.find('TotalTime')
if total_time:
metadata['total_time'] = int(total_time.text)
except Exception as e:
logger.error(f"Error extracting app properties: {e}")
# Сохраняем метаданные как атрибут для доступа из других методов
self._metadata = metadata
return metadata
def _estimate_page_numbers(self, content: bytes) -> dict[str, int]:
"""
Оценивает номера страниц для элементов документа.
Args:
content (bytes): Содержимое документа.
Returns:
dict[str, int]: Словарь соответствий id элемента и номера страницы.
"""
logger.debug("Estimating page numbers for document elements")
# Создаем словарь для хранения номеров страниц
page_numbers = {}
try:
# Получаем метаданные, включая количество страниц из metadata
total_pages = self._metadata.get("pages", 0) if hasattr(self, "_metadata") else 0
if total_pages <= 0:
total_pages = 1 # Минимум одна страница
# Парсим XML с помощью BeautifulSoup (это быстрая операция)
soup = BeautifulSoup(content, 'xml')
# Используем упрощенный метод расчета
paragraph_pages, table_pages = self.page_estimator.process_document(
soup,
metadata=self._metadata if hasattr(self, "_metadata") else None
)
# Сохраняем информацию в page_numbers
page_numbers['paragraphs'] = paragraph_pages
page_numbers['tables'] = table_pages
page_numbers['total_pages'] = total_pages
logger.debug(f"Estimated document has {total_pages} pages")
logger.debug(f"Assigned page numbers for {len(paragraph_pages)} paragraphs and {len(table_pages)} tables")
except Exception as e:
logger.error(f"Error estimating page numbers: {e}")
return page_numbers
def _apply_page_numbers(self, doc: ParsedDocument, page_numbers: dict[str, int]) -> None:
"""
Применяет оценки номеров страниц к элементам документа.
Args:
doc (ParsedDocument): Документ для обновления.
page_numbers (dict[str, int]): Словарь соответствий id элемента и номера страницы.
"""
logger.debug("Applying page numbers to document elements")
# Получаем информацию о страницах
paragraph_pages = page_numbers.get('paragraphs', {})
table_pages = page_numbers.get('tables', {})
total_pages = page_numbers.get('total_pages', 1)
logger.debug(f"Applying page numbers: document has {total_pages} pages")
# Устанавливаем индексы документа и номера страниц для параграфов
for i, paragraph in enumerate(doc.paragraphs):
# Индекс в документе (хотя это также делается в XMLParser._link_elements)
paragraph.index_in_document = i
# Номер страницы
page_num = paragraph_pages.get(i, 1)
paragraph.page_number = page_num
# Устанавливаем индексы и номера страниц для таблиц
for i, table in enumerate(doc.tables):
# Индекс в документе (хотя это также делается в XMLParser._link_elements)
table.index_in_document = i
# Номер страницы
page_num = table_pages.get(i, 1)
table.page_number = page_num
# Для изображений
for i, image in enumerate(doc.images):
# Индекс в документе (хотя это также делается в XMLParser._link_elements)
image.index_in_document = i
# Номер страницы (примерно)
image.page_number = min(total_pages, (i % total_pages) + 1)
# Для формул
for i, formula in enumerate(doc.formulas):
# Индекс в документе (хотя это также делается в XMLParser._link_elements)
formula.index_in_document = i
# Номер страницы (примерно)
formula.page_number = min(total_pages, (i % total_pages) + 1)