Spaces:
Running
Running
File size: 5,990 Bytes
8366946 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
"""Executes planning agent and streams logs/results.
Integrates with the Gradio UI.
"""
import queue
import threading
import time
import traceback
from typing import Any, Generator, List, Optional, Tuple, Union
import gradio as gr
from src.agents.planning_agent import PlanningAgent
from src.config.constants import (
DEALS_FILE,
IS_DEMO_VERSION,
MAX_CATEGORY_SELECTION,
)
from src.config.logging_queue import log_queue
from src.ui.formatting import format_deals_table, html_for
from src.utils.cleanup import delete_if_old
from src.utils.state_manager import can_run_app, get_state, update_state
def run_pipeline(log_queue: queue.Queue, selected_categories: List[str]) -> None:
"""Runs the planning agent pipeline and stores accepted deals."""
try:
delete_if_old(DEALS_FILE)
agent = PlanningAgent()
results = agent.plan(selected_categories)
global accepted_deals
accepted_deals = [
[
opp.product_description,
f"${opp.price:.2f}",
f"${opp.estimate:.2f}",
f"${opp.discount:.2f}",
opp.url,
]
for opp in results
]
except Exception as e:
log_queue.put(
f"<span style='color:red'>❌ Error during pipeline execution: "
f"{str(e)}</span>"
)
log_queue.put(f"<pre>{traceback.format_exc()}</pre>")
def validate_categories(
selected_categories: Union[str, List[str]],
) -> Tuple[bool, Optional[str]]:
"""Validates the selected categories."""
if isinstance(selected_categories, str):
selected_categories = [selected_categories]
if not selected_categories:
return False, "⚠️ Please select at least one category before running."
if len(selected_categories) > MAX_CATEGORY_SELECTION:
return (
False,
f"⚠️ You can select up to {MAX_CATEGORY_SELECTION} categories only.",
)
return True, None
def check_demo_restrictions() -> Tuple[bool, Optional[str], Optional[str]]:
"""Checks if the app can run under demo restrictions."""
can_run, message = can_run_app()
if not can_run:
return False, f"⚠️ {message}", None
if IS_DEMO_VERSION:
# Just update the run count, but use the message from can_run_app
update_state({"run_count": get_state()["run_count"] + 1})
# Get fresh message after updating the state
_, status_msg = can_run_app()
else:
status_msg = ""
return True, None, status_msg
def initial_ui_update(
log_data: List[str], status_msg: str
) -> Tuple[str, str, Any, str]:
"""Returns initial UI state for the app."""
disable_btn = gr.update(
interactive=False, elem_classes=["run-button", "btn-disabled"]
)
return html_for(log_data), format_deals_table([]), disable_btn, status_msg
def run_pipeline_threaded(
selected_categories: List[str],
log_data: List[str],
status_msg: str,
enable_btn: Any, # noqa: ANN401
) -> Generator[Tuple[str, str, Any, str], None, None]:
"""Runs pipeline in background thread with log streaming.
Yields UI updates until completion.
"""
thread = threading.Thread(
target=run_pipeline, args=(log_queue, selected_categories)
)
thread.start()
disable_btn = gr.update(
interactive=False, elem_classes=["run-button", "btn-disabled"]
)
while thread.is_alive() or not log_queue.empty():
while not log_queue.empty():
log_msg = log_queue.get()
log_data.append(log_msg)
yield (
html_for(log_data),
format_deals_table(accepted_deals),
disable_btn,
status_msg,
)
if thread.is_alive():
time.sleep(0.2)
yield (
html_for(log_data),
format_deals_table(accepted_deals),
disable_btn,
status_msg,
)
# Final UI update after thread finishes
yield html_for(log_data), format_deals_table(accepted_deals), enable_btn, status_msg
def handle_pipeline_error(
e: Exception,
log_data: List[str],
enable_btn: gr.components.Component,
status_msg: str,
) -> Tuple[str, str, gr.components.Component, str]:
"""Handles exceptions and appends error logs.
Returns the final UI update tuple.
"""
log_data.append(f"<span style='color:red'>❌ Unexpected error: {str(e)}</span>")
log_data.append(f"<pre>{traceback.format_exc()}</pre>")
return html_for(log_data), format_deals_table([]), enable_btn, status_msg
def run_and_stream_logs(
selected_categories: Union[str, List[str]],
) -> Generator[Tuple[str, str, bool, str], None, None]:
"""Runs pipeline in a thread, streaming logs and results to the UI.
Returns HTML logs, deal table, button state, and status message.
"""
global accepted_deals
accepted_deals = []
log_data = []
# Step 1: Validate categories
is_valid, error_msg = validate_categories(selected_categories)
if not is_valid:
yield None, None, gr.update(interactive=True), error_msg
return
# Step 2: Check demo restrictions
can_run, error_msg, status_msg = check_demo_restrictions()
if not can_run:
yield (
html_for([error_msg]),
format_deals_table([]),
gr.update(interactive=True),
error_msg,
)
return
# Step 3: Initial UI update showing we're starting
enable_btn = gr.update(interactive=True, elem_classes=["run-button"])
yield initial_ui_update(log_data, status_msg)
try:
# Step 4: Run the pipeline in a thread
yield from run_pipeline_threaded(
selected_categories, log_data, status_msg, enable_btn
)
except Exception as e:
yield handle_pipeline_error(e, log_data, enable_btn, status_msg)
|