Spaces:
Running
Running
File size: 5,488 Bytes
acd7cf4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# https://github.com/maszhongming/UniEval/tree/main
from dataclasses import dataclass, field
from tqdm import tqdm
from graphgen.models.text.text_pair import TextPair
def _add_questions(dimension: str, question: str, answer: str):
if dimension == "naturalness":
cur_input = 'question: Is this a natural response in the dialogue? </s> response: ' + answer
elif dimension == "coherence":
cur_input = 'question: Is this a coherent response given the dialogue history? </s> response: ' \
+ answer + ' </s> dialogue history: ' + question
elif dimension == "understandability":
cur_input = 'question: Is this an understandable response in the dialogue? </s> response: ' + answer
else:
raise NotImplementedError(
'The input format for this dimension is still undefined. Please customize it first.')
return cur_input
@dataclass
class UniEvaluator:
model_name: str = "MingZhong/unieval-sum"
dimensions: list = field(default_factory=lambda: ['naturalness', 'coherence', 'understandability'])
max_length: int = 2560
results: dict = None
def __post_init__(self):
import torch
self.num_gpus = torch.cuda.device_count()
self.results = {}
@staticmethod
def process_chunk(rank, pairs, model_name, max_length, dimension, return_dict):
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
device = f'cuda:{rank}'
torch.cuda.set_device(rank)
rank_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
rank_model.to(device)
rank_model.eval()
softmax = torch.nn.Softmax(dim=1)
pos_id = tokenizer("Yes")["input_ids"][0]
neg_id = tokenizer("No")["input_ids"][0]
results = []
with torch.no_grad():
for pair in tqdm(pairs):
text = _add_questions(dimension, pair.question, pair.answer)
tgt = "No"
encoded_src = tokenizer(
text,
max_length=max_length,
truncation=True,
padding=True,
return_tensors='pt'
)
encoded_tgt = tokenizer(
tgt,
max_length=max_length,
truncation=True,
padding=True,
return_tensors='pt'
)
src_tokens = encoded_src['input_ids'].to(device)
src_mask = encoded_src['attention_mask'].to(device)
tgt_tokens = encoded_tgt['input_ids'].to(device)[:, 0].unsqueeze(-1)
output = rank_model(
input_ids=src_tokens,
attention_mask=src_mask,
labels=tgt_tokens,
use_cache = False
)
logits = output.logits.view(-1, rank_model.config.vocab_size)
pos_score = softmax(logits)[:, pos_id] # Yes
neg_score = softmax(logits)[:, neg_id]
score = pos_score / (pos_score + neg_score)
results.append(score.item())
return_dict[rank] = results
def evaluate(self, pairs: list[TextPair]) -> list[dict]:
import torch.multiprocessing as mp
final_results = []
for dimension in self.dimensions:
chunk_size = len(pairs) // self.num_gpus
chunks = []
for i in range(self.num_gpus):
start = i * chunk_size
end = start + chunk_size
if i == self.num_gpus - 1:
end = len(pairs)
chunks.append(pairs[start:end])
# multi-process
manager = mp.Manager()
return_dict = manager.dict()
processes = []
for rank, chunk in enumerate(chunks):
p = mp.Process(
target=self.process_chunk,
args=(rank, chunk, self.model_name, self.max_length, dimension, return_dict)
)
p.start()
processes.append(p)
for p in processes:
p.join()
# 合并结果
results = []
for rank in range(len(chunks)):
results.extend(return_dict[rank])
for p in processes:
if p.is_alive():
p.terminate()
p.join()
final_results.append({
dimension: results
})
return final_results
def get_average_score(self, pairs: list[TextPair]) -> dict:
"""
Get the average score of a batch of texts.
"""
results = self.evaluate(pairs)
final_results = {}
for result in results:
for key, value in result.items():
final_results[key] = sum(value) / len(value)
self.results[key] = value
return final_results
def get_min_max_score(self, pairs: list[TextPair]) -> dict:
"""
Get the min and max score of a batch of texts.
"""
if self.results is None:
self.get_average_score(pairs)
final_results = {}
for key, value in self.results.items():
final_results[key] = min(value), max(value)
return final_results
|