File size: 5,637 Bytes
8366946
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""Scan deals and return top 5 via OpenAI.

1. Fetch deals from RSS feeds.
2. Prompt OpenAI with the deal list.
3. Return top 5 detailed, clearly priced deals as structured JSON.
"""

import json
import os
from typing import Any, Dict, List, Optional

from src.agents.base_agent import Agent
from src.config.constants import DEALS_FILE
from src.deals.raw_deals import ScrapedDeal
from src.deals.structured_deals import OpportunitiesCollection
from src.models.frontier_model import OPENAI_MODEL, openai


class DealScannerAgent(Agent):
    """Agent for scanning and filtering deals.

    Processes them via OpenAI.
    """

    name = "Deal Scanner Agent"
    color = "green"

    SYSTEM_PROMPT = """
    You are a deal filtering assistant.

    Your task is to identify the 5 deals with the most detailed product descriptions 
    and clearly stated prices. Focus only on the product itself β€” not the deal terms, 
    discounts, or promotions.

    Only include deals where the price is explicitly mentioned and easy to extract. 
    Avoid entries with phrases like "$XXX off" or "reduced by $XXX" β€” those are not 
    valid prices. Only include deals when you are confident about the actual 
    product price.

    Respond strictly in JSON with no explanation, using the following format:

    {
    "deals": [
        {
        "product_description": "A clear, 4–5 sentence summary of the product.",
        "price": 99.99,
        "url": "..."
        },
        ...
    ]
    }"""

    def __init__(self, memory_path: str = DEALS_FILE) -> None:
        """Initialize OpenAI client."""
        self.openai = openai
        self.memory_path = memory_path
        self.log("is ready")

    def _load_memory(self) -> Dict[str, List[Dict[str, Any]]]:
        """Load memory from file, returning seen URLs and the full memory."""
        if os.path.exists(self.memory_path):
            try:
                with open(self.memory_path, "r") as f:
                    memory_json = json.load(f)
                seen_urls = [op["url"] for op in memory_json.get("opportunities", [])]
                return {
                    "seen_urls": seen_urls,
                    "memory": memory_json.get("opportunities", []),
                }
            except (json.JSONDecodeError, KeyError, IOError) as e:
                self.log(f"Error loading memory: {e}. Creating new memory.")
                return {"seen_urls": [], "memory": []}
        else:
            self.log("No memory file found. Assuming first run")
            return {"seen_urls": [], "memory": []}

    def fetch_deals(self, categories: List[str]) -> List[ScrapedDeal]:
        """Fetch new RSS deals not present in memory."""
        self.log("is fetching deals from RSS feed")

        # Load memory to get seen URLs
        memory_data = self._load_memory()
        seen_urls = set(memory_data["seen_urls"])

        # Fetch all deals and filter out seen ones
        try:
            scraped = ScrapedDeal.fetch(categories)
            result = [deal for deal in scraped if deal.url not in seen_urls]
            overlap = [deal for deal in scraped if deal.url in seen_urls]
            self.log(f"{len(overlap)} deals skipped")
            self.log(f"{len(result)} new deals fetched")
            return result
        except Exception as e:
            self.log(f"Error fetching deals: {e}")
            return []

    def make_user_prompt(self, scraped: List[ScrapedDeal]) -> str:
        """Build the full user prompt for OpenAI."""
        return (
            "Select the 5 best deals with the clearest product descriptions "
            "and exact prices. Here is the list:\n\n"
            + "\n\n".join(deal.describe() for deal in scraped)
        )

    def scan(self, categories: List[str]) -> Optional[OpportunitiesCollection]:
        """Return top 5 new deals."""
        # Step 1: Fetch new deals not already in memory
        scraped = self.fetch_deals(categories)
        if not scraped:
            self.log("❌ found no new deals to process ")
            return None

        # Step 2: Construct prompt with all new deals
        user_prompt = self.make_user_prompt(scraped)

        # Step 3: Call OpenAI - allow RuntimeError to propagate
        result = self._call_openai(user_prompt)

        # Step 4: Filter out invalid deals
        filtered_result = self._filter_invalid_deals(result)

        return filtered_result if filtered_result.opportunities else None

    def _call_openai(self, user_prompt: str) -> OpportunitiesCollection:
        """Call OpenAI API to get the processed deals."""
        self.log("πŸ“ž is calling OpenAI")
        try:
            result = self.openai.beta.chat.completions.parse(
                model=OPENAI_MODEL,
                messages=[
                    {"role": "system", "content": self.SYSTEM_PROMPT},
                    {"role": "user", "content": user_prompt},
                ],
                response_format=OpportunitiesCollection,
            )
        except Exception as e:
            self.log(f"[ERROR] OpenAI call failed: {e}")
            raise RuntimeError(
                "DealScannerAgent failed to get response from OpenAI."
            ) from e
        return result

    def _filter_invalid_deals(
        self, result: OpportunitiesCollection
    ) -> OpportunitiesCollection:
        """Filter out deals with invalid prices."""
        result = result.choices[0].message.parsed
        result.opportunities = [op for op in result.opportunities if op.price > 0]
        self.log(f"βœ… received {len(result.opportunities)} valid opportunities ")
        return result