File size: 22,524 Bytes
6a0e448
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
import asyncio
import logging
import platform
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Literal, Optional, Self

from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright_stealth import StealthConfig, stealth_async
from pydantic import Field
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential

from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes
from proxy_lite.logger import logger

import base64

SELF_CONTAINED_TAGS = [
    # many of these are non-interactive but keeping them anyway
    "area",
    "base",
    "br",
    "col",
    "embed",
    "hr",
    "img",
    "input",
    "link",
    "meta",
    "param",
    "source",
    "track",
    "wbr",
]


def element_as_text(
    mark_id: int,
    tag: Optional[str] = None,
    text: Optional[str] = None,
    **raw_attributes,
) -> str:
    """Return a text representation of all elements on the page."""
    attributes = []
    for k, v in raw_attributes.items():
        if v is None:
            continue
        if isinstance(v, bool):
            if v:
                attributes.append(k)
            # we ignore False bool attributes
        else:
            v = str(v)
            if len(v) > 2500:
                v = v[: 2500 - 1] + "…"
            attributes.append(f'{k}="{v}"')
    attributes = " ".join(attributes)
    attributes = (" " + attributes).rstrip()
    tag = tag.lower()
    if text is None:
        text = ""
    if len(text) > 2500:
        text = text[: 2500 - 1] + "…"

    # sub-out line breaks so elements are easier to distinguish
    attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes)
    text = re.sub(r"\r\n|\r|\n", "⏎", text)

    if tag in SELF_CONTAINED_TAGS:
        if text:
            logger.warning(
                f"Got self-contained element '{tag}' which contained text '{text}'.",
            )
        else:
            return f"- [{mark_id}] <{tag}{attributes}/>"
    return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>"


class BrowserSession:
    def __init__(
        self,
        viewport_width: int = 1280,
        viewport_height: int = 720,
        headless: bool = True,
    ):
        self.viewport_width = viewport_width
        self.viewport_height = viewport_height
        self.headless = headless
        self.playwright: Playwright | None = None
        self.browser: Browser | None = None
        self.context: BrowserContext | None = None
        self._exit_stack: AsyncExitStack | None = None

        self.poi_elements: list = Field(default_factory=list)
        self.poi_centroids: list[Point] = Field(default_factory=list)
        self.bounding_boxes: list[BoundingBox] = Field(default_factory=list)
        self.pois: list[POI] = Field(default_factory=list)

    async def __aenter__(self) -> Self:
        self._exit_stack = AsyncExitStack()
        self.playwright = await async_playwright().start()

        self.browser = await self.playwright.chromium.launch(headless=self.headless)
        self.context = await self.browser.new_context(
            viewport={"width": self.viewport_width, "height": self.viewport_height},
        )
        # Ensure there's at least one page open
        if not self.context.pages:
            await self.context.new_page()

        self.context.set_default_timeout(60_000)
        self.current_page.set_default_timeout(60_000)
        await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False))
        await self.context.add_init_script(
            path=Path(__file__).with_name("add_custom_select.js"),
        )
        await self.context.add_init_script(
            path=Path(__file__).with_name("find_pois.js"),
        )

        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if self.browser:
            await self.browser.close()
        if self.playwright:
            await self.playwright.stop()
        if self._exit_stack:
            await self._exit_stack.aclose()

    @property
    def current_page(self) -> Optional[Page]:
        if self.context and self.context.pages:
            return self.context.pages[-1] # Return the most recently opened page
        return None

    @property 
    def current_url(self) -> Optional[str]:
        if self.current_page:
            return self.current_page.url
        return None

    # re-run for cases of mid-run redirects
    @retry(
        wait=wait_exponential(multiplier=1, min=1, max=10),
        stop=stop_after_delay(5),
        reraise=True,
        before_sleep=before_sleep_log(logger, logging.ERROR),
    )
    async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]:
        try:
            # Check iframe visibility and size
            bounding_box = await iframe.bounding_box()
            if not bounding_box:
                return None  # Skip if iframe is not visible

            width, height = bounding_box["width"], bounding_box["height"]
            if width < 50 or height < 50:
                return None

            frame = await iframe.content_frame()
            if not frame:
                return None

            poi = await frame.evaluate(
                """() => {
                    overwriteDefaultSelectConvergence();
                    return findPOIsConvergence();
                }""",
            )
            if not poi:
                return None

            iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])}
            return poi, iframe_offset
        except Exception as e:
            logger.error(f"Error processing iframe: {e}")
            return None

    @retry(
        wait=wait_exponential(multiplier=1, min=1, max=10),
        stop=stop_after_delay(5),
        reraise=True,
        before_sleep=before_sleep_log(logger, logging.ERROR),
    )
    async def update_poi(self) -> None:
        try:
            # Wait for basic page load states to ensure the DOM is ready.
            # This is a fundamental wait that should always apply.
            await self.current_page.wait_for_load_state("domcontentloaded", timeout=60000)
            logger.debug(f"DEBUG: wait_for_load_state('domcontentloaded') completed for {self.current_page.url}.")

            current_url = self.current_page.url
            
            # Define common Salesforce URL patterns for different states
            login_url_patterns = [
                "login.salesforce.com",
                "identity.force.com",
                "auth.lightning.force.com",
                "setup.salesforce.com", # Sometimes a setup login redirects here temporarily
                "my.salesforce.com" # Your specific custom domain login redirects here
            ]
            
            # This is the main Salesforce Lightning application base URL, typically seen after login.
            # We treat this as an intermediate loading state before the specific target page.
            intermediate_app_url_pattern = "/one/one.app" 
            
            # Check the current state of the page based on its URL
            is_on_login_page = any(pattern in current_url for pattern in login_url_patterns)
            is_on_intermediate_app_page = intermediate_app_url_pattern in current_url 
            # Note: is_on_target_forecast_page checks if the specific target path is in the URL
            is_on_target_forecast_page = "/AccountForecastSettings/home" in current_url

            # --- CONDITIONAL WAITING LOGIC BASED ON URL ---
            if is_on_target_forecast_page:
                logger.info(f"INFO: Detected target Account Forecast Settings page: {current_url}. Waiting for content.")
                # When on the specific target page, wait for its content and spinners
                spinner_selectors = [
                    "div.slds-spinner_container",
                    "div.auraLoadingBox",
                    "div.dxp_axb_container", # Main overlay from your inspect screenshot
                    "div.slds-sprite-astro-x-large" # Specific animated element itself
                ]
                for selector in spinner_selectors:
                    try:
                        await self.current_page.wait_for_selector(selector, state="hidden", timeout=5000) # Reduced timeout
                        logger.debug(f"DEBUG: Spinner element '{selector}' became hidden for {self.current_page.url}.")
                    except PlaywrightTimeoutError:
                        logger.warning(f"DEBUGGING: Spinner element '{selector}' not detected or did not disappear on {self.current_page.url} within 5s.")
                
                # Wait for a known element on the Account Forecast Settings page to ensure content is there.
                try:
                    # Added 'h2' for section headers, and a more generic 'div[data-aura-rendered-by]' for Lightning components
                    await self.current_page.wait_for_selector("h1.slds-page-header__title, h2, .account-forecast-settings-component, div[data-aura-rendered-by]", state="visible", timeout=15000) # Increased timeout slightly for robust content load
                    logger.debug(f"DEBUG: Confirmed main page element visible for {self.current_page.url}.")
                except PlaywrightTimeoutError:
                    logger.warning(f"DEBUGGING: Main page element not visible on {self.current_page.url} within 15s. This might indicate incomplete page load despite no spinner.")
            
            elif is_on_login_page:
                logger.info(f"INFO: Detected Salesforce login page: {current_url}. Waiting for login elements.")
                # When on a login page, just wait for the login form elements to be visible
                try:
                    await self.current_page.wait_for_selector("input[type='email'], input[type='password'], input[type='submit'], #username, #password, #Login", state="visible", timeout=10000)
                    logger.debug(f"DEBUG: Login page elements visible on {self.current_page.url}.")
                except PlaywrightTimeoutError:
                    logger.warning(f"DEBUGGING: Login page elements not visible on {self.current_page.url} within 10s. This may happen if elements are in an iframe or if page is extremely slow.")
            
            elif is_on_intermediate_app_page:
                logger.info(f"INFO: Detected intermediate Salesforce Lightning app loading page: {current_url}. Waiting for network idle and app spinner.")
                # This is the /one/one.app page or similar. Don't wait for specific content, just general load.
                try:
                    await self.current_page.wait_for_load_state("networkidle", timeout=30000) # Give it more time for network to settle
                    logger.debug(f"DEBUG: Network idle detected on intermediate app page: {current_url}.")
                except PlaywrightTimeoutError:
                    logger.warning(f"DEBUGGING: Network idle timeout on intermediate app page: {current_url}. Proceeding anyway.")
                
                # Also try to wait for a common full-app spinner to disappear, if present
                try:
                    await self.current_page.wait_for_selector('div.app-spinner, div.auraLoadingBox', state='hidden', timeout=15000) # Added auraLoadingBox as it might reappear
                    logger.debug(f"DEBUG: App spinner on intermediate page became hidden.")
                except PlaywrightTimeoutError:
                    logger.warning(f"DEBUGGING: App spinner on intermediate page not found or did not disappear.")
                
            else:
                logger.info(f"INFO: Detected unhandled URL type: {current_url}. Performing generic body wait.")
                # Fallback for any other page, just wait for body to be visible
                try:
                    await self.current_page.wait_for_selector("body", timeout=5000, state="visible")
                    logger.debug(f"DEBUG: wait_for_selector('body', state='visible') completed for {self.current_page.url}.")
                except PlaywrightTimeoutError:
                    logger.warning(f"DEBUGGING: Playwright Timeout (5s) on body selector for {self.current_page.url}. Continuing anyway.")
                    pass

        except PlaywrightTimeoutError as e:
            logger.error(f"ERROR: Timeout waiting for page readiness for {self.current_page.url}: {e}")
            raise # Re-raise if essential waits fail (e.g., initial domcontentloaded)
        except Exception as e:
            logger.error(f"ERROR: An unexpected error occurred during page readiness check for {self.current_page.url}: {e}")
            raise

        # Rest of update_poi: Run the bounding box javascript code to highlight the points of interest on the page
        page_info = await self.current_page.evaluate(
            """() => {
                overwriteDefaultSelectConvergence();
                return findPOIsConvergence();
            }""",
        )
        # Get the points of interest on the page
        self.poi_elements = page_info["element_descriptions"]
        element_centroids = page_info["element_centroids"]
        try:
            # Select all iframes on the page
            iframes = await self.current_page.query_selector_all("iframe")

            max_iframes = 10

            # Define an asynchronous function to process and filter each iframe
            tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]]

            results = await asyncio.gather(*tasks)

            filtered_results = [result for result in results if result is not None]

            iframes_pois = []
            iframe_offsets = []

            for poi, offset in filtered_results:
                iframes_pois.append(poi)
                iframe_offsets.append(offset)

            # Combine the points of interest from the iframes with the main page and adjust the centroids
            for index, iframe_poi in enumerate(iframes_pois):
                self.poi_elements.extend(iframe_poi["element_descriptions"])
                for centroid in iframe_poi["element_centroids"]:
                    centroid["x"] += iframe_offsets[index]["x"]
                    centroid["y"] += iframe_offsets[index]["y"]
                    centroid["left"] += iframe_offsets[index]["x"]
                    centroid["top"] += iframe_offsets[index]["y"]
                    centroid["right"] += iframe_offsets[index]["x"]
                    # Fix: Removed duplicate 'centroid["y"] += iframe_offsets[index]["y"]'
                    centroid["bottom"] += iframe_offsets[index]["y"]
                element_centroids.extend(iframe_poi["element_centroids"])

        except Exception as e:
            logger.error(f"Error in finding iframes: {e}")

        # Get the centroids of the points of interest
        self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids]
        self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)]
        self.pois = [
            POI(info=info, element_centroid=centroid, bounding_box=bbox)
            for info, centroid, bbox in zip(
                self.poi_elements,
                self.poi_centroids,
                self.bounding_boxes,
                strict=False,
            )
        ]

    @property
    def poi_text(self) -> str:
        # Get all points of interest on the page as text
        texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)]
        # Return formatted text of points of interest on page
        return "\n".join([txt for txt in texts if txt])

    async def screenshot(
        self,
        delay: float = 0.0,
        quality: int = 70,
        type: str = "jpeg",
        scale: str = "css",
    ) -> tuple[bytes, bytes]:
        if delay > 0.0:
            await asyncio.sleep(delay)
        await self.update_poi()
        # Keep original logic if page is highly dynamic, but for static shots, simpler is faster
        # old_poi_positions = [tuple(point) for point in self.poi_centroids]
        img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
        annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
        # Re-evaluating this block for performance. Removed redundant update_poi and conditional screenshot.
        # If precise screenshot timing is needed, the caller should manage delays and updates.
        return img, annotated_img

    async def goto(self, url: str) -> None:
        await self.current_page.goto(url, wait_until="domcontentloaded")

    async def reload(self) -> None:
        await self.current_page.reload(wait_until="domcontentloaded")

    async def click_tab(self, mark_id: int) -> None:
        point: Point = self.poi_centroids[mark_id]
        await self.hover(point)
        await self.current_page.mouse.click(*point, button="middle")

    async def click(self, mark_id: int) -> None:
        point: Point = self.poi_centroids[mark_id]
        await self.hover(point)
        await self.current_page.mouse.click(*point)

    async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None:
        await self.clear_text_field(mark_id)
        await self.click(mark_id)
        await self.current_page.keyboard.type(text)

        if submit:
            await self.current_page.keyboard.press("Enter")

    async def scroll(
        self,
        direction: Literal["up", "down", "left", "right"],
        mark_id: Optional[int] = None,
    ) -> None:
        if mark_id is None:
            point = Point(x=-1, y=-1)
            max_scroll_x = self.viewport_width
            max_scroll_y = self.viewport_height
        else:
            point: Point = self.poi_centroids[mark_id]
            bbox: BoundingBox = self.bounding_boxes[mark_id]
            max_scroll_x = bbox.right - bbox.left
            max_scroll_y = bbox.bottom - bbox.top

        await self.hover(point=point)
        scroll_x = int(max_scroll_x * 0.8)
        scroll_y = int(max_scroll_y * 0.8)
        is_vertical = direction in ("up", "down")
        reverse_scroll = direction in ("up", "left")
        await self.current_page.mouse.wheel(
            scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical),
            scroll_y * (-1 if reverse_scroll else 1) * is_vertical,
        )

    async def go_back(self) -> None:
        # If there is no tab open then return
        if not self.current_page:
            return

        await self.current_page.go_back(wait_until="domcontentloaded")
        if self.current_page.url == "about:blank":
            if not len(self.context.pages) > 1:
                await self.current_page.go_forward(wait_until="domcontentloaded")
                raise Exception("There is no previous page to go back to.")
            await self.current_page.close()

    async def hover(self, point: Point) -> None:
        await self.current_page.mouse.move(*point)

    async def focus(self, point: Point) -> None:
        # Focus on the element on the page at point (x, y)
        await self.current_page.evaluate(
            """
            ([x, y]) => {
                const element = document.elementFromPoint(x, y);
                if (element && element.focus) {
                    element.focus();
                }
            }""",
            tuple(point),
        )

    async def get_text(self, mark_id: int) -> str:
        return await self.current_page.evaluate(
            """
            (mark_id) => {
                const element = marked_elements_convergence[mark_id];
                if (element && (element.value !== undefined || element.textContent !== undefined)) {
                    return element.value || element.textContent;
                }
                return '';
            }
            """,
            (mark_id,),
        )

    async def clear_text_field(self, mark_id: int) -> None:
        existing_text = await self.get_text(mark_id)
        if existing_text.strip():
            # Clear existing text only if it exists
            await self.click(mark_id)
            if platform.system() == "Darwin":  # selecting all text is OS-specific
                await self.click(mark_id)
                await self.current_page.keyboard.press("Meta+a")
                await self.current_page.keyboard.press("Backspace")
            else:
                await self.current_page.keyboard.press("Control+Home")
                await self.current_page.keyboard.press("Control+Shift+End")
            await self.current_page.keyboard.press("Backspace")
            
    async def open_new_tab_and_go_to(self, url: str) -> None:
        """
        Opens a new browser tab/page and navigates to the specified URL.
        Closes the old page if it's not the last one remaining.
        """
        logger.info(f"Attempting to open a new tab and navigate to: {url}")
        new_page = await self.context.new_page()
        
        # Close the previous page if it's not the only one left in the context
        if len(self.context.pages) > 1 and self.current_page and self.current_page != new_page:
            try:
                await self.current_page.close()
                logger.debug("Closed previous page.")
            except Exception as e:
                logger.warning(f"Could not close previous page (might already be closed or detached): {e}")

        # After navigation, trigger POI update to reflect the new page's state
        await new_page.goto(url, wait_until="domcontentloaded")
        logger.info(f"Successfully navigated to {url} in a new tab.")
        # Crucial: update_poi uses self.current_page, which is now new_page implicitly
        await self.update_poi() 


if __name__ == "__main__":

    async def dummy_test():
        async with BrowserSession(headless=False) as s:
            page = await s.context.new_page()
            await page.goto("http://google.co.uk")
            await asyncio.sleep(5)
            await page.screenshot(path="example.png")
            await s.update_poi()
            _, annotated_image = await s.screenshot()
            with open("output.png", "wb") as f:
                f.write(annotated_image)

    asyncio.run(dummy_test())