Spaces:
Runtime error
Runtime error
import os | |
import time | |
import requests | |
from typing import Optional, Tuple | |
from ui.statusui import StatusUI | |
from checks.health_check import check_model_endpoint | |
from agents.model import load_huggingface_model | |
def wake_up_endpoint( | |
endpoint_uri: str, | |
ui, | |
max_wait: int = 300, | |
initial_delay: float = 3.0, | |
backoff_factor: float = 1.5, | |
max_retry_delay: float = 10.0 | |
) -> Tuple[bool, Optional[str]]: | |
""" | |
Poll the endpoint until it responds OK or timeout. | |
Args: | |
endpoint_uri: The endpoint URL to monitor | |
ui: UI object for status updates | |
max_wait: Maximum total wait time in seconds (minimum 60s enforced) | |
initial_delay: Initial delay between retries in seconds | |
backoff_factor: Multiplier for exponential backoff | |
max_retry_delay: Maximum delay between retries in seconds | |
Returns: | |
Tuple of (success: bool, error_message: Optional[str]) | |
""" | |
# Configuration validation | |
max_wait = max(max_wait, 60) | |
current_delay = min(initial_delay, max_retry_delay) | |
# Prepare request components | |
headers = {} | |
if hf_token := os.environ.get("HF_TOKEN"): | |
headers["Authorization"] = f"Bearer {hf_token}" | |
payload = {"inputs": "ping"} | |
timeout = min(5, current_delay * 0.8) # Ensure timeout is less than delay | |
start_time = time.time() | |
last_status = None | |
while (time.time() - start_time) < max_wait: | |
try: | |
# Log attempt | |
if endpoint_uri != last_status: | |
ui.append(f"Pinging endpoint: {endpoint_uri}") | |
last_status = endpoint_uri | |
# Make the request | |
response = requests.post( | |
endpoint_uri, | |
headers=headers, | |
json=payload, | |
timeout=timeout | |
) | |
if response.ok: | |
ui.append("✅ Endpoint is awake and responsive") | |
return True, None | |
# Handle specific HTTP status codes | |
if response.status_code in {503, 504}: | |
status_msg = f"Endpoint warming up (HTTP {response.status_code})" | |
else: | |
status_msg = f"Unexpected response (HTTP {response.status_code})" | |
ui.append(f"{status_msg}, retrying in {current_delay:.1f}s...") | |
except requests.exceptions.RequestException as e: | |
ui.append(f"Connection error ({type(e).__name__}), retrying in {current_delay:.1f}s...") | |
# Wait before next attempt with exponential backoff | |
time.sleep(current_delay) | |
current_delay = min(current_delay * backoff_factor, max_retry_delay) | |
timeout = min(5, current_delay * 0.8) | |
# Timeout reached | |
error_msg = f"❌ Timed out after {max_wait}s waiting for endpoint" | |
ui.append(error_msg) | |
return False, error_msg | |
def run_status_checks(): | |
"""Run all status checks and return endpoint URI if successful""" | |
ui = StatusUI("Content Agent Status Checks") | |
ui.launch() | |
ui.append("Starting prechecks...") | |
ui.append("Checking endpoint..") | |
endpoint_uri = load_huggingface_model() # Get the URI for the endpoint | |
ui.append(endpoint_uri) | |
# Wake it up before health check | |
wake_up_successful = wake_up_endpoint(endpoint_uri, ui) | |
success, error_msg = wake_up_endpoint(endpoint_uri, ui) | |
i | |
if not success: | |
ui.append("Warning: Could not wake up the endpoint. Exiting.") | |
else: | |
ui.append("✅ End point responded OK.") | |
is_healthy, status_info = check_model_endpoint(endpoint_uri) # Test the endpoint | |
if not is_healthy: | |
from checks.failed_check import create_failed_gradio_ui | |
interface = create_failed_gradio_ui(status_info) | |
interface.launch(show_error=True, share=True) | |
return None | |
return endpoint_uri |