Tonic's picture
fix text classifier bias parameter thing
322840a unverified
raw
history blame
8.55 kB
from fastapi import APIRouter
from datetime import datetime
import time
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Tuple
import torch
import torch.nn as nn
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoConfig
from huggingface_hub import login
from dotenv import load_dotenv
from .utils.evaluation import TextEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
# Load environment variables
load_dotenv()
# Authenticate with Hugging Face
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')
if HF_TOKEN:
login(token=HF_TOKEN)
# Disable torch compile
os.environ["TORCH_COMPILE_DISABLE"] = "1"
router = APIRouter()
DESCRIPTION = "ModernBERT Climate Claims Classifier"
ROUTE = "/text"
class TextClassifier:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
max_retries = 3
model_name = "answerdotai/ModernBERT-base"
for attempt in range(max_retries):
try:
# Load config with modified settings
self.config = AutoConfig.from_pretrained(
model_name,
num_labels=8,
problem_type="single_label_classification",
trust_remote_code=True
)
# Remove problematic config attributes
if hasattr(self.config, 'norm_bias'):
delattr(self.config, 'norm_bias')
if hasattr(self.config, 'bias'):
delattr(self.config, 'bias')
# Initialize tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
model_max_length=8192,
padding_side='right',
truncation_side='right',
trust_remote_code=True
)
# Initialize model with modified config
self.model = AutoModelForSequenceClassification.from_pretrained(
model_name,
config=self.config,
trust_remote_code=True,
torch_dtype=torch.float32,
ignore_mismatched_sizes=True
)
# Move model to appropriate device
self.model = self.model.to(self.device)
self.model.eval()
print("Model initialized successfully")
break
except Exception as e:
if attempt == max_retries - 1:
raise Exception(f"Failed to initialize model after {max_retries} attempts: {str(e)}")
print(f"Attempt {attempt + 1} failed, retrying... Error: {str(e)}")
time.sleep(1)
def process_batch(self, batch: List[str], batch_idx: int) -> Tuple[List[int], int]:
"""Process a batch of texts and return their predictions"""
try:
print(f"Processing batch {batch_idx} with {len(batch)} items")
# Tokenize with padding and truncation
inputs = self.tokenizer(
batch,
return_tensors="pt",
truncation=True,
max_length=512,
padding=True
)
# Move inputs to device
inputs = {k: v.to(self.device) for k, v in inputs.items()}
# Get predictions
with torch.no_grad():
outputs = self.model(**inputs)
predictions = torch.argmax(outputs.logits, dim=-1).cpu().tolist()
print(f"Completed batch {batch_idx} with {len(predictions)} predictions")
return predictions, batch_idx
except Exception as e:
print(f"Error in batch {batch_idx}: {str(e)}")
return [0] * len(batch), batch_idx
def __del__(self):
# Clean up CUDA memory
if hasattr(self, 'model'):
del self.model
if torch.cuda.is_available():
torch.cuda.empty_cache()
@router.post(ROUTE, tags=["Text Task"], description=DESCRIPTION)
async def evaluate_text(request: TextEvaluationRequest):
"""Evaluate text classification for climate disinformation detection."""
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"0_not_relevant": 0,
"1_not_happening": 1,
"2_not_human": 2,
"3_not_bad": 3,
"4_solutions_harmful_unnecessary": 4,
"5_science_unreliable": 5,
"6_proponents_biased": 6,
"7_fossil_fuels_needed": 7
}
try:
# Load and prepare the dataset
dataset = load_dataset("QuotaClimat/frugalaichallenge-text-train", token=HF_TOKEN)
# Convert string labels to integers with error handling
def convert_label(example):
try:
return {"label": LABEL_MAPPING[example["label"]]}
except KeyError as e:
print(f"Warning: Unknown label {example['label']}")
# Return default label or raise exception
return {"label": 0} # or raise e if you want to fail on unknown labels
dataset = dataset.map(convert_label)
# Split dataset
test_dataset = dataset["test"]
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
true_labels = test_dataset["label"]
# Initialize the model once
classifier = TextClassifier()
# Prepare batches
batch_size = 24
quotes = test_dataset["quote"]
num_batches = len(quotes) // batch_size + (1 if len(quotes) % batch_size != 0 else 0)
batches = [
quotes[i * batch_size:(i + 1) * batch_size]
for i in range(num_batches)
]
# Initialize batch_results
batch_results = [[] for _ in range(num_batches)]
# Process batches in parallel
max_workers = min(os.cpu_count(), 4)
print(f"Processing with {max_workers} workers")
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {
executor.submit(classifier.process_batch, batch, idx): idx
for idx, batch in enumerate(batches)
}
for future in future_to_batch:
batch_idx = future_to_batch[future]
try:
predictions, idx = future.result()
if predictions:
batch_results[idx] = predictions
print(f"Stored results for batch {idx} ({len(predictions)} predictions)")
except Exception as e:
print(f"Failed to get results for batch {batch_idx}: {e}")
batch_results[batch_idx] = [0] * len(batches[batch_idx])
# Flatten predictions
predictions = []
for batch_preds in batch_results:
if batch_preds is not None:
predictions.extend(batch_preds)
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
print("accuracy:", accuracy)
# Prepare results
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
print("results:", results)
return results
except Exception as e:
print(f"Error in evaluate_text: {str(e)}")
raise Exception(f"Failed to process request: {str(e)}")