Spaces:
Running
Running
File size: 4,344 Bytes
8366946 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
"""PlanningAgent coordinates deal scanning and enrichment."""
import json
from typing import List
from rich import print_json
from src.agents.base_agent import Agent
from src.agents.deal_scanner_agent import DealScannerAgent
from src.agents.ensemble_price_agent import EnsemblePriceAgent
from src.config.constants import CURRENCY, DEAL_THRESHOLD
from src.deals.structured_deals import OpportunitiesCollection, Opportunity
from src.utils.logger import console
from src.utils.memory_utils import save_opportunities_to_memory
class PlanningAgent(Agent):
"""Create instances of the Agents that this planner coordinates across."""
name = "Planning Agent"
color = "cyan"
def __init__(self) -> None:
"""Initialize agents."""
self.log("π§ Letβs wake up the agents β time to sniff out some sweet deals!")
self.log("is ready")
self.scanner = DealScannerAgent()
self.ensemble = EnsemblePriceAgent()
self.log("π All AI Agents are caffeinated, calibrated, and ready to hustle..")
def scan_deals(self, categories: List[str]) -> List[Opportunity]:
"""Scans deals and returns GPT-processed opportunities."""
result = self.scanner.scan(categories)
if result is None:
self.log("β No valid deals found.")
return []
return result.opportunities
def enrich(self, opportunity: Opportunity) -> Opportunity:
"""Add estimated market price and discount to an opportunity."""
estimate = self.ensemble.price(opportunity.product_description)
discount = round(estimate - opportunity.price, 2)
opportunity.estimate = estimate
opportunity.discount = discount
return opportunity
def _log_result(self, idx: int, opportunity: Opportunity) -> None:
"""Logs if a deal was accepted or rejected.
Decision is based on discount vs. threshold.
"""
if opportunity.discount >= DEAL_THRESHOLD:
self.log(
f"β
Deal #{idx} accepted β discount: "
f"{CURRENCY}{opportunity.discount:.2f}"
)
else:
self.log(
f"β Deal #{idx} rejected β discount below threshold: "
f"{CURRENCY}{opportunity.discount:.2f}"
)
def _report_summary(self, enriched: List[Opportunity]) -> None:
"""Display a summary of accepted opportunities after enrichment."""
if not enriched:
self.log("β No opportunities met the discount threshold.")
else:
for opp in enriched:
console.print(
f"- {opp.product_description}\n"
f" Price: {CURRENCY}{opp.price:.2f} | "
f"AI Estimate: {CURRENCY}{opp.estimate:.2f} | "
f"Discount: {CURRENCY}{opp.discount:.2f}\n"
f" URL: {opp.url}\n"
)
def plan(self, categories: List[str]) -> List[Opportunity]:
"""Full pipeline: scan β enrich β filter β save."""
self.log(
"************** SCANNING INITIATED β HUNTING JUICY DEALS...**************"
)
deals = self.scan_deals(categories)
if not deals:
self.log("β No deals found from scanner.")
return []
print_json(
data=json.loads(
OpportunitiesCollection(opportunities=deals).model_dump_json()
)
) # For debugging/inspection
self.log(
"************** SCANNING COMPLETE β STARTING ENRICHMENT **************"
)
enriched = []
for idx, deal in enumerate(deals, start=1):
opportunity = self.enrich(deal)
self._log_result(idx, opportunity)
if opportunity.discount >= DEAL_THRESHOLD:
enriched.append(opportunity)
self.log(
"************** ENRICHMENT COMPLETE β SAVING OPPORTUNITIES **************"
)
save_opportunities_to_memory([opp.model_dump() for opp in enriched])
self.log(f"πΎ {len(enriched)} top deals saved to memory.")
self._report_summary(enriched)
self.log(
"************** β
MISSION COMPLETE β BEST DEALS LOCKED IN **************"
)
return enriched
|