Spaces:
				
			
			
	
			
			
		Runtime error
		
	
	
	
			
			
	
	
	
	
		
		
		Runtime error
		
	File size: 8,242 Bytes
			
			| c87c295 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | from bot_backend import *
import base64
import time
import tiktoken
from notebook_serializer import add_code_cell_error_to_notebook, add_image_to_notebook, add_code_cell_output_to_notebook
SLICED_CONV_MESSAGE = "[Rest of the conversation has been omitted to fit in the context window]"
def get_conversation_slice(conversation, model, encoding_for_which_model, min_output_tokens_count=500):
    """
    Function to get a slice of the conversation that fits in the model's context window. returns: The conversation
    with the first message(explaining the role of the assistant) + the last x messages that can fit in the context
    window.
    """
    encoder = tiktoken.encoding_for_model(encoding_for_which_model)
    count_tokens = lambda txt: len(encoder.encode(txt))
    nb_tokens = count_tokens(conversation[0]['content'])
    sliced_conv = [conversation[0]]
    context_window_limit = int(config['model_context_window'][model])
    max_tokens = context_window_limit - count_tokens(SLICED_CONV_MESSAGE) - min_output_tokens_count
    sliced = False
    for message in conversation[-1:0:-1]:
        nb_tokens += count_tokens(message['content'])
        if nb_tokens > max_tokens:
            sliced_conv.insert(1, {'role': 'system', 'content': SLICED_CONV_MESSAGE})
            sliced = True
            break
        sliced_conv.insert(1, message)
    return sliced_conv, nb_tokens, sliced
def chat_completion(bot_backend: BotBackend):
    model_choice = bot_backend.gpt_model_choice
    model_name = bot_backend.config['model'][model_choice]['model_name']
    kwargs_for_chat_completion = copy.deepcopy(bot_backend.kwargs_for_chat_completion)
    if bot_backend.config['API_TYPE'] == "azure":
        kwargs_for_chat_completion['messages'], nb_tokens, sliced = \
            get_conversation_slice(
                conversation=kwargs_for_chat_completion['messages'],
                model=model_name,
                encoding_for_which_model='gpt-3.5-turbo' if model_choice == 'GPT-3.5' else 'gpt-4'
            )
    else:
        kwargs_for_chat_completion['messages'], nb_tokens, sliced = \
            get_conversation_slice(
                conversation=kwargs_for_chat_completion['messages'],
                model=model_name,
                encoding_for_which_model=model_name
            )
    bot_backend.update_token_count(num_tokens=nb_tokens)
    bot_backend.update_sliced_state(sliced=sliced)
    assert config['model'][model_choice]['available'], f"{model_choice} is not available for your API key"
    assert model_name in config['model_context_window'], \
        f"{model_name} lacks context window information. Please check the config.json file."
    response = openai.ChatCompletion.create(**kwargs_for_chat_completion)
    return response
def add_code_execution_result_to_bot_history(content_to_display, history, unique_id):
    images, text = [], []
    # terminal output
    error_occurred = False
    for mark, out_str in content_to_display:
        if mark in ('stdout', 'execute_result_text', 'display_text'):
            text.append(out_str)
            add_code_cell_output_to_notebook(out_str)
        elif mark in ('execute_result_png', 'execute_result_jpeg', 'display_png', 'display_jpeg'):
            if 'png' in mark:
                images.append(('png', out_str))
                add_image_to_notebook(out_str, 'image/png')
            else:
                add_image_to_notebook(out_str, 'image/jpeg')
                images.append(('jpg', out_str))
        elif mark == 'error':
            # Set output type to error
            text.append(delete_color_control_char(out_str))
            error_occurred = True
            add_code_cell_error_to_notebook(out_str)
    text = '\n'.join(text).strip('\n')
    if error_occurred:
        history.append([None, f'❌Terminal output:\n```shell\n\n{text}\n```'])
    else:
        history.append([None, f'✔️Terminal output:\n```shell\n{text}\n```'])
    # image output
    for filetype, img in images:
        image_bytes = base64.b64decode(img)
        temp_path = f'cache/temp_{unique_id}'
        if not os.path.exists(temp_path):
            os.mkdir(temp_path)
        path = f'{temp_path}/{hash(time.time())}.{filetype}'
        with open(path, 'wb') as f:
            f.write(image_bytes)
        width, height = get_image_size(path)
        history.append(
            [
                None,
                f'<img src=\"file={path}\" style=\'{"" if width < 800 else "width: 800px;"} max-width:none; '
                f'max-height:none\'> '
            ]
        )
def add_function_response_to_bot_history(hypertext_to_display, history):
    if hypertext_to_display is not None:
        if history[-1][1]:
            history.append([None, hypertext_to_display])
        else:
            history[-1][1] = hypertext_to_display
def parse_json(function_args: str, finished: bool):
    """
    GPT may generate non-standard JSON format string, which contains '\n' in string value, leading to error when using
    `json.loads()`.
    Here we implement a parser to extract code directly from non-standard JSON string.
    :return: code string if successfully parsed otherwise None
    """
    parser_log = {
        'met_begin_{': False,
        'begin_"code"': False,
        'end_"code"': False,
        'met_:': False,
        'met_end_}': False,
        'met_end_code_"': False,
        "code_begin_index": 0,
        "code_end_index": 0
    }
    try:
        for index, char in enumerate(function_args):
            if char == '{':
                parser_log['met_begin_{'] = True
            elif parser_log['met_begin_{'] and char == '"':
                if parser_log['met_:']:
                    if finished:
                        parser_log['code_begin_index'] = index + 1
                        break
                    else:
                        if index + 1 == len(function_args):
                            return None
                        else:
                            temp_code_str = function_args[index + 1:]
                            if '\n' in temp_code_str:
                                try:
                                    return json.loads(function_args + '"}')['code']
                                except json.JSONDecodeError:
                                    try:
                                        return json.loads(function_args + '}')['code']
                                    except json.JSONDecodeError:
                                        try:
                                            return json.loads(function_args)['code']
                                        except json.JSONDecodeError:
                                            if temp_code_str[-1] in ('"', '\n'):
                                                return None
                                            else:
                                                return temp_code_str.strip('\n')
                            else:
                                return json.loads(function_args + '"}')['code']
                elif parser_log['begin_"code"']:
                    parser_log['end_"code"'] = True
                else:
                    parser_log['begin_"code"'] = True
            elif parser_log['end_"code"'] and char == ':':
                parser_log['met_:'] = True
            else:
                continue
        if finished:
            for index, char in enumerate(function_args[::-1]):
                back_index = -1 - index
                if char == '}':
                    parser_log['met_end_}'] = True
                elif parser_log['met_end_}'] and char == '"':
                    parser_log['code_end_index'] = back_index - 1
                    break
                else:
                    continue
            code_str = function_args[parser_log['code_begin_index']: parser_log['code_end_index'] + 1]
            if '\n' in code_str:
                return code_str.strip('\n')
            else:
                return json.loads(function_args)['code']
    except Exception as e:
        return None
def get_image_size(image_path):
    with Image.open(image_path) as img:
        width, height = img.size
    return width, height
 | 
