Spaces:
Running
Running
import base64 | |
import json | |
import re | |
from collections.abc import Iterator | |
from json.decoder import JSONDecodeError | |
from typing import Any | |
from google.auth.exceptions import RefreshError | |
from google.oauth2.credentials import Credentials | |
from googleapiclient.discovery import build | |
from langchain_core.chat_sessions import ChatSession | |
from langchain_core.messages import HumanMessage | |
from langchain_google_community.gmail.loader import GMailLoader | |
from loguru import logger | |
from langflow.custom import Component | |
from langflow.inputs import MessageTextInput | |
from langflow.io import SecretStrInput | |
from langflow.schema import Data | |
from langflow.template import Output | |
class GmailLoaderComponent(Component): | |
display_name = "Gmail Loader" | |
description = "Loads emails from Gmail using provided credentials." | |
icon = "Google" | |
inputs = [ | |
SecretStrInput( | |
name="json_string", | |
display_name="JSON String of the Service Account Token", | |
info="JSON string containing OAuth 2.0 access token information for service account access", | |
required=True, | |
value="""{ | |
"account": "", | |
"client_id": "", | |
"client_secret": "", | |
"expiry": "", | |
"refresh_token": "", | |
"scopes": [ | |
"https://www.googleapis.com/auth/gmail.readonly", | |
], | |
"token": "", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"universe_domain": "googleapis.com" | |
}""", | |
), | |
MessageTextInput( | |
name="label_ids", | |
display_name="Label IDs", | |
info="Comma-separated list of label IDs to filter emails.", | |
required=True, | |
value="INBOX,SENT,UNREAD,IMPORTANT", | |
), | |
MessageTextInput( | |
name="max_results", | |
display_name="Max Results", | |
info="Maximum number of emails to load.", | |
required=True, | |
value="10", | |
), | |
] | |
outputs = [ | |
Output(display_name="Data", name="data", method="load_emails"), | |
] | |
def load_emails(self) -> Data: | |
class CustomGMailLoader(GMailLoader): | |
def __init__( | |
self, creds: Any, *, n: int = 100, label_ids: list[str] | None = None, raise_error: bool = False | |
) -> None: | |
super().__init__(creds, n, raise_error) | |
self.label_ids = label_ids if label_ids is not None else ["SENT"] | |
def clean_message_content(self, message): | |
# Remove URLs | |
message = re.sub(r"http\S+|www\S+|https\S+", "", message, flags=re.MULTILINE) | |
# Remove email addresses | |
message = re.sub(r"\S+@\S+", "", message) | |
# Remove special characters and excessive whitespace | |
message = re.sub(r"[^A-Za-z0-9\s]+", " ", message) | |
message = re.sub(r"\s{2,}", " ", message) | |
# Trim leading and trailing whitespace | |
return message.strip() | |
def _extract_email_content(self, msg: Any) -> HumanMessage: | |
from_email = None | |
for values in msg["payload"]["headers"]: | |
name = values["name"] | |
if name == "From": | |
from_email = values["value"] | |
if from_email is None: | |
msg = "From email not found." | |
raise ValueError(msg) | |
parts = msg["payload"]["parts"] if "parts" in msg["payload"] else [msg["payload"]] | |
for part in parts: | |
if part["mimeType"] == "text/plain": | |
data = part["body"]["data"] | |
data = base64.urlsafe_b64decode(data).decode("utf-8") | |
pattern = re.compile(r"\r\nOn .+(\r\n)*wrote:\r\n") | |
newest_response = re.split(pattern, data)[0] | |
return HumanMessage( | |
content=self.clean_message_content(newest_response), | |
additional_kwargs={"sender": from_email}, | |
) | |
msg = "No plain text part found in the email." | |
raise ValueError(msg) | |
def _get_message_data(self, service: Any, message: Any) -> ChatSession: | |
msg = service.users().messages().get(userId="me", id=message["id"]).execute() | |
message_content = self._extract_email_content(msg) | |
in_reply_to = None | |
email_data = msg["payload"]["headers"] | |
for values in email_data: | |
name = values["name"] | |
if name == "In-Reply-To": | |
in_reply_to = values["value"] | |
thread_id = msg["threadId"] | |
if in_reply_to: | |
thread = service.users().threads().get(userId="me", id=thread_id).execute() | |
messages = thread["messages"] | |
response_email = None | |
for _message in messages: | |
email_data = _message["payload"]["headers"] | |
for values in email_data: | |
if values["name"] == "Message-ID": | |
message_id = values["value"] | |
if message_id == in_reply_to: | |
response_email = _message | |
if response_email is None: | |
msg = "Response email not found in the thread." | |
raise ValueError(msg) | |
starter_content = self._extract_email_content(response_email) | |
return ChatSession(messages=[starter_content, message_content]) | |
return ChatSession(messages=[message_content]) | |
def lazy_load(self) -> Iterator[ChatSession]: | |
service = build("gmail", "v1", credentials=self.creds) | |
results = ( | |
service.users().messages().list(userId="me", labelIds=self.label_ids, maxResults=self.n).execute() | |
) | |
messages = results.get("messages", []) | |
if not messages: | |
logger.warning("No messages found with the specified labels.") | |
for message in messages: | |
try: | |
yield self._get_message_data(service, message) | |
except Exception: | |
if self.raise_error: | |
raise | |
else: | |
logger.exception(f"Error processing message {message['id']}") | |
json_string = self.json_string | |
label_ids = self.label_ids.split(",") if self.label_ids else ["INBOX"] | |
max_results = int(self.max_results) if self.max_results else 100 | |
# Load the token information from the JSON string | |
try: | |
token_info = json.loads(json_string) | |
except JSONDecodeError as e: | |
msg = "Invalid JSON string" | |
raise ValueError(msg) from e | |
creds = Credentials.from_authorized_user_info(token_info) | |
# Initialize the custom loader with the provided credentials | |
loader = CustomGMailLoader(creds=creds, n=max_results, label_ids=label_ids) | |
try: | |
docs = loader.load() | |
except RefreshError as e: | |
msg = "Authentication error: Unable to refresh authentication token. Please try to reauthenticate." | |
raise ValueError(msg) from e | |
except Exception as e: | |
msg = f"Error loading documents: {e}" | |
raise ValueError(msg) from e | |
# Return the loaded documents | |
self.status = docs | |
return Data(data={"text": docs}) | |