Trisha Tomy
hopefully working headless remotely
7b40088
import asyncio
import logging
import platform
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Literal, Optional, Self
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright_stealth import StealthConfig, stealth_async
from pydantic import Field
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential
from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes
from proxy_lite.logger import logger
import base64
SELF_CONTAINED_TAGS = [
# many of these are non-interactive but keeping them anyway
"area",
"base",
"br",
"col",
"embed",
"hr",
"img",
"input",
"link",
"meta",
"param",
"source",
"track",
"wbr",
]
def element_as_text(
mark_id: int,
tag: Optional[str] = None,
text: Optional[str] = None,
**raw_attributes,
) -> str:
"""Return a text representation of all elements on the page."""
attributes = []
for k, v in raw_attributes.items():
if v is None:
continue
if isinstance(v, bool):
if v:
attributes.append(k)
# we ignore False bool attributes
else:
v = str(v)
if len(v) > 2500:
v = v[: 2500 - 1] + "…"
attributes.append(f'{k}="{v}"')
attributes = " ".join(attributes)
attributes = (" " + attributes).rstrip()
tag = tag.lower()
if text is None:
text = ""
if len(text) > 2500:
text = text[: 2500 - 1] + "…"
# sub-out line breaks so elements are easier to distinguish
attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes)
text = re.sub(r"\r\n|\r|\n", "⏎", text)
if tag in SELF_CONTAINED_TAGS:
if text:
logger.warning(
f"Got self-contained element '{tag}' which contained text '{text}'.",
)
else:
return f"- [{mark_id}] <{tag}{attributes}/>"
return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>"
class BrowserSession:
def __init__(
self,
viewport_width: int = 1280,
viewport_height: int = 720,
headless: bool = True,
):
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.headless = headless
self.playwright: Playwright | None = None
self.browser: Browser | None = None
self.context: BrowserContext | None = None
self._exit_stack: AsyncExitStack | None = None
self.poi_elements: list = Field(default_factory=list)
self.poi_centroids: list[Point] = Field(default_factory=list)
self.bounding_boxes: list[BoundingBox] = Field(default_factory=list)
self.pois: list[POI] = Field(default_factory=list)
async def __aenter__(self) -> Self:
self._exit_stack = AsyncExitStack()
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=self.headless)
self.context = await self.browser.new_context(
viewport={"width": self.viewport_width, "height": self.viewport_height},
)
# Ensure there's at least one page open
if not self.context.pages:
await self.context.new_page()
self.context.set_default_timeout(60_000)
self.current_page.set_default_timeout(60_000)
await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False))
await self.context.add_init_script(
path=Path(__file__).with_name("add_custom_select.js"),
)
await self.context.add_init_script(
path=Path(__file__).with_name("find_pois.js"),
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
if self._exit_stack:
await self._exit_stack.aclose()
@property
def current_page(self) -> Optional[Page]:
if self.context and self.context.pages:
return self.context.pages[-1] # Return the most recently opened page
return None
@property
def current_url(self) -> Optional[str]:
if self.current_page:
return self.current_page.url
return None
# re-run for cases of mid-run redirects
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]:
try:
# Check iframe visibility and size
bounding_box = await iframe.bounding_box()
if not bounding_box:
return None # Skip if iframe is not visible
width, height = bounding_box["width"], bounding_box["height"]
if width < 50 or height < 50:
return None
frame = await iframe.content_frame()
if not frame:
return None
poi = await frame.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
if not poi:
return None
iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])}
return poi, iframe_offset
except Exception as e:
logger.error(f"Error processing iframe: {e}")
return None
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def update_poi(self) -> None:
try:
# Wait for basic page load states to ensure the DOM is ready.
# This is a fundamental wait that should always apply.
await self.current_page.wait_for_load_state("domcontentloaded", timeout=60000)
logger.debug(f"DEBUG: wait_for_load_state('domcontentloaded') completed for {self.current_page.url}.")
current_url = self.current_page.url
# Define common Salesforce URL patterns for different states
login_url_patterns = [
"login.salesforce.com",
"identity.force.com",
"auth.lightning.force.com",
"setup.salesforce.com", # Sometimes a setup login redirects here temporarily
"my.salesforce.com" # Your specific custom domain login redirects here
]
# This is the main Salesforce Lightning application base URL, typically seen after login.
# We treat this as an intermediate loading state before the specific target page.
intermediate_app_url_pattern = "/one/one.app"
# Check the current state of the page based on its URL
is_on_login_page = any(pattern in current_url for pattern in login_url_patterns)
is_on_intermediate_app_page = intermediate_app_url_pattern in current_url
# Note: is_on_target_forecast_page checks if the specific target path is in the URL
is_on_target_forecast_page = "/AccountForecastSettings/home" in current_url
# --- CONDITIONAL WAITING LOGIC BASED ON URL ---
if is_on_target_forecast_page:
logger.info(f"INFO: Detected target Account Forecast Settings page: {current_url}. Waiting for content.")
# When on the specific target page, wait for its content and spinners
spinner_selectors = [
"div.slds-spinner_container",
"div.auraLoadingBox",
"div.dxp_axb_container", # Main overlay from your inspect screenshot
"div.slds-sprite-astro-x-large" # Specific animated element itself
]
for selector in spinner_selectors:
try:
await self.current_page.wait_for_selector(selector, state="hidden", timeout=5000) # Reduced timeout
logger.debug(f"DEBUG: Spinner element '{selector}' became hidden for {self.current_page.url}.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: Spinner element '{selector}' not detected or did not disappear on {self.current_page.url} within 5s.")
# Wait for a known element on the Account Forecast Settings page to ensure content is there.
try:
# Added 'h2' for section headers, and a more generic 'div[data-aura-rendered-by]' for Lightning components
await self.current_page.wait_for_selector("h1.slds-page-header__title, h2, .account-forecast-settings-component, div[data-aura-rendered-by]", state="visible", timeout=15000) # Increased timeout slightly for robust content load
logger.debug(f"DEBUG: Confirmed main page element visible for {self.current_page.url}.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: Main page element not visible on {self.current_page.url} within 15s. This might indicate incomplete page load despite no spinner.")
elif is_on_login_page:
logger.info(f"INFO: Detected Salesforce login page: {current_url}. Waiting for login elements.")
# When on a login page, just wait for the login form elements to be visible
try:
await self.current_page.wait_for_selector("input[type='email'], input[type='password'], input[type='submit'], #username, #password, #Login", state="visible", timeout=10000)
logger.debug(f"DEBUG: Login page elements visible on {self.current_page.url}.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: Login page elements not visible on {self.current_page.url} within 10s. This may happen if elements are in an iframe or if page is extremely slow.")
elif is_on_intermediate_app_page:
logger.info(f"INFO: Detected intermediate Salesforce Lightning app loading page: {current_url}. Waiting for network idle and app spinner.")
# This is the /one/one.app page or similar. Don't wait for specific content, just general load.
try:
await self.current_page.wait_for_load_state("networkidle", timeout=30000) # Give it more time for network to settle
logger.debug(f"DEBUG: Network idle detected on intermediate app page: {current_url}.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: Network idle timeout on intermediate app page: {current_url}. Proceeding anyway.")
# Also try to wait for a common full-app spinner to disappear, if present
try:
await self.current_page.wait_for_selector('div.app-spinner, div.auraLoadingBox', state='hidden', timeout=15000) # Added auraLoadingBox as it might reappear
logger.debug(f"DEBUG: App spinner on intermediate page became hidden.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: App spinner on intermediate page not found or did not disappear.")
else:
logger.info(f"INFO: Detected unhandled URL type: {current_url}. Performing generic body wait.")
# Fallback for any other page, just wait for body to be visible
try:
await self.current_page.wait_for_selector("body", timeout=5000, state="visible")
logger.debug(f"DEBUG: wait_for_selector('body', state='visible') completed for {self.current_page.url}.")
except PlaywrightTimeoutError:
logger.warning(f"DEBUGGING: Playwright Timeout (5s) on body selector for {self.current_page.url}. Continuing anyway.")
pass
except PlaywrightTimeoutError as e:
logger.error(f"ERROR: Timeout waiting for page readiness for {self.current_page.url}: {e}")
raise # Re-raise if essential waits fail (e.g., initial domcontentloaded)
except Exception as e:
logger.error(f"ERROR: An unexpected error occurred during page readiness check for {self.current_page.url}: {e}")
raise
# Rest of update_poi: Run the bounding box javascript code to highlight the points of interest on the page
page_info = await self.current_page.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
# Get the points of interest on the page
self.poi_elements = page_info["element_descriptions"]
element_centroids = page_info["element_centroids"]
try:
# Select all iframes on the page
iframes = await self.current_page.query_selector_all("iframe")
max_iframes = 10
# Define an asynchronous function to process and filter each iframe
tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
iframes_pois = []
iframe_offsets = []
for poi, offset in filtered_results:
iframes_pois.append(poi)
iframe_offsets.append(offset)
# Combine the points of interest from the iframes with the main page and adjust the centroids
for index, iframe_poi in enumerate(iframes_pois):
self.poi_elements.extend(iframe_poi["element_descriptions"])
for centroid in iframe_poi["element_centroids"]:
centroid["x"] += iframe_offsets[index]["x"]
centroid["y"] += iframe_offsets[index]["y"]
centroid["left"] += iframe_offsets[index]["x"]
centroid["top"] += iframe_offsets[index]["y"]
centroid["right"] += iframe_offsets[index]["x"]
# Fix: Removed duplicate 'centroid["y"] += iframe_offsets[index]["y"]'
centroid["bottom"] += iframe_offsets[index]["y"]
element_centroids.extend(iframe_poi["element_centroids"])
except Exception as e:
logger.error(f"Error in finding iframes: {e}")
# Get the centroids of the points of interest
self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids]
self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)]
self.pois = [
POI(info=info, element_centroid=centroid, bounding_box=bbox)
for info, centroid, bbox in zip(
self.poi_elements,
self.poi_centroids,
self.bounding_boxes,
strict=False,
)
]
@property
def poi_text(self) -> str:
# Get all points of interest on the page as text
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)]
# Return formatted text of points of interest on page
return "\n".join([txt for txt in texts if txt])
async def screenshot(
self,
delay: float = 0.0,
quality: int = 70,
type: str = "jpeg",
scale: str = "css",
) -> tuple[bytes, bytes]:
if delay > 0.0:
await asyncio.sleep(delay)
await self.update_poi()
# Keep original logic if page is highly dynamic, but for static shots, simpler is faster
# old_poi_positions = [tuple(point) for point in self.poi_centroids]
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
# Re-evaluating this block for performance. Removed redundant update_poi and conditional screenshot.
# If precise screenshot timing is needed, the caller should manage delays and updates.
return img, annotated_img
async def goto(self, url: str) -> None:
await self.current_page.goto(url, wait_until="domcontentloaded")
async def reload(self) -> None:
await self.current_page.reload(wait_until="domcontentloaded")
async def click_tab(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point, button="middle")
async def click(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point)
async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None:
await self.clear_text_field(mark_id)
await self.click(mark_id)
await self.current_page.keyboard.type(text)
if submit:
await self.current_page.keyboard.press("Enter")
async def scroll(
self,
direction: Literal["up", "down", "left", "right"],
mark_id: Optional[int] = None,
) -> None:
if mark_id is None:
point = Point(x=-1, y=-1)
max_scroll_x = self.viewport_width
max_scroll_y = self.viewport_height
else:
point: Point = self.poi_centroids[mark_id]
bbox: BoundingBox = self.bounding_boxes[mark_id]
max_scroll_x = bbox.right - bbox.left
max_scroll_y = bbox.bottom - bbox.top
await self.hover(point=point)
scroll_x = int(max_scroll_x * 0.8)
scroll_y = int(max_scroll_y * 0.8)
is_vertical = direction in ("up", "down")
reverse_scroll = direction in ("up", "left")
await self.current_page.mouse.wheel(
scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical),
scroll_y * (-1 if reverse_scroll else 1) * is_vertical,
)
async def go_back(self) -> None:
# If there is no tab open then return
if not self.current_page:
return
await self.current_page.go_back(wait_until="domcontentloaded")
if self.current_page.url == "about:blank":
if not len(self.context.pages) > 1:
await self.current_page.go_forward(wait_until="domcontentloaded")
raise Exception("There is no previous page to go back to.")
await self.current_page.close()
async def hover(self, point: Point) -> None:
await self.current_page.mouse.move(*point)
async def focus(self, point: Point) -> None:
# Focus on the element on the page at point (x, y)
await self.current_page.evaluate(
"""
([x, y]) => {
const element = document.elementFromPoint(x, y);
if (element && element.focus) {
element.focus();
}
}""",
tuple(point),
)
async def get_text(self, mark_id: int) -> str:
return await self.current_page.evaluate(
"""
(mark_id) => {
const element = marked_elements_convergence[mark_id];
if (element && (element.value !== undefined || element.textContent !== undefined)) {
return element.value || element.textContent;
}
return '';
}
""",
(mark_id,),
)
async def clear_text_field(self, mark_id: int) -> None:
existing_text = await self.get_text(mark_id)
if existing_text.strip():
# Clear existing text only if it exists
await self.click(mark_id)
if platform.system() == "Darwin": # selecting all text is OS-specific
await self.click(mark_id)
await self.current_page.keyboard.press("Meta+a")
await self.current_page.keyboard.press("Backspace")
else:
await self.current_page.keyboard.press("Control+Home")
await self.current_page.keyboard.press("Control+Shift+End")
await self.current_page.keyboard.press("Backspace")
async def open_new_tab_and_go_to(self, url: str) -> None:
"""
Opens a new browser tab/page and navigates to the specified URL.
Closes the old page if it's not the last one remaining.
"""
logger.info(f"Attempting to open a new tab and navigate to: {url}")
new_page = await self.context.new_page()
# Close the previous page if it's not the only one left in the context
if len(self.context.pages) > 1 and self.current_page and self.current_page != new_page:
try:
await self.current_page.close()
logger.debug("Closed previous page.")
except Exception as e:
logger.warning(f"Could not close previous page (might already be closed or detached): {e}")
# After navigation, trigger POI update to reflect the new page's state
await new_page.goto(url, wait_until="domcontentloaded")
logger.info(f"Successfully navigated to {url} in a new tab.")
# Crucial: update_poi uses self.current_page, which is now new_page implicitly
await self.update_poi()
if __name__ == "__main__":
async def dummy_test():
async with BrowserSession(headless=False) as s:
page = await s.context.new_page()
await page.goto("http://google.co.uk")
await asyncio.sleep(5)
await page.screenshot(path="example.png")
await s.update_poi()
_, annotated_image = await s.screenshot()
with open("output.png", "wb") as f:
f.write(annotated_image)
asyncio.run(dummy_test())