Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import shutil | |
from pathlib import Path | |
from fastapi import HTTPException, UploadFile | |
from ntr_fileparser import UniversalParser | |
from sqlalchemy.orm import Session | |
from common.common import get_source_format | |
from common.configuration import Configuration | |
from common.constants import PROCESSING_FORMATS | |
from components.dbo.models.dataset import Dataset | |
from components.dbo.models.dataset_document import DatasetDocument | |
from components.dbo.models.document import Document | |
from schemas.document import Document as DocumentSchema | |
from schemas.document import DocumentDownload | |
from components.services.dataset import DatasetService | |
logger = logging.getLogger(__name__) | |
class DocumentService: | |
""" | |
Сервис для работы с документами. | |
""" | |
def __init__( | |
self, | |
dataset_service: DatasetService, | |
config: Configuration, | |
db: Session | |
): | |
logger.info("Initializing DocumentService") | |
self.db = db | |
self.dataset_service = dataset_service | |
self.parser = UniversalParser() | |
self.documents_path = Path(config.db_config.files.documents_path) | |
def get_document( | |
self, | |
document_id: int, | |
dataset_id: int | None = None, | |
) -> DocumentDownload: | |
""" | |
Скачать документ по его идентификатору. | |
""" | |
logger.info(f"Getting document info for ID: {document_id}") | |
if dataset_id is None: | |
dataset_id = self.dataset_service.get_current_dataset().dataset_id | |
self.dataset_service.raise_if_processing() | |
with self.db() as session: | |
document_in_dataset = ( | |
session.query(DatasetDocument) | |
.filter( | |
DatasetDocument.dataset_id == dataset_id, | |
DatasetDocument.document_id == document_id, | |
) | |
.first() | |
) | |
if not document_in_dataset: | |
logger.warning(f"Document not found: {document_id}") | |
raise HTTPException(status_code=404, detail="Document not found") | |
document = ( | |
session.query(Document) | |
.filter( | |
Document.id == document_id, | |
) | |
.first() | |
) | |
result = DocumentDownload( | |
filename=f'{document.title[:40]}.{document.source_format}', | |
filepath=self.documents_path | |
/ f'{document.document_id}.{document.source_format}', | |
) | |
logger.debug(f"Retrieved document: {result.filename}") | |
return result | |
def add_document(self, dataset_id: int, file: UploadFile) -> DocumentSchema: | |
""" | |
Добавить документ в датасет. | |
""" | |
self.dataset_service.raise_if_processing() | |
file_location = Path(os.environ.get("APP_TMP_PATH", '.')) / 'tmp' / file.filename | |
file_location.parent.mkdir(parents=True, exist_ok=True) | |
with open(file_location, 'wb') as buffer: | |
buffer.write(file.file.read()) | |
source_format = get_source_format(file.filename) | |
logger.info(f"Parsing file: {file_location}") | |
logger.info(f"Source format: {source_format}") | |
try: | |
parsed = self.parser.parse_by_path(str(file_location)) | |
except Exception: | |
raise HTTPException( | |
status_code=400, detail="Invalid file, service can't parse it" | |
) | |
with self.db() as session: | |
dataset = ( | |
session.query(Dataset).filter(Dataset.id == dataset_id).first() | |
) | |
if not dataset: | |
raise HTTPException(status_code=404, detail='Dataset not found') | |
if not dataset.is_draft: | |
raise HTTPException(status_code=403, detail='Dataset is not draft') | |
document = Document( | |
filename=file.filename, | |
title=parsed.name, | |
owner=parsed.meta.owner, | |
status=parsed.meta.status, | |
source_format=source_format, | |
) | |
logger.info(f"Document: {document}") | |
session.add(document) | |
session.flush() | |
logger.info(f"Document ID: {document.id}") | |
link = DatasetDocument( | |
dataset_id=dataset_id, | |
document_id=document.id, | |
) | |
session.add(link) | |
if source_format in PROCESSING_FORMATS: | |
logger.info( | |
f"Moving file to: {self.documents_path / f'{document.id}.{source_format}'}" | |
) | |
shutil.move( | |
file_location, | |
self.documents_path / f'{document.id}.{source_format}', | |
) | |
else: | |
logger.error(f"Unknown source format: {source_format}") | |
raise HTTPException(status_code=400, detail='Unknown document format') | |
if len(os.listdir(file_location.parent)) == 0: | |
file_location.parent.rmdir() | |
session.commit() | |
session.refresh(document) | |
result = DocumentSchema( | |
id=document.id, | |
name=document.title, | |
owner=document.owner, | |
status=document.status, | |
) | |
logger.debug(f"Retrieved document: {result.name}") | |
return result | |
def delete_document(self, dataset_id: int, document_id: int) -> None: | |
""" | |
Удалить документ из датасета. | |
""" | |
self.dataset_service.raise_if_processing() | |
with self.db() as session: | |
dataset_document = ( | |
session.query(DatasetDocument) | |
.filter( | |
DatasetDocument.dataset_id == dataset_id, | |
DatasetDocument.document_id == document_id, | |
) | |
.first() | |
) | |
if not dataset_document: | |
raise HTTPException(status_code=404, detail='Document not found') | |
dataset = ( | |
session.query(Dataset).filter(Dataset.id == dataset_id).first() | |
) | |
if not dataset.is_draft: | |
raise HTTPException(status_code=403, detail='Dataset is not draft') | |
document = ( | |
session.query(Document).filter(Document.id == document_id).first() | |
) | |
is_used = ( | |
session.query(DatasetDocument) | |
.filter(DatasetDocument.document_id == document_id) | |
.count() | |
) | |
if is_used == 0: | |
os.remove(self.documents_path / f'{document_id}.{document.source_format}') | |
session.delete(document) | |
session.delete(dataset_document) | |
session.commit() | |