Spaces:
Running
Running
from typing import Union | |
import requests | |
from core.app.app_config.entities import FileExtraConfig | |
from core.file.file_obj import FileBelongsTo, FileTransferMethod, FileType, FileVar | |
from extensions.ext_database import db | |
from models.account import Account | |
from models.model import EndUser, MessageFile, UploadFile | |
from services.file_service import IMAGE_EXTENSIONS | |
class MessageFileParser: | |
def __init__(self, tenant_id: str, app_id: str) -> None: | |
self.tenant_id = tenant_id | |
self.app_id = app_id | |
def validate_and_transform_files_arg(self, files: list[dict], file_extra_config: FileExtraConfig, | |
user: Union[Account, EndUser]) -> list[FileVar]: | |
""" | |
validate and transform files arg | |
:param files: | |
:param file_extra_config: | |
:param user: | |
:return: | |
""" | |
for file in files: | |
if not isinstance(file, dict): | |
raise ValueError('Invalid file format, must be dict') | |
if not file.get('type'): | |
raise ValueError('Missing file type') | |
FileType.value_of(file.get('type')) | |
if not file.get('transfer_method'): | |
raise ValueError('Missing file transfer method') | |
FileTransferMethod.value_of(file.get('transfer_method')) | |
if file.get('transfer_method') == FileTransferMethod.REMOTE_URL.value: | |
if not file.get('url'): | |
raise ValueError('Missing file url') | |
if not file.get('url').startswith('http'): | |
raise ValueError('Invalid file url') | |
if file.get('transfer_method') == FileTransferMethod.LOCAL_FILE.value and not file.get('upload_file_id'): | |
raise ValueError('Missing file upload_file_id') | |
# transform files to file objs | |
type_file_objs = self._to_file_objs(files, file_extra_config) | |
# validate files | |
new_files = [] | |
for file_type, file_objs in type_file_objs.items(): | |
if file_type == FileType.IMAGE: | |
# parse and validate files | |
image_config = file_extra_config.image_config | |
# check if image file feature is enabled | |
if not image_config: | |
continue | |
# Validate number of files | |
if len(files) > image_config['number_limits']: | |
raise ValueError(f"Number of image files exceeds the maximum limit {image_config['number_limits']}") | |
for file_obj in file_objs: | |
# Validate transfer method | |
if file_obj.transfer_method.value not in image_config['transfer_methods']: | |
raise ValueError(f'Invalid transfer method: {file_obj.transfer_method.value}') | |
# Validate file type | |
if file_obj.type != FileType.IMAGE: | |
raise ValueError(f'Invalid file type: {file_obj.type}') | |
if file_obj.transfer_method == FileTransferMethod.REMOTE_URL: | |
# check remote url valid and is image | |
result, error = self._check_image_remote_url(file_obj.url) | |
if result is False: | |
raise ValueError(error) | |
elif file_obj.transfer_method == FileTransferMethod.LOCAL_FILE: | |
# get upload file from upload_file_id | |
upload_file = (db.session.query(UploadFile) | |
.filter( | |
UploadFile.id == file_obj.related_id, | |
UploadFile.tenant_id == self.tenant_id, | |
UploadFile.created_by == user.id, | |
UploadFile.created_by_role == ('account' if isinstance(user, Account) else 'end_user'), | |
UploadFile.extension.in_(IMAGE_EXTENSIONS) | |
).first()) | |
# check upload file is belong to tenant and user | |
if not upload_file: | |
raise ValueError('Invalid upload file') | |
new_files.append(file_obj) | |
# return all file objs | |
return new_files | |
def transform_message_files(self, files: list[MessageFile], file_extra_config: FileExtraConfig) -> list[FileVar]: | |
""" | |
transform message files | |
:param files: | |
:param file_extra_config: | |
:return: | |
""" | |
# transform files to file objs | |
type_file_objs = self._to_file_objs(files, file_extra_config) | |
# return all file objs | |
return [file_obj for file_objs in type_file_objs.values() for file_obj in file_objs] | |
def _to_file_objs(self, files: list[Union[dict, MessageFile]], | |
file_extra_config: FileExtraConfig) -> dict[FileType, list[FileVar]]: | |
""" | |
transform files to file objs | |
:param files: | |
:param file_extra_config: | |
:return: | |
""" | |
type_file_objs: dict[FileType, list[FileVar]] = { | |
# Currently only support image | |
FileType.IMAGE: [] | |
} | |
if not files: | |
return type_file_objs | |
# group by file type and convert file args or message files to FileObj | |
for file in files: | |
if isinstance(file, MessageFile): | |
if file.belongs_to == FileBelongsTo.ASSISTANT.value: | |
continue | |
file_obj = self._to_file_obj(file, file_extra_config) | |
if file_obj.type not in type_file_objs: | |
continue | |
type_file_objs[file_obj.type].append(file_obj) | |
return type_file_objs | |
def _to_file_obj(self, file: Union[dict, MessageFile], file_extra_config: FileExtraConfig) -> FileVar: | |
""" | |
transform file to file obj | |
:param file: | |
:return: | |
""" | |
if isinstance(file, dict): | |
transfer_method = FileTransferMethod.value_of(file.get('transfer_method')) | |
return FileVar( | |
tenant_id=self.tenant_id, | |
type=FileType.value_of(file.get('type')), | |
transfer_method=transfer_method, | |
url=file.get('url') if transfer_method == FileTransferMethod.REMOTE_URL else None, | |
related_id=file.get('upload_file_id') if transfer_method == FileTransferMethod.LOCAL_FILE else None, | |
extra_config=file_extra_config | |
) | |
else: | |
return FileVar( | |
id=file.id, | |
tenant_id=self.tenant_id, | |
type=FileType.value_of(file.type), | |
transfer_method=FileTransferMethod.value_of(file.transfer_method), | |
url=file.url, | |
related_id=file.upload_file_id or None, | |
extra_config=file_extra_config | |
) | |
def _check_image_remote_url(self, url): | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
} | |
response = requests.head(url, headers=headers, allow_redirects=True) | |
if response.status_code == 200: | |
return True, "" | |
else: | |
return False, "URL does not exist." | |
except requests.RequestException as e: | |
return False, f"Error checking URL: {e}" | |