Zen0's picture
Update tasks/text.py
f63546e verified
raw
history blame
4.66 kB
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from fastapi import FastAPI, APIRouter
from fastapi.middleware.cors import CORSMiddleware
from datetime import datetime
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import torch
import numpy as np
from .utils.evaluation import TextEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info
# Initialize FastAPI app and router
app = FastAPI()
router = APIRouter()
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DESCRIPTION = "Efficient Climate Disinformation Detection"
ROUTE = "/text"
@router.post("/text", tags=["Text Task"], description=DESCRIPTION)
async def evaluate_text(request: TextEvaluationRequest):
"""
Evaluate text classification for climate disinformation detection.
"""
# Get space info
username, space_url = get_space_info()
# Define the label mapping
LABEL_MAPPING = {
"0_not_relevant": 0,
"1_not_happening": 1,
"2_not_human": 2,
"3_not_bad": 3,
"4_solutions_harmful_unnecessary": 4,
"5_science_unreliable": 5,
"6_proponents_biased": 6,
"7_fossil_fuels_needed": 7
}
# Load and prepare the dataset
dataset = load_dataset(request.dataset_name)
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]})
train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed)
test_dataset = train_test["test"]
# Start tracking emissions
tracker.start()
tracker.start_task("inference")
try:
# Model configuration
model_name = "distilbert-base-uncased"
BATCH_SIZE = 64
MAX_LENGTH = 128
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=8,
problem_type="single_label_classification"
)
# Enable mixed precision if available
if torch.cuda.is_available():
model = model.half()
# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
model.eval()
# Get test texts
test_texts = test_dataset["quote"]
predictions = []
# Process in batches
for i in range(0, len(test_texts), BATCH_SIZE):
if torch.cuda.is_available():
torch.cuda.empty_cache()
batch_texts = test_texts[i:i + BATCH_SIZE]
# Tokenize batch
inputs = tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=MAX_LENGTH,
return_tensors="pt"
)
# Move inputs to device
inputs = {k: v.to(device) for k, v in inputs.items()}
# Run inference
with torch.no_grad(), torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
outputs = model(**inputs)
batch_preds = torch.argmax(outputs.logits, dim=1)
predictions.extend(batch_preds.cpu().numpy())
# Get true labels
true_labels = test_dataset['label']
# Stop tracking emissions
emissions_data = tracker.stop_task()
# Calculate accuracy
accuracy = accuracy_score(true_labels, predictions)
# Prepare results
results = {
"username": username,
"space_url": space_url,
"submission_timestamp": datetime.now().isoformat(),
"model_description": DESCRIPTION,
"accuracy": float(accuracy),
"energy_consumed_wh": emissions_data.energy_consumed * 1000,
"emissions_gco2eq": emissions_data.emissions * 1000,
"emissions_data": clean_emissions_data(emissions_data),
"api_route": ROUTE,
"dataset_config": {
"dataset_name": request.dataset_name,
"test_size": request.test_size,
"test_seed": request.test_seed
}
}
return results
except Exception as e:
tracker.stop_task()
raise e
# Include the router
app.include_router(router)
# Add a health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy"}