from threading import Thread from multiprocessing import Queue from typing import Dict, Any, List import json import re import logging import sys from mistralai import Mistral # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout)], ) logger = logging.getLogger(__name__) class ActionProcessor(Thread): def __init__( self, text_queue: "Queue[str]", action_queue: "Queue[str]", mistral_api_key: str, ): super().__init__() self.text_queue = text_queue self.action_queue = action_queue self.text_buffers: List[str] = [] self.mistral_client = Mistral(api_key=mistral_api_key) self.daemon = True # Thread will exit when main program exits def get_sentiment_and_action(self, input_text: str) -> str: """Get sentiment analysis for input text.""" messages = [ { "role": "user", "content": f""" The following transcription is a broken transmission overheard by a baby through a baby walkie-talkie. The transmission contains **fragments** of negative parenting commands, and your task is to reconstruct the most likely intended message. Analyze the provided transcription and follow these steps: 1. **Reconstruct the most logical full command** by analyzing the given fragments and arranging them into a coherent order. 2. **Select the closest matching three-word action** from the following predefined options: ["drop it", "don't drink", "None", "stay back", "stop touching", "move away", "none"]. Always choose the most contextually relevant option. If no match is appropriate, select "None." 3. Determine whether the parenting sentiment behind the command is positive or negative**, based on the following criteria: - If the statement includes the baby’s name ("Harold"), it is considered **positive parenting**. - If the statement consists only of direct commands without affectionate or encouraging words (e.g., "good boy," "better"), it is considered **negative parenting**. Your response should strictly follow this JSON format as an array of strings: ```json [action,sentiment] ``` for example: ["drop it", "negative"] ["none", "positive"] Transcription fragments: "{input_text}" """, } ] response = self.mistral_client.chat.complete( model="mistral-large-latest", messages=messages + [ { "role": "assistant", "content": "[", "prefix": True, } ], response_format={"type": "json_object"}, temperature=0.0, ) result: str = response.choices[0].message.content return result.strip() def process_text(self, text: str) -> Dict[str, Any] | None: """Convert text into an action if a complete command is detected.""" # Get sentiment first self.text_buffers.append(text) if len(self.text_buffers) < 3: return None if len(self.text_buffers) > 3: _ = self.text_buffers.pop(0) candidate = self.text_buffers[1] if len(self.text_buffers[0]) < len(candidate) >= len(self.text_buffers[2]): sentiment_and_action = self.get_sentiment_and_action(candidate) return {"type": "action", "sentiment": sentiment_and_action} return None def run(self) -> None: """Main processing loop.""" while True: try: # Get text from queue, blocks until text is available text = self.text_queue.get() # Process the text into an action action = self.process_text(text) # If we got a valid action, add it to the action queue if action: self.action_queue.put(json.dumps(action)) except Exception as e: logger.error(f"Error processing text: {str(e)}") continue