Spaces:
Running
on
A10G
Running
on
A10G
from threading import Thread | |
from multiprocessing import Queue | |
from typing import Dict, Any, List | |
import json | |
import re | |
import logging | |
import sys | |
from mistralai import Mistral | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
handlers=[logging.StreamHandler(sys.stdout)], | |
) | |
logger = logging.getLogger(__name__) | |
class ActionProcessor(Thread): | |
def __init__( | |
self, | |
text_queue: "Queue[str]", | |
action_queue: "Queue[str]", | |
mistral_api_key: str, | |
): | |
super().__init__() | |
self.text_queue = text_queue | |
self.action_queue = action_queue | |
self.text_buffers: List[str] = [] | |
self.mistral_client = Mistral(api_key=mistral_api_key) | |
self.daemon = True # Thread will exit when main program exits | |
def get_sentiment_and_action(self, input_text: str) -> str: | |
"""Get sentiment analysis for input text.""" | |
messages = [ | |
{ | |
"role": "user", | |
"content": f""" | |
The following transcription is a broken transmission overheard by a baby through a baby walkie-talkie. The transmission contains **fragments** of negative parenting commands, and your task is to reconstruct the most likely intended message. | |
Analyze the provided transcription and follow these steps: | |
1. **Reconstruct the most logical full command** by analyzing the given fragments and arranging them into a coherent order. | |
2. **Select the closest matching three-word action** from the following predefined options: | |
["drop it", "don't drink", "None", "stay back", "stop touching", "move away"]. | |
Always choose the most contextually relevant option. If no match is appropriate, select "None." | |
3. **Determine the parenting sentiment**, which should always be classified as "negative." | |
Your response should strictly follow this JSON format: | |
{{ | |
"RECONSTRUCTED_ORDER": "[reconstructed text]", | |
"ACTION": "[chosen action]", | |
"SENTIMENT": "negative" | |
}} | |
Transcription fragments: "{input_text}" | |
""", | |
} | |
] | |
response = self.mistral_client.chat.complete( | |
model="mistral-large-latest", | |
messages=messages, | |
) | |
result: str = response.choices[0].message.content | |
return result.strip() | |
def process_text(self, text: str) -> Dict[str, Any] | None: | |
"""Convert text into an action if a complete command is detected.""" | |
# Get sentiment first | |
self.text_buffers.append(text) | |
if len(self.text_buffers) < 3: | |
return None | |
if len(self.text_buffers) >= 3: | |
_ = self.text_buffers.pop(0) | |
candidate = self.text_buffers[-2] | |
if len(self.text_buffers[0]) <= len(candidate) <= len(self.text_buffers[-1]): | |
sentiment_and_action = self.get_sentiment_and_action(candidate) | |
return {"type": "action", "sentiment": sentiment_and_action} | |
return None | |
def run(self) -> None: | |
"""Main processing loop.""" | |
while True: | |
try: | |
# Get text from queue, blocks until text is available | |
text = self.text_queue.get() | |
# Process the text into an action | |
action = self.process_text(text) | |
# If we got a valid action, add it to the action queue | |
if action: | |
self.action_queue.put(json.dumps(action)) | |
except Exception as e: | |
logger.error(f"Error processing text: {str(e)}") | |
continue | |