File size: 5,990 Bytes
8366946
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""Executes planning agent and streams logs/results.

Integrates with the Gradio UI.
"""

import queue
import threading
import time
import traceback
from typing import Any, Generator, List, Optional, Tuple, Union

import gradio as gr

from src.agents.planning_agent import PlanningAgent
from src.config.constants import (
    DEALS_FILE,
    IS_DEMO_VERSION,
    MAX_CATEGORY_SELECTION,
)
from src.config.logging_queue import log_queue
from src.ui.formatting import format_deals_table, html_for
from src.utils.cleanup import delete_if_old
from src.utils.state_manager import can_run_app, get_state, update_state


def run_pipeline(log_queue: queue.Queue, selected_categories: List[str]) -> None:
    """Runs the planning agent pipeline and stores accepted deals."""
    try:
        delete_if_old(DEALS_FILE)
        agent = PlanningAgent()
        results = agent.plan(selected_categories)

        global accepted_deals
        accepted_deals = [
            [
                opp.product_description,
                f"${opp.price:.2f}",
                f"${opp.estimate:.2f}",
                f"${opp.discount:.2f}",
                opp.url,
            ]
            for opp in results
        ]

    except Exception as e:
        log_queue.put(
            f"<span style='color:red'>❌ Error during pipeline execution: "
            f"{str(e)}</span>"
        )
        log_queue.put(f"<pre>{traceback.format_exc()}</pre>")


def validate_categories(
    selected_categories: Union[str, List[str]],
) -> Tuple[bool, Optional[str]]:
    """Validates the selected categories."""
    if isinstance(selected_categories, str):
        selected_categories = [selected_categories]

    if not selected_categories:
        return False, "⚠️ Please select at least one category before running."

    if len(selected_categories) > MAX_CATEGORY_SELECTION:
        return (
            False,
            f"⚠️ You can select up to {MAX_CATEGORY_SELECTION} categories only.",
        )

    return True, None


def check_demo_restrictions() -> Tuple[bool, Optional[str], Optional[str]]:
    """Checks if the app can run under demo restrictions."""
    can_run, message = can_run_app()

    if not can_run:
        return False, f"⚠️ {message}", None

    if IS_DEMO_VERSION:
        # Just update the run count, but use the message from can_run_app
        update_state({"run_count": get_state()["run_count"] + 1})
        # Get fresh message after updating the state
        _, status_msg = can_run_app()
    else:
        status_msg = ""

    return True, None, status_msg


def initial_ui_update(
    log_data: List[str], status_msg: str
) -> Tuple[str, str, Any, str]:
    """Returns initial UI state for the app."""
    disable_btn = gr.update(
        interactive=False, elem_classes=["run-button", "btn-disabled"]
    )
    return html_for(log_data), format_deals_table([]), disable_btn, status_msg


def run_pipeline_threaded(
    selected_categories: List[str],
    log_data: List[str],
    status_msg: str,
    enable_btn: Any,  # noqa: ANN401
) -> Generator[Tuple[str, str, Any, str], None, None]:
    """Runs pipeline in background thread with log streaming.

    Yields UI updates until completion.

    """
    thread = threading.Thread(
        target=run_pipeline, args=(log_queue, selected_categories)
    )
    thread.start()

    disable_btn = gr.update(
        interactive=False, elem_classes=["run-button", "btn-disabled"]
    )

    while thread.is_alive() or not log_queue.empty():
        while not log_queue.empty():
            log_msg = log_queue.get()
            log_data.append(log_msg)
            yield (
                html_for(log_data),
                format_deals_table(accepted_deals),
                disable_btn,
                status_msg,
            )

        if thread.is_alive():
            time.sleep(0.2)
            yield (
                html_for(log_data),
                format_deals_table(accepted_deals),
                disable_btn,
                status_msg,
            )

    # Final UI update after thread finishes
    yield html_for(log_data), format_deals_table(accepted_deals), enable_btn, status_msg


def handle_pipeline_error(
    e: Exception,
    log_data: List[str],
    enable_btn: gr.components.Component,
    status_msg: str,
) -> Tuple[str, str, gr.components.Component, str]:
    """Handles exceptions and appends error logs.

    Returns the final UI update tuple.
    """
    log_data.append(f"<span style='color:red'>❌ Unexpected error: {str(e)}</span>")
    log_data.append(f"<pre>{traceback.format_exc()}</pre>")
    return html_for(log_data), format_deals_table([]), enable_btn, status_msg


def run_and_stream_logs(
    selected_categories: Union[str, List[str]],
) -> Generator[Tuple[str, str, bool, str], None, None]:
    """Runs pipeline in a thread, streaming logs and results to the UI.

    Returns HTML logs, deal table, button state, and status message.
    """
    global accepted_deals
    accepted_deals = []
    log_data = []

    # Step 1: Validate categories
    is_valid, error_msg = validate_categories(selected_categories)
    if not is_valid:
        yield None, None, gr.update(interactive=True), error_msg
        return

    # Step 2: Check demo restrictions
    can_run, error_msg, status_msg = check_demo_restrictions()
    if not can_run:
        yield (
            html_for([error_msg]),
            format_deals_table([]),
            gr.update(interactive=True),
            error_msg,
        )
        return

    # Step 3: Initial UI update showing we're starting
    enable_btn = gr.update(interactive=True, elem_classes=["run-button"])
    yield initial_ui_update(log_data, status_msg)

    try:
        # Step 4: Run the pipeline in a thread
        yield from run_pipeline_threaded(
            selected_categories, log_data, status_msg, enable_btn
        )

    except Exception as e:
        yield handle_pipeline_error(e, log_data, enable_btn, status_msg)