File size: 8,242 Bytes
c87c295
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from bot_backend import *
import base64
import time
import tiktoken
from notebook_serializer import add_code_cell_error_to_notebook, add_image_to_notebook, add_code_cell_output_to_notebook

SLICED_CONV_MESSAGE = "[Rest of the conversation has been omitted to fit in the context window]"


def get_conversation_slice(conversation, model, encoding_for_which_model, min_output_tokens_count=500):
    """
    Function to get a slice of the conversation that fits in the model's context window. returns: The conversation
    with the first message(explaining the role of the assistant) + the last x messages that can fit in the context
    window.
    """
    encoder = tiktoken.encoding_for_model(encoding_for_which_model)
    count_tokens = lambda txt: len(encoder.encode(txt))
    nb_tokens = count_tokens(conversation[0]['content'])
    sliced_conv = [conversation[0]]
    context_window_limit = int(config['model_context_window'][model])
    max_tokens = context_window_limit - count_tokens(SLICED_CONV_MESSAGE) - min_output_tokens_count
    sliced = False
    for message in conversation[-1:0:-1]:
        nb_tokens += count_tokens(message['content'])
        if nb_tokens > max_tokens:
            sliced_conv.insert(1, {'role': 'system', 'content': SLICED_CONV_MESSAGE})
            sliced = True
            break
        sliced_conv.insert(1, message)
    return sliced_conv, nb_tokens, sliced


def chat_completion(bot_backend: BotBackend):
    model_choice = bot_backend.gpt_model_choice
    model_name = bot_backend.config['model'][model_choice]['model_name']
    kwargs_for_chat_completion = copy.deepcopy(bot_backend.kwargs_for_chat_completion)
    if bot_backend.config['API_TYPE'] == "azure":
        kwargs_for_chat_completion['messages'], nb_tokens, sliced = \
            get_conversation_slice(
                conversation=kwargs_for_chat_completion['messages'],
                model=model_name,
                encoding_for_which_model='gpt-3.5-turbo' if model_choice == 'GPT-3.5' else 'gpt-4'
            )
    else:
        kwargs_for_chat_completion['messages'], nb_tokens, sliced = \
            get_conversation_slice(
                conversation=kwargs_for_chat_completion['messages'],
                model=model_name,
                encoding_for_which_model=model_name
            )

    bot_backend.update_token_count(num_tokens=nb_tokens)
    bot_backend.update_sliced_state(sliced=sliced)

    assert config['model'][model_choice]['available'], f"{model_choice} is not available for your API key"

    assert model_name in config['model_context_window'], \
        f"{model_name} lacks context window information. Please check the config.json file."

    response = openai.ChatCompletion.create(**kwargs_for_chat_completion)
    return response


def add_code_execution_result_to_bot_history(content_to_display, history, unique_id):
    images, text = [], []

    # terminal output
    error_occurred = False

    for mark, out_str in content_to_display:
        if mark in ('stdout', 'execute_result_text', 'display_text'):
            text.append(out_str)
            add_code_cell_output_to_notebook(out_str)
        elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'):
            if 'png' in mark:
                images.append(('png', out_str))
                add_image_to_notebook(out_str, 'image/png')
            else:
                add_image_to_notebook(out_str, 'image/jpeg')
                images.append(('jpg', out_str))
        elif mark == 'error':
            # Set output type to error
            text.append(delete_color_control_char(out_str))
            error_occurred = True
            add_code_cell_error_to_notebook(out_str)
    text = '\n'.join(text).strip('\n')
    if error_occurred:
        history.append([None, f'❌Terminal output:\n```shell\n\n{text}\n```'])
    else:
        history.append([None, f'✔️Terminal output:\n```shell\n{text}\n```'])

    # image output
    for filetype, img in images:
        image_bytes = base64.b64decode(img)
        temp_path = f'cache/temp_{unique_id}'
        if not os.path.exists(temp_path):
            os.mkdir(temp_path)
        path = f'{temp_path}/{hash(time.time())}.{filetype}'
        with open(path, 'wb') as f:
            f.write(image_bytes)
        width, height = get_image_size(path)
        history.append(
            [
                None,
                f'<img src=\"file={path}\" style=\'{"" if width < 800 else "width: 800px;"} max-width:none; '
                f'max-height:none\'> '
            ]
        )


def add_function_response_to_bot_history(hypertext_to_display, history):
    if hypertext_to_display is not None:
        if history[-1][1]:
            history.append([None, hypertext_to_display])
        else:
            history[-1][1] = hypertext_to_display


def parse_json(function_args: str, finished: bool):
    """
    GPT may generate non-standard JSON format string, which contains '\n' in string value, leading to error when using
    `json.loads()`.
    Here we implement a parser to extract code directly from non-standard JSON string.
    :return: code string if successfully parsed otherwise None
    """
    parser_log = {
        'met_begin_{': False,
        'begin_"code"': False,
        'end_"code"': False,
        'met_:': False,
        'met_end_}': False,
        'met_end_code_"': False,
        "code_begin_index": 0,
        "code_end_index": 0
    }
    try:
        for index, char in enumerate(function_args):
            if char == '{':
                parser_log['met_begin_{'] = True
            elif parser_log['met_begin_{'] and char == '"':
                if parser_log['met_:']:
                    if finished:
                        parser_log['code_begin_index'] = index + 1
                        break
                    else:
                        if index + 1 == len(function_args):
                            return None
                        else:
                            temp_code_str = function_args[index + 1:]
                            if '\n' in temp_code_str:
                                try:
                                    return json.loads(function_args + '"}')['code']
                                except json.JSONDecodeError:
                                    try:
                                        return json.loads(function_args + '}')['code']
                                    except json.JSONDecodeError:
                                        try:
                                            return json.loads(function_args)['code']
                                        except json.JSONDecodeError:
                                            if temp_code_str[-1] in ('"', '\n'):
                                                return None
                                            else:
                                                return temp_code_str.strip('\n')
                            else:
                                return json.loads(function_args + '"}')['code']
                elif parser_log['begin_"code"']:
                    parser_log['end_"code"'] = True
                else:
                    parser_log['begin_"code"'] = True
            elif parser_log['end_"code"'] and char == ':':
                parser_log['met_:'] = True
            else:
                continue
        if finished:
            for index, char in enumerate(function_args[::-1]):
                back_index = -1 - index
                if char == '}':
                    parser_log['met_end_}'] = True
                elif parser_log['met_end_}'] and char == '"':
                    parser_log['code_end_index'] = back_index - 1
                    break
                else:
                    continue
            code_str = function_args[parser_log['code_begin_index']: parser_log['code_end_index'] + 1]
            if '\n' in code_str:
                return code_str.strip('\n')
            else:
                return json.loads(function_args)['code']

    except Exception as e:
        return None


def get_image_size(image_path):
    with Image.open(image_path) as img:
        width, height = img.size
    return width, height