Spaces:
BG5
/
Running

BG5 commited on
Commit
a9a7f50
·
verified ·
1 Parent(s): afac4df

Delete 收发邮件.py

Browse files
Files changed (1) hide show
  1. 收发邮件.py +0 -316
收发邮件.py DELETED
@@ -1,316 +0,0 @@
1
- import base64
2
- import email
3
- import imaplib
4
- import smtplib
5
- import requests
6
- from email.header import decode_header, Header
7
- from email.mime.text import MIMEText
8
- from email.mime.multipart import MIMEMultipart
9
- from typing import List, Dict
10
- from unibo_auth2_get_AuthCode_RefreshToken_sucsses import OAuth2Authenticator
11
-
12
-
13
- class EmailClient:
14
- def __init__(self, username: str, password: str, email_address: str):
15
- """
16
- Initialize the EmailClient with user credentials.
17
-
18
- Args:
19
- username: The username for authentication
20
- password: The password for authentication
21
- email_address: The full email address
22
- """
23
- self.username = username
24
- self.password = password
25
- self.email_address = email_address
26
- self.client_id = "9e5f94bc-e8a4-4e73-b8be-63364c29d753"
27
- self.imap_server = "outlook.office365.com"
28
- self.imap_port = 993
29
- # self.smtp_server = "submission.unipi.it" # 使用密码登录
30
- self.smtp_server_XOAUTH2 = "smtp.office365.com" # 使用OAuth2登录
31
- self.smtp_port = 587
32
- self.refresh_token = None
33
- self.access_token = None
34
-
35
- def authenticate_oauth(self) -> None:
36
- """Authenticate using OAuth2 and get refresh/access tokens."""
37
- authenticator = OAuth2Authenticator(
38
- username=self.username,
39
- password=self.password
40
- )
41
-
42
- try:
43
- auth_code = authenticator.execute_flow()
44
- print(f"成功获取授权码: {auth_code}")
45
- self.refresh_token, self.access_token = authenticator.get_refresh_token(auth_code)
46
- except Exception as e:
47
- raise Exception(f"认证失败: {e}")
48
-
49
- def _generate_auth_string(self) -> bytes:
50
- """Generate XOAUTH2 authentication string."""
51
- auth_string = f"user={self.email_address}\x01auth=Bearer {self.access_token}\x01\x01"
52
- return auth_string.encode('utf-8')
53
-
54
- def connect_imap(self) -> imaplib.IMAP4:
55
- """
56
- Connect to IMAP server using XOAUTH2 authentication.
57
-
58
- Returns:
59
- IMAP4 connection object
60
-
61
- Raises:
62
- Exception: If authentication fails
63
- """
64
- try:
65
- imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
66
-
67
- if 'AUTH=XOAUTH2' not in imap.capabilities:
68
- raise Exception("Server does not support XOAUTH2 authentication")
69
-
70
- auth_bytes = self._generate_auth_string()
71
-
72
- def oauth2_callback(response):
73
- return auth_bytes
74
-
75
- print(f"Attempting XOAUTH2 authentication as {self.email_address}...")
76
- typ, data = imap.authenticate('XOAUTH2', oauth2_callback)
77
- if typ != 'OK':
78
- raise Exception(f"Authentication failed: {data[0].decode('utf-8', errors='replace')}")
79
-
80
- print("XOAUTH2 authentication successful")
81
- return imap
82
- except Exception as e:
83
- raise Exception(f"IMAP XOAUTH2 authentication failed: {str(e)}")
84
-
85
- def _get_decoded_header(self, header: str) -> str:
86
- """解码邮件头信息
87
-
88
- Args:
89
- header: 需要解码的邮件头字符串
90
-
91
- Returns:
92
- 解码后的字符串
93
- """
94
- try:
95
- decoded = decode_header(header)
96
-
97
- # 修复括号匹配问题
98
- return "".join(
99
- str(
100
- t[0].decode(t[1] or "utf-8", errors="replace")
101
- if isinstance(t[0], bytes)
102
- else t[0]
103
- )
104
- for t in decoded
105
- )
106
- except Exception as e:
107
- print(f"邮件头解码失败: {str(e)}")
108
- return header
109
-
110
- def _parse_email_message(self, msg: email.message.Message) -> Dict:
111
- """Parse an email message into a dictionary."""
112
- email_data = {}
113
-
114
- # Parse headers
115
- email_data["from"] = self._get_decoded_header(msg["from"]) if msg["from"] else "N/A"
116
- email_data["subject"] = self._get_decoded_header(msg["subject"]) if msg["subject"] else "N/A"
117
- email_data["date"] = msg["date"] if msg["date"] else "N/A"
118
- email_data["to"] = self._get_decoded_header(msg.get("to", ""))
119
-
120
- # Parse body content
121
- def get_body_content(part):
122
- charset = part.get_content_charset() or "utf-8"
123
- try:
124
- return part.get_payload(decode=True).decode(charset, errors="replace")
125
- except Exception as e:
126
- print(f"正文解码失败: {str(e)}")
127
- return ""
128
-
129
- if msg.is_multipart():
130
- for part in msg.walk():
131
- content_type = part.get_content_type()
132
- content_disposition = str(part.get("Content-Disposition"))
133
- if "attachment" in content_disposition:
134
- continue
135
- if content_type == "text/plain":
136
- email_data["text"] = get_body_content(part)
137
- elif content_type == "text/html":
138
- email_data["html"] = get_body_content(part)
139
- else:
140
- content_type = msg.get_content_type()
141
- if content_type == "text/plain":
142
- email_data["text"] = get_body_content(msg)
143
- elif content_type == "text/html":
144
- email_data["html"] = get_body_content(msg)
145
-
146
- return email_data
147
-
148
- def read_emails(self, mailbox: str = "INBOX", limit: int = 1, keyword: str = 'ALL') -> List[Dict]:
149
- """
150
- Read emails from the specified mailbox.
151
-
152
- Args:
153
- mailbox: Mailbox to read from (default: INBOX)
154
- limit: Maximum number of emails to return (default: 1)
155
- keyword: Search keyword (default: 'ALL')
156
-
157
- Returns:
158
- List of parsed email dictionaries
159
- """
160
- imap = self.connect_imap()
161
- try:
162
- status, _ = imap.select(mailbox, readonly=True)
163
- if status != 'OK':
164
- raise Exception(f"无法选择邮箱 {mailbox}")
165
-
166
- _, messages = imap.search(None, keyword)
167
- email_list = []
168
- message_ids = messages[0].split()
169
-
170
- if not message_ids:
171
- print(f"邮箱 {mailbox} 中没有邮件")
172
- return []
173
-
174
- for num in reversed(message_ids[:limit]):
175
- _, msg_data = imap.fetch(num, "(RFC822)")
176
- email_message = email.message_from_bytes(msg_data[0][1])
177
- email_data = self._parse_email_message(email_message)
178
- email_data["uid"] = num.decode()
179
- email_list.append(email_data)
180
-
181
- print(f"成功读取 {len(email_list)} 封邮件")
182
- return email_list
183
- finally:
184
- imap.logout()
185
-
186
- def send_email(self, receiver: str, subject: str, body: str = None, is_html: bool = True) -> None:
187
- """
188
- Send an email using SMTP.
189
-
190
- Args:
191
- receiver: Recipient email address
192
- subject: Email subject
193
- body: Email body content
194
- is_html: Whether the body is HTML (default: True)
195
-
196
- Raises:
197
- Exception: If email sending fails
198
- """
199
- if body is None:
200
- body = """
201
- <h1>Python SMTP 测试邮件</h1>
202
- <p>这是一封通过Python SMTP发送的测试邮件。</p>
203
- """
204
- is_html = True
205
-
206
- message = MIMEMultipart()
207
- # message['From'] = self.email_address
208
- message['To'] = receiver
209
- message['Subject'] = subject
210
-
211
- message.attach(MIMEText(body, 'html' if is_html else 'plain', 'utf-8'))
212
-
213
- try:
214
- if self.smtp_port == 465:
215
- server = smtplib.SMTP_SSL(self.smtp_server_XOAUTH2, self.smtp_port)
216
- else:
217
- server = smtplib.SMTP(self.smtp_server_XOAUTH2, self.smtp_port)
218
- server.starttls()
219
-
220
- # 建立 SMTP 连接
221
- server = smtplib.SMTP(self.smtp_server_XOAUTH2, self.smtp_port)
222
- # server.set_debuglevel(1) # 启用调试输出
223
- server.ehlo() # 识别客户端
224
- server.starttls() # 启用TLS加密
225
- server.ehlo() # 再次识别客户端
226
- # 准备OAuth2认证字符串
227
- auth_string = f"user={self.email_address}\1auth=Bearer {self.access_token}\1\1"
228
- auth_string = base64.b64encode(auth_string.encode('utf-8')).decode('utf-8')
229
-
230
- # 执行OAuth2认证
231
- code, response = server.docmd('AUTH', 'XOAUTH2 ' + auth_string)
232
- server.sendmail(self.email_address, [receiver], message.as_string())
233
- print("邮件发送成功")
234
- except Exception as e:
235
- raise Exception(f"邮件发送失败: {e}")
236
- finally:
237
- server.quit()
238
-
239
- def refresh_access_token(self) -> str:
240
- """
241
- Refresh the access token using the refresh token.
242
-
243
- Returns:
244
- New access token
245
-
246
- Raises:
247
- Exception: If token refresh fails
248
- """
249
- if not self.refresh_token:
250
- raise Exception("No refresh token available")
251
-
252
- url = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
253
- headers = {
254
- "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:137.0) Gecko/20100101 Thunderbird/137.0',
255
- "Content-Type": 'application/x-www-form-urlencoded;charset=UTF-8',
256
- }
257
- data = {
258
- "client_id": self.client_id,
259
- "grant_type": "refresh_token",
260
- "refresh_token": self.refresh_token,
261
- "scope": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access",
262
- }
263
-
264
- response = requests.post(url, headers=headers, data=data, verify=False)
265
- if response.status_code != 200:
266
- error_text = response.text
267
- if "invalid_grant" in error_text and "70000" in error_text:
268
- print("\n[安全警告] 账户需要安全验证!")
269
- print("微软检测到您的账户可能存在安全风险,需要进行额外验证。")
270
- print("请登录 https://account.microsoft.com 或 https://account.live.com 进行安全验证")
271
- print("验证完成后,您需要重新获取refreshtoken并更新Excel文件\n")
272
- raise Exception(f"获取access token失败: {error_text}")
273
-
274
- response_data = response.json()
275
- self.access_token = response_data["access_token"]
276
- if "refresh_token" in response_data:
277
- self.refresh_token = response_data["refresh_token"]
278
-
279
- return self.access_token
280
-
281
-
282
- if __name__ == "__main__":
283
- try:
284
- # Example usage
285
- client = EmailClient(
286
- username="[email protected]",
287
- password="123",
288
- email_address="[email protected]"
289
- )
290
-
291
- # Authenticate and get tokens
292
- client.authenticate_oauth()
293
- #
294
- # Read emails with GitHub keyword
295
- emails = client.read_emails(mailbox="INBOX",
296
- limit=5,keyword='BODY "jetbrain"')
297
-
298
- # Print email details
299
- for i, email_data in enumerate(emails, 1):
300
- print(f"\n邮件 {i}:")
301
- print(f"发件人: {email_data.get('from', 'N/A')}")
302
- print(f"主题: {email_data.get('subject', 'N/A')}")
303
- print(f"日期: {email_data.get('date', 'N/A')}")
304
- print(f"文本内容: {email_data.get('text', 'N/A')[:1000]}...")
305
- print("-" * 50)
306
-
307
- # Send a test email
308
- # client.send_email(
309
- # receiver="[email protected]",
310
- # subject="Test Email",
311
- # body="This is a test email"
312
- # )
313
-
314
- except Exception as e:
315
- print(f"Error: {str(e)}")
316
-