muryshev's picture
update
86c402d
raw
history blame
5.94 kB
"""
Модуль с универсальным парсером, объединяющим все специфичные парсеры.
"""
import logging
import os
from typing import BinaryIO
from ..data_classes import ParsedDocument
from .abstract_parser import AbstractParser
from .file_types import FileType
from .parser_factory import ParserFactory
from .specific_parsers import (
DocParser,
DocxParser,
EmailParser,
HTMLParser,
MarkdownParser,
PDFParser,
XMLParser,
)
logger = logging.getLogger(__name__)
class UniversalParser:
"""
Универсальный парсер, объединяющий все специфичные парсеры.
Использует фабрику парсеров для выбора подходящего парсера
на основе типа файла.
"""
def __init__(self):
"""
Инициализирует универсальный парсер и регистрирует все доступные парсеры.
"""
self.factory = ParserFactory()
# Регистрируем все доступные парсеры
self.register_parsers(
[
XMLParser(), # Реализованный парсер
PDFParser(), # Нереализованный парсер
DocParser(), # Нереализованный парсер
DocxParser(), # Реализованный парсер
EmailParser(), # Нереализованный парсер
MarkdownParser(), # Нереализованный парсер
HTMLParser(), # Нереализованный парсер
]
)
def register_parser(self, parser: AbstractParser) -> None:
"""
Регистрирует парсер в фабрике.
Args:
parser (AbstractParser): Парсер для регистрации.
"""
self.factory.register_parser(parser)
def register_parsers(self, parsers: list[AbstractParser]) -> None:
"""
Регистрирует несколько парсеров в фабрике.
Args:
parsers (list[AbstractParser]): Список парсеров для регистрации.
"""
for parser in parsers:
self.register_parser(parser)
def parse_by_path(self, file_path: str) -> ParsedDocument | None:
"""
Парсит документ по пути к файлу, используя подходящий парсер.
Args:
file_path (str): Путь к файлу для парсинга.
Returns:
ParsedDocument | None: Структурное представление документа или None,
если подходящий парсер не найден.
Raises:
ValueError: Если файл не существует или не может быть прочитан.
"""
if not os.path.exists(file_path):
raise ValueError(f"Файл не найден: {file_path}")
# Находим подходящий парсер
parser = self.factory.get_parser(file_path)
if not parser:
logger.warning(f"Не найден подходящий парсер для файла: {file_path}")
return None
# Парсим документ
try:
return parser.parse_by_path(file_path)
except Exception as e:
logger.error(f"Ошибка при парсинге файла {file_path}: {e}")
raise
def parse(
self, file: BinaryIO, file_type: FileType | str | None = None
) -> ParsedDocument | None:
"""
Парсит документ из объекта файла, используя подходящий парсер.
Args:
file (BinaryIO): Объект файла для парсинга.
file_type: Тип файла, может быть объектом FileType или строкой с расширением.
Например: FileType.XML или ".xml"
Returns:
ParsedDocument | None: Структурное представление документа или None,
если подходящий парсер не найден.
Raises:
ValueError: Если файл не может быть прочитан или распарсен.
"""
# Преобразуем строковое расширение в FileType, если нужно
ft = None
if isinstance(file_type, str):
try:
ft = FileType.from_extension(file_type)
except ValueError:
logger.warning(f"Неизвестное расширение файла: {file_type}")
return None
else:
ft = file_type
if ft is None:
logger.warning("Тип файла не указан при парсинге из объекта файла")
return None
# Получаем парсер для указанного типа файла
parsers = [p for p in self.factory.parsers if p.supports_file(ft)]
if not parsers:
logger.warning(f"Не найден подходящий парсер для типа файла: {ft}")
return None
# Используем первый подходящий парсер
parser = parsers[0]
# Парсим документ
try:
return parser.parse(file, ft)
except Exception as e:
logger.error(f"Ошибка при парсинге файла: {e}")
raise