File size: 16,259 Bytes
27aa048
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
import json
import os
import re
from typing import List, Optional, Union, Dict
from sentencepiece import SentencePieceProcessor
from transformers import PreTrainedTokenizer
from transformers.utils import logging, PaddingStrategy
from transformers.tokenization_utils_base import EncodedInput, BatchEncoding


logger = logging.get_logger(__name__)


class SPTokenizer:
    def __init__(self, model_path: str):
        # reload tokenizer
        assert os.path.isfile(model_path), model_path
        self.sp_model = SentencePieceProcessor(model_file=model_path)

        # BOS / EOS token IDs
        self.n_words: int = self.sp_model.vocab_size()
        self.bos_id: int = self.sp_model.bos_id()
        self.eos_id: int = self.sp_model.eos_id()
        self.pad_id: int = self.sp_model.unk_id()
        # 确保vocab_size与piece数量一致
        assert self.sp_model.vocab_size() == self.sp_model.get_piece_size()

        # 定义聊天角色相关的特殊token
        role_special_tokens = ["<|system|>", "<|user|>", "<|assistant|>", "<|observation|>"]
        # 添加额外的通用特殊token
        special_tokens = ["[MASK]", "[gMASK]", "[sMASK]", "sop", "eop"] + role_special_tokens
        # 创建特殊token与ID之间的映射关系
        self.special_tokens = {}
        self.index_special_tokens = {}
        for token in special_tokens:
            # 分配新的词汇表ID给特殊token
            self.special_tokens[token] = self.n_words
            self.index_special_tokens[self.n_words] = token
            self.n_words += 1
        # 生成正则表达式,用于在apply_chat_template方法中查找特殊token
        self.role_special_token_expression = "|".join([re.escape(token) for token in special_tokens]) # for apply_chat_template

    def tokenize(self, s: str, encode_special_tokens=False):
        """ 对输入字符串进行分词操作,可选择是否编码特殊token
        """
        if encode_special_tokens:
            # 对特殊字符进行处理
            last_index = 0
            t = []
            for match in re.finditer(self.role_special_token_expression, s):
                # 查找并保留非特殊token部分的分词结果
                if last_index < match.start():
                    t.extend(self.sp_model.EncodeAsPieces(s[last_index:match.start()]))
                # 直接添加特殊token
                t.append(s[match.start():match.end()])
                last_index = match.end()
            # 处理剩余非特殊token部分
            if last_index < len(s):
                t.extend(self.sp_model.EncodeAsPieces(s[last_index:]))
            return t
        else:
            # 当encode_special_tokens为False时,直接调用SentencePiece模型进行分词
            return self.sp_model.EncodeAsPieces(s)

    def encode(self, s: str, bos: bool = False, eos: bool = False) -> List[int]:
        """ 将字符串转化为ID列表,可选择是否添加BOS/EOS token
        """
        assert type(s) is str
        t = self.sp_model.encode(s)
        if bos:
            t = [self.bos_id] + t
        if eos:
            t = t + [self.eos_id]
        return t

    def decode(self, t: List[int]) -> str:
        """ 将ID列表解码为字符串
        """
        text, buffer = "", []
        for token in t:
            # 处理特殊tokenID转字符串
            if token in self.index_special_tokens:
                if buffer:
                    text += self.sp_model.decode(buffer)
                    buffer = []
                text += self.index_special_tokens[token]
            else:
                buffer.append(token)
        # 解码剩余普通tokenID
        if buffer:
            text += self.sp_model.decode(buffer)
        return text

    def decode_tokens(self, tokens: List[str]) -> str:
        """ 将分词结果(List[str])解码为字符串
        """
        text = self.sp_model.DecodePieces(tokens)
        return text

    def convert_token_to_id(self, token):
        """ 将给定的token字符串转化为对应的ID
        """
        if token in self.special_tokens:
            return self.special_tokens[token]
        return self.sp_model.PieceToId(token)

    def convert_id_to_token(self, index):
        """ 将给定的ID转化为对应的token字符串
        """
        # 处理特殊tokenID
        if index in self.index_special_tokens:
            return self.index_special_tokens[index]
        # 处理边界情况和其他特殊ID
        if index in [self.eos_id, self.bos_id, self.pad_id] or index < 0 or index > self.sp_model.vocab_size():
            return ""
        # 将普通ID转换为token
        return self.sp_model.IdToPiece(index)


class ChatGLMTokenizer(PreTrainedTokenizer):
    # 预训练模型所需的文件名配置,这里指向tokenizer的model文件
    vocab_files_names = {"vocab_file": "tokenizer.model"}
    # 模型输入的特征名称列表
    model_input_names = ["input_ids", "attention_mask", "position_ids"]

    def __init__(
        self,
        vocab_file,
        padding_side="left",
        clean_up_tokenization_spaces=False,
        encode_special_tokens=False,
        **kwargs
    ):
        # 设置tokenizer的名称
        self.name = "GLMTokenizer"
        # 存储vocab文件路径
        self.vocab_file = vocab_file
        # 使用SPTokenizer作为基础分词器
        self.tokenizer = SPTokenizer(vocab_file)
        # 定义特殊token及其对应的ID
        self.special_tokens = {
            "<bos>": self.tokenizer.bos_id,
            "<eos>": self.tokenizer.eos_id,
            "<unk>": self.tokenizer.pad_id,
            "<pad>": self.tokenizer.pad_id
        }
        self.encode_special_tokens = encode_special_tokens

        super().__init__(
            padding_side=padding_side,
            clean_up_tokenization_spaces=clean_up_tokenization_spaces,
            **kwargs
        )

    def get_command(self, token):
        """ 获取指定特殊 token 对应的 id
        """
        if token in self.special_tokens:
            return self.special_tokens[token]
        # 如果不在自定义特殊 token 中,则从基础SPTokenizer的特殊 token 中查找
        assert token in self.tokenizer.special_tokens, f"{token} is not a special token for {self.name}"
        return self.tokenizer.special_tokens[token]

    @property
    def unk_token(self) -> str:
        """ 通过ID获取未登录词、填充符和结束符的字符串形式
        """
        return self.tokenizer.sp_model.IdToPiece(self.get_command("<unk>"))

    @property
    def pad_token(self) -> str:
        return self.tokenizer.sp_model.IdToPiece(self.get_command("<pad>"))

    @property
    def eos_token(self) -> str:
        return self.tokenizer.sp_model.IdToPiece(self.get_command("<eos>"))

    @property
    def unk_token_id(self) -> int:
        """ 获取未登录词、填充符和结束符的ID形式
        """
        return self.get_command("<unk>")

    @property
    def pad_token_id(self) -> int:
        return self.get_command("<pad>")

    @property
    def eos_token_id(self):
        return self.get_command("<eos>")

    @unk_token.setter
    def unk_token(self, value):
        """ 不支持设置未登录词、填充符和结束符,输出警告信息
        """
        logger.warning("Setting unk_token is not supported, use the default one.")

    @pad_token.setter
    def pad_token(self, value):
        logger.warning("Setting pad_token is not supported, use the default one.")

    @eos_token.setter
    def eos_token(self, value):
        logger.warning("Setting eos_token is not supported, use the default one.")

    @property
    def vocab_size(self):
        """ 返回整个词汇表的大小
        """
        return self.tokenizer.n_words

    def get_vocab(self):
        """ 获取词汇表字典,其中键是token,值是其对应的ID
        """
        vocab = {self._convert_id_to_token(i): i for i in range(self.vocab_size)}
        vocab.update(self.added_tokens_encoder)
        return vocab

    def _tokenize(self, text, **kwargs):
        """ 实现分词功能,利用SPTokenizer进行分词操作
        """
        return self.tokenizer.tokenize(text, encode_special_tokens=self.encode_special_tokens)

    def _convert_token_to_id(self, token):
        """ 将token字符串转化为ID
        """
        return self.tokenizer.convert_token_to_id(token)

    def _convert_id_to_token(self, index):
        """ 将ID转化为token字符串
        """
        return self.tokenizer.convert_id_to_token(index)

    def convert_tokens_to_string(self, tokens: List[str]) -> str:
        """ 将分词结果的tokens列表还原为字符串
        """
        return self.tokenizer.decode_tokens(tokens)

    def save_vocabulary(self, save_directory, filename_prefix=None):
        """ 将词汇表和特殊令牌token保存到指定目录。

        Args:
            save_directory (`str`): 将词汇表和特殊令牌文件保存到指定目录。
            filename_prefix (`str`, *optional*): 可选添加到保存文件名前的前缀。

        Returns:
            `Tuple(str)`: 保存文件的路径
        """
        if os.path.isdir(save_directory):
            vocab_file = os.path.join(
                save_directory, self.vocab_files_names["vocab_file"]
            )
        else:
            vocab_file = save_directory

        with open(self.vocab_file, 'rb') as fin:
            proto_str = fin.read()

        with open(vocab_file, "wb") as writer:
            writer.write(proto_str)

        return (vocab_file,)

    def get_prefix_tokens(self):
        """ 获取用于模型输入的前缀 token 
        """
        prefix_tokens = [self.get_command("[gMASK]"), self.get_command("sop")]
        return prefix_tokens

    def build_single_message(self, role, metadata, message):
        """ 构建单条消息的 token 序列
        """
        assert role in ["system", "user", "assistant", "observation"], role
        # 构建角色标识Token序列
        role_tokens = [self.get_command(f"<|{role}|>")] + self.tokenizer.encode(f"{metadata}\n")
        # 构建消息正文Token序列
        message_tokens = self.tokenizer.encode(message)
        # 合并角色标识Token与消息正文Token
        tokens = role_tokens + message_tokens
        return tokens

    def build_chat_input(self, query, history=None, role="user"):
        """ 根据对话历史及当前query构建模型输入
        """
        if history is None:
            history = []
        input_ids = []
        # 遍历对话历史
        for item in history:
            # 获取内容
            content = item["content"]
            # 若为系统消息且包含工具信息,将其加入内容
            if item["role"] == "system" and "tools" in item:
                content = content + "\n" + json.dumps(item["tools"], indent=4, ensure_ascii=False)
            # 构建单条历史消息的Token序列并加入到模型输入ID列表
            input_ids.extend(self.build_single_message(item["role"], item.get("metadata", ""), content))
        # 构建当前query的Token序列并加入到模型输入ID列表
        input_ids.extend(self.build_single_message(role, "", query))
        # 添加表示回复的assistant标记
        input_ids.extend([self.get_command("<|assistant|>")])
        # 调用tokenizer批量编码方法,返回PyTorch张量形式的模型输入
        return self.batch_encode_plus([input_ids], return_tensors="pt", is_split_into_words=True)

    def build_inputs_with_special_tokens(
        self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None
    ) -> List[int]:
        """ 通过拼接和添加特殊标记,从一个或两个序列构建用于序列分类任务的模型输入。
        
        BERT序列格式如下:
        - 单一序列:`[CLS] X [SEP]`
        - 序列对:`[CLS] A [SEP] B [SEP]`

        Args:
            token_ids_0 (`List[int]`): 将添加特殊token的IDs列表
            token_ids_1 (`List[int]`, *optional*): 可选的第二个序列的IDs列表,用于序列对。

        Returns:
            `List[int]`: 包含适当特殊标记的[输入IDs](../glossary#input-ids)列表。
        """
        # 获取前缀标记
        prefix_tokens = self.get_prefix_tokens()
        # 在token_ids_0前添加前缀标记
        token_ids_0 = prefix_tokens + token_ids_0
        # 若存在token_ids_1,将token_ids_0、token_ids_1连接,并添加结束标记,然后返回
        if token_ids_1 is not None:
            token_ids_0 = token_ids_0 + token_ids_1 + [self.get_command("<eos>")]
        return token_ids_0

    def _pad(
        self,
        encoded_inputs: Union[Dict[str, EncodedInput], BatchEncoding],
        max_length: Optional[int] = None,
        padding_strategy: PaddingStrategy = PaddingStrategy.DO_NOT_PAD,
        pad_to_multiple_of: Optional[int] = None,
        return_attention_mask: Optional[bool] = None,
    ) -> dict:
        """ 此方法用于对编码后的输入进行填充(左右两侧填充,直至达到预设长度或批次中的最大长度)

        Args:
            encoded_inputs: 字典形式的编码后输入,键为特征名称,值为整数列表(例如,`List[int]`),或者一批编码后的输入(例如,`List[List[int]]`)。
            max_length: 返回列表的最大长度,也可作为填充长度
            padding_strategy: 填充策略,有以下选项:
                - PaddingStrategy.LONGEST : 根据批次中最长序列进行填充
                - PaddingStrategy.MAX_LENGTH: 默认策略,填充至最大长度
                - PaddingStrategy.DO_NOT_PAD: 不进行填充
                本tokenizer的填充方向由self.padding_side属性决定:
                    - 'left': 在序列左侧填充
                    - 'right': 在序列右侧填充
            pad_to_multiple_of: (可选)若设置,则将序列填充至给定值的倍数。这对于在NVIDIA硬件上启用具有计算能力`>= 7.5`(Volta及以上)的Tensor Core非常有用。
            return_attention_mask:(可选)若设置为False,则避免返回注意力掩码(默认:根据模型特性设置
        """
        # 从模型默认设置中加载填充侧信息
        assert self.padding_side == "left"

        # 获取必要的输入特征,这里假设第一个特征为主要输入特征
        required_input = encoded_inputs[self.model_input_names[0]]
        seq_length = len(required_input)

        # 如果填充策略为最长序列,则将最大长度设置为当前序列长度
        if padding_strategy == PaddingStrategy.LONGEST:
            max_length = len(required_input)

        # 计算实际最大长度,确保满足pad_to_multiple_of的要求
        if max_length is not None and pad_to_multiple_of is not None and (max_length % pad_to_multiple_of != 0):
            max_length = ((max_length // pad_to_multiple_of) + 1) * pad_to_multiple_of

        # 判断是否需要填充
        needs_to_be_padded = padding_strategy != PaddingStrategy.DO_NOT_PAD and len(required_input) != max_length

        # 若不存在注意力掩码,则初始化
        if "attention_mask" not in encoded_inputs:
            encoded_inputs["attention_mask"] = [1] * seq_length

        if "position_ids" not in encoded_inputs:
            encoded_inputs["position_ids"] = list(range(seq_length))

        # 若需要填充,则执行填充操作
        if needs_to_be_padded:
            difference = max_length - len(required_input)
            # 对注意力掩码进行填充
            if "attention_mask" in encoded_inputs:
                encoded_inputs["attention_mask"] = [0] * difference + encoded_inputs["attention_mask"]
            # 对位置标识进行填充
            if "position_ids" in encoded_inputs:
                encoded_inputs["position_ids"] = [0] * difference + encoded_inputs["position_ids"]
            # 对主要输入特征进行填充
            encoded_inputs[self.model_input_names[0]] = [self.pad_token_id] * difference + required_input

        return encoded_inputs