from datetime import datetime from cachetools import TTLCache from functools import reduce from google.oauth2.service_account import Credentials from googleapiclient.discovery import build, Resource from googleapiclient.http import MediaIoBaseDownload from googleapiclient.errors import HttpError from io import BytesIO from itertools import chain from loguru import logger from pydantic import ConfigDict, PrivateAttr from typing import Any, ClassVar, Collection, Optional, Self from ctp_slack_bot.core import ApplicationComponentBase, Settings from ctp_slack_bot.models import GoogleDriveMetadata class GoogleDriveService(ApplicationComponentBase): """Service for interacting with Google Drive.""" model_config = ConfigDict(frozen=True) _FOLDER_MIME_TYPE: ClassVar[str] = "application/vnd.google-apps.folder" _PATH_SEPARATOR: ClassVar[str] = "/" settings: Settings _google_drive_client: Resource _folder_cache: TTLCache = PrivateAttr(default_factory=lambda: TTLCache(maxsize=256, ttl=60)) def model_post_init(self: Self, context: Any, /) -> None: super().model_post_init(context) credentials = Credentials.from_service_account_info({ "type": "service_account", "project_id": self.settings.google_project_id, "private_key_id": self.settings.google_private_key_id.get_secret_value(), "private_key": self.settings.google_private_key.get_secret_value(), "client_email": self.settings.google_client_email, "client_id": self.settings.google_client_id, "token_uri": self.settings.google_token_uri, }, scopes=["https://www.googleapis.com/auth/drive"]) self._google_drive_client = build('drive', 'v3', credentials=credentials) def _resolve_folder_id(self: Self, folder_path: str) -> Optional[str]: """Resolve a folder path to a Google Drive ID.""" if not folder_path: return self.settings.google_drive_root_id if folder_path in self._folder_cache: return self._folder_cache[folder_path] current_id = self.settings.google_drive_root_id try: for part in folder_path.split(self._PATH_SEPARATOR): results = self._google_drive_client.files().list( q=f"name='{part.replace("\\", "\\\\").replace("'", "\\'")}' and mimeType='{self._FOLDER_MIME_TYPE}' and '{current_id}' in parents", fields="files(id,name)", supportsAllDrives=True, includeItemsFromAllDrives=True ).execute() match results: case {"files": [ {"id": id} ]}: current_id = id case _: logger.debug("Folder not found by path: {}", folder_path) return None except HttpError as e: logger.error("Error resolving folder path: {}", folder_path) return None self._folder_cache[folder_path] = current_id return current_id def _list_directory(self: Self, folder_path: str, folder_id: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]: try: results = tuple(GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result) for result in self._google_drive_client.files().list( q=f"'{folder_id}' in parents", fields="files(id,name,mimeType,modifiedTime)", supportsAllDrives=True, includeItemsFromAllDrives=True, pageSize=1000 ).execute().get('files', ())) if not recursive: return results return tuple(reduce(chain, (self._list_directory(f"{folder_path}{self._PATH_SEPARATOR}{result.name}", result.id, True) for result in results if result.mime_type == self._FOLDER_MIME_TYPE), results)) except HttpError as e: logger.error("Error listing folder by path, {}: {}", folder_path, e) return () def list_directory(self: Self, folder_path: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]: """List contents of a directory with basic metadata.""" folder_id = self._resolve_folder_id(folder_path) if not folder_id: logger.debug("Folder not found by path: {}", folder_path) return () return self._list_directory(folder_path, folder_id, recursive) def get_metadata(self: Self, item_path: str) -> Optional[GoogleDriveMetadata]: """Get metadata for a specific file/folder by path.""" match item_path.rsplit(self._PATH_SEPARATOR, 1): case [item_name]: folder_path = "" folder_id = self.settings.google_drive_root_id case [folder_path, item_name]: folder_id = self._resolve_folder_id(folder_path) if not folder_id: logger.debug("Folder not found by path: {}", folder_path) return None try: results = self._google_drive_client.files().list( q=f"name='{item_name}' and '{folder_id}' in parents", fields="files(id,name,mimeType,modifiedTime)", supportsAllDrives=True, includeItemsFromAllDrives=True, pageSize=1 ).execute() match results: case {"files": [result]}: return GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result) except HttpError as e: logger.error("Error getting metadata for item by path, {}: {}", item_path, e) logger.debug("Item not found by path: {}", item_path) return None def read_file_by_id(self: Self, file_id: str) -> Optional[bytes]: """Read contents of a file by its unique identifier.""" try: request = self._google_drive_client.files().get_media(fileId=file_id) buffer = BytesIO() downloader = MediaIoBaseDownload(buffer, request) done = False while not done: _, done = downloader.next_chunk() return buffer.getvalue() except HttpError as e: logger.error("Error reading file by ID, {}: {}", file_id, e) return None @property def name(self: Self) -> str: return "google_drive_service"