|
import asyncio |
|
import pandas as pd |
|
import time |
|
import os |
|
from datetime import datetime, timedelta |
|
from tqdm.asyncio import tqdm_asyncio |
|
from models import models |
|
from tasks import tasks |
|
from languages import languages |
|
import json |
|
|
|
|
|
try: |
|
from google.cloud import storage |
|
GCS_AVAILABLE = True |
|
print("β
Google Cloud Storage available") |
|
except ImportError: |
|
GCS_AVAILABLE = False |
|
print("β Google Cloud Storage not available - install with: pip install google-cloud-storage") |
|
|
|
async def save_results_to_gcs(results, bucket_name="ai-language-eval-results"): |
|
"""Save results to Google Cloud Storage""" |
|
if not GCS_AVAILABLE: |
|
print("β Google Cloud Storage not available") |
|
return |
|
|
|
try: |
|
storage_client = storage.Client() |
|
bucket = storage_client.bucket(bucket_name) |
|
|
|
|
|
if not bucket.exists(): |
|
bucket = storage_client.create_bucket(bucket_name, location="us-central1") |
|
print(f"π¦ Created bucket: {bucket_name}") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
blob_name = f"results_{timestamp}.json" |
|
blob = bucket.blob(blob_name) |
|
|
|
|
|
results_json = json.dumps(results, indent=2) |
|
blob.upload_from_string(results_json, content_type='application/json') |
|
|
|
print(f"πΎ Results saved to GCS: gs://{bucket_name}/{blob_name}") |
|
print(f"π Download with: gsutil cp gs://{bucket_name}/{blob_name} ./") |
|
|
|
|
|
latest_blob = bucket.blob("results_latest.json") |
|
latest_blob.upload_from_string(results_json, content_type='application/json') |
|
print(f"πΎ Latest results: gs://{bucket_name}/results_latest.json") |
|
|
|
except Exception as e: |
|
print(f"β Failed to save to GCS: {e}") |
|
print("πΎ Results saved locally to results.json") |
|
|
|
results = pd.DataFrame() |
|
|
|
async def evaluate(): |
|
|
|
n_sentences = int(os.environ.get("N_SENTENCES", 1)) |
|
|
|
|
|
models_df = pd.DataFrame(models) |
|
languages_df = pd.DataFrame(languages) |
|
|
|
print(f"π Running full evaluation with {len(models_df)} models.") |
|
start_time = time.time() |
|
print(f"π Starting full evaluation at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
print(f"π Evaluating {n_sentences} sentences per task") |
|
|
|
|
|
max_languages = int(os.environ.get("MAX_LANGUAGES", 2)) |
|
top_languages = languages.head(max_languages) |
|
print(f"π Evaluating top {len(top_languages)} languages by speakers (max: {max_languages})") |
|
|
|
|
|
for n_languages in [min(max_languages, len(top_languages))]: |
|
print(f"running evaluations for {n_languages} languages") |
|
old_results = pd.read_json("results.json") |
|
if old_results.empty: |
|
old_results = pd.DataFrame(columns=["model", "bcp_47", "task", "metric", "origin", "score"]) |
|
old_models = pd.read_json("models.json") |
|
|
|
combis = [ |
|
(model, lang.bcp_47, task_name) |
|
for model in models_df["id"] |
|
for lang in top_languages.iloc[:n_languages].itertuples() |
|
for task_name, task in tasks.items() |
|
if task_name in models_df[models_df["id"] == model]["tasks"].iloc[0] |
|
] |
|
|
|
combis = pd.DataFrame(combis, columns=["model", "bcp_47", "task"]) |
|
combis = combis.merge(old_results, on=["model", "bcp_47", "task"], how="left") |
|
combis = combis[combis["metric"].isna()][["model", "bcp_47", "task"]] |
|
|
|
all_tasks = [] |
|
for i in range(n_sentences): |
|
for model, bcp_47, task_name in combis.itertuples(index=False): |
|
|
|
all_tasks.append((tasks[task_name], model, bcp_47, i)) |
|
|
|
print(f"β³ Processing {len(all_tasks)} evaluation tasks in batches...") |
|
|
|
batch_size = 50 |
|
all_results = [] |
|
|
|
for i in range(0, len(all_tasks), batch_size): |
|
batch = all_tasks[i:i+batch_size] |
|
print(f"π¦ Processing batch {i//batch_size + 1}/{(len(all_tasks) + batch_size - 1)//batch_size} ({len(batch)} tasks)") |
|
|
|
|
|
batch_summary = {} |
|
for task_data in batch: |
|
task_func, model, bcp_47, sentence_nr = task_data |
|
|
|
if hasattr(task_func, 'func'): |
|
task_name = task_func.func.__name__.replace('_and_evaluate', '') |
|
else: |
|
task_name = task_func.__name__.replace('_and_evaluate', '') |
|
|
|
if task_name not in batch_summary: |
|
batch_summary[task_name] = set() |
|
batch_summary[task_name].add(bcp_47) |
|
|
|
for task_name, languages_set in batch_summary.items(): |
|
lang_list = ', '.join(sorted(languages_set)) |
|
print(f" π {task_name}: {lang_list}") |
|
|
|
batch_coroutines = [] |
|
for task_data in batch: |
|
task_func, model, bcp_47, sentence_nr = task_data |
|
batch_coroutines.append(task_func(model, bcp_47, sentence_nr)) |
|
batch_results = await asyncio.gather(*batch_coroutines, return_exceptions=True) |
|
all_results.extend(batch_results) |
|
|
|
|
|
await asyncio.sleep(1) |
|
|
|
results = all_results |
|
|
|
valid_results = [] |
|
exception_count = 0 |
|
for r in results: |
|
if isinstance(r, Exception): |
|
exception_count += 1 |
|
continue |
|
if isinstance(r, list): |
|
valid_results.extend(r) |
|
else: |
|
valid_results.append(r) |
|
|
|
print(f"β οΈ Encountered {exception_count} API errors (model unavailable/rate limits)") |
|
print(f"β
Successfully processed {len(valid_results)} evaluations") |
|
|
|
|
|
if valid_results: |
|
results = valid_results |
|
args = dict(orient="records", indent=2, force_ascii=False) |
|
|
|
|
|
results_df = pd.DataFrame(results) |
|
if len(results_df) > 0: |
|
results_df = ( |
|
results_df.groupby(["model", "bcp_47", "task", "metric", "origin"]) |
|
.agg({"score": "mean"}) |
|
.reset_index() |
|
) |
|
|
|
old_results = pd.read_json("results.json") |
|
results_df = pd.concat([old_results, results_df]) |
|
results_df = results_df.sort_values(by=["model", "bcp_47", "task", "metric"]) |
|
results_df.to_json("results.json", **args) |
|
print(f"πΎ Saved {len(results_df)} aggregated results to results.json") |
|
else: |
|
print("β οΈ No valid results to aggregate") |
|
else: |
|
print("β οΈ No valid results to save - all API calls failed") |
|
|
|
|
|
all_models = pd.concat([pd.DataFrame(models), old_models]) |
|
all_models = all_models.drop_duplicates(subset=["id"]).sort_values(by=["id"]) |
|
all_models.to_json("models.json", **args) |
|
pd.DataFrame(languages).to_json("languages.json", **args) |
|
|
|
|
|
|
|
|
|
elapsed = time.time() - start_time |
|
elapsed_str = str(timedelta(seconds=int(elapsed))) |
|
if n_languages < max_languages: |
|
remaining_batches = (max_languages - n_languages) // 10 |
|
batch_count = max(1, n_languages // 10) |
|
estimated_remaining = elapsed * remaining_batches / batch_count |
|
eta = datetime.now() + timedelta(seconds=estimated_remaining) |
|
print(f"β±οΈ Batch completed in {elapsed_str}. ETA for full run: {eta.strftime('%H:%M:%S')}") |
|
else: |
|
print(f"β
Full evaluation completed in {elapsed_str}") |
|
print(f"π Finished at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
|
|
with open("results.json", "w") as f: |
|
json.dump(results, f, indent=2) |
|
print(f"πΎ Results saved to results.json") |
|
|
|
|
|
await save_results_to_gcs(results) |
|
|
|
return results |
|
|
|
|
|
if __name__ == "__main__": |
|
results = asyncio.run(evaluate()) |