Spaces:
Runtime error
Runtime error
| import os | |
| from datetime import datetime | |
| from queue import Queue | |
| import argilla as rg | |
| import gradio as gr | |
| client = rg.Argilla() | |
| server = rg.get_webhook_server() | |
| incoming_events = Queue() | |
| # The default webhook server URL is set to http://127.0.0.1:8000 | |
| # This value is important since the webhook server URL is used to create the webhook URLs | |
| # When the SPACE_HOST environment variable is set, the default webhook server URL is set to https://SPACE_HOST value | |
| # If the FastAPI app is running on a different URL, the WEBHOOK_SERVER_URL environment variable should be set | |
| default_webhook_server_url = os.getenv("WEBHOOK_SERVER_URL") | |
| if default_webhook_server_url is None and "SPACE_HOST" in os.environ: | |
| # We set it to the default gradio server URL | |
| os.environ["WEBHOOK_SERVER_URL"] = "http://127.0.0.1:7860" | |
| # Set up the webhook listeners | |
| # Create a webhook for record events | |
| async def record_events(record: rg.Record, type: str, timestamp: datetime): | |
| print(f"Received event type {type} at {timestamp}: ", record) | |
| incoming_events.put(record) | |
| # Create a webhook for dataset events | |
| async def dataset_events(dataset: rg.Dataset, type: str, timestamp: datetime): | |
| print(f"Received event type {type} at {timestamp}: ", dataset) | |
| incoming_events.put((type, dataset)) | |
| # Create a webhook for response events | |
| async def response_events(response: rg.UserResponse, type: str, timestamp: datetime): | |
| print(f"Received event type {type} at {timestamp}: ", response) | |
| incoming_events.put(response) | |
| def read_next_event(): | |
| event = incoming_events.get() | |
| return event | |
| with gr.Blocks() as demo: | |
| argilla_server = client.http_client.base_url | |
| gr.Markdown("## Argilla Events") | |
| gr.Markdown(f"This demo shows the incoming events from the [Argilla Server]({argilla_server}).") | |
| json_component = gr.JSON(label="Incoming argilla events:", value={}) | |
| gr.Timer(1, active=True).tick(read_next_event, outputs=json_component) | |
| gr.mount_gradio_app(server, demo, path="/") | |