|
|
|
import os |
|
import sys |
|
import time |
|
import requests |
|
import json |
|
import hashlib |
|
import logging |
|
import threading |
|
from pathlib import Path |
|
from concurrent.futures import ThreadPoolExecutor |
|
from watchdog.observers import Observer |
|
from watchdog.events import FileSystemEventHandler |
|
from urllib.parse import unquote |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
|
|
|
|
|
|
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class Config: |
|
"""配置管理类""" |
|
def __init__(self): |
|
self.data_dir = Path(os.getenv("DATA_DIR", "/app")) |
|
self.api_key = os.getenv("API_AUTH_KEY") |
|
self.api_endpoint = os.getenv("API_ENDPOINT") |
|
self.sync_interval = int(os.getenv("SYNC_INTERVAL", "300")) |
|
self.use_hash_check = os.getenv("USE_HASH_CHECK", "true").lower() == "true" |
|
self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "500")) |
|
self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "60")) |
|
|
|
|
|
ignore_suffixes = os.getenv("IGNORE_SUFFIXES", ".db-journal,.tmp,.log,sqlite3-journal") |
|
self.ignore_suffixes = [s.strip().lower() for s in ignore_suffixes.split(",") if s.strip()] |
|
|
|
|
|
only_suffixes = os.getenv("ONLY_SUFFIXES", ".db") |
|
self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()] |
|
|
|
|
|
if not all([self.api_key, self.api_endpoint]): |
|
raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量") |
|
|
|
|
|
if self.ignore_suffixes and self.only_suffixes: |
|
logger.warning("同时设置了IGNORE_SUFFIXES和ONLY_SUFFIXES,将优先使用ONLY_SUFFIXES") |
|
|
|
class SyncHandler(FileSystemEventHandler): |
|
"""文件系统事件处理器""" |
|
def __init__(self, callback, config): |
|
self.callback = callback |
|
self.config = config |
|
|
|
def should_sync(self, file_path): |
|
"""检查文件是否应该被同步""" |
|
file_path = str(file_path).lower() |
|
|
|
|
|
if self.config.only_suffixes: |
|
return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes) |
|
|
|
|
|
if self.config.ignore_suffixes: |
|
return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes) |
|
|
|
|
|
return True |
|
|
|
def on_modified(self, event): |
|
if not event.is_directory and self.should_sync(event.src_path): |
|
self.callback(event.src_path) |
|
|
|
def on_created(self, event): |
|
if not event.is_directory and self.should_sync(event.src_path): |
|
self.callback(event.src_path) |
|
|
|
class R2Sync: |
|
"""R2存储同步器""" |
|
def __init__(self, config): |
|
self.config = config |
|
self.file_states = {} |
|
self.upload_queue = {} |
|
self.remote_files_cache = None |
|
self.cache_valid = False |
|
self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers) |
|
self.cache_lock = threading.Lock() |
|
self.upload_queue_lock = threading.Lock() |
|
|
|
|
|
self.config.data_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
def should_sync_file(self, file_path): |
|
"""检查文件是否应该被同步""" |
|
file_path = str(file_path).lower() |
|
|
|
|
|
if self.config.only_suffixes: |
|
return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes) |
|
|
|
|
|
if self.config.ignore_suffixes: |
|
return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes) |
|
|
|
|
|
return True |
|
|
|
def r2_api_request(self, method, path, data=None, max_retries=3): |
|
"""发送API请求""" |
|
url = f"{self.config.api_endpoint}/{path}" |
|
headers = { |
|
'X-API-Key': self.config.api_key, |
|
'Content-Type': 'application/octet-stream' |
|
} |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
if method == "GET": |
|
resp = requests.get(url, headers=headers) |
|
elif method == "POST": |
|
resp = requests.post(url, data=data, headers=headers) |
|
elif method == "PUT": |
|
resp = requests.put(url, data=data, headers=headers) |
|
elif method == "DELETE": |
|
resp = requests.delete(url, headers=headers) |
|
|
|
resp.raise_for_status() |
|
|
|
if method == "GET": |
|
return resp.content |
|
elif method == "DELETE": |
|
return True |
|
return resp.json() |
|
|
|
except requests.exceptions.RequestException as e: |
|
if attempt == max_retries - 1: |
|
logger.error(f"API请求最终失败: {e}") |
|
return None |
|
wait_time = (attempt + 1) * 2 |
|
logger.warning(f"API请求失败(尝试 {attempt + 1}/{max_retries}), {wait_time}秒后重试...") |
|
time.sleep(wait_time) |
|
|
|
def get_file_hash(self, file_path): |
|
"""计算文件哈希值""" |
|
hash_obj = hashlib.blake2b() |
|
with open(file_path, 'rb') as f: |
|
while chunk := f.read(8192): |
|
hash_obj.update(chunk) |
|
return hash_obj.hexdigest() |
|
|
|
|
|
def get_remote_files(self, force_refresh=False): |
|
"""获取远程文件列表,使用缓存""" |
|
with self.cache_lock: |
|
if force_refresh or not self.cache_valid or self.remote_files_cache is None: |
|
self.remote_files_cache = self._fetch_remote_files() |
|
self.cache_valid = True |
|
return self.remote_files_cache.copy() |
|
|
|
def _fetch_remote_files(self): |
|
"""实际获取远程文件列表""" |
|
resp = self.r2_api_request("GET", "list") |
|
if not resp: |
|
return None |
|
|
|
try: |
|
raw_resp = resp.decode('utf-8').strip() |
|
if raw_resp == "[]": |
|
return [] |
|
|
|
data = json.loads(raw_resp) |
|
|
|
|
|
if isinstance(data, dict): |
|
if 'objects' in data: |
|
data = data['objects'] |
|
else: |
|
data = [data] |
|
elif not isinstance(data, list): |
|
data = [data] |
|
|
|
|
|
for item in data: |
|
if isinstance(item, dict) and 'key' in item: |
|
try: |
|
item['key'] = unquote(item['key']) |
|
except: |
|
pass |
|
|
|
return data |
|
except Exception as e: |
|
logger.error(f"解析远程文件列表出错: {e}") |
|
return [] |
|
|
|
def invalidate_cache(self): |
|
"""使缓存失效""" |
|
with self.cache_lock: |
|
self.cache_valid = False |
|
|
|
def download_file(self, key, dest_path): |
|
"""下载文件""" |
|
if not self.should_sync_file(dest_path): |
|
logger.debug(f"忽略下载文件(不匹配同步规则): {key}") |
|
return False |
|
|
|
path = f"download/{key}" |
|
logger.debug(f"下载路径: {dest_path} (原始key: {key})") |
|
|
|
resp = self.r2_api_request("GET", path) |
|
if not resp: |
|
return False |
|
|
|
|
|
dest_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if isinstance(resp, bytes): |
|
content = resp |
|
else: |
|
try: |
|
content = resp.decode('utf-8').encode('utf-8') |
|
except UnicodeDecodeError: |
|
content = resp |
|
|
|
with open(dest_path, 'wb') as f: |
|
f.write(content) |
|
|
|
logger.info(f"文件已保存到: {dest_path} (大小: {len(content)}字节)") |
|
return True |
|
|
|
def _upload_file_task(self, file_path, key): |
|
"""线程池中执行的上传任务""" |
|
if not self.should_sync_file(file_path): |
|
logger.debug(f"忽略上传文件(不匹配同步规则): {key}") |
|
return False |
|
|
|
path = f"upload/{key}" |
|
try: |
|
with open(file_path, 'rb') as f: |
|
result = self.r2_api_request("POST", path, f.read()) is not None |
|
if result: |
|
self.invalidate_cache() |
|
return result |
|
except Exception as e: |
|
logger.error(f"上传文件出错: {e}") |
|
return False |
|
|
|
def upload_file(self, file_path, key): |
|
"""上传文件""" |
|
|
|
future = self.executor.submit(self._upload_file_task, file_path, key) |
|
return future.result() |
|
|
|
def delete_file(self, key): |
|
"""删除文件并更新缓存""" |
|
if not self.should_sync_file(key): |
|
logger.debug(f"忽略删除文件(不匹配同步规则): {key}") |
|
return False |
|
|
|
logger.info(f"删除远程文件: {key}") |
|
path = f"delete/{key}" |
|
if not self.r2_api_request("DELETE", path): |
|
return False |
|
|
|
|
|
self.invalidate_cache() |
|
|
|
|
|
remote_files = self.get_remote_files(force_refresh=True) |
|
if remote_files is None: |
|
return False |
|
|
|
return not any(file_info['key'] == key for file_info in remote_files) |
|
|
|
def check_api_health(self): |
|
"""检查API健康状态""" |
|
logger.info("检查API健康状态...") |
|
|
|
|
|
files = self.get_remote_files() |
|
if files is None: |
|
logger.error("列表API测试失败") |
|
return False |
|
|
|
logger.info("API健康状态检查通过") |
|
return True |
|
|
|
def _download_and_update_state(self, file_info): |
|
"""线程任务:下载文件并更新状态""" |
|
key = file_info['key'] |
|
dest_path = self.config.data_dir / key |
|
|
|
if not self.should_sync_file(dest_path): |
|
logger.debug(f"忽略下载文件(不匹配同步规则): {key}") |
|
return |
|
|
|
logger.info(f"下载: {key} -> {dest_path}") |
|
if self.download_file(key, dest_path): |
|
|
|
stat = dest_path.stat() |
|
file_state = (stat.st_size, stat.st_mtime) |
|
if self.config.use_hash_check: |
|
file_state += (self.get_file_hash(dest_path),) |
|
with self.cache_lock: |
|
self.file_states[key] = file_state |
|
|
|
def init_sync(self): |
|
"""初始化同步""" |
|
logger.info("初始化数据目录...") |
|
|
|
if not self.check_api_health(): |
|
logger.error("API检查失败,请检查配置和网络连接") |
|
sys.exit(1) |
|
|
|
|
|
remote_files = self.get_remote_files() |
|
if remote_files is None: |
|
logger.error("获取远程文件列表失败") |
|
sys.exit(1) |
|
|
|
logger.info(f"找到 {len(remote_files)} 个远程文件") |
|
if not remote_files: |
|
logger.info("远程存储桶为空,无需同步") |
|
return |
|
|
|
logger.info("开始并行同步远程文件...") |
|
|
|
futures = [] |
|
for file_info in remote_files: |
|
futures.append(self.executor.submit(self._download_and_update_state, file_info)) |
|
|
|
|
|
for future in futures: |
|
try: |
|
future.result() |
|
except Exception as e: |
|
logger.error(f"文件下载任务出错: {e}") |
|
|
|
def handle_file_change(self, file_path): |
|
"""处理文件变化事件,加入上传队列并设置定时检查""" |
|
try: |
|
if not self.should_sync_file(file_path): |
|
logger.debug(f"忽略文件变化(不匹配同步规则): {file_path}") |
|
return |
|
|
|
rel_path = Path(file_path).relative_to(self.config.data_dir) |
|
key = str(rel_path).replace('\\', '/') |
|
|
|
|
|
stat = os.stat(file_path) |
|
current_mtime = stat.st_mtime |
|
|
|
with self.upload_queue_lock: |
|
|
|
if file_path in self.upload_queue: |
|
_, _, timer = self.upload_queue[file_path] |
|
timer.cancel() |
|
|
|
|
|
timer = threading.Timer( |
|
self.config.upload_retry_delay, |
|
self._process_upload_queue, |
|
args=(file_path,) |
|
) |
|
timer.start() |
|
|
|
|
|
self.upload_queue[file_path] = (key, current_mtime, timer) |
|
logger.info(f"检测到文件变化,加入上传队列: {key} (将在{self.config.upload_retry_delay}秒后检查)") |
|
|
|
except Exception as e: |
|
logger.error(f"处理文件变化出错: {e}") |
|
def is_file_modified(self, file_path, last_known_state): |
|
"""检查文件是否修改""" |
|
try: |
|
stat = os.stat(file_path) |
|
current_size = stat.st_size |
|
|
|
|
|
if last_known_state[0] == current_size: |
|
logger.info(f"文件大小没有变化: {file_path} (大小: {current_size})") |
|
|
|
if not self.config.use_hash_check: |
|
logger.info(f"不使用hash检查: {file_path} 视为未修改") |
|
return False |
|
logger.info(f"文件大小相同,开始检查hash: {file_path}") |
|
current_hash = self.get_file_hash(file_path) |
|
if len(last_known_state) <= 2: |
|
|
|
rel_path = str(Path(file_path).relative_to(self.config.data_dir)).replace('\\', '/') |
|
|
|
self.file_states[rel_path] = (current_size, stat.st_mtime, current_hash) |
|
return True |
|
return current_hash != last_known_state[2] |
|
return True |
|
except Exception as e: |
|
logger.error(f"检查文件修改状态出错: {e}") |
|
return False |
|
def _process_upload_queue(self, file_path): |
|
"""处理上传队列中的文件""" |
|
with self.upload_queue_lock: |
|
if file_path not in self.upload_queue: |
|
return |
|
|
|
key, original_mtime, _ = self.upload_queue[file_path] |
|
del self.upload_queue[file_path] |
|
|
|
try: |
|
|
|
if not os.path.exists(file_path): |
|
logger.info(f"文件已被删除,取消上传: {key}") |
|
return |
|
|
|
|
|
current_stat = os.stat(file_path) |
|
current_mtime = current_stat.st_mtime |
|
|
|
|
|
if current_mtime != original_mtime: |
|
logger.info(f"文件 {key} 在等待期间又有新变化,重新加入队列") |
|
self.handle_file_change(file_path) |
|
return |
|
|
|
if not self.is_file_modified(file_path, self.file_states.get(key, (0, 0))): |
|
logger.info(f"文件 {key} 没有新变化,跳过上传") |
|
return |
|
|
|
logger.info(f"开始上传文件: {key}") |
|
if self.upload_file(file_path, key): |
|
|
|
file_state = (current_stat.st_size, current_mtime) |
|
if self.config.use_hash_check: |
|
file_state += (self.get_file_hash(file_path),) |
|
self.file_states[key] = file_state |
|
else: |
|
logger.error(f"上传失败: {key}") |
|
|
|
except Exception as e: |
|
logger.error(f"处理上传队列出错: {e}") |
|
|
|
def sync_deleted_files(self): |
|
"""同步删除操作""" |
|
try: |
|
remote_files = self.get_remote_files() |
|
if remote_files is None: |
|
return |
|
|
|
local_files = { |
|
str(f.relative_to(self.config.data_dir)).replace('\\', '/') |
|
for f in self.config.data_dir.rglob('*') |
|
if f.is_file() and self.should_sync_file(f) |
|
} |
|
|
|
for file_info in remote_files: |
|
key = file_info['key'] |
|
if key not in local_files and key not in self.upload_queue: |
|
logger.info(f"删除远程文件: {key}") |
|
if self.delete_file(key): |
|
if key in self.file_states: |
|
del self.file_states[key] |
|
else: |
|
logger.error(f"删除失败: {key}") |
|
except Exception as e: |
|
logger.error(f"同步删除操作出错: {e}") |
|
|
|
def watch_and_sync(self): |
|
"""监控并同步文件""" |
|
logger.info("启动持续同步服务...") |
|
|
|
|
|
for file in self.config.data_dir.rglob('*'): |
|
if file.is_file() and self.should_sync_file(file): |
|
rel_path = str(file.relative_to(self.config.data_dir)).replace('\\', '/') |
|
stat = file.stat() |
|
file_state = (stat.st_size, stat.st_mtime) |
|
if self.config.use_hash_check: |
|
file_state += (self.get_file_hash(file),) |
|
self.file_states[rel_path] = file_state |
|
|
|
|
|
event_handler = SyncHandler(self.handle_file_change, self.config) |
|
observer = Observer() |
|
observer.schedule(event_handler, path=str(self.config.data_dir), recursive=True) |
|
observer.start() |
|
|
|
try: |
|
while True: |
|
|
|
self.sync_deleted_files() |
|
time.sleep(self.config.sync_interval) |
|
except KeyboardInterrupt: |
|
logger.info("收到停止信号,关闭监控...") |
|
|
|
with self.upload_queue_lock: |
|
for file_path, (_, _, timer) in self.upload_queue.items(): |
|
timer.cancel() |
|
self.upload_queue.clear() |
|
observer.stop() |
|
observer.join() |
|
self.executor.shutdown() |
|
|
|
def main(): |
|
if len(sys.argv) < 2: |
|
logger.info("Usage: python r2_sync.py [init|sync]") |
|
sys.exit(1) |
|
|
|
try: |
|
config = Config() |
|
except ValueError as e: |
|
logger.error(str(e)) |
|
sys.exit(1) |
|
|
|
r2_sync = R2Sync(config) |
|
command = sys.argv[1] |
|
|
|
if command == "init": |
|
r2_sync.init_sync() |
|
elif command == "sync": |
|
logger.info(f"启动同步服务,间隔: {config.sync_interval}秒") |
|
r2_sync.watch_and_sync() |
|
else: |
|
logger.error(f"未知命令: {command}") |
|
sys.exit(1) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|