from datetime import datetime import json import base64 from googleapiclient.discovery import build from utils.auth import authenticate_google_services class Gmail_Agent: def __init__(self, creds=None): if not creds: creds = authenticate_google_services() self.service = build('gmail', 'v1', credentials=creds) def read_gmail_messages(self, queries, max_results=5): query = " ".join(queries) results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute() messages = results.get('messages', []) return messages def format_messages(self, messages): formatted_content = "\n\n==========\n\n" for message in messages: msg = self.service.users().messages().get(userId='me', id=message['id']).execute() import json with open('results.json', 'w', encoding='utf-8') as f: json.dump(msg, f) internal_Date = int(msg['internalDate']) formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S') formatted_content += f"信件日期: {formatted_Date}\n" content = self._parse_gmail_response(msg['payload']['parts']) formatted_content+=f"信件內容: {content}\n\n==========\n\n" return formatted_content def send_email(self, to, subject, message_content): try: from email.message import EmailMessage message = EmailMessage() message.set_content(message_content) message["To"] = ", ".join(to) message["Subject"] = subject encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() create_message = {"raw": encoded_message} send_message = ( self.service.users() .messages() .send(userId="me", body=create_message) .execute() ) print(f'Message Id: {send_message["id"]}') formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!" except Exception as error: print(f"An error occurred: {error}") send_message = None formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}" return formatted_content def get_all_tools(self): LLM_tools = [] LLM_tools.append({ "name": "query_gmail_tool", "description": "query the gmails with specified queries", "parameters": { "type": "object", "properties": { "queries": { "type": "array", "items": {"type": "string"}, "description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:abd@gmail.com'])", }, }, "required": ["queries"] }, }) LLM_tools.append({ "name": "send_email_tool", "description": "Send an email to a specified recipient with a specified subject and message", "parameters": { "type": "object", "properties": { "to": { "type": "array", "items": {"type": "string"}, "description": "An array of email addresses of all the recipients" }, "subject": { "type": "string", "description": "The subject of the email" }, "message_content": { "type": "string", "description": "The content of the email" } }, "required": ["to", "subject", "message_content"] } }) return LLM_tools def _parse_gmail_response(self, parts): for part in parts: if "parts" in part: content = self._parse_gmail_response(part["parts"]) if content: return content if part['mimeType'] == 'text/plain' and part['body']['size'] != 0: decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') return decoded_data return None