L3Score / L3Score.py
Niklas Hoepner
Added cost of API calls to output
d9007fb
# Copyright 2020 The HuggingFace Datasets Authors and the current dataset script contributor.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer.
The metric is based on the log-probability of the Yes/No token of an LLM judge.
Metric is based on the paper: https://arxiv.org/pdf/2407.09413
"""
import os
import evaluate
import datasets
import numpy as np
import openai
from langchain.chat_models.base import init_chat_model
from litellm import model_cost
_CITATION = """\
@article{pramanick2024spiqa,
title={Spiqa: A dataset for multimodal question answering on scientific papers},
author={Pramanick, Shraman and Chellappa, Rama and Venugopalan, Subhashini},
journal={arXiv preprint arXiv:2407.09413},
year={2024}
}
"""
_DESCRIPTION = """\
Implements the L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer.
The metric is based on the log-probability of the Yes/No token of an LLM judge.
Metric is based on the paper: https://arxiv.org/pdf/2407.09413
"""
_KWARGS_DESCRIPTION = """
Implements the L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer.
Args:
questions: list of questions to score. Each question should be a string.
predictions: list of predictions to score. Each predictions
should be a string.
references: list of reference for each prediction. Each
reference should be a string.
Returns:
L3Score: mean L3Score for all (question, prediction, reference) triplets.
Cost: total cost of the LLM calls.
Examples:
Example 1: High certainty the prediction is the same as the ground-truth.
>>> L3Score = evaluate.load("L3Score")
>>> L3Score.compute(questions=["What is the capital of France?"], predictions=["Paris"], references=["Paris"], api_key="your-openai-api-key", provider="openai", model="gpt-4o-mini")
{'L3Score': 0.99..., 'Cost': ...}
Example 2: High certainty the prediction is not the same as the ground-truth.
>>> L3Score = evaluate.load("L3Score")
>>> L3Score.compute(questions=["What is the capital of Germany?"], predictions=["Moscow"], references=["Berlin"], api_key="your-openai-api-key", provider="openai", model="gpt-4o-mini")
{'L3Score': 0.00..., 'Cost': ...}
"""
PROVIDER_WITH_TOP_LOGPROBS = ["openai", "deepseek", "xai"]
_PROMPT = "You are given a question, ground-truth answer, and a candidate answer. Question: {question} \nGround-truth answer: {gt} \nCandidate answer: {answer} \n\
Is the semantic meaning of the ground-truth and candidate answers similar? Answer in one word - Yes or No."
_SUFFIXES_TO_SCORE = [" yes", " yeah"]
_COMPLEMENT_SUFFIXES = [" no"]
NEGATIVE_INF = -1000.0
@evaluate.utils.file_utils.add_start_docstrings(_DESCRIPTION, _KWARGS_DESCRIPTION)
class L3Score(evaluate.Metric):
"""
L3Score metric to score the quality of a free-form answer given a question and a ground-truth answer.
The metric is based on the log-probability of the Yes/No token of an LLM judge.
Metric is from the paper: https://arxiv.org/pdf/2407.09413
"""
def _info(self):
return evaluate.MetricInfo(
module_type="metric",
description=_DESCRIPTION,
citation=_CITATION,
inputs_description=_KWARGS_DESCRIPTION,
features=datasets.Features(
{
"questions": datasets.Value("string"),
"predictions": datasets.Value("string"),
"references": datasets.Value("string"),
}
),
homepage="https://github.com/google/spiqa",
codebase_urls=[
"https://github.com/google/spiqa/blob/main/metrics/llmlogscore/llmlogscore.py"
],
reference_urls=[
"https://arxiv.org/pdf/2407.09413",
"https://github.com/google/spiqa",
"https://huggingface.co/datasets/google/spiqa",
],
)
def _download_and_prepare(self, dl_manager):
"""Optional: download external resources useful to compute the scores"""
pass
def _verify_input(
self, questions, predictions, references, provider, api_key, model
):
"""Verify the input parameters"""
if provider not in PROVIDER_WITH_TOP_LOGPROBS:
raise ValueError(
"Provider must offer top_logprobs to use this metric, pick from {}".format(
PROVIDER_WITH_TOP_LOGPROBS
)
)
# Check whether the model is available
if provider == "openai":
client = openai.OpenAI(api_key=api_key)
model_names = set([model.id for model in client.models.list()])
if model not in model_names:
raise ValueError(
f"Model {model} not found for provider {provider}, available models: {model_names}"
)
elif provider == "deepseek":
client = openai.OpenAI(api_key=api_key, base_url="https://api.deepseek.com")
model_names = [model.id for model in client.models.list()]
if model not in model_names:
raise ValueError(
f"Model {model} not found for provider {provider}, available models: {model_names}"
)
elif provider == "xai":
client = openai.OpenAI(api_key=api_key, base_url="https://api.xai.com")
model_names = [model.id for model in client.models.list()]
if model not in model_names:
raise ValueError(
f"Model {model} not found for provider {provider}, available models: {model_names}"
)
assert (
len(questions) == len(predictions) == len(references)
), "Questions, predictions and references must have the same length"
def _get_llm(self, model, api_key):
"""Get the LLM"""
llm = init_chat_model(model=model, api_key=api_key)
llm = llm.bind(logprobs=True, top_logprobs=5)
self._model_cost = model_cost[llm.model_name]
return llm
def _compute(
self,
questions,
predictions,
references,
api_key,
provider="openai",
model="gpt-4o-mini",
):
"""Returns the scores"""
# Check whether llm can be initialized
try:
self._verify_input(
questions, predictions, references, provider, api_key, model
)
except ValueError as e:
return {"error": str(e)}
except openai.AuthenticationError as e:
message = e.body["message"]
return {"error": f"Authentication failed: {message}"}
except Exception as e:
return {
"error": f"An error occurred when verifying the provider/model match: {e}"
}
# Initialize the LLM
llm = self._get_llm(model, api_key)
L3Score = 0
count = 0
total_cost = 0
for question, prediction, reference in zip(questions, predictions, references):
try:
response = llm.invoke(
(
"human",
_PROMPT.format(
question=question, gt=reference, answer=prediction
),
)
)
cost = self._get_cost(response)
total_cost += cost
except openai.AuthenticationError as e:
message = e.body["message"]
return {"error": f"Authentication failed: {message}"}
except openai.RateLimitError as e:
message = e.body["message"]
return {"error": "Rate limit exceeded: {}".format(e)}
except openai.BadRequestError as e:
message = e.body["message"]
return {"error": "Bad request: {}".format(e)}
except Exception as e:
message = e.body["message"]
return {"error": "An error occurred: {}".format(e)}
score = self._calculate_L3Score(
response.response_metadata["logprobs"]["content"][0]["top_logprobs"]
)
L3Score += score.item()
count += 1
if count > 0:
L3Score = L3Score / count
return {
"L3Score": L3Score,
"Cost": total_cost,
}
def _calculate_L3Score(self, top_logprobs):
"""
Calculates the L3 score for a given response.
"""
normalized_suffixes = [self._normalize(suffix) for suffix in _SUFFIXES_TO_SCORE]
normalized_complement_suffixes = [
self._normalize(complement_suffix)
for complement_suffix in _COMPLEMENT_SUFFIXES
]
suffix_logprob = NEGATIVE_INF
complement_logprob = NEGATIVE_INF
suffix_index = -1
complement_suffix_index = -1
for i, token_logprob in enumerate(top_logprobs):
if self._normalize(token_logprob["token"]) in normalized_suffixes:
suffix_logprob = token_logprob["logprob"]
suffix_index = i
break
for i, token_logprob in enumerate(top_logprobs):
if (
self._normalize(token_logprob["token"])
in normalized_complement_suffixes
):
complement_suffix_index = i
complement_logprob = token_logprob["logprob"]
break
if suffix_index == -1 and complement_suffix_index == -1:
return 0.0
if suffix_index != -1 and complement_suffix_index != -1:
return self._renormalize_score(
yes_score=suffix_logprob, no_score=complement_logprob
)
lowest_logprob = top_logprobs[-1]["logprob"]
lowest_token_prob = np.exp(lowest_logprob)
sum_probs = sum(
[np.exp(token_logprob["logprob"]) for token_logprob in top_logprobs]
)
remaining_prob = 1 - sum_probs
min_prob = min(lowest_token_prob, remaining_prob)
if min_prob < 1e-8:
min_prob = 1e-8
reciprocal_logprob = np.log(min_prob)
if suffix_index != -1:
exclude_score = suffix_logprob
include_score = reciprocal_logprob
elif complement_suffix_index != -1:
exclude_score = reciprocal_logprob
include_score = complement_logprob
return self._renormalize_score(yes_score=exclude_score, no_score=include_score)
def _renormalize_score(self, yes_score: float, no_score: float) -> float:
"""Renormalize the scores to be between 0 and 1."""
return 1 / (1 + np.exp(-(yes_score - no_score)))
def _normalize(self, text: str) -> str:
"""Remove white space and lower case for normalized comparisons."""
return text.strip().lower()
def _get_cost(self, response):
"""Get the cost of the response"""
return (
self._model_cost["input_cost_per_token"]
* response.usage_metadata["input_tokens"]
+ self._model_cost["output_cost_per_token"]
* response.usage_metadata["output_tokens"]
)