import time
import ray
from ray.util.queue import Queue, Empty
from ray.actor import ActorHandle
import numpy as np
import pid_helper

# Ray Queue's take ~.5 seconds to splin up; 
# this class creates a pool of queues to cycle through
class QueueFactory:
    def __init__(self, max_size:int):
        self.queues:[Queue] = []
        self.queue_size = 5
        self.max_size = max_size
        while len(self.queues) < self.queue_size:
            self.queues.append(Queue(maxsize=max_size))

    def get_queue(self)->Queue:
        queue = self.queues.pop(0)
        self.queues.append(Queue(maxsize=self.max_size))
        return queue
        
@ray.remote
class AppInterfaceActor:
    def __init__(self):
        self.audio_input_queue = Queue(maxsize=3000)  # Adjust the size as needed
        self.video_input_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.audio_output_queue_factory = QueueFactory(max_size=50)
        self.audio_output_queue = self.audio_output_queue_factory.get_queue()
        self.video_output_queue = Queue(maxsize=10)  # Adjust the size as needed
        self.debug_str = ""
        self.state = "Initializing"
        self.charles_app_pids = []

    @staticmethod
    def get_singleton():
        return AppInterfaceActor.options(
            name="AppInterfaceActor",
            get_if_exists=True,
        ).remote()

# functions for UI to enqueue input, dequeue output
    async def enqueue_video_input_frame(self, shared_tensor_ref):
        if self.video_input_queue.full():
            evicted_item = await self.video_input_queue.get_async()
            del evicted_item
        await self.video_input_queue.put_async(shared_tensor_ref)

    async def enqueue_audio_input_frame(self, shared_buffer_ref):
        if self.audio_input_queue.full():
            evicted_item = await self.audio_input_queue.get_async()
            del evicted_item
        await self.audio_input_queue.put_async(shared_buffer_ref)

    async def dequeue_audio_output_frame_async(self):
        start_time = time.time()
        try:
            frame = await self.audio_output_queue.get_async(block=False)
        except Empty:
            frame = None
        elapsed_time = time.time() - start_time
        if elapsed_time > 0.1:
            print (f"dequeue_audio_output_frame_async time: {elapsed_time}. was empty: {frame is None}. frame type: {type(frame) if frame else str(0)}")
        return frame

    async def dequeue_video_output_frames_async(self):
        video_frames = []
        if self.video_output_queue.empty():
            return video_frames
        while not self.video_output_queue.empty():
            shared_tensor = await self.video_output_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# functions for application to dequeue input, enqueue output
    def get_audio_output_queue(self)->Queue:
        return self.audio_output_queue

    async def cycle_output_queue(self)->Queue:
        self.audio_output_queue.shutdown(grace_period_s=0.1)
        self.audio_output_queue = self.audio_output_queue_factory.get_queue()
        return self.audio_output_queue
    
    async def enqueue_video_output_frame(self, shared_tensor_ref):
        if self.video_output_queue.full():
            evicted_item = await self.video_output_queue.get_async()
            del evicted_item
        await self.video_output_queue.put_async(shared_tensor_ref)
    
    async def dequeue_audio_input_frames_async(self):
        audio_frames = []
        if self.audio_input_queue.empty():
            return audio_frames
        while not self.audio_input_queue.empty():
            shared_tensor = await self.audio_input_queue.get_async()
            audio_frames.append(shared_tensor)
        return audio_frames

    async def dequeue_video_input_frames_async(self):
        video_frames = []
        if self.video_input_queue.empty():
            return video_frames
        while not self.video_input_queue.empty():
            shared_tensor = await self.video_input_queue.get_async()
            video_frames.append(shared_tensor)
        return video_frames

# pid helpers
    async def add_charles_app_pid(self, pid:int):
        self.charles_app_pids.append(pid)

    async def get_charles_app_pids(self)->[int]:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return self.charles_app_pids

    async def is_charles_app_running(self)->bool:
        # prune dead pids
        running_charles_app_pids = []
        for pid in self.charles_app_pids:
            if pid_helper.is_pid_running(pid):
                running_charles_app_pids.append(pid)
        self.charles_app_pids = running_charles_app_pids
        return len(self.charles_app_pids) > 0

# debug helpers
    async def get_debug_output(self)->str:
        return self.debug_str
    
    async def set_debug_output(self, debug_str:str):
        self.debug_str = debug_str

    async def get_state(self)->str:
        return self.state
    
    async def set_state(self, state:str):
        self.state = state