"""Executes planning agent and streams logs/results. Integrates with the Gradio UI. """ import queue import threading import time import traceback from typing import Any, Generator, List, Optional, Tuple, Union import gradio as gr from src.agents.planning_agent import PlanningAgent from src.config.constants import ( DEALS_FILE, IS_DEMO_VERSION, MAX_CATEGORY_SELECTION, ) from src.config.logging_queue import log_queue from src.ui.formatting import format_deals_table, html_for from src.utils.cleanup import delete_if_old from src.utils.state_manager import can_run_app, get_state, update_state def run_pipeline(log_queue: queue.Queue, selected_categories: List[str]) -> None: """Runs the planning agent pipeline and stores accepted deals.""" try: delete_if_old(DEALS_FILE) agent = PlanningAgent() results = agent.plan(selected_categories) global accepted_deals accepted_deals = [ [ opp.product_description, f"${opp.price:.2f}", f"${opp.estimate:.2f}", f"${opp.discount:.2f}", opp.url, ] for opp in results ] except Exception as e: log_queue.put( f"❌ Error during pipeline execution: " f"{str(e)}" ) log_queue.put(f"
{traceback.format_exc()}
") def validate_categories( selected_categories: Union[str, List[str]], ) -> Tuple[bool, Optional[str]]: """Validates the selected categories.""" if isinstance(selected_categories, str): selected_categories = [selected_categories] if not selected_categories: return False, "⚠️ Please select at least one category before running." if len(selected_categories) > MAX_CATEGORY_SELECTION: return ( False, f"⚠️ You can select up to {MAX_CATEGORY_SELECTION} categories only.", ) return True, None def check_demo_restrictions() -> Tuple[bool, Optional[str], Optional[str]]: """Checks if the app can run under demo restrictions.""" can_run, message = can_run_app() if not can_run: return False, f"⚠️ {message}", None if IS_DEMO_VERSION: # Just update the run count, but use the message from can_run_app update_state({"run_count": get_state()["run_count"] + 1}) # Get fresh message after updating the state _, status_msg = can_run_app() else: status_msg = "" return True, None, status_msg def initial_ui_update( log_data: List[str], status_msg: str ) -> Tuple[str, str, Any, str]: """Returns initial UI state for the app.""" disable_btn = gr.update( interactive=False, elem_classes=["run-button", "btn-disabled"] ) return html_for(log_data), format_deals_table([]), disable_btn, status_msg def run_pipeline_threaded( selected_categories: List[str], log_data: List[str], status_msg: str, enable_btn: Any, # noqa: ANN401 ) -> Generator[Tuple[str, str, Any, str], None, None]: """Runs pipeline in background thread with log streaming. Yields UI updates until completion. """ thread = threading.Thread( target=run_pipeline, args=(log_queue, selected_categories) ) thread.start() disable_btn = gr.update( interactive=False, elem_classes=["run-button", "btn-disabled"] ) while thread.is_alive() or not log_queue.empty(): while not log_queue.empty(): log_msg = log_queue.get() log_data.append(log_msg) yield ( html_for(log_data), format_deals_table(accepted_deals), disable_btn, status_msg, ) if thread.is_alive(): time.sleep(0.2) yield ( html_for(log_data), format_deals_table(accepted_deals), disable_btn, status_msg, ) # Final UI update after thread finishes yield html_for(log_data), format_deals_table(accepted_deals), enable_btn, status_msg def handle_pipeline_error( e: Exception, log_data: List[str], enable_btn: gr.components.Component, status_msg: str, ) -> Tuple[str, str, gr.components.Component, str]: """Handles exceptions and appends error logs. Returns the final UI update tuple. """ log_data.append(f"❌ Unexpected error: {str(e)}") log_data.append(f"
{traceback.format_exc()}
") return html_for(log_data), format_deals_table([]), enable_btn, status_msg def run_and_stream_logs( selected_categories: Union[str, List[str]], ) -> Generator[Tuple[str, str, bool, str], None, None]: """Runs pipeline in a thread, streaming logs and results to the UI. Returns HTML logs, deal table, button state, and status message. """ global accepted_deals accepted_deals = [] log_data = [] # Step 1: Validate categories is_valid, error_msg = validate_categories(selected_categories) if not is_valid: yield None, None, gr.update(interactive=True), error_msg return # Step 2: Check demo restrictions can_run, error_msg, status_msg = check_demo_restrictions() if not can_run: yield ( html_for([error_msg]), format_deals_table([]), gr.update(interactive=True), error_msg, ) return # Step 3: Initial UI update showing we're starting enable_btn = gr.update(interactive=True, elem_classes=["run-button"]) yield initial_ui_update(log_data, status_msg) try: # Step 4: Run the pipeline in a thread yield from run_pipeline_threaded( selected_categories, log_data, status_msg, enable_btn ) except Exception as e: yield handle_pipeline_error(e, log_data, enable_btn, status_msg)