File size: 4,540 Bytes
c74033b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
from datetime import datetime
import json
import base64
from googleapiclient.discovery import build
from utils.auth import authenticate_google_services
class Gmail_Agent:
def __init__(self, creds=None):
if not creds:
creds = authenticate_google_services()
self.service = build('gmail', 'v1', credentials=creds)
def read_gmail_messages(self, queries, max_results=5):
query = " ".join(queries)
results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute()
messages = results.get('messages', [])
return messages
def format_messages(self, messages):
formatted_content = "\n\n==========\n\n"
for message in messages:
msg = self.service.users().messages().get(userId='me', id=message['id']).execute()
import json
with open('results.json', 'w', encoding='utf-8') as f:
json.dump(msg, f)
internal_Date = int(msg['internalDate'])
formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S')
formatted_content += f"信件日期: {formatted_Date}\n"
content = self._parse_gmail_response(msg['payload']['parts'])
formatted_content+=f"信件內容: {content}\n\n==========\n\n"
return formatted_content
def send_email(self, to, subject, message_content):
try:
from email.message import EmailMessage
message = EmailMessage()
message.set_content(message_content)
message["To"] = ", ".join(to)
message["Subject"] = subject
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode()
create_message = {"raw": encoded_message}
send_message = (
self.service.users()
.messages()
.send(userId="me", body=create_message)
.execute()
)
print(f'Message Id: {send_message["id"]}')
formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!"
except Exception as error:
print(f"An error occurred: {error}")
send_message = None
formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}"
return formatted_content
def get_all_tools(self):
LLM_tools = []
LLM_tools.append({
"name": "query_gmail_tool",
"description": "query the gmails with specified queries",
"parameters": {
"type": "object",
"properties": {
"queries": {
"type": "array",
"items": {"type": "string"},
"description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:[email protected]'])",
},
},
"required": ["queries"]
},
})
LLM_tools.append({
"name": "send_email_tool",
"description": "Send an email to a specified recipient with a specified subject and message",
"parameters": {
"type": "object",
"properties": {
"to": {
"type": "array",
"items": {"type": "string"},
"description": "An array of email addresses of all the recipients"
},
"subject": {
"type": "string",
"description": "The subject of the email"
},
"message_content": {
"type": "string",
"description": "The content of the email"
}
},
"required": ["to", "subject", "message_content"]
}
})
return LLM_tools
def _parse_gmail_response(self, parts):
for part in parts:
if "parts" in part:
content = self._parse_gmail_response(part["parts"])
if content:
return content
if part['mimeType'] == 'text/plain' and part['body']['size'] != 0:
decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8')
return decoded_data
return None
|