from transformers import AutoTokenizer, AutoModelForSequenceClassification from fastapi import FastAPI, APIRouter from fastapi.middleware.cors import CORSMiddleware from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score import torch import numpy as np from .utils.evaluation import TextEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info # Initialize FastAPI app and router app = FastAPI() router = APIRouter() # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) DESCRIPTION = "Efficient Climate Disinformation Detection" ROUTE = "/text" @router.post("/text", tags=["Text Task"], description=DESCRIPTION) async def evaluate_text(request: TextEvaluationRequest): """ Evaluate text classification for climate disinformation detection. """ # Get space info username, space_url = get_space_info() # Define the label mapping LABEL_MAPPING = { "0_not_relevant": 0, "1_not_happening": 1, "2_not_human": 2, "3_not_bad": 3, "4_solutions_harmful_unnecessary": 4, "5_science_unreliable": 5, "6_proponents_biased": 6, "7_fossil_fuels_needed": 7 } # Load and prepare the dataset dataset = load_dataset(request.dataset_name) dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]}) train_test = dataset["train"].train_test_split(test_size=request.test_size, seed=request.test_seed) test_dataset = train_test["test"] # Start tracking emissions tracker.start() tracker.start_task("inference") try: # Model configuration model_name = "distilbert-base-uncased" BATCH_SIZE = 64 MAX_LENGTH = 128 # Initialize tokenizer and model tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=8, problem_type="single_label_classification" ) # Enable mixed precision if available if torch.cuda.is_available(): model = model.half() # Move model to device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) model.eval() # Get test texts test_texts = test_dataset["quote"] predictions = [] # Process in batches for i in range(0, len(test_texts), BATCH_SIZE): if torch.cuda.is_available(): torch.cuda.empty_cache() batch_texts = test_texts[i:i + BATCH_SIZE] # Tokenize batch inputs = tokenizer( batch_texts, padding=True, truncation=True, max_length=MAX_LENGTH, return_tensors="pt" ) # Move inputs to device inputs = {k: v.to(device) for k, v in inputs.items()} # Run inference with torch.no_grad(), torch.cuda.amp.autocast(enabled=torch.cuda.is_available()): outputs = model(**inputs) batch_preds = torch.argmax(outputs.logits, dim=1) predictions.extend(batch_preds.cpu().numpy()) # Get true labels true_labels = test_dataset['label'] # Stop tracking emissions emissions_data = tracker.stop_task() # Calculate accuracy accuracy = accuracy_score(true_labels, predictions) # Prepare results results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } return results except Exception as e: tracker.stop_task() raise e # Include the router app.include_router(router) # Add a health check endpoint @app.get("/health") async def health_check(): return {"status": "healthy"}