File size: 4,646 Bytes
93161aa
7f7ae8a
6831f1f
3d7f69e
0f6f535
3d7f69e
 
9254534
3d7f69e
 
 
 
 
 
 
 
 
6831f1f
 
 
b0b3162
 
 
 
 
 
38abb63
b0b3162
 
 
 
3d7f69e
 
 
7f7ae8a
3d7f69e
 
6831f1f
 
 
0f6f535
9254534
6831f1f
 
b0b3162
457d4b2
9254534
 
b0b3162
 
7f7ae8a
 
 
 
 
 
 
 
 
38abb63
7f7ae8a
 
b0b3162
8e56e25
 
b0b3162
 
 
 
 
 
8e56e25
b0b3162
 
8e56e25
7f7ae8a
 
8e56e25
7f7ae8a
b0b3162
 
 
 
 
9254534
0f6f535
45ab685
9254534
3256e71
 
 
 
7f7ae8a
3256e71
 
 
 
 
457d4b2
9254534
0f6f535
457d4b2
45ab685
457d4b2
6831f1f
 
457d4b2
0f6f535
 
 
5813146
 
3256e71
0f6f535
 
3256e71
0f6f535
3256e71
7f7ae8a
 
 
 
 
 
 
 
b0b3162
 
 
93161aa
b0b3162
 
 
93161aa
 
6831f1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f7ae8a
6831f1f
 
3d7f69e
6831f1f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import datetime
import json
from threading import Thread
from multiprocessing import Queue
from typing import Dict, Any, List
import logging
import sys
from mistralai import Mistral

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

logger = logging.getLogger(__name__)


class ActionProcessor(Thread):
    valid_action: List[str] = [
        "DropBleach",
        "DropSyringe",
        "DropFork",
        "GoToLivingRoom",
        "GoToBedroom",
        "GoToGarage",
        "Come",
        "None",
    ]

    def __init__(
        self,
        text_queue: "Queue[str]",
        action_queue: "Queue[Dict[str, Any]]",
        mistral_api_key: str,
    ):
        super().__init__()
        self.text_queue = text_queue
        self.action_queue = action_queue
        self.text_buffers: List[str] = []
        self.mistral_client = Mistral(api_key=mistral_api_key)
        self.daemon = True  # Thread will exit when main program exits

    def get_action_and_sentiment(self, input_text: str) -> str:
        """Get sentiment analysis for input text."""
        messages = [
            {
                "role": "system",
                "content": """
You are a transcription expert. You're listening to a parent speaking to a baby. Your goal
is to determine what the baby is asked to do and what the parent's sentiment is.

The following interpretations are possible:
- DropBleach: The parent asks to drop the bleach (or 'Javel').
- DropSyringe: The parent asks to drop the syringe.
- DropFork: The parent asks to drop the fork.
- GoToLivingRoom: The parent asks to go to the living room.
- GoToBedroom: The parent asks to go to the bedroom.
- GoToGarage: The parent asks to go to the garage.
- Come: The parent asks to come.
- None: Others instructions are not relevant.

The following sentiments are possible: badSentiment, goodSentiment, neutralSentiment

```json
[action,sentiment]
```

for example:
Input: "Don't put the fork in the socket!"
Output: ["DropFork", "badSentiment"]

Input: "Harold, please don't drink the bleach!"
Output: ["DropBleach", "goodSentiment"]

Input: "I'm so tired of this."
Output: ["None", "neutralSentiment"]
""",
            },
            {
                "role": "user",
                "content": f"Transcription fragments: {input_text}",
            },
        ]

        response = self.mistral_client.chat.complete(
            model="mistral-large-latest",
            messages=messages
            + [
                {
                    "role": "assistant",
                    "content": '["',
                    "prefix": True,
                }
            ],
            response_format={"type": "json_object"},
            temperature=0.0,
        )

        result: str = response.choices[0].message.content

        return result.strip()

    def process_text(self, text: str) -> Dict[str, Any] | None:
        """Convert text into an action if a complete command is detected."""
        # Get sentiment first
        self.text_buffers.append(text)

        if len(self.text_buffers) < 3:
            return None

        if len(self.text_buffers) > 3:
            _ = self.text_buffers.pop(0)

        candidate = self.text_buffers[1]

        if len(self.text_buffers[0]) < len(candidate) >= len(self.text_buffers[2]):
            action_and_sentiment = json.loads(self.get_action_and_sentiment(candidate))
            if (
                not isinstance(action_and_sentiment, list)
                or len(action_and_sentiment) != 2
            ):
                return None

            action, sentiment = action_and_sentiment

            if action not in self.valid_action:
                action = "None"
            return {
                "action": action,
                "sentiment": sentiment,
                "voice": candidate,
                "time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }

        return None

    def run(self) -> None:
        """Main processing loop."""
        while True:
            try:
                # Get text from queue, blocks until text is available
                text = self.text_queue.get()

                # Process the text into an action
                action = self.process_text(text)

                # If we got a valid action, add it to the action queue
                if action:
                    self.action_queue.put(action)

            except Exception as e:
                logger.error(f"Error processing text: {str(e)}")
                continue