Spaces:
Running
Running
"""Scan deals and return top 5 via OpenAI. | |
1. Fetch deals from RSS feeds. | |
2. Prompt OpenAI with the deal list. | |
3. Return top 5 detailed, clearly priced deals as structured JSON. | |
""" | |
import json | |
import os | |
from typing import Any, Dict, List, Optional | |
from src.agents.base_agent import Agent | |
from src.config.constants import DEALS_FILE | |
from src.deals.raw_deals import ScrapedDeal | |
from src.deals.structured_deals import OpportunitiesCollection | |
from src.models.frontier_model import OPENAI_MODEL, openai | |
class DealScannerAgent(Agent): | |
"""Agent for scanning and filtering deals. | |
Processes them via OpenAI. | |
""" | |
name = "Deal Scanner Agent" | |
color = "green" | |
SYSTEM_PROMPT = """ | |
You are a deal filtering assistant. | |
Your task is to identify the 5 deals with the most detailed product descriptions | |
and clearly stated prices. Focus only on the product itself β not the deal terms, | |
discounts, or promotions. | |
Only include deals where the price is explicitly mentioned and easy to extract. | |
Avoid entries with phrases like "$XXX off" or "reduced by $XXX" β those are not | |
valid prices. Only include deals when you are confident about the actual | |
product price. | |
Respond strictly in JSON with no explanation, using the following format: | |
{ | |
"deals": [ | |
{ | |
"product_description": "A clear, 4β5 sentence summary of the product.", | |
"price": 99.99, | |
"url": "..." | |
}, | |
... | |
] | |
}""" | |
def __init__(self, memory_path: str = DEALS_FILE) -> None: | |
"""Initialize OpenAI client.""" | |
self.openai = openai | |
self.memory_path = memory_path | |
self.log("is ready") | |
def _load_memory(self) -> Dict[str, List[Dict[str, Any]]]: | |
"""Load memory from file, returning seen URLs and the full memory.""" | |
if os.path.exists(self.memory_path): | |
try: | |
with open(self.memory_path, "r") as f: | |
memory_json = json.load(f) | |
seen_urls = [op["url"] for op in memory_json.get("opportunities", [])] | |
return { | |
"seen_urls": seen_urls, | |
"memory": memory_json.get("opportunities", []), | |
} | |
except (json.JSONDecodeError, KeyError, IOError) as e: | |
self.log(f"Error loading memory: {e}. Creating new memory.") | |
return {"seen_urls": [], "memory": []} | |
else: | |
self.log("No memory file found. Assuming first run") | |
return {"seen_urls": [], "memory": []} | |
def fetch_deals(self, categories: List[str]) -> List[ScrapedDeal]: | |
"""Fetch new RSS deals not present in memory.""" | |
self.log("is fetching deals from RSS feed") | |
# Load memory to get seen URLs | |
memory_data = self._load_memory() | |
seen_urls = set(memory_data["seen_urls"]) | |
# Fetch all deals and filter out seen ones | |
try: | |
scraped = ScrapedDeal.fetch(categories) | |
result = [deal for deal in scraped if deal.url not in seen_urls] | |
overlap = [deal for deal in scraped if deal.url in seen_urls] | |
self.log(f"{len(overlap)} deals skipped") | |
self.log(f"{len(result)} new deals fetched") | |
return result | |
except Exception as e: | |
self.log(f"Error fetching deals: {e}") | |
return [] | |
def make_user_prompt(self, scraped: List[ScrapedDeal]) -> str: | |
"""Build the full user prompt for OpenAI.""" | |
return ( | |
"Select the 5 best deals with the clearest product descriptions " | |
"and exact prices. Here is the list:\n\n" | |
+ "\n\n".join(deal.describe() for deal in scraped) | |
) | |
def scan(self, categories: List[str]) -> Optional[OpportunitiesCollection]: | |
"""Return top 5 new deals.""" | |
# Step 1: Fetch new deals not already in memory | |
scraped = self.fetch_deals(categories) | |
if not scraped: | |
self.log("β found no new deals to process ") | |
return None | |
# Step 2: Construct prompt with all new deals | |
user_prompt = self.make_user_prompt(scraped) | |
# Step 3: Call OpenAI - allow RuntimeError to propagate | |
result = self._call_openai(user_prompt) | |
# Step 4: Filter out invalid deals | |
filtered_result = self._filter_invalid_deals(result) | |
return filtered_result if filtered_result.opportunities else None | |
def _call_openai(self, user_prompt: str) -> OpportunitiesCollection: | |
"""Call OpenAI API to get the processed deals.""" | |
self.log("π is calling OpenAI") | |
try: | |
result = self.openai.beta.chat.completions.parse( | |
model=OPENAI_MODEL, | |
messages=[ | |
{"role": "system", "content": self.SYSTEM_PROMPT}, | |
{"role": "user", "content": user_prompt}, | |
], | |
response_format=OpportunitiesCollection, | |
) | |
except Exception as e: | |
self.log(f"[ERROR] OpenAI call failed: {e}") | |
raise RuntimeError( | |
"DealScannerAgent failed to get response from OpenAI." | |
) from e | |
return result | |
def _filter_invalid_deals( | |
self, result: OpportunitiesCollection | |
) -> OpportunitiesCollection: | |
"""Filter out deals with invalid prices.""" | |
result = result.choices[0].message.parsed | |
result.opportunities = [op for op in result.opportunities if op.price > 0] | |
self.log(f"β received {len(result.opportunities)} valid opportunities ") | |
return result | |