ctp-slack-bot / src /ctp_slack_bot /services /google_drive_service.py
LiKenun's picture
Refactor #6
f0fe0fd
from datetime import datetime
from cachetools import TTLCache
from functools import reduce
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build, Resource
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.errors import HttpError
from io import BytesIO
from itertools import chain
from loguru import logger
from pydantic import ConfigDict, PrivateAttr
from typing import Any, ClassVar, Collection, Optional, Self
from ctp_slack_bot.core import ApplicationComponentBase, Settings
from ctp_slack_bot.models import GoogleDriveMetadata
class GoogleDriveService(ApplicationComponentBase):
"""Service for interacting with Google Drive."""
model_config = ConfigDict(frozen=True)
_FOLDER_MIME_TYPE: ClassVar[str] = "application/vnd.google-apps.folder"
_PATH_SEPARATOR: ClassVar[str] = "/"
settings: Settings
_google_drive_client: Resource
_folder_cache: TTLCache = PrivateAttr(default_factory=lambda: TTLCache(maxsize=256, ttl=60))
def model_post_init(self: Self, context: Any, /) -> None:
super().model_post_init(context)
credentials = Credentials.from_service_account_info({
"type": "service_account",
"project_id": self.settings.google_project_id,
"private_key_id": self.settings.google_private_key_id.get_secret_value(),
"private_key": self.settings.google_private_key.get_secret_value(),
"client_email": self.settings.google_client_email,
"client_id": self.settings.google_client_id,
"token_uri": self.settings.google_token_uri,
}, scopes=["https://www.googleapis.com/auth/drive"])
self._google_drive_client = build('drive', 'v3', credentials=credentials)
def _resolve_folder_id(self: Self, folder_path: str) -> Optional[str]:
"""Resolve a folder path to a Google Drive ID."""
if not folder_path:
return self.settings.google_drive_root_id
if folder_path in self._folder_cache:
return self._folder_cache[folder_path]
current_id = self.settings.google_drive_root_id
try:
for part in folder_path.split(self._PATH_SEPARATOR):
results = self._google_drive_client.files().list(
q=f"name='{part.replace("\\", "\\\\").replace("'", "\\'")}' and mimeType='{self._FOLDER_MIME_TYPE}' and '{current_id}' in parents",
fields="files(id,name)",
supportsAllDrives=True,
includeItemsFromAllDrives=True
).execute()
match results:
case {"files": [ {"id": id} ]}:
current_id = id
case _:
logger.debug("Folder not found by path: {}", folder_path)
return None
except HttpError as e:
logger.error("Error resolving folder path: {}", folder_path)
return None
self._folder_cache[folder_path] = current_id
return current_id
def _list_directory(self: Self, folder_path: str, folder_id: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]:
try:
results = tuple(GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result)
for result
in self._google_drive_client.files().list(
q=f"'{folder_id}' in parents",
fields="files(id,name,mimeType,modifiedTime)",
supportsAllDrives=True,
includeItemsFromAllDrives=True,
pageSize=1000
).execute().get('files', ()))
if not recursive:
return results
return tuple(reduce(chain,
(self._list_directory(f"{folder_path}{self._PATH_SEPARATOR}{result.name}", result.id, True)
for result
in results
if result.mime_type == self._FOLDER_MIME_TYPE),
results))
except HttpError as e:
logger.error("Error listing folder by path, {}: {}", folder_path, e)
return ()
def list_directory(self: Self, folder_path: str, recursive: bool = False) -> Collection[GoogleDriveMetadata]:
"""List contents of a directory with basic metadata."""
folder_id = self._resolve_folder_id(folder_path)
if not folder_id:
logger.debug("Folder not found by path: {}", folder_path)
return ()
return self._list_directory(folder_path, folder_id, recursive)
def get_metadata(self: Self, item_path: str) -> Optional[GoogleDriveMetadata]:
"""Get metadata for a specific file/folder by path."""
match item_path.rsplit(self._PATH_SEPARATOR, 1):
case [item_name]:
folder_path = ""
folder_id = self.settings.google_drive_root_id
case [folder_path, item_name]:
folder_id = self._resolve_folder_id(folder_path)
if not folder_id:
logger.debug("Folder not found by path: {}", folder_path)
return None
try:
results = self._google_drive_client.files().list(
q=f"name='{item_name}' and '{folder_id}' in parents",
fields="files(id,name,mimeType,modifiedTime)",
supportsAllDrives=True,
includeItemsFromAllDrives=True,
pageSize=1
).execute()
match results:
case {"files": [result]}:
return GoogleDriveMetadata.from_folder_path_and_dict(folder_path, result)
except HttpError as e:
logger.error("Error getting metadata for item by path, {}: {}", item_path, e)
logger.debug("Item not found by path: {}", item_path)
return None
def read_file_by_id(self: Self, file_id: str) -> Optional[bytes]:
"""Read contents of a file by its unique identifier."""
try:
request = self._google_drive_client.files().get_media(fileId=file_id)
buffer = BytesIO()
downloader = MediaIoBaseDownload(buffer, request)
done = False
while not done:
_, done = downloader.next_chunk()
return buffer.getvalue()
except HttpError as e:
logger.error("Error reading file by ID, {}: {}", file_id, e)
return None
@property
def name(self: Self) -> str:
return "google_drive_service"