Spaces:
Running
Running
File size: 5,637 Bytes
8366946 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
"""Scan deals and return top 5 via OpenAI.
1. Fetch deals from RSS feeds.
2. Prompt OpenAI with the deal list.
3. Return top 5 detailed, clearly priced deals as structured JSON.
"""
import json
import os
from typing import Any, Dict, List, Optional
from src.agents.base_agent import Agent
from src.config.constants import DEALS_FILE
from src.deals.raw_deals import ScrapedDeal
from src.deals.structured_deals import OpportunitiesCollection
from src.models.frontier_model import OPENAI_MODEL, openai
class DealScannerAgent(Agent):
"""Agent for scanning and filtering deals.
Processes them via OpenAI.
"""
name = "Deal Scanner Agent"
color = "green"
SYSTEM_PROMPT = """
You are a deal filtering assistant.
Your task is to identify the 5 deals with the most detailed product descriptions
and clearly stated prices. Focus only on the product itself β not the deal terms,
discounts, or promotions.
Only include deals where the price is explicitly mentioned and easy to extract.
Avoid entries with phrases like "$XXX off" or "reduced by $XXX" β those are not
valid prices. Only include deals when you are confident about the actual
product price.
Respond strictly in JSON with no explanation, using the following format:
{
"deals": [
{
"product_description": "A clear, 4β5 sentence summary of the product.",
"price": 99.99,
"url": "..."
},
...
]
}"""
def __init__(self, memory_path: str = DEALS_FILE) -> None:
"""Initialize OpenAI client."""
self.openai = openai
self.memory_path = memory_path
self.log("is ready")
def _load_memory(self) -> Dict[str, List[Dict[str, Any]]]:
"""Load memory from file, returning seen URLs and the full memory."""
if os.path.exists(self.memory_path):
try:
with open(self.memory_path, "r") as f:
memory_json = json.load(f)
seen_urls = [op["url"] for op in memory_json.get("opportunities", [])]
return {
"seen_urls": seen_urls,
"memory": memory_json.get("opportunities", []),
}
except (json.JSONDecodeError, KeyError, IOError) as e:
self.log(f"Error loading memory: {e}. Creating new memory.")
return {"seen_urls": [], "memory": []}
else:
self.log("No memory file found. Assuming first run")
return {"seen_urls": [], "memory": []}
def fetch_deals(self, categories: List[str]) -> List[ScrapedDeal]:
"""Fetch new RSS deals not present in memory."""
self.log("is fetching deals from RSS feed")
# Load memory to get seen URLs
memory_data = self._load_memory()
seen_urls = set(memory_data["seen_urls"])
# Fetch all deals and filter out seen ones
try:
scraped = ScrapedDeal.fetch(categories)
result = [deal for deal in scraped if deal.url not in seen_urls]
overlap = [deal for deal in scraped if deal.url in seen_urls]
self.log(f"{len(overlap)} deals skipped")
self.log(f"{len(result)} new deals fetched")
return result
except Exception as e:
self.log(f"Error fetching deals: {e}")
return []
def make_user_prompt(self, scraped: List[ScrapedDeal]) -> str:
"""Build the full user prompt for OpenAI."""
return (
"Select the 5 best deals with the clearest product descriptions "
"and exact prices. Here is the list:\n\n"
+ "\n\n".join(deal.describe() for deal in scraped)
)
def scan(self, categories: List[str]) -> Optional[OpportunitiesCollection]:
"""Return top 5 new deals."""
# Step 1: Fetch new deals not already in memory
scraped = self.fetch_deals(categories)
if not scraped:
self.log("β found no new deals to process ")
return None
# Step 2: Construct prompt with all new deals
user_prompt = self.make_user_prompt(scraped)
# Step 3: Call OpenAI - allow RuntimeError to propagate
result = self._call_openai(user_prompt)
# Step 4: Filter out invalid deals
filtered_result = self._filter_invalid_deals(result)
return filtered_result if filtered_result.opportunities else None
def _call_openai(self, user_prompt: str) -> OpportunitiesCollection:
"""Call OpenAI API to get the processed deals."""
self.log("π is calling OpenAI")
try:
result = self.openai.beta.chat.completions.parse(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
response_format=OpportunitiesCollection,
)
except Exception as e:
self.log(f"[ERROR] OpenAI call failed: {e}")
raise RuntimeError(
"DealScannerAgent failed to get response from OpenAI."
) from e
return result
def _filter_invalid_deals(
self, result: OpportunitiesCollection
) -> OpportunitiesCollection:
"""Filter out deals with invalid prices."""
result = result.choices[0].message.parsed
result.opportunities = [op for op in result.opportunities if op.price > 0]
self.log(f"β
received {len(result.opportunities)} valid opportunities ")
return result
|