BG5 commited on
Commit
9c8df27
·
verified ·
1 Parent(s): f9cbb48

Create r2_sync.py

Browse files
Files changed (1) hide show
  1. r2_sync.py +531 -0
r2_sync.py ADDED
@@ -0,0 +1,531 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ import os
3
+ import sys
4
+ import time
5
+ import requests
6
+ import json
7
+ import hashlib
8
+ import logging
9
+ import threading
10
+ from pathlib import Path
11
+ from concurrent.futures import ThreadPoolExecutor
12
+ from watchdog.observers import Observer
13
+ from watchdog.events import FileSystemEventHandler
14
+ from urllib.parse import unquote
15
+
16
+ # 配置日志
17
+ logging.basicConfig(
18
+ level=logging.INFO,
19
+ format='%(asctime)s - %(levelname)s - %(message)s',
20
+ # handlers=[
21
+ # logging.FileHandler('r2_sync.log'),
22
+ # logging.StreamHandler()
23
+ # ]
24
+ )
25
+ logger = logging.getLogger(__name__)
26
+
27
+ class Config:
28
+ """配置管理类"""
29
+ def __init__(self):
30
+ self.data_dir = Path(os.getenv("DATA_DIR", "/app"))
31
+ self.api_key = os.getenv("API_AUTH_KEY")
32
+ self.api_endpoint = os.getenv("API_ENDPOINT")
33
+ self.sync_interval = int(os.getenv("SYNC_INTERVAL", "300")) # 默认5分钟
34
+ self.use_hash_check = os.getenv("USE_HASH_CHECK", "false").lower() == "true"
35
+ self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "500"))
36
+ self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "60")) # 秒
37
+
38
+ # 忽略的文件后缀配置(逗号分隔,如 ".tmp,.log")
39
+ ignore_suffixes = os.getenv("IGNORE_SUFFIXES", ".db-journal,.tmp,.log,sqlite3-journal")
40
+ self.ignore_suffixes = [s.strip().lower() for s in ignore_suffixes.split(",") if s.strip()]
41
+
42
+ # 只同步的文件后缀配置(逗号分隔,如 ".jpg,.png",优先级高于忽略后缀)
43
+ only_suffixes = os.getenv("ONLY_SUFFIXES", ".db")
44
+ self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()]
45
+
46
+ # 验证配置
47
+ if not all([self.api_key, self.api_endpoint]):
48
+ raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量")
49
+
50
+ # 检查冲突配置
51
+ if self.ignore_suffixes and self.only_suffixes:
52
+ logger.warning("同时设置了IGNORE_SUFFIXES和ONLY_SUFFIXES,将优先使用ONLY_SUFFIXES")
53
+
54
+ class SyncHandler(FileSystemEventHandler):
55
+ """文件系统事件处理器"""
56
+ def __init__(self, callback, config):
57
+ self.callback = callback
58
+ self.config = config
59
+
60
+ def should_sync(self, file_path):
61
+ """检查文件是否应该被同步"""
62
+ file_path = str(file_path).lower()
63
+
64
+ # 优先检查白名单模式
65
+ if self.config.only_suffixes:
66
+ return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
67
+
68
+ # 然后检查黑名单模式
69
+ if self.config.ignore_suffixes:
70
+ return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
71
+
72
+ # 默认同步所有文件
73
+ return True
74
+
75
+ def on_modified(self, event):
76
+ if not event.is_directory and self.should_sync(event.src_path):
77
+ self.callback(event.src_path)
78
+
79
+ def on_created(self, event):
80
+ if not event.is_directory and self.should_sync(event.src_path):
81
+ self.callback(event.src_path)
82
+
83
+ class R2Sync:
84
+ """R2存储同步器"""
85
+ def __init__(self, config):
86
+ self.config = config
87
+ self.file_states = {} # 跟踪文件最后状态 (size, mtime, hash?)
88
+ self.upload_queue = {} # 上传队列 {file_path: (key, mtime, timer)}
89
+ self.remote_files_cache = None # 缓存远程文件列表
90
+ self.cache_valid = False # 缓存是否有效
91
+ self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers)
92
+ self.cache_lock = threading.Lock()
93
+ self.upload_queue_lock = threading.Lock()
94
+
95
+ # 确保数据目录存在
96
+ self.config.data_dir.mkdir(parents=True, exist_ok=True)
97
+
98
+ def should_sync_file(self, file_path):
99
+ """检查文件是否应该被同步"""
100
+ file_path = str(file_path).lower()
101
+
102
+ # 优先检查白名单模式
103
+ if self.config.only_suffixes:
104
+ return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
105
+
106
+ # 然后检查黑名单模式
107
+ if self.config.ignore_suffixes:
108
+ return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
109
+
110
+ # 默认同步所有文件
111
+ return True
112
+
113
+ def r2_api_request(self, method, path, data=None, max_retries=3):
114
+ """发送API请求"""
115
+ url = f"{self.config.api_endpoint}/{path}"
116
+ headers = {
117
+ 'X-API-Key': self.config.api_key,
118
+ 'Content-Type': 'application/octet-stream'
119
+ }
120
+
121
+ for attempt in range(max_retries):
122
+ try:
123
+ if method == "GET":
124
+ resp = requests.get(url, headers=headers)
125
+ elif method == "POST":
126
+ resp = requests.post(url, data=data, headers=headers)
127
+ elif method == "PUT":
128
+ resp = requests.put(url, data=data, headers=headers)
129
+ elif method == "DELETE":
130
+ resp = requests.delete(url, headers=headers)
131
+
132
+ resp.raise_for_status()
133
+
134
+ if method == "GET":
135
+ return resp.content
136
+ elif method == "DELETE":
137
+ return True
138
+ return resp.json()
139
+
140
+ except requests.exceptions.RequestException as e:
141
+ if attempt == max_retries - 1:
142
+ logger.error(f"API请求最终失败: {e}")
143
+ return None
144
+ wait_time = (attempt + 1) * 2
145
+ logger.warning(f"API请求失败(尝试 {attempt + 1}/{max_retries}), {wait_time}秒后重试...")
146
+ time.sleep(wait_time)
147
+
148
+ def get_file_hash(self, file_path):
149
+ """计算文件哈希值"""
150
+ hash_obj = hashlib.blake2b()
151
+ with open(file_path, 'rb') as f:
152
+ while chunk := f.read(8192):
153
+ hash_obj.update(chunk)
154
+ return hash_obj.hexdigest()
155
+
156
+
157
+ def get_remote_files(self, force_refresh=False):
158
+ """获取远程文件列表,使用缓存"""
159
+ with self.cache_lock:
160
+ if force_refresh or not self.cache_valid or self.remote_files_cache is None:
161
+ self.remote_files_cache = self._fetch_remote_files()
162
+ self.cache_valid = True
163
+ return self.remote_files_cache.copy()
164
+
165
+ def _fetch_remote_files(self):
166
+ """实际获取远程文件列表"""
167
+ resp = self.r2_api_request("GET", "list")
168
+ if not resp:
169
+ return None
170
+
171
+ try:
172
+ raw_resp = resp.decode('utf-8').strip()
173
+ if raw_resp == "[]":
174
+ return []
175
+
176
+ data = json.loads(raw_resp)
177
+
178
+ # 统一处理为列表格式
179
+ if isinstance(data, dict):
180
+ if 'objects' in data:
181
+ data = data['objects']
182
+ else:
183
+ data = [data]
184
+ elif not isinstance(data, list):
185
+ data = [data]
186
+
187
+ # 处理文件名编码
188
+ for item in data:
189
+ if isinstance(item, dict) and 'key' in item:
190
+ try:
191
+ item['key'] = unquote(item['key'])
192
+ except:
193
+ pass
194
+
195
+ return data
196
+ except Exception as e:
197
+ logger.error(f"解析远程文件列表出错: {e}")
198
+ return []
199
+
200
+ def invalidate_cache(self):
201
+ """使缓存失效"""
202
+ with self.cache_lock:
203
+ self.cache_valid = False
204
+
205
+ def download_file(self, key, dest_path):
206
+ """下载文件"""
207
+ if not self.should_sync_file(dest_path):
208
+ logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
209
+ return False
210
+
211
+ path = f"download/{key}"
212
+ logger.debug(f"下载路径: {dest_path} (原始key: {key})")
213
+
214
+ resp = self.r2_api_request("GET", path)
215
+ if not resp:
216
+ return False
217
+
218
+ # 确保目标目录存在
219
+ dest_path.parent.mkdir(parents=True, exist_ok=True)
220
+
221
+ # 处理不同响应类型
222
+ if isinstance(resp, bytes):
223
+ content = resp
224
+ else:
225
+ try:
226
+ content = resp.decode('utf-8').encode('utf-8')
227
+ except UnicodeDecodeError:
228
+ content = resp
229
+
230
+ with open(dest_path, 'wb') as f:
231
+ f.write(content)
232
+
233
+ logger.info(f"文件已保存到: {dest_path} (大小: {len(content)}字节)")
234
+ return True
235
+
236
+ def _upload_file_task(self, file_path, key):
237
+ """线程池中执行的上传任务"""
238
+ if not self.should_sync_file(file_path):
239
+ logger.debug(f"忽略上传文件(不匹配同步规则): {key}")
240
+ return False
241
+
242
+ path = f"upload/{key}"
243
+ try:
244
+ with open(file_path, 'rb') as f:
245
+ result = self.r2_api_request("POST", path, f.read()) is not None
246
+ if result:
247
+ self.invalidate_cache() # 上传成功,使缓存失效
248
+ return result
249
+ except Exception as e:
250
+ logger.error(f"上传文件出错: {e}")
251
+ return False
252
+
253
+ def upload_file(self, file_path, key):
254
+ """上传文件"""
255
+ # 提交到线程池执行
256
+ future = self.executor.submit(self._upload_file_task, file_path, key)
257
+ return future.result() # 阻塞等待结果
258
+
259
+ def delete_file(self, key):
260
+ """删除文件并更新缓存"""
261
+ if not self.should_sync_file(key):
262
+ logger.debug(f"忽略删除文件(不匹配同步规则): {key}")
263
+ return False
264
+
265
+ logger.info(f"删除远程文件: {key}")
266
+ path = f"delete/{key}"
267
+ if not self.r2_api_request("DELETE", path):
268
+ return False
269
+
270
+ # 使缓存失效
271
+ self.invalidate_cache()
272
+
273
+ # 验证文件是否真的被删除
274
+ remote_files = self.get_remote_files(force_refresh=True)
275
+ if remote_files is None:
276
+ return False
277
+
278
+ return not any(file_info['key'] == key for file_info in remote_files)
279
+
280
+ def check_api_health(self):
281
+ """检查API健康状态"""
282
+ logger.info("检查API健康状态...")
283
+
284
+ # 测试列表API
285
+ files = self.get_remote_files()
286
+ if files is None:
287
+ logger.error("列表API测试失败")
288
+ return False
289
+
290
+ logger.info("API健康状态检查通过")
291
+ return True
292
+
293
+ def _download_and_update_state(self, file_info):
294
+ """线程任务:下载文件并更新状态"""
295
+ key = file_info['key']
296
+ dest_path = self.config.data_dir / key
297
+
298
+ if not self.should_sync_file(dest_path):
299
+ logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
300
+ return
301
+
302
+ logger.info(f"下载: {key} -> {dest_path}")
303
+ if self.download_file(key, dest_path):
304
+ # 记录文件初始状态
305
+ stat = dest_path.stat()
306
+ file_state = (stat.st_size, stat.st_mtime)
307
+ if self.config.use_hash_check:
308
+ file_state += (self.get_file_hash(dest_path),)
309
+ with self.cache_lock: # 使用锁保护file_states更新
310
+ self.file_states[key] = file_state
311
+
312
+ def init_sync(self):
313
+ """初始化同步"""
314
+ logger.info("初始化数据目录...")
315
+
316
+ if not self.check_api_health():
317
+ logger.error("API检查失败,请检查配置和网络连接")
318
+ sys.exit(1)
319
+
320
+ # 获取远程文件列表
321
+ remote_files = self.get_remote_files()
322
+ if remote_files is None:
323
+ logger.error("获取远程文件列表失败")
324
+ sys.exit(1)
325
+
326
+ logger.info(f"找到 {len(remote_files)} 个远程文件")
327
+ if not remote_files:
328
+ logger.info("远程存储桶为空,无需同步")
329
+ return
330
+
331
+ logger.info("开始并行同步远程文件...")
332
+ # 使用线程池并行下载
333
+ futures = []
334
+ for file_info in remote_files:
335
+ futures.append(self.executor.submit(self._download_and_update_state, file_info))
336
+
337
+ # 等待所有任务完成
338
+ for future in futures:
339
+ try:
340
+ future.result() # 捕获任何异常
341
+ except Exception as e:
342
+ logger.error(f"文件下载任务出错: {e}")
343
+
344
+ def handle_file_change(self, file_path):
345
+ """处理文件变化事件,加入上传队列并设置定时检查"""
346
+ try:
347
+ if not self.should_sync_file(file_path):
348
+ logger.debug(f"忽略文件变化(不匹配同步规则): {file_path}")
349
+ return
350
+
351
+ rel_path = Path(file_path).relative_to(self.config.data_dir)
352
+ key = str(rel_path).replace('\\', '/')
353
+
354
+ # 获取文件当前状态
355
+ stat = os.stat(file_path)
356
+ current_mtime = stat.st_mtime
357
+
358
+ with self.upload_queue_lock:
359
+ # 如果文件已在队列中,更新修改时间并重置计时器
360
+ if file_path in self.upload_queue:
361
+ _, _, timer = self.upload_queue[file_path]
362
+ timer.cancel() # 取消之前的计时器
363
+
364
+ # 创建新计时器,15秒后检查
365
+ timer = threading.Timer(
366
+ self.config.upload_retry_delay,
367
+ self._process_upload_queue,
368
+ args=(file_path,)
369
+ )
370
+ timer.start()
371
+
372
+ # 加入/更新队列
373
+ self.upload_queue[file_path] = (key, current_mtime, timer)
374
+ logger.info(f"检测到文件变化,加入上传队列: {key} (将在{self.config.upload_retry_delay}秒后检查)")
375
+
376
+ except Exception as e:
377
+ logger.error(f"处理文件变化出错: {e}")
378
+ def is_file_modified(self, file_path, last_known_state):
379
+ """检查文件是否修改"""
380
+ try:
381
+ stat = os.stat(file_path)
382
+ current_size = stat.st_size
383
+
384
+ # 只比较文件大小
385
+ if last_known_state[0] == current_size: # 比较size
386
+ logger.info(f"文件大小没有变化: {file_path} (大小: {current_size})")
387
+ # 新增:当hash不存在时,计算并更新hash
388
+ if not self.config.use_hash_check:
389
+ logger.info(f"不使用hash检查: {file_path} 视为未修改")
390
+ return False # 不使用hash检查,直接返回False
391
+ logger.info(f"文件大小相同,开始检查hash: {file_path}")
392
+ current_hash = self.get_file_hash(file_path)
393
+ if len(last_known_state) <= 2: # hash不存在
394
+ # 获取文件key用于更新状态
395
+ rel_path = str(Path(file_path).relative_to(self.config.data_dir)).replace('\\', '/')
396
+ # 更新状态 (size, mtime, hash)
397
+ self.file_states[rel_path] = (current_size, stat.st_mtime, current_hash)
398
+ return True # 返回True触发更新
399
+ return current_hash != last_known_state[2] # 比较hash
400
+ return True # 大小不同,认为已修改
401
+ except Exception as e:
402
+ logger.error(f"检查文件修改状态出错: {e}")
403
+ return False
404
+ def _process_upload_queue(self, file_path):
405
+ """处理上传队列中的文件"""
406
+ with self.upload_queue_lock:
407
+ if file_path not in self.upload_queue:
408
+ return
409
+
410
+ key, original_mtime, _ = self.upload_queue[file_path]
411
+ del self.upload_queue[file_path] # 从队列移除
412
+
413
+ try:
414
+ # 检查文件是否还存在
415
+ if not os.path.exists(file_path):
416
+ logger.info(f"文件已被删除,取消上传: {key}")
417
+ return
418
+
419
+ # 获取当前文件状态
420
+ current_stat = os.stat(file_path)
421
+ current_mtime = current_stat.st_mtime
422
+
423
+ # 比较修改时间,确认文件是否又有新变化
424
+ if current_mtime != original_mtime:
425
+ logger.info(f"文件 {key} 在等待期间又有新变化,重新加入队列")
426
+ self.handle_file_change(file_path) # 重新加入队列
427
+ return
428
+
429
+ if not self.is_file_modified(file_path, self.file_states.get(key, (0, 0))):
430
+ logger.info(f"文件 {key} 没有新变化,跳过上传")
431
+ return
432
+ # 确认文件无新变化,开始上传
433
+ logger.info(f"开始上传文件: {key}")
434
+ if self.upload_file(file_path, key):
435
+ # 更新文件状态
436
+ file_state = (current_stat.st_size, current_mtime)
437
+ if self.config.use_hash_check:
438
+ file_state += (self.get_file_hash(file_path),)
439
+ self.file_states[key] = file_state
440
+ else:
441
+ logger.error(f"上传失败: {key}")
442
+
443
+ except Exception as e:
444
+ logger.error(f"处理上传队列出错: {e}")
445
+
446
+ def sync_deleted_files(self):
447
+ """同步删除操作"""
448
+ try:
449
+ remote_files = self.get_remote_files()
450
+ if remote_files is None:
451
+ return
452
+
453
+ local_files = {
454
+ str(f.relative_to(self.config.data_dir)).replace('\\', '/')
455
+ for f in self.config.data_dir.rglob('*')
456
+ if f.is_file() and self.should_sync_file(f)
457
+ }
458
+
459
+ for file_info in remote_files:
460
+ key = file_info['key']
461
+ if key not in local_files and key not in self.upload_queue:
462
+ logger.info(f"删除远程文件: {key}")
463
+ if self.delete_file(key):
464
+ if key in self.file_states:
465
+ del self.file_states[key]
466
+ else:
467
+ logger.error(f"删除失败: {key}")
468
+ except Exception as e:
469
+ logger.error(f"同步删除操作出错: {e}")
470
+
471
+ def watch_and_sync(self):
472
+ """监控并同步文件"""
473
+ logger.info("启动持续同步服务...")
474
+
475
+ # 初始化文件状态(根据同步规则)
476
+ for file in self.config.data_dir.rglob('*'):
477
+ if file.is_file() and self.should_sync_file(file):
478
+ rel_path = str(file.relative_to(self.config.data_dir)).replace('\\', '/')
479
+ stat = file.stat()
480
+ file_state = (stat.st_size, stat.st_mtime)
481
+ if self.config.use_hash_check:
482
+ file_state += (self.get_file_hash(file),)
483
+ self.file_states[rel_path] = file_state
484
+
485
+ # 设置文件监控(根据同步规则)
486
+ event_handler = SyncHandler(self.handle_file_change, self.config)
487
+ observer = Observer()
488
+ observer.schedule(event_handler, path=str(self.config.data_dir), recursive=True)
489
+ observer.start()
490
+
491
+ try:
492
+ while True:
493
+ # 定期检查远程文件删除
494
+ self.sync_deleted_files()
495
+ time.sleep(self.config.sync_interval)
496
+ except KeyboardInterrupt:
497
+ logger.info("收到停止信号,关闭监控...")
498
+ # 取消所有待处理的上传计时器
499
+ with self.upload_queue_lock:
500
+ for file_path, (_, _, timer) in self.upload_queue.items():
501
+ timer.cancel()
502
+ self.upload_queue.clear()
503
+ observer.stop()
504
+ observer.join()
505
+ self.executor.shutdown()
506
+
507
+ def main():
508
+ if len(sys.argv) < 2:
509
+ logger.info("Usage: python r2_sync.py [init|sync]")
510
+ sys.exit(1)
511
+
512
+ try:
513
+ config = Config()
514
+ except ValueError as e:
515
+ logger.error(str(e))
516
+ sys.exit(1)
517
+
518
+ r2_sync = R2Sync(config)
519
+ command = sys.argv[1]
520
+
521
+ if command == "init":
522
+ r2_sync.init_sync()
523
+ elif command == "sync":
524
+ logger.info(f"启动同步服务,间隔: {config.sync_interval}秒")
525
+ r2_sync.watch_and_sync()
526
+ else:
527
+ logger.error(f"未知命令: {command}")
528
+ sys.exit(1)
529
+
530
+ if __name__ == "__main__":
531
+ main()