Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
import logging | |
import json | |
from huggingface_hub import model_info, InferenceClient | |
from dotenv import load_dotenv | |
from config.models_config import PREFERRED_PROVIDERS, DEFAULT_BENCHMARK_MODEL, ALTERNATIVE_BENCHMARK_MODELS | |
# Load environment variables once at the module level | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
def prioritize_providers(providers): | |
"""Prioritize preferred providers, keeping all others.""" | |
return sorted(providers, key=lambda provider: provider not in PREFERRED_PROVIDERS) | |
def test_provider(model_name: str, provider: str, verbose: bool = False) -> bool: | |
""" | |
Test if a specific provider is available for a model using InferenceClient | |
Args: | |
model_name: Name of the model | |
provider: Provider to test | |
verbose: Whether to log detailed information | |
Returns: | |
True if the provider is available, False otherwise | |
""" | |
try: | |
load_dotenv() | |
# Get HF token from environment | |
hf_token = os.environ.get("HF_TOKEN") | |
if not hf_token: | |
if verbose: | |
logger.warning("No HF_TOKEN found in environment variables. This will likely cause authentication failures.") | |
print("WARNING: HF_TOKEN is missing. Most model providers require valid authentication.") | |
# Essayer sans token (pour certains providers qui acceptent des requêtes anonymes) | |
return _test_provider_without_token(model_name, provider, verbose) | |
# Get HF organization from environment | |
hf_organization = os.environ.get("HF_ORGANIZATION") | |
if not hf_organization: | |
if verbose: | |
logger.warning("HF_ORGANIZATION not defined in environment") | |
if verbose: | |
logger.info(f"Testing provider {provider} for model {model_name}") | |
# Initialize the InferenceClient with the specific provider | |
try: | |
client = InferenceClient( | |
model=model_name, | |
token=hf_token, | |
provider=provider, | |
# bill_to=hf_organization if hf_organization else None, | |
timeout=3 # Increased timeout to allow model loading | |
) | |
try: | |
# Use the chat completions method for testing | |
response = client.chat_completion( | |
messages=[{"role": "user", "content": "Hello"}], | |
max_tokens=5 | |
) | |
if verbose: | |
logger.info(f"Provider {provider} is available for {model_name}") | |
return True | |
except Exception as e: | |
if verbose: | |
error_message = str(e) | |
logger.warning(f"Error with provider {provider}: {error_message}") | |
# Log specific error types if we can identify them | |
if "status_code=429" in error_message: | |
logger.warning(f"Provider {provider} rate limited. You may need to wait or upgrade your plan.") | |
elif "status_code=401" in error_message or "status_code=403" in error_message: | |
logger.warning(f"Authentication failed for provider {provider}. Your HF_TOKEN may be invalid or expired.") | |
print(f"Authentication error with provider {provider}. Please check your HF_TOKEN.") | |
# Essayer sans token | |
if verbose: | |
logger.info(f"Trying provider {provider} without authentication") | |
return _test_provider_without_token(model_name, provider, verbose) | |
elif "status_code=503" in error_message: | |
logger.warning(f"Provider {provider} service unavailable. Model may be loading or provider is down.") | |
elif "timed out" in error_message.lower(): | |
logger.warning(f"Timeout error with provider {provider} - request timed out after 10 seconds") | |
return False | |
except Exception as auth_error: | |
if "401" in str(auth_error) or "Unauthorized" in str(auth_error): | |
# En cas d'erreur d'authentification, essayer sans token | |
if verbose: | |
logger.warning(f"Authentication error with {provider}: {str(auth_error)}. Your HF_TOKEN may be invalid.") | |
print(f"Authentication error detected. Please verify your HF_TOKEN is valid and has appropriate permissions.") | |
return _test_provider_without_token(model_name, provider, verbose) | |
else: | |
if verbose: | |
logger.warning(f"Error creating client for {provider}: {str(auth_error)}") | |
return False | |
except Exception as e: | |
if verbose: | |
logger.warning(f"Error in test_provider: {str(e)}") | |
return False | |
def _test_provider_without_token(model_name: str, provider: str, verbose: bool = False) -> bool: | |
""" | |
Essaye de tester un provider sans token d'authentification | |
Args: | |
model_name: Nom du modèle | |
provider: Provider à tester | |
verbose: Afficher les logs détaillés | |
Returns: | |
True si le provider est disponible, False sinon | |
""" | |
try: | |
if verbose: | |
logger.info(f"Testing provider {provider} for model {model_name} without authentication") | |
# Initialize without token | |
client = InferenceClient( | |
model=model_name, | |
provider=provider, | |
timeout=3 | |
) | |
try: | |
# Use the chat completions method for testing | |
response = client.chat_completion( | |
messages=[{"role": "user", "content": "Hello"}], | |
max_tokens=5 | |
) | |
if verbose: | |
logger.info(f"Provider {provider} is available for {model_name} without authentication") | |
return True | |
except Exception as e: | |
if verbose: | |
logger.warning(f"Error with provider {provider} without authentication: {str(e)}") | |
return False | |
except Exception as e: | |
if verbose: | |
logger.warning(f"Error in _test_provider_without_token: {str(e)}") | |
return False | |
def get_available_model_provider(model_name, verbose=False): | |
""" | |
Get the first available provider for a given model. | |
Args: | |
model_name: Name of the model on the Hub | |
verbose: Whether to log detailed information | |
Returns: | |
First available provider or None if none are available | |
""" | |
try: | |
# Get HF token from environment | |
hf_token = os.environ.get("HF_TOKEN") | |
if not hf_token: | |
if verbose: | |
logger.error("HF_TOKEN not defined in environment") | |
raise ValueError("HF_TOKEN not defined in environment") | |
# Get providers for the model and prioritize them | |
info = None | |
try: | |
# Essayer avec le token | |
try: | |
if verbose: | |
logger.info(f"Trying to get model info for {model_name} with auth token") | |
info = model_info(model_name, token=hf_token, expand="inferenceProviderMapping") | |
except Exception as auth_error: | |
# Si l'authentification échoue, essayer sans token (pour les modèles publics) | |
if "401" in str(auth_error) or "Unauthorized" in str(auth_error): | |
if verbose: | |
logger.warning(f"Authentication failed for {model_name}, trying without token") | |
# Essayer de récupérer les infos sans token | |
try: | |
info = model_info(model_name, expand="inferenceProviderMapping") | |
except Exception as e: | |
if verbose: | |
logger.error(f"Failed to get model info without token: {str(e)}") | |
# Comme dernier recours, retourner la liste des providers par défaut pour tester | |
if verbose: | |
logger.warning(f"Using default providers list as fallback for {model_name}") | |
# Fournir une liste de providers de secours pour tester directement | |
return _test_fallback_providers(model_name, verbose) | |
else: | |
# Autre erreur, la relancer | |
raise auth_error | |
if not info or not hasattr(info, "inference_provider_mapping"): | |
if verbose: | |
logger.info(f"No inference providers found for {model_name}") | |
# Essayer avec la liste de providers par défaut | |
return _test_fallback_providers(model_name, verbose) | |
providers = list(info.inference_provider_mapping.keys()) | |
if not providers: | |
if verbose: | |
logger.info(f"Empty list of providers for {model_name}") | |
# Essayer avec la liste de providers par défaut | |
return _test_fallback_providers(model_name, verbose) | |
except Exception as e: | |
if verbose: | |
logger.error(f"Error retrieving model info for {model_name}: {str(e)}") | |
# Essayer avec la liste de providers par défaut | |
return _test_fallback_providers(model_name, verbose) | |
# Prioritize providers | |
prioritized_providers = prioritize_providers(providers) | |
if verbose: | |
logger.info(f"Available providers for {model_name}: {', '.join(providers)}") | |
logger.info(f"Prioritized providers: {', '.join(prioritized_providers)}") | |
# Test each preferred provider first | |
failed_providers = [] | |
for provider in prioritized_providers: | |
if verbose: | |
logger.info(f"Testing provider {provider} for {model_name}") | |
try: | |
if test_provider(model_name, provider, verbose): | |
if verbose: | |
logger.info(f"Provider {provider} is available for {model_name}") | |
return provider | |
else: | |
failed_providers.append(provider) | |
if verbose: | |
logger.warning(f"Provider {provider} test failed for {model_name}") | |
except Exception as e: | |
failed_providers.append(provider) | |
if verbose: | |
logger.error(f"Exception while testing provider {provider} for {model_name}: {str(e)}") | |
# If all prioritized providers failed, try any remaining providers | |
remaining_providers = [p for p in providers if p not in prioritized_providers and p not in failed_providers] | |
if remaining_providers and verbose: | |
logger.info(f"Trying remaining non-prioritized providers: {', '.join(remaining_providers)}") | |
for provider in remaining_providers: | |
if verbose: | |
logger.info(f"Testing non-prioritized provider {provider} for {model_name}") | |
try: | |
if test_provider(model_name, provider, verbose): | |
if verbose: | |
logger.info(f"Non-prioritized provider {provider} is available for {model_name}") | |
return provider | |
except Exception as e: | |
if verbose: | |
logger.error(f"Exception while testing non-prioritized provider {provider}: {str(e)}") | |
# If we've tried all providers and none worked, log this but don't raise an exception | |
if verbose: | |
logger.error(f"No available providers for {model_name}. Tried {len(failed_providers + remaining_providers)} providers.") | |
return None | |
except Exception as e: | |
if verbose: | |
logger.error(f"Error in get_available_model_provider: {str(e)}") | |
return None | |
def _test_fallback_providers(model_name, verbose=False): | |
""" | |
Fonction de secours qui teste une liste de providers communs sans passer par l'API | |
Args: | |
model_name: Nom du modèle | |
verbose: Afficher les logs détaillés | |
Returns: | |
Le premier provider disponible ou None | |
""" | |
# Liste de providers à tester en direct | |
default_providers = ["huggingface", "sambanova", "novita", "fireworks-ai", "together", "openai", "anthropic"] | |
if verbose: | |
logger.warning(f"Using fallback providers list for {model_name}: {', '.join(default_providers)}") | |
# Tester chaque provider directement | |
for provider in default_providers: | |
if verbose: | |
logger.info(f"Testing fallback provider {provider} for {model_name}") | |
try: | |
if test_provider(model_name, provider, verbose): | |
if verbose: | |
logger.info(f"FALLBACK: Provider {provider} is available for {model_name}") | |
return provider | |
except Exception as e: | |
if verbose: | |
logger.warning(f"FALLBACK: Error testing provider {provider} for {model_name}: {str(e)}") | |
return None | |
def test_models(verbose=True): | |
""" | |
Test le modèle par défaut et les modèles alternatifs, puis retourne un résumé des résultats. | |
Args: | |
verbose: Afficher les logs détaillés | |
Returns: | |
Un dictionnaire avec les résultats des tests | |
""" | |
results = { | |
"default_model": None, | |
"working_model": None, | |
"provider": None, | |
"all_models": {}, | |
"available_models": [], | |
"unavailable_models": [] | |
} | |
print("\n===== Checking HuggingFace Authentication =====") | |
# Obtenez le jeton HF | |
hf_token = os.environ.get("HF_TOKEN") | |
if hf_token: | |
print("✅ HF_TOKEN is available") | |
# Vérifier si le token a un format valide (vérification simple) | |
if not hf_token.startswith("hf_"): | |
print("⚠️ WARNING: Your HF_TOKEN does not start with 'hf_' which is unusual. Please verify its format.") | |
# Ne montrer aucun caractère du token, juste indiquer sa présence | |
masked_token = "••••••••••" | |
# Vérifier la validité du token en testant directement l'API d'inférence | |
import requests | |
try: | |
# Test avec un modèle public simple (gpt2) | |
test_model = "gpt2" | |
api_url = f"https://api-inference.huggingface.co/models/{test_model}" | |
print(f"Testing token with inference API on public model {test_model}...") | |
headers = {"Authorization": f"Bearer {hf_token}"} | |
payload = {"inputs": "Hello, how are you?"} | |
response = requests.post(api_url, headers=headers, json=payload, timeout=10) | |
if response.status_code in [200, 503]: # 503 = modèle en cours de chargement, mais le token est accepté | |
print(f"✅ HF_TOKEN validated - Token accepted by the inference API! Status: {response.status_code}") | |
if response.status_code == 503: | |
print("ℹ️ Model is loading, but token is valid") | |
# Si le token est valide pour l'API d'inférence, vérifions également si nous pouvons obtenir | |
# des informations sur l'utilisateur (mais ce n'est pas bloquant si ça échoue) | |
try: | |
whoami_response = requests.get( | |
"https://huggingface.co/api/whoami", | |
headers={"Authorization": f"Bearer {hf_token}"} | |
) | |
if whoami_response.status_code == 200: | |
user_info = whoami_response.json() | |
print(f"✅ Additional info - Authenticated as: {user_info.get('name', 'Unknown user')}") | |
# Vérifier si l'utilisateur a accès à des modèles payants | |
if user_info.get('canPay', False): | |
print("✅ Your account has payment methods configured - you may have access to premium models") | |
else: | |
print("ℹ️ Your account does not have payment methods configured - access to premium models may be limited") | |
except Exception: | |
# Ignorer les erreurs lors de la récupération des infos utilisateur | |
pass | |
else: | |
print(f"❌ HF_TOKEN validation failed with status code: {response.status_code}") | |
error_message = "Unknown error" | |
try: | |
error_data = response.json() | |
if "error" in error_data: | |
error_message = error_data["error"] | |
print(f"❌ Error message: {error_message}") | |
except: | |
print(f"❌ Error message: {response.text}") | |
print("⚠️ Most model providers will not work with invalid credentials") | |
# Test alternatif avec l'endpoint status | |
try: | |
print("Attempting alternative validation with status endpoint...") | |
status_url = "https://api-inference.huggingface.co/status" | |
status_response = requests.get(status_url, headers=headers, timeout=10) | |
if status_response.status_code == 200: | |
print("✅ Token can access the status endpoint. This is partially good news.") | |
else: | |
print(f"❌ Status endpoint test also failed: {status_response.status_code}") | |
except Exception as e: | |
print(f"❌ Alternative validation also failed: {str(e)}") | |
except Exception as e: | |
print(f"❌ Error validating HF_TOKEN with inference API: {str(e)}") | |
else: | |
print("❌ HF_TOKEN is missing - authentication to HuggingFace API will fail") | |
print("⚠️ Most models and providers require authentication") | |
# Obtenez l'organisation HF | |
hf_organization = os.environ.get("HF_ORGANIZATION") | |
if hf_organization: | |
print(f"✅ HF_ORGANIZATION is available: {hf_organization}") | |
else: | |
print("ℹ️ HF_ORGANIZATION is not set") | |
if verbose: | |
print(f"\n===== Testing main default model: {DEFAULT_BENCHMARK_MODEL} =====") | |
# Test du modèle par défaut | |
provider = get_available_model_provider(DEFAULT_BENCHMARK_MODEL, verbose=verbose) | |
if provider: | |
if verbose: | |
print(f"\n✅ SUCCESS: Found provider for default model {DEFAULT_BENCHMARK_MODEL}: {provider}") | |
results["default_model"] = DEFAULT_BENCHMARK_MODEL | |
results["working_model"] = DEFAULT_BENCHMARK_MODEL | |
results["provider"] = provider | |
else: | |
if verbose: | |
print(f"\n❌ DEFAULT MODEL FAILED: No provider found for {DEFAULT_BENCHMARK_MODEL}") | |
print("Trying alternative models...") | |
# Essayer les modèles alternatifs | |
for alt_model in ALTERNATIVE_BENCHMARK_MODELS: | |
if verbose: | |
print(f"\nTrying alternative model: {alt_model}") | |
alt_provider = get_available_model_provider(alt_model, verbose=verbose) | |
if alt_provider: | |
if verbose: | |
print(f"\n✅ SUCCESS: Found provider for alternative model {alt_model}: {alt_provider}") | |
results["working_model"] = alt_model | |
results["provider"] = alt_provider | |
break | |
elif verbose: | |
print(f"❌ Failed to find provider for alternative model: {alt_model}") | |
else: | |
if verbose: | |
print("\n❌ ALL MODELS FAILED: No provider found for any model") | |
print("\n⚠️ This is likely due to authentication issues with your HF_TOKEN") | |
print("⚠️ Please check your token or try using models that don't require authentication") | |
# Tester tous les modèles pour avoir une vue d'ensemble | |
models = [ | |
"Qwen/QwQ-32B", | |
"Qwen/Qwen2.5-72B-Instruct", | |
"Qwen/Qwen2.5-32B-Instruct", | |
"meta-llama/Llama-3.1-8B-Instruct", | |
"meta-llama/Llama-3.3-70B-Instruct", | |
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", | |
"mistralai/Mistral-Small-24B-Instruct-2501", | |
] | |
if verbose: | |
print("\n===== Testing all available models =====") | |
for model in models: | |
provider = get_available_model_provider(model, verbose) | |
results["all_models"][model] = provider | |
if provider: | |
results["available_models"].append((model, provider)) | |
else: | |
results["unavailable_models"].append(model) | |
if verbose: | |
print("\n===== Results Summary =====") | |
if results["available_models"]: | |
print("Models with available providers:") | |
for model, provider in results["available_models"]: | |
print(f"✅ Model: {model}, Provider: {provider}") | |
else: | |
print("❌ No models with available providers found") | |
print("⚠️ Please check your HF_TOKEN and permissions") | |
if results["unavailable_models"]: | |
print("\nModels with no available providers:") | |
for model in results["unavailable_models"]: | |
print(f"❌ {model}") | |
print(f"\nTotal Available Models: {len(results['available_models'])}") | |
print(f"Total Unavailable Models: {len(results['unavailable_models'])}") | |
return results | |
if __name__ == "__main__": | |
# Exécuter le test si le script est lancé directement | |
test_results = test_models(verbose=True) |