Trisha Tomy
trying fixes for loading
c8e914e
raw
history blame
17.7 kB
import asyncio
import logging
import platform
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Literal, Optional, Self
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright_stealth import StealthConfig, stealth_async
from pydantic import Field
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential
from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes
from proxy_lite.logger import logger
import base64
SELF_CONTAINED_TAGS = [
# many of these are non-interactive but keeping them anyway
"area",
"base",
"br",
"col",
"embed",
"hr",
"img",
"input",
"link",
"meta",
"param",
"source",
"track",
"wbr",
]
def element_as_text(
mark_id: int,
tag: Optional[str] = None,
text: Optional[str] = None,
**raw_attributes,
) -> str:
"""Return a text representation of all elements on the page."""
attributes = []
for k, v in raw_attributes.items():
if v is None:
continue
if isinstance(v, bool):
if v:
attributes.append(k)
# we ignore False bool attributes
else:
v = str(v)
if len(v) > 2500:
v = v[: 2500 - 1] + "…"
attributes.append(f'{k}="{v}"')
attributes = " ".join(attributes)
attributes = (" " + attributes).rstrip()
tag = tag.lower()
if text is None:
text = ""
if len(text) > 2500:
text = text[: 2500 - 1] + "…"
# sub-out line breaks so elements are easier to distinguish
attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes)
text = re.sub(r"\r\n|\r|\n", "⏎", text)
if tag in SELF_CONTAINED_TAGS:
if text:
logger.warning(
f"Got self-contained element '{tag}' which contained text '{text}'.",
)
else:
return f"- [{mark_id}] <{tag}{attributes}/>"
return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>"
class BrowserSession:
def __init__(
self,
viewport_width: int = 1280,
viewport_height: int = 720,
headless: bool = True,
):
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.headless = headless
self.playwright: Playwright | None = None
self.browser: Browser | None = None
self.context: BrowserContext | None = None
self._exit_stack: AsyncExitStack | None = None
self.poi_elements: list = Field(default_factory=list)
self.poi_centroids: list[Point] = Field(default_factory=list)
self.bounding_boxes: list[BoundingBox] = Field(default_factory=list)
self.pois: list[POI] = Field(default_factory=list)
async def __aenter__(self) -> Self:
self._exit_stack = AsyncExitStack()
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=self.headless)
self.context = await self.browser.new_context(
viewport={"width": self.viewport_width, "height": self.viewport_height},
)
await self.context.new_page()
self.context.set_default_timeout(60_000)
self.current_page.set_default_timeout(60_000)
await stealth_async(self.current_page, StealthConfig(navigator_user_agent=False))
await self.context.add_init_script(
path=Path(__file__).with_name("add_custom_select.js"),
)
await self.context.add_init_script(
path=Path(__file__).with_name("find_pois.js"),
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
if self._exit_stack:
await self._exit_stack.aclose()
@property
def current_page(self) -> Optional[Page]:
if self.context.pages:
return self.context.pages[-1]
return None
@property
def current_url(self) -> Optional[str]:
if self.current_page:
return self.current_page.url
return None
# re-run for cases of mid-run redirects
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]:
try:
# Check iframe visibility and size
bounding_box = await iframe.bounding_box()
if not bounding_box:
return None # Skip if iframe is not visible
width, height = bounding_box["width"], bounding_box["height"]
if width < 50 or height < 50:
return None
frame = await iframe.content_frame()
if not frame:
return None
poi = await frame.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
if not poi:
return None
iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])}
return poi, iframe_offset
except Exception as e:
logger.error(f"Error processing iframe: {e}")
return None
# re-run for cases of mid-run redirects
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def update_poi(self) -> None:
try:
# We will use "domcontentloaded" as a base and then wait for specific elements
await self.current_page.wait_for_load_state("domcontentloaded", timeout=60000) # Reduced timeout for initial load
logger.debug("wait_for_load_state('domcontentloaded') completed.")
# --- MODIFICATION START ---
# Wait for the "Account Forecasting" heading to be visible
# Adjust the selector below based on the actual HTML of the Salesforce page.
# Common selectors could be:
# - `h1:has-text('Account Forecasting')`
# - `h2:has-text('Account Forecasting')`
# - `div.some-class-name:has-text('Account Forecasting')`
# - `[data-qa-id="account-forecasting-heading"]` (if Salesforce uses data-qa attributes)
# You might need to inspect the Salesforce page to get the exact selector.
# For now, let's assume it's an h1 or h2 tag containing the text.
try:
await self.current_page.wait_for_selector(
"h1:has-text('Account Forecasting'), h2:has-text('Account Forecasting')",
timeout=60000, # Set a reasonable timeout for this specific element
state="visible"
)
logger.debug("Successfully waited for 'Account Forecasting' heading.")
except PlaywrightTimeoutError as e:
logger.error(f"Timeout waiting for 'Account Forecasting' heading on URL: {self.current_page.url}")
# You might want to log more specific HTML/screenshot here if this still times out often
raise # Re-raise if this critical element doesn't appear
# It's still good to wait for the body to be visible, but with a shorter timeout
# if the previous specific heading check passed.
try:
await self.current_page.wait_for_selector("body", timeout=30000, state="visible")
logger.debug("wait_for_selector('body', state='visible') completed.")
except PlaywrightTimeoutError as e:
logger.warning(f"DEBUGGING: Playwright Timeout (30s) on body selector, but 'Account Forecasting' heading was found. This might be acceptable if the page is usable.")
# We can choose to suppress this specific timeout if the critical element is found,
# or re-raise it if a fully loaded body is strictly necessary for further actions.
# For now, let's just log and continue, as the primary issue was full page load.
pass # Do not re-raise if we've found our key indicator.
except PlaywrightTimeoutError: # This outer catch is for the wait_for_load_state timeout
logger.error(f"Timeout waiting for website load state (domcontentloaded): {self.current_url}")
raise # Re-raise if initial load_state itself times out
except Exception as e:
logger.error(f"An unexpected error occurred during page readiness check: {e}")
raise
# Run the bounding box javascript code to highlight the points of interest on the page
page_info = await self.current_page.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
# Get the points of interest on the page
self.poi_elements = page_info["element_descriptions"]
element_centroids = page_info["element_centroids"]
try:
# Select all iframes on the page
iframes = await self.current_page.query_selector_all("iframe")
max_iframes = 10
# Define an asynchronous function to process and filter each iframe
tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
iframes_pois = []
iframe_offsets = []
for poi, offset in filtered_results:
iframes_pois.append(poi)
iframe_offsets.append(offset)
# Combine the points of interest from the iframes with the main page and adjust the centroids
for index, iframe_poi in enumerate(iframes_pois):
self.poi_elements.extend(iframe_poi["element_descriptions"])
for centroid in iframe_poi["element_centroids"]:
centroid["x"] += iframe_offsets[index]["x"]
centroid["y"] += iframe_offsets[index]["y"]
centroid["left"] += iframe_offsets[index]["x"]
centroid["top"] += iframe_offsets[index]["y"]
centroid["right"] += iframe_offsets[index]["x"]
centroid["bottom"] += iframe_offsets[index]["y"]
element_centroids.extend(iframe_poi["element_centroids"])
except Exception as e:
logger.error(f"Error in finding iframes: {e}")
# Get the centroids of the points of interest
self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids]
self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)]
self.pois = [
POI(info=info, element_centroid=centroid, bounding_box=bbox)
for info, centroid, bbox in zip(
self.poi_elements,
self.poi_centroids,
self.bounding_boxes,
strict=False,
)
]
@property
def poi_text(self) -> str:
# Get all points of interest on the page as text
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)]
# Return formatted text of points of interest on page
return "\n".join([txt for txt in texts if txt])
async def screenshot(
self,
delay: float = 0.0,
quality: int = 70,
type: str = "jpeg",
scale: str = "css",
) -> tuple[bytes, bytes]:
if delay > 0.0:
await asyncio.sleep(delay)
await self.update_poi()
old_poi_positions = [tuple(point) for point in self.poi_centroids]
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
# check page has not changed since the screenshot was taken
await self.update_poi()
new_poi_positions = [tuple(point) for point in self.poi_centroids]
if new_poi_positions != old_poi_positions:
# if it has changed, take another
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
await self.update_poi()
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
return img, annotated_img
async def goto(self, url: str) -> None:
await self.current_page.goto(url, wait_until="domcontentloaded")
async def reload(self) -> None:
await self.current_page.reload(wait_until="domcontentloaded")
async def click_tab(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point, button="middle")
async def click(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point)
async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None:
await self.clear_text_field(mark_id)
await self.click(mark_id)
await self.current_page.keyboard.type(text)
if submit:
await self.current_page.keyboard.press("Enter")
async def scroll(
self,
direction: Literal["up", "down", "left", "right"],
mark_id: Optional[int] = None,
) -> None:
if mark_id is None:
point = Point(x=-1, y=-1)
max_scroll_x = self.viewport_width
max_scroll_y = self.viewport_height
else:
point: Point = self.poi_centroids[mark_id]
bbox: BoundingBox = self.bounding_boxes[mark_id]
max_scroll_x = bbox.right - bbox.left
max_scroll_y = bbox.bottom - bbox.top
await self.hover(point=point)
scroll_x = int(max_scroll_x * 0.8)
scroll_y = int(max_scroll_y * 0.8)
is_vertical = direction in ("up", "down")
reverse_scroll = direction in ("up", "left")
await self.current_page.mouse.wheel(
scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical),
scroll_y * (-1 if reverse_scroll else 1) * is_vertical,
)
async def go_back(self) -> None:
# If there is no tab open then return
if not self.current_page:
return
await self.current_page.go_back(wait_until="domcontentloaded")
if self.current_page.url == "about:blank":
if not len(self.context.pages) > 1:
await self.current_page.go_forward(wait_until="domcontentloaded")
raise Exception("There is no previous page to go back to.")
await self.current_page.close()
async def hover(self, point: Point) -> None:
await self.current_page.mouse.move(*point)
async def focus(self, point: Point) -> None:
# Focus on the element on the page at point (x, y)
await self.current_page.evaluate(
"""
([x, y]) => {
const element = document.elementFromPoint(x, y);
if (element && element.focus) {
element.focus();
}
}""",
tuple(point),
)
async def get_text(self, mark_id: int) -> str:
return await self.current_page.evaluate(
"""
(mark_id) => {
const element = marked_elements_convergence[mark_id];
if (element && (element.value !== undefined || element.textContent !== undefined)) {
return element.value || element.textContent;
}
return '';
}
""",
(mark_id,),
)
async def clear_text_field(self, mark_id: int) -> None:
existing_text = await self.get_text(mark_id)
if existing_text.strip():
# Clear existing text only if it exists
await self.click(mark_id)
if platform.system() == "Darwin": # selecting all text is OS-specific
await self.click(mark_id)
await self.current_page.keyboard.press("Meta+a")
await self.current_page.keyboard.press("Backspace")
else:
await self.current_page.keyboard.press("Control+Home")
await self.current_page.keyboard.press("Control+Shift+End")
await self.current_page.keyboard.press("Backspace")
if __name__ == "__main__":
async def dummy_test():
async with BrowserSession(headless=False) as s:
page = await s.context.new_page()
await page.goto("http://google.co.uk")
await asyncio.sleep(5)
await page.screenshot(path="example.png")
await s.update_poi()
_, annotated_image = await s.screenshot()
with open("output.png", "wb") as f:
f.write(annotated_image)
asyncio.run(dummy_test())