import asyncio import logging import platform import re from contextlib import AsyncExitStack from pathlib import Path from typing import Literal, Optional, Self from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright from playwright.async_api import TimeoutError as PlaywrightTimeoutError from playwright_stealth import StealthConfig, stealth_async from pydantic import Field from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes from proxy_lite.logger import logger import base64 SELF_CONTAINED_TAGS = [ # many of these are non-interactive but keeping them anyway "area", "base", "br", "col", "embed", "hr", "img", "input", "link", "meta", "param", "source", "track", "wbr", ] def element_as_text( mark_id: int, tag: Optional[str] = None, text: Optional[str] = None, **raw_attributes, ) -> str: """Return a text representation of all elements on the page.""" attributes = [] for k, v in raw_attributes.items(): if v is None: continue if isinstance(v, bool): if v: attributes.append(k) # we ignore False bool attributes else: v = str(v) if len(v) > 2500: v = v[: 2500 - 1] + "…" attributes.append(f'{k}="{v}"') attributes = " ".join(attributes) attributes = (" " + attributes).rstrip() tag = tag.lower() if text is None: text = "" if len(text) > 2500: text = text[: 2500 - 1] + "…" # sub-out line breaks so elements are easier to distinguish attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes) text = re.sub(r"\r\n|\r|\n", "⏎", text) if tag in SELF_CONTAINED_TAGS: if text: logger.warning( f"Got self-contained element '{tag}' which contained text '{text}'.", ) else: return f"- [{mark_id}] <{tag}{attributes}/>" return f"- [{mark_id}] <{tag}{attributes}>{text}" class BrowserSession: def __init__( self, viewport_width: int = 1280, viewport_height: int = 720, headless: bool = True, ): self.viewport_width = viewport_width self.viewport_height = viewport_height self.headless = headless self.playwright: Playwright | None = None self.browser: Browser | None = None self.context: BrowserContext | None = None self._exit_stack: AsyncExitStack | None = None self.poi_elements: list = Field(default_factory=list) self.poi_centroids: list[Point] = Field(default_factory=list) self.bounding_boxes: list[BoundingBox] = Field(default_factory=list) self.pois: list[POI] = Field(default_factory=list) async def __aenter__(self) -> Self: self._exit_stack = AsyncExitStack() self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch(headless=self.headless) self.context = await self.browser.new_context( viewport={"width": self.viewport_width, "height": self.viewport_height}, ) # Ensure there's at least one page open if not self.context.pages: await self.context.new_page() self.context.set_default_timeout(60_000) self.current_page.set_default_timeout(60_000) await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False)) await self.context.add_init_script( path=Path(__file__).with_name("add_custom_select.js"), ) await self.context.add_init_script( path=Path(__file__).with_name("find_pois.js"), ) return self async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: if self.browser: await self.browser.close() if self.playwright: await self.playwright.stop() if self._exit_stack: await self._exit_stack.aclose() @property def current_page(self) -> Optional[Page]: if self.context and self.context.pages: return self.context.pages[-1] # Return the most recently opened page return None @property def current_url(self) -> Optional[str]: if self.current_page: return self.current_page.url return None # re-run for cases of mid-run redirects @retry( wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_delay(5), reraise=True, before_sleep=before_sleep_log(logger, logging.ERROR), ) async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]: try: # Check iframe visibility and size bounding_box = await iframe.bounding_box() if not bounding_box: return None # Skip if iframe is not visible width, height = bounding_box["width"], bounding_box["height"] if width < 50 or height < 50: return None frame = await iframe.content_frame() if not frame: return None poi = await frame.evaluate( """() => { overwriteDefaultSelectConvergence(); return findPOIsConvergence(); }""", ) if not poi: return None iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])} return poi, iframe_offset except Exception as e: logger.error(f"Error processing iframe: {e}") return None @retry( wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_delay(5), reraise=True, before_sleep=before_sleep_log(logger, logging.ERROR), ) async def update_poi(self) -> None: try: # Wait for basic page load states to ensure the DOM is ready. # This is a fundamental wait that should always apply. await self.current_page.wait_for_load_state("domcontentloaded", timeout=60000) logger.debug(f"DEBUG: wait_for_load_state('domcontentloaded') completed for {self.current_page.url}.") current_url = self.current_page.url # Define common Salesforce URL patterns for different states login_url_patterns = [ "login.salesforce.com", "identity.force.com", "auth.lightning.force.com", "setup.salesforce.com", # Sometimes a setup login redirects here temporarily "my.salesforce.com" # Your specific custom domain login redirects here ] # This is the main Salesforce Lightning application base URL, typically seen after login. # We treat this as an intermediate loading state before the specific target page. intermediate_app_url_pattern = "/one/one.app" # Check the current state of the page based on its URL is_on_login_page = any(pattern in current_url for pattern in login_url_patterns) is_on_intermediate_app_page = intermediate_app_url_pattern in current_url # Note: is_on_target_forecast_page checks if the specific target path is in the URL is_on_target_forecast_page = "/AccountForecastSettings/home" in current_url # --- CONDITIONAL WAITING LOGIC BASED ON URL --- if is_on_target_forecast_page: logger.info(f"INFO: Detected target Account Forecast Settings page: {current_url}. Waiting for content.") # When on the specific target page, wait for its content and spinners spinner_selectors = [ "div.slds-spinner_container", "div.auraLoadingBox", "div.dxp_axb_container", # Main overlay from your inspect screenshot "div.slds-sprite-astro-x-large" # Specific animated element itself ] for selector in spinner_selectors: try: await self.current_page.wait_for_selector(selector, state="hidden", timeout=5000) # Reduced timeout logger.debug(f"DEBUG: Spinner element '{selector}' became hidden for {self.current_page.url}.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: Spinner element '{selector}' not detected or did not disappear on {self.current_page.url} within 5s.") # Wait for a known element on the Account Forecast Settings page to ensure content is there. try: # Added 'h2' for section headers, and a more generic 'div[data-aura-rendered-by]' for Lightning components await self.current_page.wait_for_selector("h1.slds-page-header__title, h2, .account-forecast-settings-component, div[data-aura-rendered-by]", state="visible", timeout=15000) # Increased timeout slightly for robust content load logger.debug(f"DEBUG: Confirmed main page element visible for {self.current_page.url}.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: Main page element not visible on {self.current_page.url} within 15s. This might indicate incomplete page load despite no spinner.") elif is_on_login_page: logger.info(f"INFO: Detected Salesforce login page: {current_url}. Waiting for login elements.") # When on a login page, just wait for the login form elements to be visible try: await self.current_page.wait_for_selector("input[type='email'], input[type='password'], input[type='submit'], #username, #password, #Login", state="visible", timeout=10000) logger.debug(f"DEBUG: Login page elements visible on {self.current_page.url}.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: Login page elements not visible on {self.current_page.url} within 10s. This may happen if elements are in an iframe or if page is extremely slow.") elif is_on_intermediate_app_page: logger.info(f"INFO: Detected intermediate Salesforce Lightning app loading page: {current_url}. Waiting for network idle and app spinner.") # This is the /one/one.app page or similar. Don't wait for specific content, just general load. try: await self.current_page.wait_for_load_state("networkidle", timeout=30000) # Give it more time for network to settle logger.debug(f"DEBUG: Network idle detected on intermediate app page: {current_url}.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: Network idle timeout on intermediate app page: {current_url}. Proceeding anyway.") # Also try to wait for a common full-app spinner to disappear, if present try: await self.current_page.wait_for_selector('div.app-spinner, div.auraLoadingBox', state='hidden', timeout=15000) # Added auraLoadingBox as it might reappear logger.debug(f"DEBUG: App spinner on intermediate page became hidden.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: App spinner on intermediate page not found or did not disappear.") else: logger.info(f"INFO: Detected unhandled URL type: {current_url}. Performing generic body wait.") # Fallback for any other page, just wait for body to be visible try: await self.current_page.wait_for_selector("body", timeout=5000, state="visible") logger.debug(f"DEBUG: wait_for_selector('body', state='visible') completed for {self.current_page.url}.") except PlaywrightTimeoutError: logger.warning(f"DEBUGGING: Playwright Timeout (5s) on body selector for {self.current_page.url}. Continuing anyway.") pass except PlaywrightTimeoutError as e: logger.error(f"ERROR: Timeout waiting for page readiness for {self.current_page.url}: {e}") raise # Re-raise if essential waits fail (e.g., initial domcontentloaded) except Exception as e: logger.error(f"ERROR: An unexpected error occurred during page readiness check for {self.current_page.url}: {e}") raise # Rest of update_poi: Run the bounding box javascript code to highlight the points of interest on the page page_info = await self.current_page.evaluate( """() => { overwriteDefaultSelectConvergence(); return findPOIsConvergence(); }""", ) # Get the points of interest on the page self.poi_elements = page_info["element_descriptions"] element_centroids = page_info["element_centroids"] try: # Select all iframes on the page iframes = await self.current_page.query_selector_all("iframe") max_iframes = 10 # Define an asynchronous function to process and filter each iframe tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]] results = await asyncio.gather(*tasks) filtered_results = [result for result in results if result is not None] iframes_pois = [] iframe_offsets = [] for poi, offset in filtered_results: iframes_pois.append(poi) iframe_offsets.append(offset) # Combine the points of interest from the iframes with the main page and adjust the centroids for index, iframe_poi in enumerate(iframes_pois): self.poi_elements.extend(iframe_poi["element_descriptions"]) for centroid in iframe_poi["element_centroids"]: centroid["x"] += iframe_offsets[index]["x"] centroid["y"] += iframe_offsets[index]["y"] centroid["left"] += iframe_offsets[index]["x"] centroid["top"] += iframe_offsets[index]["y"] centroid["right"] += iframe_offsets[index]["x"] # Fix: Removed duplicate 'centroid["y"] += iframe_offsets[index]["y"]' centroid["bottom"] += iframe_offsets[index]["y"] element_centroids.extend(iframe_poi["element_centroids"]) except Exception as e: logger.error(f"Error in finding iframes: {e}") # Get the centroids of the points of interest self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids] self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)] self.pois = [ POI(info=info, element_centroid=centroid, bounding_box=bbox) for info, centroid, bbox in zip( self.poi_elements, self.poi_centroids, self.bounding_boxes, strict=False, ) ] @property def poi_text(self) -> str: # Get all points of interest on the page as text texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)] # Return formatted text of points of interest on page return "\n".join([txt for txt in texts if txt]) async def screenshot( self, delay: float = 0.0, quality: int = 70, type: str = "jpeg", scale: str = "css", ) -> tuple[bytes, bytes]: if delay > 0.0: await asyncio.sleep(delay) await self.update_poi() # Keep original logic if page is highly dynamic, but for static shots, simpler is faster # old_poi_positions = [tuple(point) for point in self.poi_centroids] img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) # Re-evaluating this block for performance. Removed redundant update_poi and conditional screenshot. # If precise screenshot timing is needed, the caller should manage delays and updates. return img, annotated_img async def goto(self, url: str) -> None: await self.current_page.goto(url, wait_until="domcontentloaded") async def reload(self) -> None: await self.current_page.reload(wait_until="domcontentloaded") async def click_tab(self, mark_id: int) -> None: point: Point = self.poi_centroids[mark_id] await self.hover(point) await self.current_page.mouse.click(*point, button="middle") async def click(self, mark_id: int) -> None: point: Point = self.poi_centroids[mark_id] await self.hover(point) await self.current_page.mouse.click(*point) async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None: await self.clear_text_field(mark_id) await self.click(mark_id) await self.current_page.keyboard.type(text) if submit: await self.current_page.keyboard.press("Enter") async def scroll( self, direction: Literal["up", "down", "left", "right"], mark_id: Optional[int] = None, ) -> None: if mark_id is None: point = Point(x=-1, y=-1) max_scroll_x = self.viewport_width max_scroll_y = self.viewport_height else: point: Point = self.poi_centroids[mark_id] bbox: BoundingBox = self.bounding_boxes[mark_id] max_scroll_x = bbox.right - bbox.left max_scroll_y = bbox.bottom - bbox.top await self.hover(point=point) scroll_x = int(max_scroll_x * 0.8) scroll_y = int(max_scroll_y * 0.8) is_vertical = direction in ("up", "down") reverse_scroll = direction in ("up", "left") await self.current_page.mouse.wheel( scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical), scroll_y * (-1 if reverse_scroll else 1) * is_vertical, ) async def go_back(self) -> None: # If there is no tab open then return if not self.current_page: return await self.current_page.go_back(wait_until="domcontentloaded") if self.current_page.url == "about:blank": if not len(self.context.pages) > 1: await self.current_page.go_forward(wait_until="domcontentloaded") raise Exception("There is no previous page to go back to.") await self.current_page.close() async def hover(self, point: Point) -> None: await self.current_page.mouse.move(*point) async def focus(self, point: Point) -> None: # Focus on the element on the page at point (x, y) await self.current_page.evaluate( """ ([x, y]) => { const element = document.elementFromPoint(x, y); if (element && element.focus) { element.focus(); } }""", tuple(point), ) async def get_text(self, mark_id: int) -> str: return await self.current_page.evaluate( """ (mark_id) => { const element = marked_elements_convergence[mark_id]; if (element && (element.value !== undefined || element.textContent !== undefined)) { return element.value || element.textContent; } return ''; } """, (mark_id,), ) async def clear_text_field(self, mark_id: int) -> None: existing_text = await self.get_text(mark_id) if existing_text.strip(): # Clear existing text only if it exists await self.click(mark_id) if platform.system() == "Darwin": # selecting all text is OS-specific await self.click(mark_id) await self.current_page.keyboard.press("Meta+a") await self.current_page.keyboard.press("Backspace") else: await self.current_page.keyboard.press("Control+Home") await self.current_page.keyboard.press("Control+Shift+End") await self.current_page.keyboard.press("Backspace") async def open_new_tab_and_go_to(self, url: str) -> None: """ Opens a new browser tab/page and navigates to the specified URL. Closes the old page if it's not the last one remaining. """ logger.info(f"Attempting to open a new tab and navigate to: {url}") new_page = await self.context.new_page() # Close the previous page if it's not the only one left in the context if len(self.context.pages) > 1 and self.current_page and self.current_page != new_page: try: await self.current_page.close() logger.debug("Closed previous page.") except Exception as e: logger.warning(f"Could not close previous page (might already be closed or detached): {e}") # After navigation, trigger POI update to reflect the new page's state await new_page.goto(url, wait_until="domcontentloaded") logger.info(f"Successfully navigated to {url} in a new tab.") # Crucial: update_poi uses self.current_page, which is now new_page implicitly await self.update_poi() if __name__ == "__main__": async def dummy_test(): async with BrowserSession(headless=False) as s: page = await s.context.new_page() await page.goto("http://google.co.uk") await asyncio.sleep(5) await page.screenshot(path="example.png") await s.update_poi() _, annotated_image = await s.screenshot() with open("output.png", "wb") as f: f.write(annotated_image) asyncio.run(dummy_test())