Spaces:
Paused
Paused
# | |
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import math | |
import json | |
import re | |
import os | |
import numpy as np | |
from rag.nlp import rag_tokenizer | |
from api.utils.file_utils import get_project_base_directory | |
class Dealer: | |
def __init__(self): | |
self.stop_words = set(["请问", | |
"您", | |
"你", | |
"我", | |
"他", | |
"是", | |
"的", | |
"就", | |
"有", | |
"于", | |
"及", | |
"即", | |
"在", | |
"为", | |
"最", | |
"有", | |
"从", | |
"以", | |
"了", | |
"将", | |
"与", | |
"吗", | |
"吧", | |
"中", | |
"#", | |
"什么", | |
"怎么", | |
"哪个", | |
"哪些", | |
"啥", | |
"相关"]) | |
def load_dict(fnm): | |
res = {} | |
f = open(fnm, "r") | |
while True: | |
l = f.readline() | |
if not l: | |
break | |
arr = l.replace("\n", "").split("\t") | |
if len(arr) < 2: | |
res[arr[0]] = 0 | |
else: | |
res[arr[0]] = int(arr[1]) | |
c = 0 | |
for _, v in res.items(): | |
c += v | |
if c == 0: | |
return set(res.keys()) | |
return res | |
fnm = os.path.join(get_project_base_directory(), "rag/res") | |
self.ne, self.df = {}, {} | |
try: | |
self.ne = json.load(open(os.path.join(fnm, "ner.json"), "r")) | |
except Exception as e: | |
print("[WARNING] Load ner.json FAIL!") | |
try: | |
self.df = load_dict(os.path.join(fnm, "term.freq")) | |
except Exception as e: | |
print("[WARNING] Load term.freq FAIL!") | |
def pretoken(self, txt, num=False, stpwd=True): | |
patt = [ | |
r"[~—\t @#%!<>,\.\?\":;'\{\}\[\]_=\(\)\|,。?》•●○↓《;‘’:“”【¥ 】…¥!、·()×`&\\/「」\\]" | |
] | |
rewt = [ | |
] | |
for p, r in rewt: | |
txt = re.sub(p, r, txt) | |
res = [] | |
for t in rag_tokenizer.tokenize(txt).split(" "): | |
tk = t | |
if (stpwd and tk in self.stop_words) or ( | |
re.match(r"[0-9]$", tk) and not num): | |
continue | |
for p in patt: | |
if re.match(p, t): | |
tk = "#" | |
break | |
#tk = re.sub(r"([\+\\-])", r"\\\1", tk) | |
if tk != "#" and tk: | |
res.append(tk) | |
return res | |
def tokenMerge(self, tks): | |
def oneTerm(t): return len(t) == 1 or re.match(r"[0-9a-z]{1,2}$", t) | |
res, i = [], 0 | |
while i < len(tks): | |
j = i | |
if i == 0 and oneTerm(tks[i]) and len( | |
tks) > 1 and (len(tks[i + 1]) > 1 and not re.match(r"[0-9a-zA-Z]", tks[i + 1])): # 多 工位 | |
res.append(" ".join(tks[0:2])) | |
i = 2 | |
continue | |
while j < len( | |
tks) and tks[j] and tks[j] not in self.stop_words and oneTerm(tks[j]): | |
j += 1 | |
if j - i > 1: | |
if j - i < 5: | |
res.append(" ".join(tks[i:j])) | |
i = j | |
else: | |
res.append(" ".join(tks[i:i + 2])) | |
i = i + 2 | |
else: | |
if len(tks[i]) > 0: | |
res.append(tks[i]) | |
i += 1 | |
return [t for t in res if t] | |
def ner(self, t): | |
if not self.ne: | |
return "" | |
res = self.ne.get(t, "") | |
if res: | |
return res | |
def split(self, txt): | |
tks = [] | |
for t in re.sub(r"[ \t]+", " ", txt).split(" "): | |
if tks and re.match(r".*[a-zA-Z]$", tks[-1]) and \ | |
re.match(r".*[a-zA-Z]$", t) and tks and \ | |
self.ne.get(t, "") != "func" and self.ne.get(tks[-1], "") != "func": | |
tks[-1] = tks[-1] + " " + t | |
else: | |
tks.append(t) | |
return tks | |
def weights(self, tks): | |
def skill(t): | |
if t not in self.sk: | |
return 1 | |
return 6 | |
def ner(t): | |
if re.match(r"[0-9,.]{2,}$", t): | |
return 2 | |
if re.match(r"[a-z]{1,2}$", t): | |
return 0.01 | |
if not self.ne or t not in self.ne: | |
return 1 | |
m = {"toxic": 2, "func": 1, "corp": 3, "loca": 3, "sch": 3, "stock": 3, | |
"firstnm": 1} | |
return m[self.ne[t]] | |
def postag(t): | |
t = rag_tokenizer.tag(t) | |
if t in set(["r", "c", "d"]): | |
return 0.3 | |
if t in set(["ns", "nt"]): | |
return 3 | |
if t in set(["n"]): | |
return 2 | |
if re.match(r"[0-9-]+", t): | |
return 2 | |
return 1 | |
def freq(t): | |
if re.match(r"[0-9. -]{2,}$", t): | |
return 3 | |
s = rag_tokenizer.freq(t) | |
if not s and re.match(r"[a-z. -]+$", t): | |
return 300 | |
if not s: | |
s = 0 | |
if not s and len(t) >= 4: | |
s = [tt for tt in rag_tokenizer.fine_grained_tokenize(t).split(" ") if len(tt) > 1] | |
if len(s) > 1: | |
s = np.min([freq(tt) for tt in s]) / 6. | |
else: | |
s = 0 | |
return max(s, 10) | |
def df(t): | |
if re.match(r"[0-9. -]{2,}$", t): | |
return 5 | |
if t in self.df: | |
return self.df[t] + 3 | |
elif re.match(r"[a-z. -]+$", t): | |
return 300 | |
elif len(t) >= 4: | |
s = [tt for tt in rag_tokenizer.fine_grained_tokenize(t).split(" ") if len(tt) > 1] | |
if len(s) > 1: | |
return max(3, np.min([df(tt) for tt in s]) / 6.) | |
return 3 | |
def idf(s, N): return math.log10(10 + ((N - s + 0.5) / (s + 0.5))) | |
tw = [] | |
for tk in tks: | |
tt = self.tokenMerge(self.pretoken(tk, True)) | |
idf1 = np.array([idf(freq(t), 10000000) for t in tt]) | |
idf2 = np.array([idf(df(t), 1000000000) for t in tt]) | |
wts = (0.3 * idf1 + 0.7 * idf2) * \ | |
np.array([ner(t) * postag(t) for t in tt]) | |
tw.extend(zip(tt, wts)) | |
S = np.sum([s for _, s in tw]) | |
return [(t, s / S) for t, s in tw] | |