Spaces:
Sleeping
Sleeping
""" | |
Модуль для парсинга DOCX документов. | |
""" | |
import io | |
import logging | |
import os | |
import shutil | |
import tempfile | |
import zipfile | |
from typing import Any, BinaryIO | |
from bs4 import BeautifulSoup | |
from ...data_classes import ParsedDocument | |
from ..abstract_parser import AbstractParser | |
from ..file_types import FileType | |
from .docx.page_estimator import DocxPageEstimator | |
from .docx.relationships_parser import RelationshipsParser | |
from .xml_parser import XMLParser | |
logger = logging.getLogger(__name__) | |
class DocxParser(AbstractParser): | |
""" | |
Парсер для DOCX документов. | |
""" | |
def __init__(self): | |
""" | |
Инициализирует парсер DOCX. | |
""" | |
super().__init__(FileType.DOCX) | |
self.xml_parser = None | |
self.page_estimator = DocxPageEstimator() | |
self.relationships_parser = RelationshipsParser() | |
def parse_by_path(self, file_path: str) -> ParsedDocument: | |
""" | |
Парсит DOCX документ по пути к файлу. | |
Args: | |
file_path (str): Путь к DOCX файлу. | |
Returns: | |
ParsedDocument: Распарсенный документ. | |
""" | |
with open(file_path, 'rb') as f: | |
parsed_document = self.parse(f) | |
parsed_document.name = os.path.basename(file_path) | |
parsed_document.type = FileType.DOCX.name | |
return parsed_document | |
def parse(self, file: BinaryIO) -> ParsedDocument: | |
""" | |
Парсит DOCX документ из файлового объекта. | |
Args: | |
file (BinaryIO): Файловый объект DOCX документа. | |
Returns: | |
ParsedDocument: Распарсенный документ. | |
""" | |
# Создаем временную директорию для распаковки | |
temp_dir = tempfile.mkdtemp() | |
try: | |
# Распаковываем DOCX во временную директорию | |
with zipfile.ZipFile(file) as docx: | |
docx.extractall(temp_dir) | |
# Извлекаем стили | |
styles = self._extract_styles(temp_dir) | |
# Извлекаем нумерацию | |
numbering = self._extract_numbering(temp_dir) | |
# Извлекаем связи | |
relationships = self.relationships_parser.parse(temp_dir) | |
# Создаем XML парсер с кэшами | |
self.xml_parser = XMLParser(styles, numbering, relationships) | |
# Парсим основное содержимое | |
document_path = os.path.join(temp_dir, 'word', 'document.xml') | |
if not os.path.exists(document_path): | |
logger.error(f"Document file not found: {document_path}") | |
return ParsedDocument([], {}) | |
# Читаем и парсим основной документ | |
with open(document_path, 'rb') as f: | |
content = f.read() | |
# Получаем метаданные | |
metadata = self._extract_metadata(temp_dir) | |
# Предварительно оцениваем номера страниц | |
estimated_pages = self._estimate_page_numbers(content) | |
# Парсим документ через XMLParser, оборачивая байты в BytesIO | |
doc = self.xml_parser.parse(io.BytesIO(content)) | |
doc.meta.note = metadata | |
# Применяем номера страниц к элементам документа | |
self._apply_page_numbers(doc, estimated_pages) | |
return doc | |
finally: | |
# Удаляем временную директорию | |
shutil.rmtree(temp_dir) | |
def _extract_styles(self, temp_dir: str) -> dict[str, Any]: | |
""" | |
Извлекает стили из word/styles.xml. | |
Args: | |
temp_dir (str): Путь к временной директории с распакованным DOCX. | |
Returns: | |
dict[str, Any]: Словарь с информацией о стилях. | |
""" | |
styles_path = os.path.join(temp_dir, 'word', 'styles.xml') | |
if not os.path.exists(styles_path): | |
logger.warning(f"Styles file not found: {styles_path}") | |
return {} | |
try: | |
with open(styles_path, 'rb') as f: | |
content = f.read() | |
# Парсим XML с помощью BeautifulSoup | |
soup = BeautifulSoup(content, 'xml') | |
# Извлекаем информацию о стилях | |
styles = {} | |
for style in soup.find_all('w:style'): | |
if 'w:styleId' in style.attrs: | |
style_id = style['w:styleId'] | |
style_info = {} | |
# Имя стиля | |
name = style.find('w:name') | |
if name and 'w:val' in name.attrs: | |
style_info['name'] = name['w:val'] | |
# Тип стиля (paragraph, character, table, numbering) | |
if 'w:type' in style.attrs: | |
style_info['type'] = style['w:type'] | |
# Базовый стиль | |
base_style = style.find('w:basedOn') | |
if base_style and 'w:val' in base_style.attrs: | |
style_info['based_on'] = base_style['w:val'] | |
# Следующий стиль | |
next_style = style.find('w:next') | |
if next_style and 'w:val' in next_style.attrs: | |
style_info['next'] = next_style['w:val'] | |
styles[style_id] = style_info | |
logger.debug(f"Extracted {len(styles)} styles") | |
return styles | |
except Exception as e: | |
logger.error(f"Error extracting styles: {e}") | |
return {} | |
def _extract_numbering(self, temp_dir: str) -> dict[str, Any]: | |
""" | |
Извлекает информацию о нумерации из word/numbering.xml. | |
Args: | |
temp_dir (str): Путь к временной директории с распакованным DOCX. | |
Returns: | |
dict[str, Any]: Словарь с информацией о нумерации. | |
""" | |
numbering_path = os.path.join(temp_dir, 'word', 'numbering.xml') | |
if not os.path.exists(numbering_path): | |
logger.warning(f"Numbering file not found: {numbering_path}") | |
return {} | |
try: | |
with open(numbering_path, 'rb') as f: | |
content = f.read() | |
# Парсим XML с помощью BeautifulSoup | |
soup = BeautifulSoup(content, 'xml') | |
# Извлекаем определения абстрактной нумерации | |
abstract_nums = {} | |
for abstract_num in soup.find_all('w:abstractNum'): | |
if 'w:abstractNumId' in abstract_num.attrs: | |
abstract_id = abstract_num['w:abstractNumId'] | |
levels = {} | |
# Извлекаем информацию о каждом уровне нумерации | |
for level in abstract_num.find_all('w:lvl'): | |
if 'w:ilvl' in level.attrs: | |
level_id = level['w:ilvl'] | |
level_info = {} | |
# Формат нумерации (decimal, bullet, etc.) | |
num_fmt = level.find('w:numFmt') | |
if num_fmt and 'w:val' in num_fmt.attrs: | |
level_info['format'] = num_fmt['w:val'] | |
# Текст до и после номера | |
level_text = level.find('w:lvlText') | |
if level_text and 'w:val' in level_text.attrs: | |
level_info['text'] = level_text['w:val'] | |
# Выравнивание | |
jc = level.find('w:lvlJc') | |
if jc and 'w:val' in jc.attrs: | |
level_info['alignment'] = jc['w:val'] | |
levels[level_id] = level_info | |
abstract_nums[abstract_id] = levels | |
# Извлекаем конкретные определения нумерации | |
numbering = {} | |
for num in soup.find_all('w:num'): | |
if 'w:numId' in num.attrs: | |
num_id = num['w:numId'] | |
abstract_num_id = None | |
# Получаем ссылку на абстрактную нумерацию | |
abstract_num = num.find('w:abstractNumId') | |
if abstract_num and 'w:val' in abstract_num.attrs: | |
abstract_num_id = abstract_num['w:val'] | |
if abstract_num_id in abstract_nums: | |
numbering[num_id] = { | |
'abstract_num_id': abstract_num_id, | |
'levels': abstract_nums[abstract_num_id], | |
} | |
logger.debug(f"Extracted {len(numbering)} numbering definitions") | |
return numbering | |
except Exception as e: | |
logger.error(f"Error extracting numbering: {e}") | |
return {} | |
def _extract_metadata(self, temp_dir: str) -> dict[str, Any]: | |
""" | |
Извлекает метаданные из docProps/core.xml и docProps/app.xml. | |
Args: | |
temp_dir (str): Путь к временной директории с распакованным DOCX. | |
Returns: | |
dict[str, Any]: Словарь с метаданными. | |
""" | |
metadata = {} | |
# Извлекаем основные свойства | |
core_props_path = os.path.join(temp_dir, 'docProps', 'core.xml') | |
if os.path.exists(core_props_path): | |
try: | |
with open(core_props_path, 'rb') as f: | |
content = f.read() | |
soup = BeautifulSoup(content, 'xml') | |
# Автор | |
creator = soup.find('dc:creator') | |
if creator: | |
metadata['creator'] = creator.text | |
# Заголовок | |
title = soup.find('dc:title') | |
if title: | |
metadata['title'] = title.text | |
# Тема | |
subject = soup.find('dc:subject') | |
if subject: | |
metadata['subject'] = subject.text | |
# Описание | |
description = soup.find('dc:description') | |
if description: | |
metadata['description'] = description.text | |
# Ключевые слова | |
keywords = soup.find('cp:keywords') | |
if keywords: | |
metadata['keywords'] = keywords.text | |
# Даты создания и изменения | |
created = soup.find('dcterms:created') | |
if created: | |
metadata['created'] = created.text | |
modified = soup.find('dcterms:modified') | |
if modified: | |
metadata['modified'] = modified.text | |
except Exception as e: | |
logger.error(f"Error extracting core properties: {e}") | |
# Извлекаем свойства приложения | |
app_props_path = os.path.join(temp_dir, 'docProps', 'app.xml') | |
if os.path.exists(app_props_path): | |
try: | |
with open(app_props_path, 'rb') as f: | |
content = f.read() | |
soup = BeautifulSoup(content, 'xml') | |
# Статистика документа | |
pages = soup.find('Pages') | |
if pages: | |
metadata['pages'] = int(pages.text) | |
words = soup.find('Words') | |
if words: | |
metadata['words'] = int(words.text) | |
characters = soup.find('Characters') | |
if characters: | |
metadata['characters'] = int(characters.text) | |
# Информация о приложении | |
application = soup.find('Application') | |
if application: | |
metadata['application'] = application.text | |
app_version = soup.find('AppVersion') | |
if app_version: | |
metadata['app_version'] = app_version.text | |
# Информация о компании | |
company = soup.find('Company') | |
if company: | |
metadata['company'] = company.text | |
# Время редактирования | |
total_time = soup.find('TotalTime') | |
if total_time: | |
metadata['total_time'] = int(total_time.text) | |
except Exception as e: | |
logger.error(f"Error extracting app properties: {e}") | |
# Сохраняем метаданные как атрибут для доступа из других методов | |
self._metadata = metadata | |
return metadata | |
def _estimate_page_numbers(self, content: bytes) -> dict[str, int]: | |
""" | |
Оценивает номера страниц для элементов документа. | |
Args: | |
content (bytes): Содержимое документа. | |
Returns: | |
dict[str, int]: Словарь соответствий id элемента и номера страницы. | |
""" | |
logger.debug("Estimating page numbers for document elements") | |
# Создаем словарь для хранения номеров страниц | |
page_numbers = {} | |
try: | |
# Получаем метаданные, включая количество страниц из metadata | |
total_pages = self._metadata.get("pages", 0) if hasattr(self, "_metadata") else 0 | |
if total_pages <= 0: | |
total_pages = 1 # Минимум одна страница | |
# Парсим XML с помощью BeautifulSoup (это быстрая операция) | |
soup = BeautifulSoup(content, 'xml') | |
# Используем упрощенный метод расчета | |
paragraph_pages, table_pages = self.page_estimator.process_document( | |
soup, | |
metadata=self._metadata if hasattr(self, "_metadata") else None | |
) | |
# Сохраняем информацию в page_numbers | |
page_numbers['paragraphs'] = paragraph_pages | |
page_numbers['tables'] = table_pages | |
page_numbers['total_pages'] = total_pages | |
logger.debug(f"Estimated document has {total_pages} pages") | |
logger.debug(f"Assigned page numbers for {len(paragraph_pages)} paragraphs and {len(table_pages)} tables") | |
except Exception as e: | |
logger.error(f"Error estimating page numbers: {e}") | |
return page_numbers | |
def _apply_page_numbers(self, doc: ParsedDocument, page_numbers: dict[str, int]) -> None: | |
""" | |
Применяет оценки номеров страниц к элементам документа. | |
Args: | |
doc (ParsedDocument): Документ для обновления. | |
page_numbers (dict[str, int]): Словарь соответствий id элемента и номера страницы. | |
""" | |
logger.debug("Applying page numbers to document elements") | |
# Получаем информацию о страницах | |
paragraph_pages = page_numbers.get('paragraphs', {}) | |
table_pages = page_numbers.get('tables', {}) | |
total_pages = page_numbers.get('total_pages', 1) | |
logger.debug(f"Applying page numbers: document has {total_pages} pages") | |
# Устанавливаем индексы документа и номера страниц для параграфов | |
for i, paragraph in enumerate(doc.paragraphs): | |
# Индекс в документе (хотя это также делается в XMLParser._link_elements) | |
paragraph.index_in_document = i | |
# Номер страницы | |
page_num = paragraph_pages.get(i, 1) | |
paragraph.page_number = page_num | |
# Устанавливаем индексы и номера страниц для таблиц | |
for i, table in enumerate(doc.tables): | |
# Индекс в документе (хотя это также делается в XMLParser._link_elements) | |
table.index_in_document = i | |
# Номер страницы | |
page_num = table_pages.get(i, 1) | |
table.page_number = page_num | |
# Для изображений | |
for i, image in enumerate(doc.images): | |
# Индекс в документе (хотя это также делается в XMLParser._link_elements) | |
image.index_in_document = i | |
# Номер страницы (примерно) | |
image.page_number = min(total_pages, (i % total_pages) + 1) | |
# Для формул | |
for i, formula in enumerate(doc.formulas): | |
# Индекс в документе (хотя это также делается в XMLParser._link_elements) | |
formula.index_in_document = i | |
# Номер страницы (примерно) | |
formula.page_number = min(total_pages, (i % total_pages) + 1) | |