File size: 8,396 Bytes
2500165
 
 
38f413c
2500165
 
 
 
 
 
 
 
 
 
 
 
 
 
38f413c
 
 
 
 
 
2500165
 
38f413c
 
2500165
 
38f413c
 
 
 
 
2500165
38f413c
 
2500165
38f413c
 
 
 
 
2500165
38f413c
 
 
2500165
38f413c
 
2500165
38f413c
 
 
 
 
 
2500165
38f413c
 
2500165
 
38f413c
 
 
 
2500165
38f413c
 
 
 
 
2500165
38f413c
 
 
2500165
38f413c
2500165
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38f413c
 
 
 
 
 
 
 
 
 
2500165
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# Copyright (c) 2024, MeetKai Inc. All rights reserved.

from copy import deepcopy
import datetime
import json
from typing import Any, Dict, List, Literal, Optional, Union

import jsonref
from pydantic import BaseModel, Field, model_validator
from typing_extensions import Self

from transformers.tokenization_utils_base import BatchEncoding
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast
from transformers.utils import TensorType, logging


logger = logging.get_logger(__name__)

def get_instruction_string(custom_tool_definition) -> str:
    name, description = (
        custom_tool_definition["name"],
        custom_tool_definition["description"],
    )
    return f"Use the function '{name}' to '{description}'"


def get_parameters_string(custom_tool_definition) -> str:
    return json.dumps(custom_tool_definition)


def get_system_prompt_for_custom_tools(custom_tools: List) -> str:
    custom_tool_params = ""
    for t in custom_tools:
        custom_tool_params += get_instruction_string(t) + "\n"
        custom_tool_params += get_parameters_string(t) + "\n\n"

    content = f"""
You have access to the following functions:

{custom_tool_params}
Think very carefully before calling functions.
If a you choose to call a function ONLY reply in the following format:
<{{start_tag}}={{function_name}}>{{parameters}}{{end_tag}}
where

start_tag => `<function`
parameters => a JSON dict with the function argument name as key and function argument value as value.
end_tag => `</function>`

Here is an example,
<function=example_function_name>{{"example_name": "example_value"}}</function>

Reminder:
- If looking for real time information use relevant functions before falling back to brave_search
- Function calls MUST follow the specified format, start with <function= and end with </function>
- Required parameters MUST be specified
- Only call one function at a time
- Put the entire function call reply on one line

"""
    return content


def get_system_message_for_tools(tools: List[Dict], use_code_interpreter) -> List[Dict]:
    content = ""
    if use_code_interpreter:
        content += "Environment: ipython\n"

    current_date = datetime.datetime.now()
    formatted_date = current_date.strftime("%d %B %Y")
    date_str = f"""
Cutting Knowledge Date: December 2023\n\n"""
    content += date_str

    if tools:
        custom_message = get_system_prompt_for_custom_tools(tools)
        content += custom_message

    return {"role": "system", "content": content}


class FunctionaryTokenizer(PreTrainedTokenizerFast):
    def apply_chat_template(
        self,
        conversation: Union[List[Dict[str, str]], List[List[Dict[str, str]]], str],
        tools: Optional[List[Dict[str, Any]]],
        chat_template: Optional[str] = None,
        add_generation_prompt: bool = False,
        tokenize: bool = True,
        padding: bool = False,
        truncation: bool = False,
        max_length: Optional[int] = None,
        return_tensors: Optional[Union[str, TensorType]] = None,
        return_dict: bool = False,
        tokenizer_kwargs: Optional[Dict[str, Any]] = None,
        **kwargs,
    ) -> Union[str, List[int], List[str], List[List[int]], BatchEncoding]:

        if return_dict and not tokenize:
            raise ValueError(
                "`return_dict=True` is incompatible with `tokenize=False`, because there is no dict "
                "of tokenizer outputs to return."
            )

        if tokenizer_kwargs is None:
            tokenizer_kwargs = {}

        using_default_template = False
        
        # First, handle the cases when the model has a dict of multiple templates
        if isinstance(self.chat_template, dict) or (
            self.chat_template is None and isinstance(self.default_chat_template, dict)
        ):
            if self.chat_template is not None:
                template_dict = self.chat_template
                using_default_dict = False
            else:
                template_dict = self.default_chat_template
                using_default_dict = True
            if chat_template is not None and chat_template in template_dict:
                # The user can pass the name of a template to the chat template argument instead of an entire template
                chat_template = template_dict[chat_template]
                if using_default_dict:
                    using_default_template = True
            elif chat_template is None and "default" in template_dict:
                chat_template = template_dict["default"]
                if using_default_dict:
                    using_default_template = True
            elif chat_template is None:
                raise ValueError(
                    "This model has multiple chat templates with no default specified! Please either pass a chat "
                    "template or the name of the template you wish to use to the `chat_template` argument. Available "
                    f"template names are {sorted(template_dict.keys())}."
                )
        elif chat_template is None:
            # These are the cases when the model has a single template
            # priority: `chat_template` argument > `tokenizer.chat_template` > `tokenizer.default_chat_template
            if self.chat_template is not None:
                chat_template = self.chat_template
            else:
                chat_template = self.default_chat_template
                using_default_template = True
                
        if using_default_template:
            logger.warning_once(
                "No chat template is set for this tokenizer, falling back to a default class-level template. This is "
                "very error-prone, because models are often trained with templates different from the class default! "
                "Default chat templates are a legacy feature and will be removed in Transformers v4.43, at which "
                "point any code depending on them will stop working. We recommend setting a valid chat template before "
                "then to ensure that this model continues working without issues."
            )
            
        # Prepare tools/functions into schema
        functions_pydantic_to_render = []
        has_code_interpreter = False
        if tools is not None:
            for item in tools:
                if "function" in item and item["function"] is not None:
                    functions_pydantic_to_render.append(item["function"])
                elif "type" in item and item["type"] == "code_interpreter":
                    has_code_interpreter = True
                else:
                    functions_pydantic_to_render.append(item)
        tools_system_message = get_system_message_for_tools(functions_pydantic_to_render, has_code_interpreter)
        conversation.insert(0, tools_system_message)

        # Compilation function uses a cache to avoid recompiling the same template
        compiled_template = self._compile_jinja_template(chat_template)
        
        if isinstance(conversation, (list, tuple)) and (
            isinstance(conversation[0], (list, tuple)) or hasattr(conversation[0], "messages")
        ):
            conversations = conversation
            is_batched = True
        else:
            conversations = [conversation]
            is_batched = False

        rendered = []
        template_kwargs = {**self.special_tokens_map, **kwargs}  # kwargs overwrite special tokens if both are present
        for chat in conversations:
            if hasattr(chat, "messages"):
                # Indicates it's a Conversation object
                chat = chat.messages
            rendered_chat = compiled_template.render(
                messages=chat, add_generation_prompt=add_generation_prompt, **template_kwargs
            )
            rendered.append(rendered_chat)

        if not is_batched:
            rendered = rendered[0]

        if tokenize:
            out = self(
                rendered,
                padding=padding,
                truncation=truncation,
                max_length=max_length,
                add_special_tokens=False,
                return_tensors=return_tensors,
                **tokenizer_kwargs,
            )
            if return_dict:
                return out
            else:
                return out["input_ids"]
        else:
            return rendered