Spaces:
Runtime error
Runtime error
from datetime import datetime | |
from cachetools import TTLCache | |
from functools import reduce | |
from google.oauth2.service_account import Credentials | |
from googleapiclient.discovery import build, Resource | |
from googleapiclient.http import MediaIoBaseDownload | |
from googleapiclient.errors import HttpError | |
from io import BytesIO | |
from itertools import chain | |
from loguru import logger | |
from pydantic import ConfigDict, PrivateAttr | |
from typing import Any, ClassVar, Collection, Optional, Self | |
from ctp_slack_bot.core import ApplicationComponentBase, Settings | |
from ctp_slack_bot.models import GoogleDriveMetadata | |
class GoogleDriveService(ApplicationComponentBase): | |
"""Service for interacting with Google Drive.""" | |
model_config = ConfigDict(frozen=True) | |
_FOLDER_MIME_TYPE: ClassVar[str] = "application/vnd.google-apps.folder" | |
_PATH_SEPARATOR: ClassVar[str] = "/" | |
settings: Settings | |
_google_drive_client: Resource | |
_folder_cache: TTLCache = PrivateAttr(default_factory=lambda: TTLCache(maxsize=256, ttl=60)) | |
def model_post_init(self: Self, context: Any, /) -> None: | |
super().model_post_init(context) | |
credentials = Credentials.from_service_account_info({ | |
"type": "service_account", | |
"project_id": self.settings.google_project_id, | |
"private_key_id": self.settings.google_private_key_id.get_secret_value(), | |
"private_key": self.settings.google_private_key.get_secret_value(), | |
"client_email": self.settings.google_client_email, | |
"client_id": self.settings.google_client_id, | |
"token_uri": self.settings.google_token_uri, | |
}, scopes=["https://www.googleapis.com/auth/drive"]) | |
self._google_drive_client = build('drive', 'v3', credentials=credentials) | |
def _resolve_folder_id(self: Self, folder_path: str) -> Optional[str]: | |
"""Resolve a folder path to a Google Drive ID.""" | |
if not folder_path: | |
return self.settings.google_drive_root_id | |
if folder_path in self._folder_cache: | |
return self._folder_cache[folder_path] | |
current_id = self.settings.google_drive_root_id | |
try: | |
for part in folder_path.split(self._PATH_SEPARATOR): | |
results = self._google_drive_client.files().list( | |
q=f"name='{part.replace("\\", "\\\\").replace("'", "\\'")}' and mimeType='{self._FOLDER_MIME_TYPE}' and '{current_id}' in parents", | |
fields="files(id,name)", | |
supportsAllDrives=True, | |
includeItemsFromAllDrives=True | |
).execute() | |
match results: | |
case {"files": [ {"id": id} ]}: | |
current_id = id | |
case _: | |
logger.debug("Folder not found by path: {}", folder_path) | |
return None | |
except HttpError as e: | |
logger.error("Error resolving folder path: {}", folder_path) | |
return None | |
self._folder_cache[folder_path] = current_id | |
return current_id | |
def _list_directory(self: Self, folder_path: str, folder_id: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]: | |
try: | |
results = tuple(GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result) | |
for result | |
in self._google_drive_client.files().list( | |
q=f"'{folder_id}' in parents", | |
fields="files(id,name,mimeType,modifiedTime)", | |
supportsAllDrives=True, | |
includeItemsFromAllDrives=True, | |
pageSize=1000 | |
).execute().get('files', ())) | |
if not recursive: | |
return results | |
return tuple(reduce(chain, | |
(self._list_directory(f"{folder_path}{self._PATH_SEPARATOR}{result.name}", result.id, True) | |
for result | |
in results | |
if result.mime_type == self._FOLDER_MIME_TYPE), | |
results)) | |
except HttpError as e: | |
logger.error("Error listing folder by path, {}: {}", folder_path, e) | |
return () | |
def list_directory(self: Self, folder_path: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]: | |
"""List contents of a directory with basic metadata.""" | |
folder_id = self._resolve_folder_id(folder_path) | |
if not folder_id: | |
logger.debug("Folder not found by path: {}", folder_path) | |
return () | |
return self._list_directory(folder_path, folder_id, recursive) | |
def get_metadata(self: Self, item_path: str) -> Optional[GoogleDriveMetadata]: | |
"""Get metadata for a specific file/folder by path.""" | |
match item_path.rsplit(self._PATH_SEPARATOR, 1): | |
case [item_name]: | |
folder_path = "" | |
folder_id = self.settings.google_drive_root_id | |
case [folder_path, item_name]: | |
folder_id = self._resolve_folder_id(folder_path) | |
if not folder_id: | |
logger.debug("Folder not found by path: {}", folder_path) | |
return None | |
try: | |
results = self._google_drive_client.files().list( | |
q=f"name='{item_name}' and '{folder_id}' in parents", | |
fields="files(id,name,mimeType,modifiedTime)", | |
supportsAllDrives=True, | |
includeItemsFromAllDrives=True, | |
pageSize=1 | |
).execute() | |
match results: | |
case {"files": [result]}: | |
return GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result) | |
except HttpError as e: | |
logger.error("Error getting metadata for item by path, {}: {}", item_path, e) | |
logger.debug("Item not found by path: {}", item_path) | |
return None | |
def read_file_by_id(self: Self, file_id: str) -> Optional[bytes]: | |
"""Read contents of a file by its unique identifier.""" | |
try: | |
request = self._google_drive_client.files().get_media(fileId=file_id) | |
buffer = BytesIO() | |
downloader = MediaIoBaseDownload(buffer, request) | |
done = False | |
while not done: | |
_, done = downloader.next_chunk() | |
return buffer.getvalue() | |
except HttpError as e: | |
logger.error("Error reading file by ID, {}: {}", file_id, e) | |
return None | |
def name(self: Self) -> str: | |
return "google_drive_service" | |