Spaces:
BG5
/
Running

BG5 commited on
Commit
e02b07d
·
verified ·
1 Parent(s): 0fd60ee

Delete r2_sync.py

Browse files
Files changed (1) hide show
  1. r2_sync.py +0 -531
r2_sync.py DELETED
@@ -1,531 +0,0 @@
1
- #!/usr/bin/env python3
2
- import os
3
- import sys
4
- import time
5
- import requests
6
- import json
7
- import hashlib
8
- import logging
9
- import threading
10
- from pathlib import Path
11
- from concurrent.futures import ThreadPoolExecutor
12
- from watchdog.observers import Observer
13
- from watchdog.events import FileSystemEventHandler
14
- from urllib.parse import unquote
15
-
16
- # 配置日志
17
- logging.basicConfig(
18
- level=logging.INFO,
19
- format='%(asctime)s - %(levelname)s - %(message)s',
20
- # handlers=[
21
- # logging.FileHandler('r2_sync.log'),
22
- # logging.StreamHandler()
23
- # ]
24
- )
25
- logger = logging.getLogger(__name__)
26
-
27
- class Config:
28
- """配置管理类"""
29
- def __init__(self):
30
- self.data_dir = Path(os.getenv("DATA_DIR", "/app"))
31
- self.api_key = os.getenv("API_AUTH_KEY")
32
- self.api_endpoint = os.getenv("API_ENDPOINT")
33
- self.sync_interval = int(os.getenv("SYNC_INTERVAL", "300")) # 默认5分钟
34
- self.use_hash_check = os.getenv("USE_HASH_CHECK", "true").lower() == "true"
35
- self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "500"))
36
- self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "60")) # 秒
37
-
38
- # 忽略的文件后缀配置(逗号分隔,如 ".tmp,.log")
39
- ignore_suffixes = os.getenv("IGNORE_SUFFIXES", ".db-journal,.tmp,.log,sqlite3-journal")
40
- self.ignore_suffixes = [s.strip().lower() for s in ignore_suffixes.split(",") if s.strip()]
41
-
42
- # 只同步的文件后缀配置(逗号分隔,如 ".jpg,.png",优先级高于忽略后缀)
43
- only_suffixes = os.getenv("ONLY_SUFFIXES", ".db")
44
- self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()]
45
-
46
- # 验证配置
47
- if not all([self.api_key, self.api_endpoint]):
48
- raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量")
49
-
50
- # 检查冲突配置
51
- if self.ignore_suffixes and self.only_suffixes:
52
- logger.warning("同时设置了IGNORE_SUFFIXES和ONLY_SUFFIXES,将优先使用ONLY_SUFFIXES")
53
-
54
- class SyncHandler(FileSystemEventHandler):
55
- """文件系统事件处理器"""
56
- def __init__(self, callback, config):
57
- self.callback = callback
58
- self.config = config
59
-
60
- def should_sync(self, file_path):
61
- """检查文件是否应该被同步"""
62
- file_path = str(file_path).lower()
63
-
64
- # 优先检查白名单模式
65
- if self.config.only_suffixes:
66
- return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
67
-
68
- # 然后检查黑名单模式
69
- if self.config.ignore_suffixes:
70
- return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
71
-
72
- # 默认同步所有文件
73
- return True
74
-
75
- def on_modified(self, event):
76
- if not event.is_directory and self.should_sync(event.src_path):
77
- self.callback(event.src_path)
78
-
79
- def on_created(self, event):
80
- if not event.is_directory and self.should_sync(event.src_path):
81
- self.callback(event.src_path)
82
-
83
- class R2Sync:
84
- """R2存储同步器"""
85
- def __init__(self, config):
86
- self.config = config
87
- self.file_states = {} # 跟踪文件最后状态 (size, mtime, hash?)
88
- self.upload_queue = {} # 上传队列 {file_path: (key, mtime, timer)}
89
- self.remote_files_cache = None # 缓存远程文件列表
90
- self.cache_valid = False # 缓存是否有效
91
- self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers)
92
- self.cache_lock = threading.Lock()
93
- self.upload_queue_lock = threading.Lock()
94
-
95
- # 确保数据目录存在
96
- self.config.data_dir.mkdir(parents=True, exist_ok=True)
97
-
98
- def should_sync_file(self, file_path):
99
- """检查文件是否应该被同步"""
100
- file_path = str(file_path).lower()
101
-
102
- # 优先检查白名单模式
103
- if self.config.only_suffixes:
104
- return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
105
-
106
- # 然后检查黑名单模式
107
- if self.config.ignore_suffixes:
108
- return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
109
-
110
- # 默认同步所有文件
111
- return True
112
-
113
- def r2_api_request(self, method, path, data=None, max_retries=3):
114
- """发送API请求"""
115
- url = f"{self.config.api_endpoint}/{path}"
116
- headers = {
117
- 'X-API-Key': self.config.api_key,
118
- 'Content-Type': 'application/octet-stream'
119
- }
120
-
121
- for attempt in range(max_retries):
122
- try:
123
- if method == "GET":
124
- resp = requests.get(url, headers=headers)
125
- elif method == "POST":
126
- resp = requests.post(url, data=data, headers=headers)
127
- elif method == "PUT":
128
- resp = requests.put(url, data=data, headers=headers)
129
- elif method == "DELETE":
130
- resp = requests.delete(url, headers=headers)
131
-
132
- resp.raise_for_status()
133
-
134
- if method == "GET":
135
- return resp.content
136
- elif method == "DELETE":
137
- return True
138
- return resp.json()
139
-
140
- except requests.exceptions.RequestException as e:
141
- if attempt == max_retries - 1:
142
- logger.error(f"API请求最终失败: {e}")
143
- return None
144
- wait_time = (attempt + 1) * 2
145
- logger.warning(f"API请求失败(尝试 {attempt + 1}/{max_retries}), {wait_time}秒后重试...")
146
- time.sleep(wait_time)
147
-
148
- def get_file_hash(self, file_path):
149
- """计算文件哈希值"""
150
- hash_obj = hashlib.blake2b()
151
- with open(file_path, 'rb') as f:
152
- while chunk := f.read(8192):
153
- hash_obj.update(chunk)
154
- return hash_obj.hexdigest()
155
-
156
-
157
- def get_remote_files(self, force_refresh=False):
158
- """获取远程文件列表,使用缓存"""
159
- with self.cache_lock:
160
- if force_refresh or not self.cache_valid or self.remote_files_cache is None:
161
- self.remote_files_cache = self._fetch_remote_files()
162
- self.cache_valid = True
163
- return self.remote_files_cache.copy()
164
-
165
- def _fetch_remote_files(self):
166
- """实际获取远程文件列表"""
167
- resp = self.r2_api_request("GET", "list")
168
- if not resp:
169
- return None
170
-
171
- try:
172
- raw_resp = resp.decode('utf-8').strip()
173
- if raw_resp == "[]":
174
- return []
175
-
176
- data = json.loads(raw_resp)
177
-
178
- # 统一处理为列表格式
179
- if isinstance(data, dict):
180
- if 'objects' in data:
181
- data = data['objects']
182
- else:
183
- data = [data]
184
- elif not isinstance(data, list):
185
- data = [data]
186
-
187
- # 处理文件名编码
188
- for item in data:
189
- if isinstance(item, dict) and 'key' in item:
190
- try:
191
- item['key'] = unquote(item['key'])
192
- except:
193
- pass
194
-
195
- return data
196
- except Exception as e:
197
- logger.error(f"解析远程文件列表出错: {e}")
198
- return []
199
-
200
- def invalidate_cache(self):
201
- """使缓存失效"""
202
- with self.cache_lock:
203
- self.cache_valid = False
204
-
205
- def download_file(self, key, dest_path):
206
- """下载文件"""
207
- if not self.should_sync_file(dest_path):
208
- logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
209
- return False
210
-
211
- path = f"download/{key}"
212
- logger.debug(f"下载路径: {dest_path} (原始key: {key})")
213
-
214
- resp = self.r2_api_request("GET", path)
215
- if not resp:
216
- return False
217
-
218
- # 确保目标目录存在
219
- dest_path.parent.mkdir(parents=True, exist_ok=True)
220
-
221
- # 处理不同响应类型
222
- if isinstance(resp, bytes):
223
- content = resp
224
- else:
225
- try:
226
- content = resp.decode('utf-8').encode('utf-8')
227
- except UnicodeDecodeError:
228
- content = resp
229
-
230
- with open(dest_path, 'wb') as f:
231
- f.write(content)
232
-
233
- logger.info(f"文件已保存到: {dest_path} (大小: {len(content)}字节)")
234
- return True
235
-
236
- def _upload_file_task(self, file_path, key):
237
- """线程池中执行的上传任务"""
238
- if not self.should_sync_file(file_path):
239
- logger.debug(f"忽略上传文件(不匹配同步规则): {key}")
240
- return False
241
-
242
- path = f"upload/{key}"
243
- try:
244
- with open(file_path, 'rb') as f:
245
- result = self.r2_api_request("POST", path, f.read()) is not None
246
- if result:
247
- self.invalidate_cache() # 上传成功,使缓存失效
248
- return result
249
- except Exception as e:
250
- logger.error(f"上传文件出错: {e}")
251
- return False
252
-
253
- def upload_file(self, file_path, key):
254
- """上传文件"""
255
- # 提交到线程池执行
256
- future = self.executor.submit(self._upload_file_task, file_path, key)
257
- return future.result() # 阻塞等待结果
258
-
259
- def delete_file(self, key):
260
- """删除文件并更新缓存"""
261
- if not self.should_sync_file(key):
262
- logger.debug(f"忽略删除文件(不匹配同步规则): {key}")
263
- return False
264
-
265
- logger.info(f"删除远程文件: {key}")
266
- path = f"delete/{key}"
267
- if not self.r2_api_request("DELETE", path):
268
- return False
269
-
270
- # 使缓存失效
271
- self.invalidate_cache()
272
-
273
- # 验证文件是否真的被删除
274
- remote_files = self.get_remote_files(force_refresh=True)
275
- if remote_files is None:
276
- return False
277
-
278
- return not any(file_info['key'] == key for file_info in remote_files)
279
-
280
- def check_api_health(self):
281
- """检查API健康状态"""
282
- logger.info("检查API健康状态...")
283
-
284
- # 测试列表API
285
- files = self.get_remote_files()
286
- if files is None:
287
- logger.error("列表API测试失败")
288
- return False
289
-
290
- logger.info("API健康状态检查通过")
291
- return True
292
-
293
- def _download_and_update_state(self, file_info):
294
- """线程任务:下载文件并更新状态"""
295
- key = file_info['key']
296
- dest_path = self.config.data_dir / key
297
-
298
- if not self.should_sync_file(dest_path):
299
- logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
300
- return
301
-
302
- logger.info(f"下载: {key} -> {dest_path}")
303
- if self.download_file(key, dest_path):
304
- # 记录文件初始状态
305
- stat = dest_path.stat()
306
- file_state = (stat.st_size, stat.st_mtime)
307
- if self.config.use_hash_check:
308
- file_state += (self.get_file_hash(dest_path),)
309
- with self.cache_lock: # 使用锁保护file_states更新
310
- self.file_states[key] = file_state
311
-
312
- def init_sync(self):
313
- """初始化同步"""
314
- logger.info("初始化数据目录...")
315
-
316
- if not self.check_api_health():
317
- logger.error("API检查失败,请检查配置和网络连接")
318
- sys.exit(1)
319
-
320
- # 获取远程文件列表
321
- remote_files = self.get_remote_files()
322
- if remote_files is None:
323
- logger.error("获取远程文件列表失败")
324
- sys.exit(1)
325
-
326
- logger.info(f"找到 {len(remote_files)} 个远程文件")
327
- if not remote_files:
328
- logger.info("远程存储桶为空,无需同步")
329
- return
330
-
331
- logger.info("开始并行同步远程文件...")
332
- # 使用线程池并行下载
333
- futures = []
334
- for file_info in remote_files:
335
- futures.append(self.executor.submit(self._download_and_update_state, file_info))
336
-
337
- # 等待所有任务完成
338
- for future in futures:
339
- try:
340
- future.result() # 捕获任何异常
341
- except Exception as e:
342
- logger.error(f"文件下载任务出错: {e}")
343
-
344
- def handle_file_change(self, file_path):
345
- """处理文件变化事件,加入上传队列并设置定时检查"""
346
- try:
347
- if not self.should_sync_file(file_path):
348
- logger.debug(f"忽略文件变化(不匹配同步规则): {file_path}")
349
- return
350
-
351
- rel_path = Path(file_path).relative_to(self.config.data_dir)
352
- key = str(rel_path).replace('\\', '/')
353
-
354
- # 获取文件当前状态
355
- stat = os.stat(file_path)
356
- current_mtime = stat.st_mtime
357
-
358
- with self.upload_queue_lock:
359
- # 如果文件已在队列中,更新修改时间并重置计时器
360
- if file_path in self.upload_queue:
361
- _, _, timer = self.upload_queue[file_path]
362
- timer.cancel() # 取消之前的计时器
363
-
364
- # 创建新计时器,15秒后检查
365
- timer = threading.Timer(
366
- self.config.upload_retry_delay,
367
- self._process_upload_queue,
368
- args=(file_path,)
369
- )
370
- timer.start()
371
-
372
- # 加入/更新队列
373
- self.upload_queue[file_path] = (key, current_mtime, timer)
374
- logger.info(f"检测到文件变化,加入上传队列: {key} (将在{self.config.upload_retry_delay}秒后检查)")
375
-
376
- except Exception as e:
377
- logger.error(f"处理文件变化出错: {e}")
378
- def is_file_modified(self, file_path, last_known_state):
379
- """检查文件是否修改"""
380
- try:
381
- stat = os.stat(file_path)
382
- current_size = stat.st_size
383
-
384
- # 只比较文件大小
385
- if last_known_state[0] == current_size: # 比较size
386
- logger.info(f"文件大小没有变化: {file_path} (大小: {current_size})")
387
- # 新增:当hash不存在时,计算并更新hash
388
- if not self.config.use_hash_check:
389
- logger.info(f"不使用hash检查: {file_path} 视为未修改")
390
- return False # 不使用hash检查,直接返回False
391
- logger.info(f"文件大小相同,开始检查hash: {file_path}")
392
- current_hash = self.get_file_hash(file_path)
393
- if len(last_known_state) <= 2: # hash不存在
394
- # 获取文件key用于更新状态
395
- rel_path = str(Path(file_path).relative_to(self.config.data_dir)).replace('\\', '/')
396
- # 更新状态 (size, mtime, hash)
397
- self.file_states[rel_path] = (current_size, stat.st_mtime, current_hash)
398
- return True # 返回True触发更新
399
- return current_hash != last_known_state[2] # 比较hash
400
- return True # 大小不同,认为已修改
401
- except Exception as e:
402
- logger.error(f"检查文件修改状态出错: {e}")
403
- return False
404
- def _process_upload_queue(self, file_path):
405
- """处理上传队列中的文件"""
406
- with self.upload_queue_lock:
407
- if file_path not in self.upload_queue:
408
- return
409
-
410
- key, original_mtime, _ = self.upload_queue[file_path]
411
- del self.upload_queue[file_path] # 从队列移除
412
-
413
- try:
414
- # 检查文件是否还存在
415
- if not os.path.exists(file_path):
416
- logger.info(f"文件已被删除,取消上传: {key}")
417
- return
418
-
419
- # 获取当前文件状态
420
- current_stat = os.stat(file_path)
421
- current_mtime = current_stat.st_mtime
422
-
423
- # 比较修改时间,确认文件是否又有新变化
424
- if current_mtime != original_mtime:
425
- logger.info(f"文件 {key} 在等待期间又有新变化,重新加入队列")
426
- self.handle_file_change(file_path) # 重新加入队列
427
- return
428
-
429
- if not self.is_file_modified(file_path, self.file_states.get(key, (0, 0))):
430
- logger.info(f"文件 {key} 没有新变化,跳过上传")
431
- return
432
- # 确认文件无新变化,开始上传
433
- logger.info(f"开始上传文件: {key}")
434
- if self.upload_file(file_path, key):
435
- # 更新文件状态
436
- file_state = (current_stat.st_size, current_mtime)
437
- if self.config.use_hash_check:
438
- file_state += (self.get_file_hash(file_path),)
439
- self.file_states[key] = file_state
440
- else:
441
- logger.error(f"上传失败: {key}")
442
-
443
- except Exception as e:
444
- logger.error(f"处理上传队列出错: {e}")
445
-
446
- def sync_deleted_files(self):
447
- """同步删除操作"""
448
- try:
449
- remote_files = self.get_remote_files()
450
- if remote_files is None:
451
- return
452
-
453
- local_files = {
454
- str(f.relative_to(self.config.data_dir)).replace('\\', '/')
455
- for f in self.config.data_dir.rglob('*')
456
- if f.is_file() and self.should_sync_file(f)
457
- }
458
-
459
- for file_info in remote_files:
460
- key = file_info['key']
461
- if key not in local_files and key not in self.upload_queue:
462
- logger.info(f"删除远程文件: {key}")
463
- if self.delete_file(key):
464
- if key in self.file_states:
465
- del self.file_states[key]
466
- else:
467
- logger.error(f"删除失败: {key}")
468
- except Exception as e:
469
- logger.error(f"同步删除操作出错: {e}")
470
-
471
- def watch_and_sync(self):
472
- """监控并同步文件"""
473
- logger.info("启动持续同步服务...")
474
-
475
- # 初始化文件状态(根据同步规则)
476
- for file in self.config.data_dir.rglob('*'):
477
- if file.is_file() and self.should_sync_file(file):
478
- rel_path = str(file.relative_to(self.config.data_dir)).replace('\\', '/')
479
- stat = file.stat()
480
- file_state = (stat.st_size, stat.st_mtime)
481
- if self.config.use_hash_check:
482
- file_state += (self.get_file_hash(file),)
483
- self.file_states[rel_path] = file_state
484
-
485
- # 设置文件监控(根据同步规则)
486
- event_handler = SyncHandler(self.handle_file_change, self.config)
487
- observer = Observer()
488
- observer.schedule(event_handler, path=str(self.config.data_dir), recursive=True)
489
- observer.start()
490
-
491
- try:
492
- while True:
493
- # 定期检查远程文件删除
494
- self.sync_deleted_files()
495
- time.sleep(self.config.sync_interval)
496
- except KeyboardInterrupt:
497
- logger.info("收到停止信号,关闭监控...")
498
- # 取消所有待处理的上传计时器
499
- with self.upload_queue_lock:
500
- for file_path, (_, _, timer) in self.upload_queue.items():
501
- timer.cancel()
502
- self.upload_queue.clear()
503
- observer.stop()
504
- observer.join()
505
- self.executor.shutdown()
506
-
507
- def main():
508
- if len(sys.argv) < 2:
509
- logger.info("Usage: python r2_sync.py [init|sync]")
510
- sys.exit(1)
511
-
512
- try:
513
- config = Config()
514
- except ValueError as e:
515
- logger.error(str(e))
516
- sys.exit(1)
517
-
518
- r2_sync = R2Sync(config)
519
- command = sys.argv[1]
520
-
521
- if command == "init":
522
- r2_sync.init_sync()
523
- elif command == "sync":
524
- logger.info(f"启动同步服务,间隔: {config.sync_interval}秒")
525
- r2_sync.watch_and_sync()
526
- else:
527
- logger.error(f"未知命令: {command}")
528
- sys.exit(1)
529
-
530
- if __name__ == "__main__":
531
- main()