import base64 import json import re from collections.abc import Iterator from json.decoder import JSONDecodeError from typing import Any from google.auth.exceptions import RefreshError from google.oauth2.credentials import Credentials from googleapiclient.discovery import build from langchain_core.chat_sessions import ChatSession from langchain_core.messages import HumanMessage from langchain_google_community.gmail.loader import GMailLoader from loguru import logger from langflow.custom import Component from langflow.inputs import MessageTextInput from langflow.io import SecretStrInput from langflow.schema import Data from langflow.template import Output class GmailLoaderComponent(Component): display_name = "Gmail Loader" description = "Loads emails from Gmail using provided credentials." icon = "Google" inputs = [ SecretStrInput( name="json_string", display_name="JSON String of the Service Account Token", info="JSON string containing OAuth 2.0 access token information for service account access", required=True, value="""{ "account": "", "client_id": "", "client_secret": "", "expiry": "", "refresh_token": "", "scopes": [ "https://www.googleapis.com/auth/gmail.readonly", ], "token": "", "token_uri": "https://oauth2.googleapis.com/token", "universe_domain": "googleapis.com" }""", ), MessageTextInput( name="label_ids", display_name="Label IDs", info="Comma-separated list of label IDs to filter emails.", required=True, value="INBOX,SENT,UNREAD,IMPORTANT", ), MessageTextInput( name="max_results", display_name="Max Results", info="Maximum number of emails to load.", required=True, value="10", ), ] outputs = [ Output(display_name="Data", name="data", method="load_emails"), ] def load_emails(self) -> Data: class CustomGMailLoader(GMailLoader): def __init__( self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False ) -> None: super().__init__(creds, n, raise_error) self.label_ids = label_ids if label_ids is not None else ["SENT"] def clean_message_content(self, message): # Remove URLs message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE) # Remove email addresses message = re.sub(r"\S+@\S+", "", message) # Remove special characters and excessive whitespace message = re.sub(r"[^A-Za-z0-9\s]+", " ", message) message = re.sub(r"\s{2,}", " ", message) # Trim leading and trailing whitespace return message.strip() def _extract_email_content(self, msg: Any) -> HumanMessage: from_email = None for values in msg["payload"]["headers"]: name = values["name"] if name == "From": from_email = values["value"] if from_email is None: msg = "From email not found." raise ValueError(msg) parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]] for part in parts: if part["mimeType"] == "text/plain": data = part["body"]["data"] data = base64.urlsafe_b64decode(data).decode("utf-8") pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n") newest_response = re.split(pattern, data)[0] return HumanMessage( content=self.clean_message_content(newest_response), additional_kwargs={"sender": from_email}, ) msg = "No plain text part found in the email." raise ValueError(msg) def _get_message_data(self, service: Any, message: Any) -> ChatSession: msg = service.users().messages().get(userId="me", id=message["id"]).execute() message_content = self._extract_email_content(msg) in_reply_to = None email_data = msg["payload"]["headers"] for values in email_data: name = values["name"] if name == "In-Reply-To": in_reply_to = values["value"] thread_id = msg["threadId"] if in_reply_to: thread = service.users().threads().get(userId="me", id=thread_id).execute() messages = thread["messages"] response_email = None for _message in messages: email_data = _message["payload"]["headers"] for values in email_data: if values["name"] == "Message-ID": message_id = values["value"] if message_id == in_reply_to: response_email = _message if response_email is None: msg = "Response email not found in the thread." raise ValueError(msg) starter_content = self._extract_email_content(response_email) return ChatSession(messages=[starter_content, message_content]) return ChatSession(messages=[message_content]) def lazy_load(self) -> Iterator[ChatSession]: service = build("gmail", "v1", credentials=self.creds) results = ( service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute() ) messages = results.get("messages", []) if not messages: logger.warning("No messages found with the specified labels.") for message in messages: try: yield self._get_message_data(service, message) except Exception: if self.raise_error: raise else: logger.exception(f"Error processing message {message['id']}") json_string = self.json_string label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"] max_results = int(self.max_results) if self.max_results else 100 # Load the token information from the JSON string try: token_info = json.loads(json_string) except JSONDecodeError as e: msg = "Invalid JSON string" raise ValueError(msg) from e creds = Credentials.from_authorized_user_info(token_info) # Initialize the custom loader with the provided credentials loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids) try: docs = loader.load() except RefreshError as e: msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate." raise ValueError(msg) from e except Exception as e: msg = f"Error loading documents: {e}" raise ValueError(msg) from e # Return the loaded documents self.status = docs return Data(data={"text": docs})