File size: 8,528 Bytes
e6d07cd
4d6e8c2
4477f42
4d6e8c2
 
e6d07cd
 
 
ece5856
45a2367
21262c6
 
4d6e8c2
 
1a885c6
4d6e8c2
21262c6
 
 
 
bc4f464
21262c6
 
 
e6d07cd
 
e33fed0
4d6e8c2
 
45a2367
1c33274
de4e4d7
70f5f26
e6d07cd
 
4477f42
e6d07cd
7abed63
e6d07cd
 
45a2367
30f3a06
45a2367
 
0c9dbe5
822db29
30f3a06
 
45a2367
 
 
 
 
 
 
 
 
 
 
a036e74
 
45a2367
a036e74
 
 
 
45a2367
822db29
 
e6d07cd
 
7abed63
e6d07cd
 
 
7abed63
e6d07cd
 
 
 
45a2367
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6d07cd
7eb6153
 
45a2367
 
a036e74
 
7eb6153
 
 
4477f42
4d6e8c2
4477f42
e6d07cd
 
 
 
 
 
 
 
 
 
 
85c5204
e6d07cd
 
 
 
6f0e9af
08f1c39
dc058e1
6f0e9af
85c5204
 
 
 
 
 
 
 
 
 
ada5a12
bc4f464
6f0e9af
 
 
 
 
f3f30d7
6f0e9af
 
 
 
 
08f1c39
dc058e1
6f0e9af
 
 
 
 
 
 
 
 
 
08f1c39
dc058e1
6f0e9af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e6d07cd
 
6f0e9af
 
 
 
 
85c5204
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
from fastapi import APIRouter
from datetime import datetime
import time
from datasets import load_dataset
from sklearn.metrics import accuracy_score
import os
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Tuple
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from huggingface_hub import login
from dotenv import load_dotenv

from .utils.evaluation import TextEvaluationRequest
from .utils.emissions import tracker, clean_emissions_data, get_space_info

# Load environment variables
load_dotenv()

# Authenticate with Hugging Face
HF_TOKEN = os.getenv('HUGGINGFACE_TOKEN')
if HF_TOKEN:
    login(token=HF_TOKEN)

# Disable torch compile
os.environ["TORCH_COMPILE_DISABLE"] = "1"

router = APIRouter()

DESCRIPTION = "ModernBERT fine-tuned for climate disinformation detection"
ROUTE = "/text"
MODEL_NAME = "Tonic/climate-guard-toxic-agent"

class TextClassifier:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        max_retries = 3
        
        for attempt in range(max_retries):
            try:
                # Initialize tokenizer
                self.tokenizer = AutoTokenizer.from_pretrained(
                    MODEL_NAME,
                    model_max_length=512,
                    padding_side='right',
                    truncation_side='right'
                )
                
                # Initialize model with specific configuration
                self.model = AutoModelForSequenceClassification.from_pretrained(
                    MODEL_NAME,
                    num_labels=8,
                    problem_type="single_label_classification"
                )
                
                # Move model to appropriate device
                self.model = self.model.to(self.device)
                
                # Initialize pipeline with the model and tokenizer
                self.classifier = pipeline(
                    "text-classification",
                    model=self.model,
                    tokenizer=self.tokenizer,
                    device=self.device,
                    max_length=512,
                    truncation=True,
                    batch_size=16
                )
                
                print("Model initialized successfully")
                break
                
            except Exception as e:
                if attempt == max_retries - 1:
                    raise Exception(f"Failed to initialize model after {max_retries} attempts: {str(e)}")
                print(f"Attempt {attempt + 1} failed, retrying... Error: {str(e)}")
                time.sleep(1)

    def process_batch(self, batch: List[str], batch_idx: int) -> Tuple[List[int], int]:
        """Process a batch of texts and return their predictions"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                print(f"Processing batch {batch_idx} with {len(batch)} items")
                
                # Process texts with error handling
                predictions = []
                for text in batch:
                    try:
                        result = self.classifier(text)
                        pred_label = int(result[0]['label'].split('_')[0])
                        predictions.append(pred_label)
                    except Exception as e:
                        print(f"Error processing text in batch {batch_idx}: {str(e)}")
                        predictions.append(0)  # Default prediction
                
                print(f"Completed batch {batch_idx} with {len(predictions)} predictions")
                return predictions, batch_idx
                
            except Exception as e:
                if attempt == max_retries - 1:
                    print(f"Final error in batch {batch_idx}: {str(e)}")
                    return [0] * len(batch), batch_idx
                print(f"Error in batch {batch_idx} (attempt {attempt + 1}): {str(e)}")
                time.sleep(1)

    def __del__(self):
        # Clean up CUDA memory
        if hasattr(self, 'model'):
            del self.model
        if hasattr(self, 'classifier'):
            del self.classifier
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

@router.post(ROUTE, tags=["Text Task"], description=DESCRIPTION)
async def evaluate_text(request: TextEvaluationRequest):
    """Evaluate text classification for climate disinformation detection."""
    
    # Get space info
    username, space_url = get_space_info()

    # Define the label mapping
    LABEL_MAPPING = {
        "0_not_relevant": 0,
        "1_not_happening": 1,
        "2_not_human": 2,
        "3_not_bad": 3,
        "4_solutions_harmful_unnecessary": 4,
        "5_science_unreliable": 5,
        "6_proponents_biased": 6,
        "7_fossil_fuels_needed": 7
    }

    try:
        # Load and prepare the dataset
        dataset = load_dataset("QuotaClimat/frugalaichallenge-text-train", token=HF_TOKEN)
        
        # Convert string labels to integers with error handling
        def convert_label(example):
            try:
                return {"label": LABEL_MAPPING[example["label"]]}
            except KeyError as e:
                print(f"Warning: Unknown label {example['label']}")
                # Return default label or raise exception
                return {"label": 0}  # or raise e if you want to fail on unknown labels
                
        dataset = dataset.map(convert_label)

        # Split dataset
        test_dataset = dataset["test"]
        
        # Start tracking emissions
        tracker.start()
        tracker.start_task("inference")

        true_labels = test_dataset["label"]
        
        # Initialize the model once
        classifier = TextClassifier()

        # Prepare batches
        batch_size = 24
        quotes = test_dataset["quote"]
        num_batches = len(quotes) // batch_size + (1 if len(quotes) % batch_size != 0 else 0)
        batches = [
            quotes[i * batch_size:(i + 1) * batch_size]
            for i in range(num_batches)
        ]

        # Initialize batch_results
        batch_results = [[] for _ in range(num_batches)]
        
        # Process batches in parallel
        max_workers = min(os.cpu_count(), 4)
        print(f"Processing with {max_workers} workers")
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_batch = {
                executor.submit(classifier.process_batch, batch, idx): idx 
                for idx, batch in enumerate(batches)
            }

            for future in future_to_batch:
                batch_idx = future_to_batch[future]
                try:
                    predictions, idx = future.result()
                    if predictions:
                        batch_results[idx] = predictions
                        print(f"Stored results for batch {idx} ({len(predictions)} predictions)")
                except Exception as e:
                    print(f"Failed to get results for batch {batch_idx}: {e}")
                    batch_results[batch_idx] = [0] * len(batches[batch_idx])

        # Flatten predictions
        predictions = []
        for batch_preds in batch_results:
            if batch_preds is not None:
                predictions.extend(batch_preds)
        
        # Stop tracking emissions
        emissions_data = tracker.stop_task()
        
        # Calculate accuracy
        accuracy = accuracy_score(true_labels, predictions)
        print("accuracy:", accuracy)
        
        # Prepare results
        results = {
            "username": username,
            "space_url": space_url,
            "submission_timestamp": datetime.now().isoformat(),
            "model_description": DESCRIPTION,
            "accuracy": float(accuracy),
            "energy_consumed_wh": emissions_data.energy_consumed * 1000,
            "emissions_gco2eq": emissions_data.emissions * 1000,
            "emissions_data": clean_emissions_data(emissions_data),
            "api_route": ROUTE,
            "dataset_config": {
                "dataset_name": request.dataset_name,
                "test_size": request.test_size,
                "test_seed": request.test_seed
            }
        }

        print("results:", results)
        return results

    except Exception as e:
        print(f"Error in evaluate_text: {str(e)}")
        raise Exception(f"Failed to process request: {str(e)}")