Spaces:
Paused
Paused
| import logging | |
| import math | |
| import time | |
| from datetime import datetime, timezone, timedelta | |
| from fastapi import Request, Response, HTTPException | |
| from mediaflow_proxy.configs import settings | |
| from mediaflow_proxy.drm.decrypter import decrypt_segment | |
| from mediaflow_proxy.utils.http_utils import encode_mediaflow_proxy_url | |
| logger = logging.getLogger(__name__) | |
| async def process_manifest(request: Request, mpd_dict: dict, key_id: str = None, key: str = None) -> Response: | |
| """ | |
| Processes the MPD manifest and converts it to an HLS manifest. | |
| Args: | |
| request (Request): The incoming HTTP request. | |
| mpd_dict (dict): The MPD manifest data. | |
| key_id (str, optional): The DRM key ID. Defaults to None. | |
| key (str, optional): The DRM key. Defaults to None. | |
| Returns: | |
| Response: The HLS manifest as an HTTP response. | |
| """ | |
| hls_content = build_hls(mpd_dict, request, key_id, key) | |
| return Response(content=hls_content, media_type="application/vnd.apple.mpegurl") | |
| async def process_playlist(request: Request, mpd_dict: dict, profile_id: str) -> Response: | |
| """ | |
| Processes the MPD manifest and converts it to an HLS playlist for a specific profile. | |
| Args: | |
| request (Request): The incoming HTTP request. | |
| mpd_dict (dict): The MPD manifest data. | |
| profile_id (str): The profile ID to generate the playlist for. | |
| Returns: | |
| Response: The HLS playlist as an HTTP response. | |
| Raises: | |
| HTTPException: If the profile is not found in the MPD manifest. | |
| """ | |
| matching_profiles = [p for p in mpd_dict["profiles"] if p["id"] == profile_id] | |
| if not matching_profiles: | |
| raise HTTPException(status_code=404, detail="Profile not found") | |
| hls_content = build_hls_playlist(mpd_dict, matching_profiles, request) | |
| return Response(content=hls_content, media_type="application/vnd.apple.mpegurl") | |
| async def process_segment( | |
| init_content: bytes, | |
| segment_content: bytes, | |
| mimetype: str, | |
| key_id: str = None, | |
| key: str = None, | |
| ) -> Response: | |
| """ | |
| Processes and decrypts a media segment. | |
| Args: | |
| init_content (bytes): The initialization segment content. | |
| segment_content (bytes): The media segment content. | |
| mimetype (str): The MIME type of the segment. | |
| key_id (str, optional): The DRM key ID. Defaults to None. | |
| key (str, optional): The DRM key. Defaults to None. | |
| Returns: | |
| Response: The decrypted segment as an HTTP response. | |
| """ | |
| if key_id and key: | |
| # For DRM protected content | |
| now = time.time() | |
| decrypted_content = decrypt_segment(init_content, segment_content, key_id, key) | |
| logger.info(f"Decryption of {mimetype} segment took {time.time() - now:.4f} seconds") | |
| else: | |
| # For non-DRM protected content, we just concatenate init and segment content | |
| decrypted_content = init_content + segment_content | |
| return Response(content=decrypted_content, media_type=mimetype) | |
| def build_hls(mpd_dict: dict, request: Request, key_id: str = None, key: str = None) -> str: | |
| """ | |
| Builds an HLS manifest from the MPD manifest. | |
| Args: | |
| mpd_dict (dict): The MPD manifest data. | |
| request (Request): The incoming HTTP request. | |
| key_id (str, optional): The DRM key ID. Defaults to None. | |
| key (str, optional): The DRM key. Defaults to None. | |
| Returns: | |
| str: The HLS manifest as a string. | |
| """ | |
| hls = ["#EXTM3U", "#EXT-X-VERSION:6"] | |
| query_params = dict(request.query_params) | |
| video_profiles = {} | |
| audio_profiles = {} | |
| for profile in mpd_dict["profiles"]: | |
| query_params.update({"profile_id": profile["id"], "key_id": key_id or "", "key": key or ""}) | |
| playlist_url = encode_mediaflow_proxy_url( | |
| str(request.url_for("playlist_endpoint")), | |
| query_params=query_params, | |
| ) | |
| if "video" in profile["mimeType"]: | |
| video_profiles[profile["id"]] = (profile, playlist_url) | |
| elif "audio" in profile["mimeType"]: | |
| audio_profiles[profile["id"]] = (profile, playlist_url) | |
| # Add audio streams | |
| for i, (profile, playlist_url) in enumerate(audio_profiles.values()): | |
| is_default = "YES" if i == 0 else "NO" # Set the first audio track as default | |
| hls.append( | |
| f'#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="audio",NAME="{profile["id"]}",DEFAULT={is_default},AUTOSELECT={is_default},LANGUAGE="{profile.get("lang", "und")}",URI="{playlist_url}"' | |
| ) | |
| # Add video streams | |
| for profile, playlist_url in video_profiles.values(): | |
| hls.append( | |
| f'#EXT-X-STREAM-INF:BANDWIDTH={profile["bandwidth"]},RESOLUTION={profile["width"]}x{profile["height"]},CODECS="{profile["codecs"]}",FRAME-RATE={profile["frameRate"]},AUDIO="audio"' | |
| ) | |
| hls.append(playlist_url) | |
| return "\n".join(hls) | |
| def build_hls_playlist(mpd_dict: dict, profiles: list[dict], request: Request) -> str: | |
| """ | |
| Builds an HLS playlist from the MPD manifest for specific profiles. | |
| Args: | |
| mpd_dict (dict): The MPD manifest data. | |
| profiles (list[dict]): The profiles to include in the playlist. | |
| request (Request): The incoming HTTP request. | |
| Returns: | |
| str: The HLS playlist as a string. | |
| """ | |
| hls = ["#EXTM3U", "#EXT-X-VERSION:6"] | |
| added_segments = 0 | |
| current_time = datetime.now(timezone.utc) | |
| live_stream_delay = timedelta(seconds=settings.mpd_live_stream_delay) | |
| target_end_time = current_time - live_stream_delay | |
| for index, profile in enumerate(profiles): | |
| segments = profile["segments"] | |
| if not segments: | |
| logger.warning(f"No segments found for profile {profile['id']}") | |
| continue | |
| # Add headers for only the first profile | |
| if index == 0: | |
| sequence = segments[0]["number"] | |
| extinf_values = [f["extinf"] for f in segments if "extinf" in f] | |
| target_duration = math.ceil(max(extinf_values)) if extinf_values else 3 | |
| hls.extend( | |
| [ | |
| f"#EXT-X-TARGETDURATION:{target_duration}", | |
| f"#EXT-X-MEDIA-SEQUENCE:{sequence}", | |
| ] | |
| ) | |
| if mpd_dict["isLive"]: | |
| hls.append("#EXT-X-PLAYLIST-TYPE:EVENT") | |
| else: | |
| hls.append("#EXT-X-PLAYLIST-TYPE:VOD") | |
| init_url = profile["initUrl"] | |
| query_params = dict(request.query_params) | |
| query_params.pop("profile_id", None) | |
| query_params.pop("d", None) | |
| for segment in segments: | |
| if mpd_dict["isLive"]: | |
| if segment["end_time"] > target_end_time: | |
| continue | |
| hls.append(f"#EXT-X-PROGRAM-DATE-TIME:{segment['program_date_time']}") | |
| hls.append(f'#EXTINF:{segment["extinf"]:.3f},') | |
| query_params.update( | |
| {"init_url": init_url, "segment_url": segment["media"], "mime_type": profile["mimeType"]} | |
| ) | |
| hls.append( | |
| encode_mediaflow_proxy_url( | |
| str(request.url_for("segment_endpoint")), | |
| query_params=query_params, | |
| ) | |
| ) | |
| added_segments += 1 | |
| if not mpd_dict["isLive"]: | |
| hls.append("#EXT-X-ENDLIST") | |
| logger.info(f"Added {added_segments} segments to HLS playlist") | |
| return "\n".join(hls) | |