Spaces:
Running
Running
import asyncio | |
import logging | |
import platform | |
import re | |
from contextlib import AsyncExitStack | |
from pathlib import Path | |
from typing import Literal, Optional, Self | |
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright | |
from playwright.async_api import TimeoutError as PlaywrightTimeoutError | |
from playwright_stealth import StealthConfig, stealth_async | |
from pydantic import Field | |
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential | |
from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes | |
from proxy_lite.logger import logger | |
import base64 | |
SELF_CONTAINED_TAGS = [ | |
# many of these are non-interactive but keeping them anyway | |
"area", | |
"base", | |
"br", | |
"col", | |
"embed", | |
"hr", | |
"img", | |
"input", | |
"link", | |
"meta", | |
"param", | |
"source", | |
"track", | |
"wbr", | |
] | |
def element_as_text( | |
mark_id: int, | |
tag: Optional[str] = None, | |
text: Optional[str] = None, | |
**raw_attributes, | |
) -> str: | |
"""Return a text representation of all elements on the page.""" | |
attributes = [] | |
for k, v in raw_attributes.items(): | |
if v is None: | |
continue | |
if isinstance(v, bool): | |
if v: | |
attributes.append(k) | |
# we ignore False bool attributes | |
else: | |
v = str(v) | |
if len(v) > 2500: | |
v = v[: 2500 - 1] + "…" | |
attributes.append(f'{k}="{v}"') | |
attributes = " ".join(attributes) | |
attributes = (" " + attributes).rstrip() | |
tag = tag.lower() | |
if text is None: | |
text = "" | |
if len(text) > 2500: | |
text = text[: 2500 - 1] + "…" | |
# sub-out line breaks so elements are easier to distinguish | |
attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes) | |
text = re.sub(r"\r\n|\r|\n", "⏎", text) | |
if tag in SELF_CONTAINED_TAGS: | |
if text: | |
logger.warning( | |
f"Got self-contained element '{tag}' which contained text '{text}'.", | |
) | |
else: | |
return f"- [{mark_id}] <{tag}{attributes}/>" | |
return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>" | |
class BrowserSession: | |
def __init__( | |
self, | |
viewport_width: int = 1280, | |
viewport_height: int = 720, | |
headless: bool = True, | |
): | |
self.viewport_width = viewport_width | |
self.viewport_height = viewport_height | |
self.headless = headless | |
self.playwright: Playwright | None = None | |
self.browser: Browser | None = None | |
self.context: BrowserContext | None = None | |
self._exit_stack: AsyncExitStack | None = None | |
self.poi_elements: list = Field(default_factory=list) | |
self.poi_centroids: list[Point] = Field(default_factory=list) | |
self.bounding_boxes: list[BoundingBox] = Field(default_factory=list) | |
self.pois: list[POI] = Field(default_factory=list) | |
async def __aenter__(self) -> Self: | |
self._exit_stack = AsyncExitStack() | |
self.playwright = await async_playwright().start() | |
self.browser = await self.playwright.chromium.launch(headless=self.headless) | |
self.context = await self.browser.new_context( | |
viewport={"width": self.viewport_width, "height": self.viewport_height}, | |
) | |
await self.context.new_page() | |
self.context.set_default_timeout(60_000) | |
self.current_page.set_default_timeout(60_000) | |
await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False)) | |
await self.context.add_init_script( | |
path=Path(__file__).with_name("add_custom_select.js"), | |
) | |
await self.context.add_init_script( | |
path=Path(__file__).with_name("find_pois.js"), | |
) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: | |
if self.browser: | |
await self.browser.close() | |
if self.playwright: | |
await self.playwright.stop() | |
if self._exit_stack: | |
await self._exit_stack.aclose() | |
def current_page(self) -> Optional[Page]: | |
if self.context.pages: | |
return self.context.pages[-1] | |
return None | |
def current_url(self) -> Optional[str]: | |
if self.current_page: | |
return self.current_page.url | |
return None | |
# re-run for cases of mid-run redirects | |
async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]: | |
try: | |
# Check iframe visibility and size | |
bounding_box = await iframe.bounding_box() | |
if not bounding_box: | |
return None # Skip if iframe is not visible | |
width, height = bounding_box["width"], bounding_box["height"] | |
if width < 50 or height < 50: | |
return None | |
frame = await iframe.content_frame() | |
if not frame: | |
return None | |
poi = await frame.evaluate( | |
"""() => { | |
overwriteDefaultSelectConvergence(); | |
return findPOIsConvergence(); | |
}""", | |
) | |
if not poi: | |
return None | |
iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])} | |
return poi, iframe_offset | |
except Exception as e: | |
logger.error(f"Error processing iframe: {e}") | |
return None | |
# re-run for cases of mid-run redirects | |
async def update_poi(self) -> None: | |
try: | |
# Step 1: Wait for network to be idle. This indicates that initial requests have settled. | |
logger.debug("Attempting wait_for_load_state('networkidle')...") | |
await self.current_page.wait_for_load_state("networkidle", timeout=180000) # Increased timeout | |
logger.debug("wait_for_load_state('networkidle') completed.") | |
# Step 2: Wait for the 'loading' class to disappear from the body. | |
# This is a common and effective way to detect when SPAs like Salesforce are visually ready. | |
logger.debug("Attempting wait_for_selector('body:not(.loading)')...") | |
# Removed state="visible" as it's often too strict for 'body' in SPAs, | |
# and 'not(.loading)' implies it should become visible eventually. | |
await self.current_page.wait_for_selector("body:not(.loading)", timeout=180000) | |
logger.debug("wait_for_selector('body:not(.loading)') completed.") | |
# Optional Step 3 (Highly Recommended): If the above still times out, | |
# uncomment and replace with a reliable selector for an interactive element | |
# that only appears after the Salesforce UI is fully loaded and ready for user input. | |
# Example: await self.current_page.wait_for_selector("#some_salesforce_specific_id", timeout=180000, state="visible") | |
# Example: await self.current_page.wait_for_selector("text=App Launcher", timeout=180000, state="visible") | |
# For now, we'll rely on the 'body:not(.loading)' as the primary indicator. | |
except PlaywrightTimeoutError as e: | |
# --- START TEMPORARY DEBUGGING CODE --- | |
# This block captures state specifically when a Playwright timeout occurs | |
current_url = self.current_page.url if self.current_page else "N/A" | |
logger.error(f"DEBUGGING: Playwright Timeout (180s) during page readiness check at URL: {current_url}") | |
html_content = None | |
try: | |
if self.current_page: | |
html_content = await self.current_page.content() | |
logger.error(f"DEBUGGING: HTML Content (first 1000 chars) when timeout occurred:\n{html_content[:1000]}...") | |
except Exception as html_e: | |
logger.error(f"DEBUGGING: Could not get HTML content for debug: {html_e}") | |
screenshot_b64 = "N/A" | |
try: | |
if self.current_page: | |
# Capture screenshot at lower quality (e.g., 50) to keep log size manageable. | |
# Higher quality might make logs too large for some platforms. | |
screenshot_bytes = await self.current_page.screenshot(type="jpeg", quality=50) | |
screenshot_b64 = base64.b64encode(screenshot_bytes).decode("utf-8") | |
# Log only a very short snippet of base64 string to confirm it's there | |
logger.error(f"DEBUGGING: Base64 Screenshot (truncated) when timeout occurred:\ndata:image/jpeg;base64,{screenshot_b64[:100]}... (full string is much longer)") | |
# If you want to view the full screenshot locally during development, you can save it: | |
# with open("debug_timeout_full_screenshot.jpeg", "wb") as f: | |
# f.write(screenshot_bytes) | |
# logger.error("DEBUGGING: Full screenshot saved to debug_timeout | |
def poi_text(self) -> str: | |
# Get all points of interest on the page as text | |
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)] | |
# Return formatted text of points of interest on page | |
return "\n".join([txt for txt in texts if txt]) | |
async def screenshot( | |
self, | |
delay: float = 0.0, | |
quality: int = 70, | |
type: str = "jpeg", | |
scale: str = "css", | |
) -> tuple[bytes, bytes]: | |
if delay > 0.0: | |
await asyncio.sleep(delay) | |
await self.update_poi() | |
old_poi_positions = [tuple(point) for point in self.poi_centroids] | |
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) | |
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) | |
# check page has not changed since the screenshot was taken | |
await self.update_poi() | |
new_poi_positions = [tuple(point) for point in self.poi_centroids] | |
if new_poi_positions != old_poi_positions: | |
# if it has changed, take another | |
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale) | |
await self.update_poi() | |
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes) | |
return img, annotated_img | |
async def goto(self, url: str) -> None: | |
await self.current_page.goto(url, wait_until="domcontentloaded") | |
async def reload(self) -> None: | |
await self.current_page.reload(wait_until="domcontentloaded") | |
async def click_tab(self, mark_id: int) -> None: | |
point: Point = self.poi_centroids[mark_id] | |
await self.hover(point) | |
await self.current_page.mouse.click(*point, button="middle") | |
async def click(self, mark_id: int) -> None: | |
point: Point = self.poi_centroids[mark_id] | |
await self.hover(point) | |
await self.current_page.mouse.click(*point) | |
async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None: | |
await self.clear_text_field(mark_id) | |
await self.click(mark_id) | |
await self.current_page.keyboard.type(text) | |
if submit: | |
await self.current_page.keyboard.press("Enter") | |
async def scroll( | |
self, | |
direction: Literal["up", "down", "left", "right"], | |
mark_id: Optional[int] = None, | |
) -> None: | |
if mark_id is None: | |
point = Point(x=-1, y=-1) | |
max_scroll_x = self.viewport_width | |
max_scroll_y = self.viewport_height | |
else: | |
point: Point = self.poi_centroids[mark_id] | |
bbox: BoundingBox = self.bounding_boxes[mark_id] | |
max_scroll_x = bbox.right - bbox.left | |
max_scroll_y = bbox.bottom - bbox.top | |
await self.hover(point=point) | |
scroll_x = int(max_scroll_x * 0.8) | |
scroll_y = int(max_scroll_y * 0.8) | |
is_vertical = direction in ("up", "down") | |
reverse_scroll = direction in ("up", "left") | |
await self.current_page.mouse.wheel( | |
scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical), | |
scroll_y * (-1 if reverse_scroll else 1) * is_vertical, | |
) | |
async def go_back(self) -> None: | |
# If there is no tab open then return | |
if not self.current_page: | |
return | |
await self.current_page.go_back(wait_until="domcontentloaded") | |
if self.current_page.url == "about:blank": | |
if not len(self.context.pages) > 1: | |
await self.current_page.go_forward(wait_until="domcontentloaded") | |
raise Exception("There is no previous page to go back to.") | |
await self.current_page.close() | |
async def hover(self, point: Point) -> None: | |
await self.current_page.mouse.move(*point) | |
async def focus(self, point: Point) -> None: | |
# Focus on the element on the page at point (x, y) | |
await self.current_page.evaluate( | |
""" | |
([x, y]) => { | |
const element = document.elementFromPoint(x, y); | |
if (element && element.focus) { | |
element.focus(); | |
} | |
}""", | |
tuple(point), | |
) | |
async def get_text(self, mark_id: int) -> str: | |
return await self.current_page.evaluate( | |
""" | |
(mark_id) => { | |
const element = marked_elements_convergence[mark_id]; | |
if (element && (element.value !== undefined || element.textContent !== undefined)) { | |
return element.value || element.textContent; | |
} | |
return ''; | |
} | |
""", | |
(mark_id,), | |
) | |
async def clear_text_field(self, mark_id: int) -> None: | |
existing_text = await self.get_text(mark_id) | |
if existing_text.strip(): | |
# Clear existing text only if it exists | |
await self.click(mark_id) | |
if platform.system() == "Darwin": # selecting all text is OS-specific | |
await self.click(mark_id) | |
await self.current_page.keyboard.press("Meta+a") | |
await self.current_page.keyboard.press("Backspace") | |
else: | |
await self.current_page.keyboard.press("Control+Home") | |
await self.current_page.keyboard.press("Control+Shift+End") | |
await self.current_page.keyboard.press("Backspace") | |
if __name__ == "__main__": | |
async def dummy_test(): | |
async with BrowserSession(headless=False) as s: | |
page = await s.context.new_page() | |
await page.goto("http://google.co.uk") | |
await asyncio.sleep(5) | |
await page.screenshot(path="example.png") | |
await s.update_poi() | |
_, annotated_image = await s.screenshot() | |
with open("output.png", "wb") as f: | |
f.write(annotated_image) | |
asyncio.run(dummy_test()) |