Spaces:
Runtime error
Runtime error
import json | |
import os | |
import json | |
from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
import re | |
from src.envs import EVAL_RESULTS_PATH | |
def parse_first_word(answer): | |
# Extract the first word and check if it's 'yes' or 'no' | |
first_word = re.split(r'[\s,\.]', answer.lower())[0] | |
if first_word.startswith('yes'): | |
return 'yes' | |
elif first_word.startswith('no'): | |
return 'no' | |
else: | |
return None | |
def compute_metrics(true_labels, predicted_labels): | |
# Filtering out invalid answers | |
valid_indices = [i for i, label in enumerate(predicted_labels) if label in ['yes', 'no']] | |
filtered_true_labels = [true_labels[i] for i in valid_indices] | |
filtered_predicted_labels = [predicted_labels[i] for i in valid_indices] | |
# Calculating metrics | |
accuracy = accuracy_score(filtered_true_labels, filtered_predicted_labels) | |
precision, recall, f1_score, _ = precision_recall_fscore_support( | |
filtered_true_labels, filtered_predicted_labels, average='binary', pos_label='yes') | |
yes_ratio = filtered_predicted_labels.count('yes') / len(filtered_predicted_labels) if filtered_predicted_labels else 0 | |
return { | |
"Accuracy": accuracy, | |
"Precision": precision, | |
"Recall": recall, | |
"F1 Score": f1_score, | |
"Yes Ratio": yes_ratio | |
} | |
def aggregate_metrics(directory_path): | |
metrics_data = {"random": {"true": [], "pred": [], "invalid": []}, | |
"popular": {"true": [], "pred": [], "invalid": []}, | |
"adversarial": {"true": [], "pred": [], "invalid": []}} | |
# Process each file in the directory | |
for filename in os.listdir(directory_path): | |
if filename.endswith(".json"): | |
file_path = os.path.join(directory_path, filename) | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
question_type = filename.split('_')[0] | |
if question_type in metrics_data: | |
for entry in data[next(iter(data))]: | |
first_word = parse_first_word(entry['predicted_answer']) | |
if first_word is None: | |
metrics_data[question_type]["invalid"].append(entry['predicted_answer']) | |
metrics_data[question_type]["true"].append(entry['ground_truth_answer'].lower()) | |
metrics_data[question_type]["pred"].append(first_word if first_word else entry['predicted_answer'].lower()) | |
results = {} | |
for q_type, data in metrics_data.items(): | |
result = compute_metrics(data["true"], data["pred"]) | |
result["Non-Binary Responses Count"] = len(data["invalid"]) | |
result["Non-Binary Responses"] = data["invalid"] | |
results[q_type] = result | |
return results | |
def transform_format(data, model_name): | |
# Define the new format's base structure | |
transformed_data = { | |
"config": { | |
"model_name": model_name | |
}, | |
"results": {} | |
} | |
# Mapping of old keys to new keys | |
key_mapping = { | |
"Accuracy": "accuracy", | |
"Precision": "precision", | |
"Recall": "recall", | |
"F1 Score": "f1_score", | |
"Yes Ratio": "yes_percentage" | |
} | |
# Iterate over each item in the original data | |
for model_type, metrics in data.items(): | |
for old_key, new_suffix in key_mapping.items(): | |
# Format the new key according to the required format 2 style | |
new_key = f"{model_type}_{new_suffix}" | |
# Assign the corresponding value to the new key in the results dictionary | |
transformed_data["results"][new_key] = { | |
new_key: round(metrics[old_key], 4) if isinstance(metrics[old_key], float) else metrics[old_key] | |
} | |
return transformed_data | |
def calculate_metrics(json_output_directory, model_name): | |
final_metrics = aggregate_metrics(json_output_directory) | |
transformed_metrics = transform_format(final_metrics, model_name) | |
# write to a file | |
results_path = os.path.join(EVAL_RESULTS_PATH, '3d-pope', model_name) | |
if not os.path.exists(results_path): | |
os.makedirs(results_path) | |
with open(os.path.join(results_path, 'results.json'), 'w') as f: | |
json.dump(transformed_metrics, f, indent=4) | |
print(json.dumps(final_metrics, indent=4)) |