File size: 4,838 Bytes
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
361f9d4
c490c32
361f9d4
c490c32
 
 
 
 
 
 
361f9d4
c490c32
361f9d4
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
814feb3
 
c490c32
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from asyncio import Queue, TaskGroup
import asyncio
from contextlib import asynccontextmanager

import ray
from chat_service import ChatService
# from local_speaker_service import LocalSpeakerService
from text_to_speech_service import TextToSpeechService
from environment_state_actor import EnvironmentStateActor
from ffmpeg_converter_actor import FFMpegConverterActor
from agent_response import AgentResponse
import json
from asyncio import Semaphore

class RespondToPromptAsync:
    def __init__(
            self, 
            environment_state_actor:EnvironmentStateActor, 
            audio_output_queue):
        voice_id="2OviOUQc1JsQRQgNkVBj"
        self.prompt_queue = Queue(maxsize=100)
        self.llm_sentence_queue = Queue(maxsize=100)
        self.speech_chunk_queue = Queue(maxsize=100)
        self.voice_id = voice_id
        self.audio_output_queue = audio_output_queue
        self.environment_state_actor = environment_state_actor
        self.processing_semaphore = Semaphore(1)
        self.sentence_queues = []
        self.sentence_tasks = []    
        # self.ffmpeg_converter_actor = FFMpegConverterActor.remote(audio_output_queue)

    async def enqueue_prompt(self, prompt:str, messages:[str]):
        if len(prompt) > 0:  # handles case where we just want to flush
            await self.prompt_queue.put((prompt, messages))
        print("Enqueued prompt")

    async def prompt_to_llm(self):
        chat_service = ChatService()

        async with TaskGroup() as tg:
            while True:
                prompt, messages = await self.prompt_queue.get()
                agent_response = AgentResponse(prompt)
                async for text, is_complete_sentance in chat_service.get_responses_as_sentances_async(messages):
                    if chat_service.ignore_sentence(text):
                        is_complete_sentance = False
                    if not is_complete_sentance:
                        agent_response['llm_preview'] = text
                        await self.environment_state_actor.set_llm_preview.remote(text)
                        continue
                    agent_response['llm_preview'] = ''
                    agent_response['llm_sentence'] = text
                    agent_response['llm_sentences'].append(text)
                    await self.environment_state_actor.add_llm_response_and_clear_llm_preview.remote(text)
                    print(f"{agent_response['llm_sentence']} id: {agent_response['llm_sentence_id']} from prompt: {agent_response['prompt']}")
                    sentence_response = agent_response.make_copy()
                    new_queue = Queue()
                    self.sentence_queues.append(new_queue)
                    task = tg.create_task(self.llm_sentence_to_speech(sentence_response, new_queue))
                    self.sentence_tasks.append(task)
                    agent_response['llm_sentence_id'] += 1


    async def llm_sentence_to_speech(self, sentence_response, output_queue):
        tts_service = TextToSpeechService(self.voice_id)
        
        chunk_count = 0
        async for chunk_response in tts_service.get_speech_chunks_async(sentence_response):
            chunk_response = chunk_response.make_copy()
            # await self.output_queue.put_async(chunk_response)
            await output_queue.put(chunk_response)
            chunk_response = {
                'prompt': sentence_response['prompt'],
                'llm_sentence_id': sentence_response['llm_sentence_id'],
                'chunk_count': chunk_count,
            }
            chunk_id_json = json.dumps(chunk_response)
            await self.environment_state_actor.add_tts_raw_chunk_id.remote(chunk_id_json)
            chunk_count += 1

    async def speech_to_converter(self):
        self.ffmpeg_converter_actor = FFMpegConverterActor.remote(self.audio_output_queue)
        await self.ffmpeg_converter_actor.start_process.remote()
        self.ffmpeg_converter_actor.run.remote()
        
        while True:
            for i, task in enumerate(self.sentence_tasks):
                # Skip this task/queue pair if task completed
                queue = self.sentence_queues[i]
                if task.done() and queue.empty():
                    continue 
                while not queue.empty():
                    chunk_response = await queue.get()
                    audio_chunk_ref = chunk_response['tts_raw_chunk_ref']
                    audio_chunk = ray.get(audio_chunk_ref)
                    await self.ffmpeg_converter_actor.push_chunk.remote(audio_chunk)
                break

            await asyncio.sleep(0.01) 

    async def run(self):
        async with TaskGroup() as tg:  # Use asyncio's built-in TaskGroup
            tg.create_task(self.prompt_to_llm())
            tg.create_task(self.speech_to_converter())