File size: 3,774 Bytes
6831f1f
3d7f69e
0f6f535
6831f1f
 
3d7f69e
 
9254534
3d7f69e
 
 
 
 
 
 
 
 
6831f1f
 
 
3d7f69e
 
 
 
 
 
6831f1f
 
 
0f6f535
9254534
6831f1f
 
0f6f535
457d4b2
9254534
 
 
0f6f535
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9254534
0f6f535
45ab685
9254534
 
457d4b2
9254534
0f6f535
457d4b2
45ab685
457d4b2
6831f1f
 
457d4b2
0f6f535
 
 
5813146
 
0f6f535
 
 
 
 
 
 
 
6831f1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d7f69e
6831f1f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from threading import Thread
from multiprocessing import Queue
from typing import Dict, Any, List
import json
import re
import logging
import sys
from mistralai import Mistral

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

logger = logging.getLogger(__name__)


class ActionProcessor(Thread):
    def __init__(
        self,
        text_queue: "Queue[str]",
        action_queue: "Queue[str]",
        mistral_api_key: str,
    ):
        super().__init__()
        self.text_queue = text_queue
        self.action_queue = action_queue
        self.text_buffers: List[str] = []
        self.mistral_client = Mistral(api_key=mistral_api_key)
        self.daemon = True  # Thread will exit when main program exits

    def get_sentiment_and_action(self, input_text: str) -> str:
        """Get sentiment analysis for input text."""
        messages = [
            {
                "role": "user",
                "content": f"""
            The following transcription is a broken transmission overheard by a baby through a baby walkie-talkie. The transmission contains **fragments** of negative parenting commands, and your task is to reconstruct the most likely intended message.

            Analyze the provided transcription and follow these steps:

            1. **Reconstruct the most logical full command** by analyzing the given fragments and arranging them into a coherent order.
            2. **Select the closest matching three-word action** from the following predefined options:
                ["drop it", "don't drink", "None", "stay back", "stop touching", "move away"].
                Always choose the most contextually relevant option. If no match is appropriate, select "None."
            3. **Determine the parenting sentiment**, which should always be classified as "negative."

            Your response should strictly follow this JSON format:

            {{
                "RECONSTRUCTED_ORDER": "[reconstructed text]",
                "ACTION": "[chosen action]",
                "SENTIMENT": "negative"
            }}

            Transcription fragments: "{input_text}"
            """,
            }
        ]

        response = self.mistral_client.chat.complete(
            model="mistral-large-latest",
            messages=messages,
        )

        result: str = response.choices[0].message.content

        return result.strip()

    def process_text(self, text: str) -> Dict[str, Any] | None:
        """Convert text into an action if a complete command is detected."""
        # Get sentiment first
        self.text_buffers.append(text)

        if len(self.text_buffers) < 3:
            return None

        if len(self.text_buffers) >= 3:
            _ = self.text_buffers.pop(0)

        candidate = self.text_buffers[-2]

        if len(self.text_buffers[0]) <= len(candidate) <= len(self.text_buffers[-1]):
            sentiment_and_action = self.get_sentiment_and_action(candidate)
            return {"type": "action", "sentiment": sentiment_and_action}

        return None

    def run(self) -> None:
        """Main processing loop."""
        while True:
            try:
                # Get text from queue, blocks until text is available
                text = self.text_queue.get()

                # Process the text into an action
                action = self.process_text(text)

                # If we got a valid action, add it to the action queue
                if action:
                    self.action_queue.put(json.dumps(action))

            except Exception as e:
                logger.error(f"Error processing text: {str(e)}")
                continue