|
from datetime import datetime |
|
import json |
|
import base64 |
|
from googleapiclient.discovery import build |
|
from utils.auth import authenticate_google_services |
|
|
|
class Gmail_Agent: |
|
def __init__(self, creds=None): |
|
if not creds: |
|
creds = authenticate_google_services() |
|
self.service = build('gmail', 'v1', credentials=creds) |
|
|
|
def read_gmail_messages(self, queries, max_results=5): |
|
query = " ".join(queries) |
|
results = self.service.users().messages().list(userId='me', maxResults=max_results, q=query).execute() |
|
messages = results.get('messages', []) |
|
return messages |
|
|
|
def format_messages(self, messages): |
|
formatted_content = "\n\n==========\n\n" |
|
for message in messages: |
|
msg = self.service.users().messages().get(userId='me', id=message['id']).execute() |
|
import json |
|
with open('results.json', 'w', encoding='utf-8') as f: |
|
json.dump(msg, f) |
|
internal_Date = int(msg['internalDate']) |
|
formatted_Date = datetime.fromtimestamp(internal_Date / 1000).strftime('%Y-%m-%d %H:%M:%S') |
|
formatted_content += f"信件日期: {formatted_Date}\n" |
|
content = self._parse_gmail_response(msg['payload']['parts']) |
|
formatted_content+=f"信件內容: {content}\n\n==========\n\n" |
|
return formatted_content |
|
|
|
def send_email(self, to, subject, message_content): |
|
try: |
|
from email.message import EmailMessage |
|
message = EmailMessage() |
|
message.set_content(message_content) |
|
message["To"] = ", ".join(to) |
|
message["Subject"] = subject |
|
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() |
|
create_message = {"raw": encoded_message} |
|
send_message = ( |
|
self.service.users() |
|
.messages() |
|
.send(userId="me", body=create_message) |
|
.execute() |
|
) |
|
print(f'Message Id: {send_message["id"]}') |
|
formatted_content = "✅ Gmail 已傳送給所有使用者,請至您的 Gmail 寄件備份確認!" |
|
except Exception as error: |
|
print(f"An error occurred: {error}") |
|
send_message = None |
|
formatted_content = f"❌ 信件未能成功寄出,以下是錯誤訊息(英文):{error}" |
|
return formatted_content |
|
|
|
def get_all_tools(self): |
|
LLM_tools = [] |
|
LLM_tools.append({ |
|
"name": "query_gmail_tool", |
|
"description": "query the gmails with specified queries", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"queries": { |
|
"type": "array", |
|
"items": {"type": "string"}, |
|
"description": "Queries that can be used to filter the gmails. (e.g., ['before:2025/06/01', 'after:2025/05/01', 'from:[email protected]'])", |
|
}, |
|
}, |
|
"required": ["queries"] |
|
}, |
|
}) |
|
LLM_tools.append({ |
|
"name": "send_email_tool", |
|
"description": "Send an email to a specified recipient with a specified subject and message", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"to": { |
|
"type": "array", |
|
"items": {"type": "string"}, |
|
"description": "An array of email addresses of all the recipients" |
|
}, |
|
"subject": { |
|
"type": "string", |
|
"description": "The subject of the email" |
|
}, |
|
"message_content": { |
|
"type": "string", |
|
"description": "The content of the email" |
|
} |
|
}, |
|
"required": ["to", "subject", "message_content"] |
|
} |
|
}) |
|
return LLM_tools |
|
|
|
def _parse_gmail_response(self, parts): |
|
for part in parts: |
|
if "parts" in part: |
|
content = self._parse_gmail_response(part["parts"]) |
|
if content: |
|
return content |
|
if part['mimeType'] == 'text/plain' and part['body']['size'] != 0: |
|
decoded_data = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') |
|
return decoded_data |
|
return None |
|
|