snapr / src /agents /deal_scanner_agent.py
lisekarimi
Deploy version 0.1.0
8366946
"""Scan deals and return top 5 via OpenAI.
1. Fetch deals from RSS feeds.
2. Prompt OpenAI with the deal list.
3. Return top 5 detailed, clearly priced deals as structured JSON.
"""
import json
import os
from typing import Any, Dict, List, Optional
from src.agents.base_agent import Agent
from src.config.constants import DEALS_FILE
from src.deals.raw_deals import ScrapedDeal
from src.deals.structured_deals import OpportunitiesCollection
from src.models.frontier_model import OPENAI_MODEL, openai
class DealScannerAgent(Agent):
"""Agent for scanning and filtering deals.
Processes them via OpenAI.
"""
name = "Deal Scanner Agent"
color = "green"
SYSTEM_PROMPT = """
You are a deal filtering assistant.
Your task is to identify the 5 deals with the most detailed product descriptions
and clearly stated prices. Focus only on the product itself β€” not the deal terms,
discounts, or promotions.
Only include deals where the price is explicitly mentioned and easy to extract.
Avoid entries with phrases like "$XXX off" or "reduced by $XXX" β€” those are not
valid prices. Only include deals when you are confident about the actual
product price.
Respond strictly in JSON with no explanation, using the following format:
{
"deals": [
{
"product_description": "A clear, 4–5 sentence summary of the product.",
"price": 99.99,
"url": "..."
},
...
]
}"""
def __init__(self, memory_path: str = DEALS_FILE) -> None:
"""Initialize OpenAI client."""
self.openai = openai
self.memory_path = memory_path
self.log("is ready")
def _load_memory(self) -> Dict[str, List[Dict[str, Any]]]:
"""Load memory from file, returning seen URLs and the full memory."""
if os.path.exists(self.memory_path):
try:
with open(self.memory_path, "r") as f:
memory_json = json.load(f)
seen_urls = [op["url"] for op in memory_json.get("opportunities", [])]
return {
"seen_urls": seen_urls,
"memory": memory_json.get("opportunities", []),
}
except (json.JSONDecodeError, KeyError, IOError) as e:
self.log(f"Error loading memory: {e}. Creating new memory.")
return {"seen_urls": [], "memory": []}
else:
self.log("No memory file found. Assuming first run")
return {"seen_urls": [], "memory": []}
def fetch_deals(self, categories: List[str]) -> List[ScrapedDeal]:
"""Fetch new RSS deals not present in memory."""
self.log("is fetching deals from RSS feed")
# Load memory to get seen URLs
memory_data = self._load_memory()
seen_urls = set(memory_data["seen_urls"])
# Fetch all deals and filter out seen ones
try:
scraped = ScrapedDeal.fetch(categories)
result = [deal for deal in scraped if deal.url not in seen_urls]
overlap = [deal for deal in scraped if deal.url in seen_urls]
self.log(f"{len(overlap)} deals skipped")
self.log(f"{len(result)} new deals fetched")
return result
except Exception as e:
self.log(f"Error fetching deals: {e}")
return []
def make_user_prompt(self, scraped: List[ScrapedDeal]) -> str:
"""Build the full user prompt for OpenAI."""
return (
"Select the 5 best deals with the clearest product descriptions "
"and exact prices. Here is the list:\n\n"
+ "\n\n".join(deal.describe() for deal in scraped)
)
def scan(self, categories: List[str]) -> Optional[OpportunitiesCollection]:
"""Return top 5 new deals."""
# Step 1: Fetch new deals not already in memory
scraped = self.fetch_deals(categories)
if not scraped:
self.log("❌ found no new deals to process ")
return None
# Step 2: Construct prompt with all new deals
user_prompt = self.make_user_prompt(scraped)
# Step 3: Call OpenAI - allow RuntimeError to propagate
result = self._call_openai(user_prompt)
# Step 4: Filter out invalid deals
filtered_result = self._filter_invalid_deals(result)
return filtered_result if filtered_result.opportunities else None
def _call_openai(self, user_prompt: str) -> OpportunitiesCollection:
"""Call OpenAI API to get the processed deals."""
self.log("πŸ“ž is calling OpenAI")
try:
result = self.openai.beta.chat.completions.parse(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
response_format=OpportunitiesCollection,
)
except Exception as e:
self.log(f"[ERROR] OpenAI call failed: {e}")
raise RuntimeError(
"DealScannerAgent failed to get response from OpenAI."
) from e
return result
def _filter_invalid_deals(
self, result: OpportunitiesCollection
) -> OpportunitiesCollection:
"""Filter out deals with invalid prices."""
result = result.choices[0].message.parsed
result.opportunities = [op for op in result.opportunities if op.price > 0]
self.log(f"βœ… received {len(result.opportunities)} valid opportunities ")
return result