File size: 4,540 Bytes
c74033b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from datetime import datetime
import json
import base64
from googleapiclient.discovery import build
from utils.auth import authenticate_google_services

class Gmail_Agent:
    def __init__(self, creds=None):
        if not creds:
            creds = authenticate_google_services()
        self.service = build('gmail', 'v1', credentials=creds)

    def read_gmail_messages(self, queries, max_results=5):
        query = " ".join(queries)
        results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute()
        messages = results.get('messages', [])
        return messages

    def format_messages(self, messages):
        formatted_content = "\n\n==========\n\n"
        for message in messages:
            msg = self.service.users().messages().get(userId='me', id=message['id']).execute()
            import json
            with open('results.json', 'w', encoding='utf-8') as f:
                json.dump(msg, f)
            internal_Date = int(msg['internalDate'])
            formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S')
            formatted_content += f"信件日期: {formatted_Date}\n"
            content = self._parse_gmail_response(msg['payload']['parts'])
            formatted_content+=f"信件內容: {content}\n\n==========\n\n"
        return formatted_content

    def send_email(self, to, subject, message_content):
        try:
            from email.message import EmailMessage
            message = EmailMessage()
            message.set_content(message_content)
            message["To"] = ", ".join(to)
            message["Subject"] = subject
            encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
            create_message = {"raw": encoded_message}
            send_message = (
                self.service.users()
                .messages()
                .send(userId="me", body=create_message)
                .execute()
            )
            print(f'Message Id: {send_message["id"]}')
            formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!"
        except Exception as error:
            print(f"An error occurred: {error}")
            send_message = None
            formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}"
        return formatted_content

    def get_all_tools(self):
        LLM_tools = []
        LLM_tools.append({
            "name": "query_gmail_tool",
            "description": "query the gmails with specified queries",
            "parameters": {
                "type": "object",
                "properties": {
                    "queries": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:[email protected]'])",
                    },
                },
                "required": ["queries"]
            },
        })
        LLM_tools.append({
            "name": "send_email_tool",
            "description": "Send an email to a specified recipient with a specified subject and message",
            "parameters": {
                "type": "object",
                "properties": {
                    "to": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "An array of email addresses of all the recipients"
                        },
                    "subject": {
                        "type": "string",
                        "description": "The subject of the email"
                    },
                    "message_content": {
                        "type": "string",
                        "description": "The content of the email"
                    }
                },
                "required": ["to", "subject", "message_content"]
            }
        })
        return LLM_tools

    def _parse_gmail_response(self, parts):
        for part in parts:
            if "parts" in part:
                content = self._parse_gmail_response(part["parts"])
                if content:
                    return content
            if part['mimeType'] == 'text/plain' and part['body']['size'] != 0:
                decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
                return decoded_data
        return None