File size: 4,344 Bytes
8366946
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""PlanningAgent coordinates deal scanning and enrichment."""

import json
from typing import List

from rich import print_json

from src.agents.base_agent import Agent
from src.agents.deal_scanner_agent import DealScannerAgent
from src.agents.ensemble_price_agent import EnsemblePriceAgent
from src.config.constants import CURRENCY, DEAL_THRESHOLD
from src.deals.structured_deals import OpportunitiesCollection, Opportunity
from src.utils.logger import console
from src.utils.memory_utils import save_opportunities_to_memory


class PlanningAgent(Agent):
    """Create instances of the Agents that this planner coordinates across."""

    name = "Planning Agent"
    color = "cyan"

    def __init__(self) -> None:
        """Initialize agents."""
        self.log("🧠 Let’s wake up the agents β€” time to sniff out some sweet deals!")
        self.log("is ready")
        self.scanner = DealScannerAgent()
        self.ensemble = EnsemblePriceAgent()
        self.log("πŸš€ All AI Agents are caffeinated, calibrated, and ready to hustle..")

    def scan_deals(self, categories: List[str]) -> List[Opportunity]:
        """Scans deals and returns GPT-processed opportunities."""
        result = self.scanner.scan(categories)
        if result is None:
            self.log("❌ No valid deals found.")
            return []
        return result.opportunities

    def enrich(self, opportunity: Opportunity) -> Opportunity:
        """Add estimated market price and discount to an opportunity."""
        estimate = self.ensemble.price(opportunity.product_description)
        discount = round(estimate - opportunity.price, 2)
        opportunity.estimate = estimate
        opportunity.discount = discount
        return opportunity

    def _log_result(self, idx: int, opportunity: Opportunity) -> None:
        """Logs if a deal was accepted or rejected.

        Decision is based on discount vs. threshold.

        """
        if opportunity.discount >= DEAL_THRESHOLD:
            self.log(
                f"βœ… Deal #{idx} accepted β€” discount: "
                f"{CURRENCY}{opportunity.discount:.2f}"
            )
        else:
            self.log(
                f"❌ Deal #{idx} rejected β€” discount below threshold: "
                f"{CURRENCY}{opportunity.discount:.2f}"
            )

    def _report_summary(self, enriched: List[Opportunity]) -> None:
        """Display a summary of accepted opportunities after enrichment."""
        if not enriched:
            self.log("❌ No opportunities met the discount threshold.")
        else:
            for opp in enriched:
                console.print(
                    f"- {opp.product_description}\n"
                    f"  Price: {CURRENCY}{opp.price:.2f} | "
                    f"AI Estimate: {CURRENCY}{opp.estimate:.2f} | "
                    f"Discount: {CURRENCY}{opp.discount:.2f}\n"
                    f"  URL: {opp.url}\n"
                )

    def plan(self, categories: List[str]) -> List[Opportunity]:
        """Full pipeline: scan β†’ enrich β†’ filter β†’ save."""
        self.log(
            "************** SCANNING INITIATED β€” HUNTING JUICY DEALS...**************"
        )

        deals = self.scan_deals(categories)
        if not deals:
            self.log("❌ No deals found from scanner.")
            return []
        print_json(
            data=json.loads(
                OpportunitiesCollection(opportunities=deals).model_dump_json()
            )
        )  # For debugging/inspection

        self.log(
            "************** SCANNING COMPLETE β€” STARTING ENRICHMENT **************"
        )

        enriched = []
        for idx, deal in enumerate(deals, start=1):
            opportunity = self.enrich(deal)
            self._log_result(idx, opportunity)
            if opportunity.discount >= DEAL_THRESHOLD:
                enriched.append(opportunity)

        self.log(
            "************** ENRICHMENT COMPLETE β€” SAVING OPPORTUNITIES **************"
        )
        save_opportunities_to_memory([opp.model_dump() for opp in enriched])
        self.log(f"πŸ’Ύ {len(enriched)} top deals saved to memory.")

        self._report_summary(enriched)
        self.log(
            "************** βœ… MISSION COMPLETE β€” BEST DEALS LOCKED IN **************"
        )

        return enriched