Spaces:
Running
Running
File size: 2,248 Bytes
186440e 0d11e5e 186440e 0d6128e 7b8f88d 0d11e5e 7b8f88d bf7d327 0d6128e 065a96f 0d6128e 0d11e5e 7b8f88d 0d11e5e 186440e 0d6128e 0d11e5e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import os
import re
import shutil
import requests
from tqdm import tqdm
from datetime import datetime
from zoneinfo import ZoneInfo
from tzlocal import get_localzone
EN_US = os.getenv("LANG") != "zh_CN.UTF-8"
API_TIKTOK = os.getenv("api_tiktok")
API_BILI = os.getenv("api_bili")
API_BILI_1 = os.getenv("api_bili_1")
API_BILI_2 = os.getenv("api_bili_2")
if not (API_TIKTOK and API_BILI_1 and API_BILI_2):
print("请检查环境变量")
exit()
TIMEOUT = None
TMP_DIR = "./__pycache__"
HEADER = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36"
}
def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")):
if not naive_time:
naive_time = datetime.now()
local_tz = get_localzone()
aware_local = naive_time.replace(tzinfo=local_tz)
return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S")
def mk_dir(dirpath: str):
if not os.path.exists(dirpath):
os.makedirs(dirpath)
def rm_dir(dirpath: str):
if os.path.exists(dirpath):
shutil.rmtree(dirpath)
def clean_dir(dirpath: str):
rm_dir(dirpath)
os.makedirs(dirpath)
def download_file(url, video_id, cache_dir: str):
clean_dir(cache_dir)
local_file = f"{cache_dir}/{video_id}.mp4"
response = requests.get(url, headers=HEADER, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get("Content-Length", 0)) + 1
time_stamp = timestamp()
progress_bar = tqdm(
total=total_size,
unit="B",
unit_scale=True,
desc=f"[{time_stamp}] {local_file}",
)
with open(local_file, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk: # 确保 chunk 不为空
f.write(chunk) # 更新进度条
progress_bar.update(len(chunk))
else:
raise ConnectionError(f"HTTP: {response.status_code}")
return local_file
def extract_fst_url(text):
url_pattern = r'(https?://[^\s"]+)'
match = re.search(url_pattern, text)
if match:
return match.group(1)
else:
return None
|