Spaces:
Running
Running
import os | |
import re | |
import shutil | |
import requests | |
from tqdm import tqdm | |
from datetime import datetime | |
from zoneinfo import ZoneInfo | |
from tzlocal import get_localzone | |
EN_US = os.getenv("LANG") != "zh_CN.UTF-8" | |
API_TIKTOK = os.getenv("api_tiktok") | |
API_BILI = os.getenv("api_bili") | |
API_BILI_1 = os.getenv("api_bili_1") | |
API_BILI_2 = os.getenv("api_bili_2") | |
if not (API_TIKTOK and API_BILI_1 and API_BILI_2): | |
print("请检查环境变量") | |
exit() | |
TIMEOUT = None | |
TMP_DIR = "./__pycache__" | |
HEADER = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.116 Safari/537.36" | |
} | |
def timestamp(naive_time: datetime = None, target_tz=ZoneInfo("Asia/Shanghai")): | |
if not naive_time: | |
naive_time = datetime.now() | |
local_tz = get_localzone() | |
aware_local = naive_time.replace(tzinfo=local_tz) | |
return aware_local.astimezone(target_tz).strftime("%Y-%m-%d %H:%M:%S") | |
def mk_dir(dirpath: str): | |
if not os.path.exists(dirpath): | |
os.makedirs(dirpath) | |
def rm_dir(dirpath: str): | |
if os.path.exists(dirpath): | |
shutil.rmtree(dirpath) | |
def clean_dir(dirpath: str): | |
rm_dir(dirpath) | |
os.makedirs(dirpath) | |
def download_file(url, video_id, cache_dir: str): | |
clean_dir(cache_dir) | |
local_file = f"{cache_dir}/{video_id}.mp4" | |
response = requests.get(url, headers=HEADER, stream=True) | |
if response.status_code == 200: | |
total_size = int(response.headers.get("Content-Length", 0)) + 1 | |
time_stamp = timestamp() | |
progress_bar = tqdm( | |
total=total_size, | |
unit="B", | |
unit_scale=True, | |
desc=f"[{time_stamp}] {local_file}", | |
) | |
with open(local_file, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: # 确保 chunk 不为空 | |
f.write(chunk) # 更新进度条 | |
progress_bar.update(len(chunk)) | |
else: | |
raise ConnectionError(f"HTTP: {response.status_code}") | |
return local_file | |
def extract_fst_url(text): | |
url_pattern = r'(https?://[^\s"]+)' | |
match = re.search(url_pattern, text) | |
if match: | |
return match.group(1) | |
else: | |
return None | |