""" Модуль с универсальным парсером, объединяющим все специфичные парсеры. """ import logging import os from typing import BinaryIO from ..data_classes import ParsedDocument from .abstract_parser import AbstractParser from .file_types import FileType from .parser_factory import ParserFactory from .specific_parsers import ( DocParser, DocxParser, EmailParser, HTMLParser, MarkdownParser, PDFParser, XMLParser, ) logger = logging.getLogger(__name__) class UniversalParser: """ Универсальный парсер, объединяющий все специфичные парсеры. Использует фабрику парсеров для выбора подходящего парсера на основе типа файла. """ def __init__(self): """ Инициализирует универсальный парсер и регистрирует все доступные парсеры. """ self.factory = ParserFactory() # Регистрируем все доступные парсеры self.register_parsers( [ XMLParser(), # Реализованный парсер PDFParser(), # Нереализованный парсер DocParser(), # Нереализованный парсер DocxParser(), # Реализованный парсер EmailParser(), # Нереализованный парсер MarkdownParser(), # Нереализованный парсер HTMLParser(), # Нереализованный парсер ] ) def register_parser(self, parser: AbstractParser) -> None: """ Регистрирует парсер в фабрике. Args: parser (AbstractParser): Парсер для регистрации. """ self.factory.register_parser(parser) def register_parsers(self, parsers: list[AbstractParser]) -> None: """ Регистрирует несколько парсеров в фабрике. Args: parsers (list[AbstractParser]): Список парсеров для регистрации. """ for parser in parsers: self.register_parser(parser) def parse_by_path(self, file_path: str) -> ParsedDocument | None: """ Парсит документ по пути к файлу, используя подходящий парсер. Args: file_path (str): Путь к файлу для парсинга. Returns: ParsedDocument | None: Структурное представление документа или None, если подходящий парсер не найден. Raises: ValueError: Если файл не существует или не может быть прочитан. """ if not os.path.exists(file_path): raise ValueError(f"Файл не найден: {file_path}") # Находим подходящий парсер parser = self.factory.get_parser(file_path) if not parser: logger.warning(f"Не найден подходящий парсер для файла: {file_path}") return None # Парсим документ try: return parser.parse_by_path(file_path) except Exception as e: logger.error(f"Ошибка при парсинге файла {file_path}: {e}") raise def parse( self, file: BinaryIO, file_type: FileType | str | None = None ) -> ParsedDocument | None: """ Парсит документ из объекта файла, используя подходящий парсер. Args: file (BinaryIO): Объект файла для парсинга. file_type: Тип файла, может быть объектом FileType или строкой с расширением. Например: FileType.XML или ".xml" Returns: ParsedDocument | None: Структурное представление документа или None, если подходящий парсер не найден. Raises: ValueError: Если файл не может быть прочитан или распарсен. """ # Преобразуем строковое расширение в FileType, если нужно ft = None if isinstance(file_type, str): try: ft = FileType.from_extension(file_type) except ValueError: logger.warning(f"Неизвестное расширение файла: {file_type}") return None else: ft = file_type if ft is None: logger.warning("Тип файла не указан при парсинге из объекта файла") return None # Получаем парсер для указанного типа файла parsers = [p for p in self.factory.parsers if p.supports_file(ft)] if not parsers: logger.warning(f"Не найден подходящий парсер для типа файла: {ft}") return None # Используем первый подходящий парсер parser = parsers[0] # Парсим документ try: return parser.parse(file, ft) except Exception as e: logger.error(f"Ошибка при парсинге файла: {e}") raise