Spaces:
Running
Running
import base64 | |
import json | |
import secrets | |
import string | |
from datetime import timedelta | |
from enum import IntEnum | |
from ipaddress import IPv4Address | |
from random import randint | |
from typing import Annotated, Any, Optional | |
from Cryptodome.Cipher import AES | |
from Cryptodome.Util.Padding import pad | |
from fastapi import Query | |
from hibiapi.api.netease.constants import NeteaseConstants | |
from hibiapi.utils.cache import cache_config | |
from hibiapi.utils.decorators import enum_auto_doc | |
from hibiapi.utils.exceptions import UpstreamAPIException | |
from hibiapi.utils.net import catch_network_error | |
from hibiapi.utils.routing import BaseEndpoint, dont_route | |
class SearchType(IntEnum): | |
"""搜索内容类型""" | |
SONG = 1 | |
"""单曲""" | |
ALBUM = 10 | |
"""专辑""" | |
ARTIST = 100 | |
"""歌手""" | |
PLAYLIST = 1000 | |
"""歌单""" | |
USER = 1002 | |
"""用户""" | |
MV = 1004 | |
"""MV""" | |
LYRICS = 1006 | |
"""歌词""" | |
DJ = 1009 | |
"""主播电台""" | |
VIDEO = 1014 | |
"""视频""" | |
class BitRateType(IntEnum): | |
"""歌曲码率""" | |
LOW = 64000 | |
MEDIUM = 128000 | |
STANDARD = 198000 | |
HIGH = 320000 | |
class MVResolutionType(IntEnum): | |
"""MV分辨率""" | |
QVGA = 240 | |
VGA = 480 | |
HD = 720 | |
FHD = 1080 | |
class RecordPeriodType(IntEnum): | |
"""听歌记录时段类型""" | |
WEEKLY = 1 | |
"""本周""" | |
ALL = 0 | |
"""所有时段""" | |
class _EncryptUtil: | |
alphabets = bytearray(ord(char) for char in string.ascii_letters + string.digits) | |
def _aes(data: bytes, key: bytes) -> bytes: | |
data = pad(data, 16) if len(data) % 16 else data | |
return base64.encodebytes( | |
AES.new( | |
key=key, | |
mode=AES.MODE_CBC, | |
iv=NeteaseConstants.AES_IV, | |
).encrypt(data) | |
) | |
def _rsa(data: bytes): | |
result = pow( | |
base=int(data.hex(), 16), | |
exp=NeteaseConstants.RSA_PUBKEY, | |
mod=NeteaseConstants.RSA_MODULUS, | |
) | |
return f"{result:0>256x}" | |
def encrypt(cls, data: dict[str, Any]) -> dict[str, str]: | |
secret = bytes(secrets.choice(cls.alphabets) for _ in range(16)) | |
secure_key = cls._rsa(bytes(reversed(secret))) | |
return { | |
"params": cls._aes( | |
data=cls._aes( | |
data=json.dumps(data).encode(), | |
key=NeteaseConstants.AES_KEY, | |
), | |
key=secret, | |
).decode("ascii"), | |
"encSecKey": secure_key, | |
} | |
class NeteaseEndpoint(BaseEndpoint): | |
def _construct_headers(self): | |
headers = self.client.headers.copy() | |
headers["X-Real-IP"] = str( | |
IPv4Address( | |
randint( | |
int(NeteaseConstants.SOURCE_IP_SEGMENT.network_address), | |
int(NeteaseConstants.SOURCE_IP_SEGMENT.broadcast_address), | |
) | |
) | |
) | |
return headers | |
async def request( | |
self, endpoint: str, *, params: Optional[dict[str, Any]] = None | |
) -> dict[str, Any]: | |
params = { | |
**(params or {}), | |
"csrf_token": self.client.cookies.get("__csrf", ""), | |
} | |
response = await self.client.post( | |
self._join( | |
NeteaseConstants.HOST, | |
endpoint=endpoint, | |
params=params, | |
), | |
headers=self._construct_headers(), | |
data=_EncryptUtil.encrypt(params), | |
) | |
response.raise_for_status() | |
if not response.text.strip(): | |
raise UpstreamAPIException( | |
f"Upstream API {endpoint=} returns blank content" | |
) | |
return response.json() | |
async def search( | |
self, | |
*, | |
s: str, | |
search_type: SearchType = SearchType.SONG, | |
limit: int = 20, | |
offset: int = 0, | |
): | |
return await self.request( | |
"api/cloudsearch/pc", | |
params={ | |
"s": s, | |
"type": search_type, | |
"limit": limit, | |
"offset": offset, | |
"total": True, | |
}, | |
) | |
async def artist(self, *, id: int): | |
return await self.request( | |
"weapi/v1/artist/{artist_id}", | |
params={ | |
"artist_id": id, | |
}, | |
) | |
async def album(self, *, id: int): | |
return await self.request( | |
"weapi/v1/album/{album_id}", | |
params={ | |
"album_id": id, | |
}, | |
) | |
async def detail( | |
self, | |
*, | |
id: Annotated[list[int], Query()], | |
): | |
return await self.request( | |
"api/v3/song/detail", | |
params={ | |
"c": json.dumps( | |
[{"id": str(i)} for i in id], | |
), | |
}, | |
) | |
async def song( | |
self, | |
*, | |
id: Annotated[list[int], Query()], | |
br: BitRateType = BitRateType.STANDARD, | |
): | |
return await self.request( | |
"weapi/song/enhance/player/url", | |
params={ | |
"ids": [str(i) for i in id], | |
"br": br, | |
}, | |
) | |
async def playlist(self, *, id: int): | |
return await self.request( | |
"weapi/v6/playlist/detail", | |
params={ | |
"id": id, | |
"total": True, | |
"offset": 0, | |
"limit": 1000, | |
"n": 1000, | |
}, | |
) | |
async def lyric(self, *, id: int): | |
return await self.request( | |
"weapi/song/lyric", | |
params={ | |
"id": id, | |
"os": "pc", | |
"lv": -1, | |
"kv": -1, | |
"tv": -1, | |
}, | |
) | |
async def mv(self, *, id: int): | |
return await self.request( | |
"api/v1/mv/detail", | |
params={ | |
"id": id, | |
}, | |
) | |
async def mv_url( | |
self, | |
*, | |
id: int, | |
res: MVResolutionType = MVResolutionType.FHD, | |
): | |
return await self.request( | |
"weapi/song/enhance/play/mv/url", | |
params={ | |
"id": id, | |
"r": res, | |
}, | |
) | |
async def comments(self, *, id: int, offset: int = 0, limit: int = 1): | |
return await self.request( | |
"weapi/v1/resource/comments/R_SO_4_{song_id}", | |
params={ | |
"song_id": id, | |
"offset": offset, | |
"total": True, | |
"limit": limit, | |
}, | |
) | |
async def record(self, *, id: int, period: RecordPeriodType = RecordPeriodType.ALL): | |
return await self.request( | |
"weapi/v1/play/record", | |
params={ | |
"uid": id, | |
"type": period, | |
}, | |
) | |
async def djradio(self, *, id: int): | |
return await self.request( | |
"api/djradio/v2/get", | |
params={ | |
"id": id, | |
}, | |
) | |
async def dj(self, *, id: int, offset: int = 0, limit: int = 20, asc: bool = False): | |
# NOTE: Possible not same with origin | |
return await self.request( | |
"weapi/dj/program/byradio", | |
params={ | |
"radioId": id, | |
"offset": offset, | |
"limit": limit, | |
"asc": asc, | |
}, | |
) | |
async def detail_dj(self, *, id: int): | |
return await self.request( | |
"api/dj/program/detail", | |
params={ | |
"id": id, | |
}, | |
) | |
async def user(self, *, id: int): | |
return await self.request( | |
"weapi/v1/user/detail/{id}", | |
params={"id": id}, | |
) | |
async def user_playlist(self, *, id: int, limit: int = 50, offset: int = 0): | |
return await self.request( | |
"weapi/user/playlist", | |
params={ | |
"uid": id, | |
"limit": limit, | |
"offset": offset, | |
}, | |
) | |