from fastapi import APIRouter from datetime import datetime from datasets import load_dataset from sklearn.metrics import accuracy_score import random import os from .utils.evaluation import AudioEvaluationRequest from .utils.emissions import tracker, clean_emissions_data, get_space_info from dotenv import load_dotenv load_dotenv() router = APIRouter() DESCRIPTION = "Random Baseline" ROUTE = "/audio" @router.post(ROUTE, tags=["Audio Task"], description=DESCRIPTION) async def evaluate_audio(request: AudioEvaluationRequest): # Load and prepare the dataset # Because the dataset is gated, we need to use the HF_TOKEN environment variable to authenticate dataset = load_dataset(request.dataset_name, token=os.getenv("HF_TOKEN")) # Split dataset train_test = dataset["train"] test_dataset = dataset["test"] # Start tracking emissions tracker.start() tracker.start_task("inference") #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE CODE HERE # Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. #-------------------------------------------------------------------------------------------- import tensorflow as tf import tensorflow_hub as hub import librosa import numpy as np from sklearn.model_selection import train_test_split from tensorflow.keras.utils import to_categorical # Load YAMNet Model yamnet_model_url = "https://tfhub.dev/google/yamnet/1" yamnet_model = hub.load(yamnet_model_url) # Function to extract embeddings from audio def extract_embedding(audio_example): '''Extract YAMNet embeddings from a waveform''' waveform = audio_example["audio"]["array"] # Ensure correct key reference waveform = tf.convert_to_tensor(waveform, dtype=tf.float32) scores, embeddings, spectrogram = yamnet_model(waveform) return {"embedding": embeddings.numpy()} # Apply embedding extraction to training data train_embeddings = dataset["train"].map(extract_embedding) # Apply embedding extraction to testing data test_embeddings = dataset["test"].map(extract_embedding) X_train, y_train = [], [] X_test, y_test = [], [] # Process Training Data for example in train_embeddings: for embedding in example["embedding"]: X_train.append(embedding) y_train.append(example["label"]) # Process Testing Data for example in test_embeddings: for embedding in example["embedding"]: X_test.append(embedding) y_test.append(example["label"]) # Convert to NumPy arrays X_train = np.array(X_train) y_train = np.array(y_train) X_test = np.array(X_test) y_test = np.array(y_test) # Convert labels to categorical (one-hot encoding) y_train_cat = to_categorical(y_train, num_classes=2) y_test_cat = to_categorical(y_test, num_classes=2) print(f"Training samples: {X_train.shape}, Test samples: {X_test.shape}") from tensorflow.keras.models import Sequential from tensorflow.keras.layers import Dense, Dropout # Define the model model = Sequential([ Dense(128, activation='relu', input_shape=(X_train.shape[1],)), Dropout(0.3), Dense(64, activation='relu'), Dropout(0.3), Dense(2, activation='softmax') # 2 classes: chainsaw (0) vs. environment (1) ]) model.summary() # Compile the model model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) # Train the model on YAMNet embeddings model.fit(X_train, y_train_cat, epochs=20, batch_size=16, validation_data=(X_test, y_test_cat)) # Evaluate the model y_pred = model.predict(X_test) y_pred_labels = np.argmax(y_pred, axis=1) from sklearn.metrics import accuracy_score accuracy = accuracy_score(y_test, y_pred_labels) print("Transfer Learning Model Accuracy:", accuracy) # Predict labels for the test dataset # Run YAMNet inference on the raw audio data predictions = [] for audio_data in test_dataset["audio"]: # Extract waveform and sampling rate waveform = audio_data["array"] sample_rate = audio_data["sampling_rate"] # Resample the waveform to 16kHz (YAMNet's expected sample rate) if necessary if sample_rate != 16000: waveform = librosa.resample(waveform, orig_sr=sample_rate, target_sr=16000) # Convert waveform to tensor waveform = tf.convert_to_tensor(waveform, dtype=tf.float32) # Ensure waveform is 1D waveform = tf.squeeze(waveform) # Predict with YAMNet--->model # Get YAMNet embeddings _, embeddings, _ = yamnet_model(waveform) # Using the original yamnet_model for embedding extraction # Calculate the mean of the embeddings across the time dimension embeddings = tf.reduce_mean(embeddings, axis=0) # Average across time frames # Reshape embeddings for prediction embeddings = embeddings.numpy() # Convert to NumPy array embeddings = embeddings.reshape(1, -1) # Reshape to (1, embedding_dimension) # Now predict using your trained model scores = model.predict(embeddings) # Get predicted class predicted_class_index = np.argmax(scores) predicted_class_label = predicted_class_index # Assuming 0 for 'chainsaw', 1 for 'environment' # Get the top class name using the predicted label top_class = "chainsaw" if predicted_class_label == 0 else "environment" predictions.append(top_class) print("Predictions:", predictions) def map_predictions_to_labels(predictions): """ Maps string predictions to numeric labels: - "chainsaw" -> 0 - any other class -> 1 Args: predictions (list of str): List of class name predictions. Returns: list of int: Mapped numeric labels. """ return [0 if pred == "chainsaw" else 1 for pred in predictions] # Map string predictions to numeric labels numeric_predictions = map_predictions_to_labels(predictions) # Extract true labels (already numeric) true_labels = test_dataset["label"] # Calculate accuracy accuracy = accuracy_score(true_labels, numeric_predictions) print("Accuracy:", accuracy) #-------------------------------------------------------------------------------------------- # YOUR MODEL INFERENCE STOPS HERE #-------------------------------------------------------------------------------------------- # Stop tracking emissions emissions_data = tracker.stop_task() # Prepare results dictionary results = { "username": username, "space_url": space_url, "submission_timestamp": datetime.now().isoformat(), "model_description": DESCRIPTION, "accuracy": float(accuracy), "energy_consumed_wh": emissions_data.energy_consumed * 1000, "emissions_gco2eq": emissions_data.emissions * 1000, "emissions_data": clean_emissions_data(emissions_data), "api_route": ROUTE, "dataset_config": { "dataset_name": request.dataset_name, "test_size": request.test_size, "test_seed": request.test_seed } } print(results)