File size: 2,285 Bytes
dff5fe4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import asyncio
import json
import logging
import os
import ssl
import uuid
from typing import Dict, Optional, Callable

import aiohttp
from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack, RTCIceCandidate
from aiortc.contrib.media import MediaBlackhole, MediaRelay

logger = logging.getLogger("webrtc_handler")
pcs = set()
relay = MediaRelay()

class AudioTransformTrack(MediaStreamTrack):
    """

    A track that processes audio and sends it to a callback function

    """
    kind = "audio"

    def __init__(self, track, callback):
        super().__init__()
        self.track = track
        self.callback = callback

    async def recv(self):
        frame = await self.track.recv()
        # Process audio frame
        if self.callback:
            self.callback(frame)
        return frame

async def handle_offer(offer, audio_callback=None):
    offer_data = RTCSessionDescription(sdp=offer["sdp"], type=offer["type"])
    
    pc = RTCPeerConnection()
    pcs.add(pc)
    
    @pc.on("connectionstatechange")
    async def on_connectionstatechange():
        logger.info(f"Connection state is {pc.connectionState}")
        if pc.connectionState == "failed":
            await pc.close()
            pcs.discard(pc)
    
    @pc.on("track")
    def on_track(track):
        logger.info(f"Track {track.kind} received")
        if track.kind == "audio":
            pc.addTrack(AudioTransformTrack(relay.subscribe(track), audio_callback))
        
        @track.on("ended")
        async def on_ended():
            logger.info(f"Track {track.kind} ended")
    
    # Handle the incoming offer
    await pc.setRemoteDescription(offer_data)
    
    # Create an answer
    answer = await pc.createAnswer()
    await pc.setLocalDescription(answer)
    
    return {
        "sdp": pc.localDescription.sdp,
        "type": pc.localDescription.type
    }

async def add_ice_candidate(candidate, pc):
    if candidate and pc:
        candidate_data = RTCIceCandidate(
            sdpMLineIndex=candidate.get("sdpMLineIndex"),
            sdpMid=candidate.get("sdpMid"),
            candidate=candidate.get("candidate")
        )
        await pc.addIceCandidate(candidate_data)