Spaces:
Runtime error
Runtime error
import subprocess | |
import base64 | |
from huggingface_hub import hf_hub_download | |
import fasttext | |
import os | |
import json | |
import pandas as pd | |
from sklearn.metrics import ( | |
precision_score, | |
recall_score, | |
f1_score, | |
confusion_matrix, | |
balanced_accuracy_score, | |
matthews_corrcoef | |
) | |
import numpy as np | |
from constants import * | |
def predict_label(text, model, language_mapping_dict, use_mapping=False): | |
""" | |
Runs predictions for a fasttext model. | |
Args: | |
text (str): The input text to classify. | |
model (fasttext.FastText._FastText): The fasttext model to use for prediction. | |
language_mapping_dict (dict): A dictionary mapping fasttext labels to human-readable language names. | |
use_mapping (bool): Whether to use the language mapping dictionary. | |
Returns: | |
str: The predicted label for the input text. | |
""" | |
# Remove any newline characters and strip whitespace | |
text = str(text).strip().replace('\n', ' ') | |
if text == '': | |
# if empty text, return EMPTY | |
return 'EMPTY' | |
try: | |
# Get top prediction | |
prediction = model.predict(text, 1) | |
# Extract label and remove __label__ prefix | |
label = prediction[0][0].replace('__label__', '') | |
# Extract confidence score | |
confidence = prediction[1][0] | |
# map label to language using language_mapping_dict | |
if use_mapping: | |
# if label not found in mapping dict, set it to other as we are not taking them into account | |
label = language_mapping_dict.get(label, 'Other') | |
return label | |
except Exception as e: | |
print(f"Error processing text: {text}") | |
print(f"Exception: {e}") | |
return {'prediction_label': 'Error', 'prediction_confidence': 0.0} | |
def compute_classification_metrics(eval_dataset): | |
""" | |
Compute comprehensive classification metrics for each class. | |
Args: | |
data (pd.DataFrame): DataFrame containing 'dialect' as true labels and 'preds' as predicted labels. | |
Returns: | |
pd.DataFrame: DataFrame with detailed metrics for each class. | |
""" | |
# transform the dataset object into a pandas DataFrame object | |
data = pd.DataFrame(eval_dataset) | |
# Extract true labels and predictions | |
true_labels = list(data['dialect']) | |
predicted_labels = list(data['preds']) | |
# Handle all unique labels | |
labels = sorted(list(set(true_labels + predicted_labels))) | |
label_to_index = {label: index for index, label in enumerate(labels)} | |
# Convert labels to indices | |
true_indices = [label_to_index[label] for label in true_labels] | |
pred_indices = [label_to_index[label] for label in predicted_labels] | |
# Compute basic metrics | |
f1_scores = f1_score(true_indices, pred_indices, average=None, labels=range(len(labels))) | |
precision_scores = precision_score(true_indices, pred_indices, average=None, labels=range(len(labels))) | |
recall_scores = recall_score(true_indices, pred_indices, average=None, labels=range(len(labels))) | |
# Compute macro, weighted and micro f1 score | |
macro_f1_score = f1_score(true_indices, pred_indices, average='macro') | |
weighted_f1_score = f1_score(true_indices, pred_indices, average='weighted') | |
micro_f1_score = f1_score(true_indices, pred_indices, average='micro') | |
# Compute confusion matrix | |
conf_mat = confusion_matrix(true_indices, pred_indices, labels=range(len(labels))) | |
# Calculate various metrics per class | |
FP = conf_mat.sum(axis=0) - np.diag(conf_mat) # False Positives | |
FN = conf_mat.sum(axis=1) - np.diag(conf_mat) # False Negatives | |
TP = np.diag(conf_mat) # True Positives | |
TN = conf_mat.sum() - (FP + FN + TP) # True Negatives | |
# Calculate sample counts per class | |
samples_per_class = np.bincount(true_indices, minlength=len(labels)) | |
# Calculate additional metrics | |
with np.errstate(divide='ignore', invalid='ignore'): | |
fp_rate = FP / (FP + TN) # False Positive Rate | |
fn_rate = FN / (FN + TP) # False Negative Rate | |
specificity = TN / (TN + FP) # True Negative Rate | |
npv = TN / (TN + FN) # Negative Predictive Value | |
# Replace NaN/inf with 0 | |
metrics = [fp_rate, fn_rate, specificity, npv] | |
metrics = [np.nan_to_num(m, nan=0.0, posinf=0.0, neginf=0.0) for m in metrics] | |
fp_rate, fn_rate, specificity, npv = metrics | |
# Calculate overall metrics | |
balanced_acc = balanced_accuracy_score(true_indices, pred_indices) | |
mcc = matthews_corrcoef(true_indices, pred_indices) | |
# Compile results into a DataFrame | |
result_df = pd.DataFrame({ | |
'country': labels, | |
'samples': samples_per_class, | |
'f1_score': f1_scores, | |
'macro_f1_score': macro_f1_score, | |
'weighted_f1_score': weighted_f1_score, | |
'micro_f1_score': micro_f1_score, | |
'precision': precision_scores, | |
'recall': recall_scores, | |
'specificity': specificity, | |
'false_positive_rate': fp_rate, | |
'false_negative_rate': fn_rate, | |
'true_positives': TP, | |
'false_positives': FP, | |
'true_negatives': TN, | |
'false_negatives': FN, | |
'negative_predictive_value': npv, | |
'balanced_accuracy': balanced_acc, | |
'matthews_correlation': mcc, | |
}) | |
# Sort by number of samples (descending) | |
result_df = result_df.sort_values('samples', ascending=False) | |
# Format all numeric columns to 4 decimal places | |
numeric_cols = result_df.select_dtypes(include=[np.number]).columns | |
result_df[numeric_cols] = result_df[numeric_cols].round(4) | |
print(f'[INFO] result_df \n: {result_df}') | |
return result_df | |
def make_binary(dialect, target): | |
if dialect != target: | |
return 'Other' | |
return target | |
def run_eval_one_vs_all(data_test, TARGET_LANG='Morocco'): | |
# map to binary | |
df_test_preds = data_test.copy() | |
df_test_preds.loc[df_test_preds['dialect'] == TARGET_LANG, 'dialect'] = TARGET_LANG | |
df_test_preds.loc[df_test_preds['dialect'] != TARGET_LANG, 'dialect'] = 'Other' | |
# compute the fpr per dialect | |
dialect_counts = data_test.groupby('dialect')['dialect'].count().reset_index(name='size') | |
result_df = pd.merge(dialect_counts, data_test, on='dialect') | |
result_df = result_df.groupby(['dialect', 'size', 'preds'])['preds'].count()/result_df.groupby(['dialect', 'size'])['preds'].count() | |
result_df.sort_index(ascending=False, level='size', inplace=True) | |
# group by dialect and get the false positive rate | |
out = result_df.copy() | |
out.name = 'false_positive_rate' | |
out = out.reset_index() | |
out = out[out['preds']==TARGET_LANG].drop(columns=['preds', 'size']) | |
print(f'[INFO] out for TARGET_LANG={TARGET_LANG} \n: {out}') | |
return out | |
def update_darija_one_vs_all_leaderboard(result_df, model_name, target_lang, DIALECT_CONFUSION_LEADERBOARD_FILE="darija_leaderboard_binary.json"): | |
# use base path to ensure correct saving | |
base_path = os.path.dirname(__file__) | |
json_file_path = os.path.join(base_path, DIALECT_CONFUSION_LEADERBOARD_FILE) | |
print(f"[INFO] Loading leaderboard data (json file) from: {json_file_path}") | |
# Load leaderboard data | |
try: | |
with open(json_file_path, "r") as f: | |
data = json.load(f) | |
except FileNotFoundError: | |
data = [] | |
# Process the results for each dialect/country | |
for _, row in result_df.iterrows(): | |
dialect = row['dialect'] | |
# Skip 'Other' class, it is considered as the null space | |
if dialect == 'Other': | |
continue | |
# Find existing target_lang entry or create a new one | |
target_entry = next((item for item in data if target_lang in item), None) | |
if target_entry is None: | |
target_entry = {target_lang: {}} | |
data.append(target_entry) | |
# Get the country-specific data for this target language | |
country_data = target_entry[target_lang] | |
# Initialize the dialect/country entry if it doesn't exist | |
if dialect not in country_data: | |
country_data[dialect] = {} | |
# Update the model metrics under the model name for the given dialect | |
country_data[dialect][model_name] = float(row['false_positive_rate']) | |
# Save updated leaderboard data | |
with open(json_file_path, "w") as f: | |
json.dump(data, f, indent=4) | |
# save_leaderboard_file(DIALECT_CONFUSION_LEADERBOARD_FILE) | |
def handle_evaluation(model_path, model_path_bin, use_mapping=False): | |
# download model and get the model path | |
model_path_hub = hf_hub_download(repo_id=model_path, filename=model_path_bin, cache_dir=None) | |
# Load the trained model | |
print(f"[INFO] Loading model from Path: {model_path_hub}, using version {model_path_bin}...") | |
model = fasttext.load_model(model_path_hub) | |
# Transform to pandas DataFrame | |
print(f"[INFO] Converting evaluation dataset to Pandas DataFrame...") | |
df_eval = pd.DataFrame(eval_dataset) | |
# Predict labels using the model | |
print(f"[INFO] Running predictions...") | |
df_eval['preds'] = df_eval['text'].apply(lambda text: predict_label(text, model, language_mapping_dict, use_mapping=use_mapping)) | |
# run the evaluation | |
result_df = run_eval(df_eval) | |
# set the model name | |
model_name = model_path + '/' + model_path_bin | |
# update the multilingual leaderboard | |
update_darija_multilingual_leaderboard(result_df, model_name, MULTI_DIALECTS_LEADERBOARD_FILE) | |
for target_lang in all_target_languages: | |
result_df_one_vs_all =run_eval_one_vs_all(df_eval, TARGET_LANG=target_lang) | |
update_darija_one_vs_all_leaderboard(result_df_one_vs_all, model_name, target_lang, DIALECT_CONFUSION_LEADERBOARD_FILE) | |
# load the updated leaderboard tables | |
df_multilingual = load_leaderboard_multilingual(MULTI_DIALECTS_LEADERBOARD_FILE) | |
df_one_vs_all = load_leaderboard_one_vs_all(DIALECT_CONFUSION_LEADERBOARD_FILE) | |
status_message = "**Evaluation now ended! 🤗**" | |
return create_leaderboard_display_multilingual(df_multilingual, target_label, default_metrics), status_message | |
def run_eval(df_eval): | |
"""Run evaluation on a dataset and compute metrics. | |
Args: | |
model: The model to evaluate. | |
DATA_PATH (str): Path to the dataset. | |
is_binary (bool): If True, evaluate as binary classification. | |
If False, evaluate as multi-class classification. | |
target_label (str): The target class label in binary mode. | |
Returns: | |
pd.DataFrame: A DataFrame containing evaluation metrics. | |
""" | |
# make a copy as the original one is used later | |
df_eval_multilingual = df_eval.copy() | |
# now drop the columns that are not needed, i.e. 'text' | |
df_eval_multilingual = df_eval_multilingual.drop(columns=['text', 'metadata', 'dataset_source']) | |
# Compute evaluation metrics | |
print(f"[INFO] Computing metrics...") | |
result_df = compute_classification_metrics(df_eval_multilingual) | |
# update_darija_multilingual_leaderboard(result_df, model_path, MULTI_DIALECTS_LEADERBOARD_FILE) | |
return result_df | |
def process_results_file(file, uploaded_model_name, base_path_save="./atlasia/submissions/", default_language='Morocco'): | |
try: | |
if file is None: | |
return "Please upload a file." | |
# Clean the model name to be safe for file paths | |
uploaded_model_name = uploaded_model_name.strip().replace(" ", "_") | |
print(f"[INFO] Uploaded model name: {uploaded_model_name}") | |
# Create the directory for saving submissions | |
path_saving = os.path.join(base_path_save, uploaded_model_name) | |
os.makedirs(path_saving, exist_ok=True) | |
# Define the full path to save the file | |
saved_file_path = os.path.join(path_saving, 'submission.csv') | |
# Read the uploaded file as DataFrame | |
print(f"[INFO] Loading csv results file...") | |
df_eval = pd.read_csv(file.name) | |
# Save the DataFrame | |
print(f"[INFO] Saving the file locally in: {saved_file_path}") | |
df_eval.to_csv(saved_file_path, index=False) | |
except Exception as e: | |
return f"Error processing file: {str(e)}" | |
# Compute evaluation metrics | |
print(f"[INFO] Computing metrics...") | |
result_df = compute_classification_metrics(df_eval) | |
# Update the leaderboards | |
update_darija_multilingual_leaderboard(result_df, uploaded_model_name, MULTI_DIALECTS_LEADERBOARD_FILE) | |
# TODO: implement this ove_vs_all differently for people only submitting csv file. They need to submit two files, one for multi-lang and the other for one-vs-all | |
# result_df_one_vs_all = run_eval_one_vs_all(...) | |
# update_darija_one_vs_all_leaderboard(...) | |
for target_lang in all_target_languages: | |
result_df_one_vs_all =run_eval_one_vs_all(df_eval, TARGET_LANG=target_lang) | |
update_darija_one_vs_all_leaderboard(result_df_one_vs_all, uploaded_model_name, target_lang, DIALECT_CONFUSION_LEADERBOARD_FILE) | |
# load the updated leaderboard tables | |
df_multilingual = load_leaderboard_multilingual(MULTI_DIALECTS_LEADERBOARD_FILE) | |
df_one_vs_all = load_leaderboard_one_vs_all(DIALECT_CONFUSION_LEADERBOARD_FILE) | |
status_message = "**Evaluation now ended! 🤗**" | |
return create_leaderboard_display_multilingual(df_multilingual, target_label, default_metrics), status_message | |
def update_darija_multilingual_leaderboard(result_df, model_name, MULTI_DIALECTS_LEADERBOARD_FILE): | |
# use base path to ensure correct saving | |
base_path = os.path.dirname(__file__) | |
json_file_path = os.path.join(base_path, MULTI_DIALECTS_LEADERBOARD_FILE) | |
# Load leaderboard data | |
try: | |
with open(json_file_path, "r") as f: | |
data = json.load(f) | |
except FileNotFoundError: | |
data = [] | |
# Process the results for each dialect/country | |
for _, row in result_df.iterrows(): | |
country = row['country'] | |
# skip 'Other' class, it is considered as the null space | |
if country == 'Other': | |
continue | |
# Create metrics dictionary directly | |
metrics = { | |
'f1_score': float(row['f1_score']), | |
'precision': float(row['precision']), | |
'recall': float(row['recall']), | |
'macro_f1_score': float(row['macro_f1_score']), | |
'micro_f1_score': float(row['micro_f1_score']), | |
'weighted_f1_score': float(row['weighted_f1_score']), | |
'specificity': float(row['specificity']), | |
'false_positive_rate': float(row['false_positive_rate']), | |
'false_negative_rate': float(row['false_negative_rate']), | |
'negative_predictive_value': float(row['negative_predictive_value']), | |
'balanced_accuracy': float(row['balanced_accuracy']), | |
'matthews_correlation': float(row['matthews_correlation']), | |
'n_test_samples': int(row['samples']) | |
} | |
# Find existing country entry or create new one | |
country_entry = next((item for item in data if country in item), None) | |
if country_entry is None: | |
country_entry = {country: {}} | |
data.append(country_entry) | |
# Update the model metrics directly under the model name | |
if country not in country_entry: | |
country_entry[country] = {} | |
country_entry[country][model_name] = metrics | |
# Save updated leaderboard data | |
with open(json_file_path, "w") as f: | |
json.dump(data, f, indent=4) | |
# save_leaderboard_file(MULTI_DIALECTS_LEADERBOARD_FILE) | |
def load_leaderboard_one_vs_all(DIALECT_CONFUSION_LEADERBOARD_FILE): | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
DIALECT_CONFUSION_LEADERBOARD_FILE = os.path.join(current_dir, DIALECT_CONFUSION_LEADERBOARD_FILE) | |
with open(DIALECT_CONFUSION_LEADERBOARD_FILE, "r") as f: | |
data = json.load(f) | |
# Initialize lists to store the flattened data | |
rows = [] | |
# Process each target language's data | |
for leaderboard_data in data: | |
for target_language, results in leaderboard_data.items(): | |
for language, models in results.items(): | |
for model_name, false_positive_rate in models.items(): | |
row = { | |
'target_language': target_language, | |
'language': language, | |
'model': model_name, | |
'false_positive_rate': false_positive_rate, | |
} | |
# Add all metrics to the row | |
rows.append(row) | |
# Convert to DataFrame | |
df = pd.DataFrame(rows) | |
# Pivot the DataFrame to create the desired structure: all languages in columns and models in rows, and each (model, target_language, language) = false_positive_rate | |
df_pivot = df.pivot(index=['model', 'target_language'], columns='language', values='false_positive_rate').reset_index() | |
return df_pivot | |
def load_leaderboard_multilingual(MULTI_DIALECTS_LEADERBOARD_FILE): | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
MULTI_DIALECTS_LEADERBOARD_FILE = os.path.join(current_dir, MULTI_DIALECTS_LEADERBOARD_FILE) | |
with open(MULTI_DIALECTS_LEADERBOARD_FILE, "r") as f: | |
data = json.load(f) | |
# Initialize lists to store the flattened data | |
rows = [] | |
# Process each country's data | |
for country_data in data: | |
for country, models in country_data.items(): | |
for model_name, metrics in models.items(): | |
row = { | |
'country': country, | |
'model': model_name, | |
} | |
# Add all metrics to the row | |
row.update(metrics) | |
rows.append(row) | |
# Convert to DataFrame | |
df = pd.DataFrame(rows) | |
return df | |
def create_leaderboard_display_one_vs_all(df, target_language, selected_languages): | |
# Filter by target_language if specified | |
if target_language: | |
df = df[df['target_language'] == target_language] | |
# Remove the target_language from selected_languages | |
if target_language in selected_languages: | |
selected_languages = [lang for lang in selected_languages if lang != target_language] | |
# Select only the chosen languages (plus 'model' column) | |
columns_to_show = ['model'] + [language for language in selected_languages if language in df.columns] | |
# Sort by first selected metric by default | |
if selected_languages: | |
df = df.sort_values(by=selected_languages[0], ascending=False) | |
df = df[columns_to_show] | |
# Format numeric columns to 4 decimal places | |
numeric_cols = df.select_dtypes(include=['float64']).columns | |
df[numeric_cols] = df[numeric_cols].round(4) | |
return df, selected_languages | |
def create_leaderboard_display_multilingual(df, selected_country, selected_metrics): | |
# Filter by country if specified | |
if selected_country and selected_country.upper() != 'ALL': | |
print(f"Filtering leaderboard by country: {selected_country}") | |
print(df.columns) | |
df = df[df['country'] == selected_country] | |
df = df.drop(columns=['country']) | |
# Select only the chosen metrics (plus 'model' column) | |
columns_to_show = ['model'] + [metric for metric in selected_metrics if metric in df.columns] | |
else: | |
# Select all metrics (plus 'country' and 'model' columns), if no country is selected or 'All' is selected for ease of comparison | |
columns_to_show = ['model', 'country'] + selected_metrics | |
# Sort by first selected metric by default | |
if selected_metrics: | |
df = df.sort_values(by=selected_metrics[0], ascending=False) | |
df = df[columns_to_show] | |
# Format numeric columns to 4 decimal places | |
numeric_cols = df.select_dtypes(include=['float64']).columns | |
df[numeric_cols] = df[numeric_cols].round(4) | |
return df | |
def update_leaderboard_multilingual(country, selected_metrics): | |
if not selected_metrics: # If no metrics selected, show all | |
selected_metrics = metrics | |
df = load_leaderboard_multilingual(MULTI_DIALECTS_LEADERBOARD_FILE) | |
display_df = create_leaderboard_display_multilingual(df, country, selected_metrics) | |
return display_df | |
def update_leaderboard_one_vs_all(target_language, selected_languages): | |
if not selected_languages: # If no language selected, show all defaults | |
selected_languages = default_languages | |
df = load_leaderboard_one_vs_all(DIALECT_CONFUSION_LEADERBOARD_FILE) | |
display_df, selected_languages = create_leaderboard_display_one_vs_all(df, target_language, selected_languages) | |
# to improve visibility in case the user chooses multiple language leading to many columns, the `model` column must remain fixed | |
# display_df = render_fixed_columns(display_df) # needs to be implemented | |
return display_df, selected_languages | |
def encode_image_to_base64(image_path): | |
""" encodes the image to base64""" | |
with open(image_path, "rb") as image_file: | |
encoded_string = base64.b64encode(image_file.read()).decode() | |
return encoded_string | |
def create_html_image(image_path): | |
""" Creates the html of the logo image from the image path input """ | |
# Get base64 string of image | |
img_base64 = encode_image_to_base64(image_path) | |
# Create HTML string with embedded image and centering styles | |
html_string = f""" | |
<div style="display: flex; justify-content: center; align-items: center; width: 100%; text-align: center;"> | |
<div style="max-width: 800px; margin: auto;"> | |
<img src="data:image/jpeg;base64,{img_base64}" | |
style="max-width: 75%; height: auto; display: block; margin: 0 auto; margin-top: 50px;" | |
alt="Displayed Image"> | |
</div> | |
</div> | |
""" | |
return html_string | |
def render_fixed_columns(df): | |
""" A function to render HTML table with fixed 'model' column for better visibility """ | |
return NotImplementedError | |
# Function to save and commit leaderboard files | |
def save_leaderboard_file(FILE_PATH): | |
# Example data to save (replace with actual leaderboard data) | |
data = {"status": "updated", "data": []} | |
# Save data in json | |
with open(FILE_PATH, "w") as f: | |
json.dump(data, f, indent=4) | |
print(f"[INFO] Saved {FILE_PATH}") | |
# Commit changes to the repository | |
try: | |
subprocess.run(["git", "add", FILE_PATH], check=True) | |
subprocess.run(["git", "commit", "-m", "Update leaderboard file"], check=True) | |
subprocess.run(["git", "push"], check=True) | |
print("[INFO] Leaderboard file committed and pushed to the repository.") | |
except subprocess.CalledProcessError as e: | |
print(f"[ERROR] Failed to commit or push changes: {e}") |