File size: 4,753 Bytes
ad67495
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import ray
import time
import asyncio
import os

@ray.remote
class CharlesActor:
    def __init__(self):
        self._needs_init = True
        self._system_one_audio_history_output = ""
        self._state = "Initializing"
        
    def get_state(self):
        return self._state
    
    
    def get_system_one_audio_history_output(self):
        return self._system_one_audio_history_output
    
    async def _initalize_resources(self):
        # Initialize resources
        print("000")
        from streamlit_av_queue import StreamlitAVQueue
        self._streamlit_av_queue = StreamlitAVQueue()

        print("002")
        from speech_to_text_vosk import SpeechToTextVosk
        self._speech_to_text_vosk = SpeechToTextVosk()

        from chat_pipeline import ChatPipeline
        self._chat_pipeline = ChatPipeline()
        await self._chat_pipeline.start()

        self._debug_queue = [
            # "hello, how are you today?",
            # "hmm, interesting, tell me more about that.",
        ]
        print("010")
        self._needs_init = True
        self._state = "Initialized"
        
    async def start(self):
        if self._needs_init:
            await self._initalize_resources()
            
        system_one_audio_history = []
        
        self._state = "Waiting for input"
        total_video_frames = 0
        total_audio_frames = 0
        loops = 0

        while True:
            if len(self._debug_queue) > 0:
                prompt = self._debug_queue.pop(0)
                await self._chat_pipeline.enqueue(prompt)
            audio_frames = self._streamlit_av_queue.get_audio_frames()
            if len(audio_frames) > 0:
                total_audio_frames += len(audio_frames)
                # Concatenate all audio frames into a single buffer
                audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
                self._speech_to_text_vosk.add_speech_bytes(audio_buffer)
            prompt, speaker_finished = self._speech_to_text_vosk.get_text()
            if speaker_finished and len(prompt) > 0:
                print(f"Prompt: {prompt}")
                system_one_audio_history.append(prompt)
                if len(system_one_audio_history) > 10:
                    system_one_audio_history = system_one_audio_history[-10:]
                table_content = "| System 1 Audio History |\n| --- |\n"
                table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
                self._system_one_audio_history_output = table_content
                await self._chat_pipeline.enqueue(prompt)
            video_frames = self._streamlit_av_queue.get_video_frames()
            if len(video_frames) > 0:
                total_video_frames += len(video_frames)
            #     for video_frame in video_frames:
            #         system_one_video_output.image(video_frame.to_ndarray())
            #         pass
            
            # update debug output
            if (total_video_frames >0 or total_audio_frames > 0):
                self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames"
            await asyncio.sleep(0.1)
            loops+=1
            self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}"

if __name__ == "__main__":
    if not ray.is_initialized():
        # Try to connect to a running Ray cluster
        ray_address = os.getenv('RAY_ADDRESS')
        if ray_address:
            ray.init(ray_address, namespace="project_charles")
        else:
            ray.init(namespace="project_charles")

    charles_actor = CharlesActor.options(
        name="CharlesActor", 
        get_if_exists=True,
        ).remote() 
    future = charles_actor.start.remote()

    try:
        while True:
            ready, _ = ray.wait([future], timeout=0)
            if ready:
                # The start method has terminated. You can fetch the result (if any) with ray.get().
                # If the method raised an exception, it will be re-raised here.
                try:
                    result = ray.get(future)
                    print(f"The start method has terminated with result: {result}")
                except Exception as e:
                    print(f"The start method raised an exception: {e}")
                break
            else:
                # The start method is still running. You can poll for debug information here.
                time.sleep(1)
                state = charles_actor.get_state.remote()
                print(f"Charles is in state: {ray.get(state)}")
    except KeyboardInterrupt:
        print("Script was manually terminated")