|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer. |
|
The metric is based on the log-probability of the Yes/No token of an LLM judge. |
|
Metric is based on the paper: https://arxiv.org/pdf/2407.09413 |
|
""" |
|
|
|
import os |
|
|
|
import evaluate |
|
import datasets |
|
import numpy as np |
|
import openai |
|
|
|
from langchain.chat_models.base import init_chat_model |
|
from litellm import model_cost |
|
|
|
_CITATION = """\ |
|
@article{pramanick2024spiqa, |
|
title={Spiqa: A dataset for multimodal question answering on scientific papers}, |
|
author={Pramanick, Shraman and Chellappa, Rama and Venugopalan, Subhashini}, |
|
journal={arXiv preprint arXiv:2407.09413}, |
|
year={2024} |
|
} |
|
""" |
|
|
|
_DESCRIPTION = """\ |
|
Implements the L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer. |
|
The metric is based on the log-probability of the Yes/No token of an LLM judge. |
|
Metric is based on the paper: https://arxiv.org/pdf/2407.09413 |
|
""" |
|
|
|
|
|
_KWARGS_DESCRIPTION = """ |
|
Implements the L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer. |
|
Args: |
|
questions: list of questions to score. Each question should be a string. |
|
predictions: list of predictions to score. Each predictions |
|
should be a string. |
|
references: list of reference for each prediction. Each |
|
reference should be a string. |
|
Returns: |
|
L3Score: mean L3Score for all (question, prediction, reference) triplets. |
|
Cost: total cost of the LLM calls. |
|
Examples: |
|
Example 1: High certainty the prediction is the same as the ground-truth. |
|
>>> L3Score = evaluate.load("L3Score") |
|
>>> L3Score.compute(questions=["What is the capital of France?"], predictions=["Paris"], references=["Paris"], api_key="your-openai-api-key", provider="openai", model="gpt-4o-mini") |
|
{'L3Score': 0.99..., 'Cost': ...} |
|
|
|
Example 2: High certainty the prediction is not the same as the ground-truth. |
|
>>> L3Score = evaluate.load("L3Score") |
|
>>> L3Score.compute(questions=["What is the capital of Germany?"], predictions=["Moscow"], references=["Berlin"], api_key="your-openai-api-key", provider="openai", model="gpt-4o-mini") |
|
{'L3Score': 0.00..., 'Cost': ...} |
|
""" |
|
|
|
|
|
PROVIDER_WITH_TOP_LOGPROBS = ["openai", "deepseek", "xai"] |
|
|
|
_PROMPT = "You are given a question, ground-truth answer, and a candidate answer. Question: {question} \nGround-truth answer: {gt} \nCandidate answer: {answer} \n\ |
|
Is the semantic meaning of the ground-truth and candidate answers similar? Answer in one word - Yes or No." |
|
|
|
_SUFFIXES_TO_SCORE = [" yes", " yeah"] |
|
_COMPLEMENT_SUFFIXES = [" no"] |
|
|
|
NEGATIVE_INF = -1000.0 |
|
|
|
|
|
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION) |
|
class L3Score(evaluate.Metric): |
|
""" |
|
L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer. |
|
The metric is based on the log-probability of the Yes/No token of an LLM judge. |
|
Metric is from the paper: https://arxiv.org/pdf/2407.09413 |
|
""" |
|
|
|
def _info(self): |
|
return evaluate.MetricInfo( |
|
module_type="metric", |
|
description=_DESCRIPTION, |
|
citation=_CITATION, |
|
inputs_description=_KWARGS_DESCRIPTION, |
|
features=datasets.Features( |
|
{ |
|
"questions": datasets.Value("string"), |
|
"predictions": datasets.Value("string"), |
|
"references": datasets.Value("string"), |
|
} |
|
), |
|
homepage="https://github.com/google/spiqa", |
|
codebase_urls=[ |
|
"https://github.com/google/spiqa/blob/main/metrics/llmlogscore/llmlogscore.py" |
|
], |
|
reference_urls=[ |
|
"https://arxiv.org/pdf/2407.09413", |
|
"https://github.com/google/spiqa", |
|
"https://huggingface.co/datasets/google/spiqa", |
|
], |
|
) |
|
|
|
def _download_and_prepare(self, dl_manager): |
|
"""Optional: download external resources useful to compute the scores""" |
|
pass |
|
|
|
def _verify_input( |
|
self, questions, predictions, references, provider, api_key, model |
|
): |
|
"""Verify the input parameters""" |
|
|
|
if provider not in PROVIDER_WITH_TOP_LOGPROBS: |
|
raise ValueError( |
|
"Provider must offer top_logprobs to use this metric, pick from {}".format( |
|
PROVIDER_WITH_TOP_LOGPROBS |
|
) |
|
) |
|
|
|
|
|
|
|
if provider == "openai": |
|
client = openai.OpenAI(api_key=api_key) |
|
model_names = set([model.id for model in client.models.list()]) |
|
if model not in model_names: |
|
raise ValueError( |
|
f"Model {model} not found for provider {provider}, available models: {model_names}" |
|
) |
|
|
|
elif provider == "deepseek": |
|
client = openai.OpenAI(api_key=api_key, base_url="https://api.deepseek.com") |
|
model_names = [model.id for model in client.models.list()] |
|
if model not in model_names: |
|
raise ValueError( |
|
f"Model {model} not found for provider {provider}, available models: {model_names}" |
|
) |
|
|
|
elif provider == "xai": |
|
client = openai.OpenAI(api_key=api_key, base_url="https://api.xai.com") |
|
model_names = [model.id for model in client.models.list()] |
|
if model not in model_names: |
|
raise ValueError( |
|
f"Model {model} not found for provider {provider}, available models: {model_names}" |
|
) |
|
|
|
assert ( |
|
len(questions) == len(predictions) == len(references) |
|
), "Questions, predictions and references must have the same length" |
|
|
|
def _get_llm(self, model, api_key): |
|
"""Get the LLM""" |
|
llm = init_chat_model(model=model, api_key=api_key) |
|
llm = llm.bind(logprobs=True, top_logprobs=5) |
|
self._model_cost = model_cost[llm.model_name] |
|
return llm |
|
|
|
def _compute( |
|
self, |
|
questions, |
|
predictions, |
|
references, |
|
api_key, |
|
provider="openai", |
|
model="gpt-4o-mini", |
|
): |
|
"""Returns the scores""" |
|
|
|
|
|
try: |
|
self._verify_input( |
|
questions, predictions, references, provider, api_key, model |
|
) |
|
except ValueError as e: |
|
return {"error": str(e)} |
|
except openai.AuthenticationError as e: |
|
message = e.body["message"] |
|
return {"error": f"Authentication failed: {message}"} |
|
except Exception as e: |
|
return { |
|
"error": f"An error occurred when verifying the provider/model match: {e}" |
|
} |
|
|
|
|
|
llm = self._get_llm(model, api_key) |
|
|
|
L3Score = 0 |
|
count = 0 |
|
total_cost = 0 |
|
for question, prediction, reference in zip(questions, predictions, references): |
|
try: |
|
response = llm.invoke( |
|
( |
|
"human", |
|
_PROMPT.format( |
|
question=question, gt=reference, answer=prediction |
|
), |
|
) |
|
) |
|
cost = self._get_cost(response) |
|
total_cost += cost |
|
except openai.AuthenticationError as e: |
|
message = e.body["message"] |
|
return {"error": f"Authentication failed: {message}"} |
|
except openai.RateLimitError as e: |
|
message = e.body["message"] |
|
return {"error": "Rate limit exceeded: {}".format(e)} |
|
except openai.BadRequestError as e: |
|
message = e.body["message"] |
|
return {"error": "Bad request: {}".format(e)} |
|
except Exception as e: |
|
message = e.body["message"] |
|
return {"error": "An error occurred: {}".format(e)} |
|
|
|
score = self._calculate_L3Score( |
|
response.response_metadata["logprobs"]["content"][0]["top_logprobs"] |
|
) |
|
L3Score += score.item() |
|
count += 1 |
|
|
|
if count > 0: |
|
L3Score = L3Score / count |
|
|
|
return { |
|
"L3Score": L3Score, |
|
"Cost": total_cost, |
|
} |
|
|
|
def _calculate_L3Score(self, top_logprobs): |
|
""" |
|
Calculates the L3 score for a given response. |
|
""" |
|
|
|
normalized_suffixes = [self._normalize(suffix) for suffix in _SUFFIXES_TO_SCORE] |
|
normalized_complement_suffixes = [ |
|
self._normalize(complement_suffix) |
|
for complement_suffix in _COMPLEMENT_SUFFIXES |
|
] |
|
|
|
suffix_logprob = NEGATIVE_INF |
|
complement_logprob = NEGATIVE_INF |
|
suffix_index = -1 |
|
complement_suffix_index = -1 |
|
|
|
for i, token_logprob in enumerate(top_logprobs): |
|
if self._normalize(token_logprob["token"]) in normalized_suffixes: |
|
suffix_logprob = token_logprob["logprob"] |
|
suffix_index = i |
|
break |
|
|
|
for i, token_logprob in enumerate(top_logprobs): |
|
if ( |
|
self._normalize(token_logprob["token"]) |
|
in normalized_complement_suffixes |
|
): |
|
complement_suffix_index = i |
|
complement_logprob = token_logprob["logprob"] |
|
break |
|
|
|
if suffix_index == -1 and complement_suffix_index == -1: |
|
return 0.0 |
|
|
|
if suffix_index != -1 and complement_suffix_index != -1: |
|
return self._renormalize_score( |
|
yes_score=suffix_logprob, no_score=complement_logprob |
|
) |
|
|
|
lowest_logprob = top_logprobs[-1]["logprob"] |
|
lowest_token_prob = np.exp(lowest_logprob) |
|
sum_probs = sum( |
|
[np.exp(token_logprob["logprob"]) for token_logprob in top_logprobs] |
|
) |
|
remaining_prob = 1 - sum_probs |
|
min_prob = min(lowest_token_prob, remaining_prob) |
|
if min_prob < 1e-8: |
|
min_prob = 1e-8 |
|
reciprocal_logprob = np.log(min_prob) |
|
|
|
if suffix_index != -1: |
|
exclude_score = suffix_logprob |
|
include_score = reciprocal_logprob |
|
elif complement_suffix_index != -1: |
|
exclude_score = reciprocal_logprob |
|
include_score = complement_logprob |
|
|
|
return self._renormalize_score(yes_score=exclude_score, no_score=include_score) |
|
|
|
def _renormalize_score(self, yes_score: float, no_score: float) -> float: |
|
"""Renormalize the scores to be between 0 and 1.""" |
|
return 1 / (1 + np.exp(-(yes_score - no_score))) |
|
|
|
def _normalize(self, text: str) -> str: |
|
"""Remove white space and lower case for normalized comparisons.""" |
|
return text.strip().lower() |
|
|
|
def _get_cost(self, response): |
|
"""Get the cost of the response""" |
|
return ( |
|
self._model_cost["input_cost_per_token"] |
|
* response.usage_metadata["input_tokens"] |
|
+ self._model_cost["output_cost_per_token"] |
|
* response.usage_metadata["output_tokens"] |
|
) |
|
|
|
|
|
|
|
|