XanderJC's picture
setup
f0f6e5c
raw
history blame
15 kB
import asyncio
import logging
import re
from contextlib import AsyncExitStack
from pathlib import Path
from typing import Literal, Optional, Self
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from playwright_stealth import stealth_async
from pydantic import Field
from tenacity import before_sleep_log, retry, stop_after_delay, wait_exponential
from proxy_lite.browser.bounding_boxes import POI, BoundingBox, Point, annotate_bounding_boxes
from proxy_lite.logger import logger
SELF_CONTAINED_TAGS = [
# many of these are non-interactive but keeping them anyway
"area",
"base",
"br",
"col",
"embed",
"hr",
"img",
"input",
"link",
"meta",
"param",
"source",
"track",
"wbr",
]
def element_as_text(
mark_id: int,
tag: Optional[str] = None,
text: Optional[str] = None,
**raw_attributes,
) -> str:
"""Return a text representation of all elements on the page."""
attributes = []
for k, v in raw_attributes.items():
if v is None:
continue
if isinstance(v, bool):
if v:
attributes.append(k)
# we ignore False bool attributes
else:
v = str(v)
if len(v) > 2500:
v = v[: 2500 - 1] + "…"
attributes.append(f'{k}="{v}"')
attributes = " ".join(attributes)
attributes = (" " + attributes).rstrip()
tag = tag.lower()
if text is None:
text = ""
if len(text) > 2500:
text = text[: 2500 - 1] + "…"
# sub-out line breaks so elements are easier to distinguish
attributes = re.sub(r"\r\n|\r|\n", "⏎", attributes)
text = re.sub(r"\r\n|\r|\n", "⏎", text)
if tag in SELF_CONTAINED_TAGS:
if text:
logger.warning(
f"Got self-contained element '{tag}' which contained text '{text}'.",
)
else:
return f"- [{mark_id}] <{tag}{attributes}/>"
return f"- [{mark_id}] <{tag}{attributes}>{text}</{tag}>"
class BrowserSession:
def __init__(
self,
viewport_width: int = 1280,
viewport_height: int = 720,
headless: bool = True,
):
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.headless = headless
self.playwright: Playwright | None = None
self.browser: Browser | None = None
self.context: BrowserContext | None = None
self._exit_stack: AsyncExitStack | None = None
self.poi_elements: list = Field(default_factory=list)
self.poi_centroids: list[Point] = Field(default_factory=list)
self.bounding_boxes: list[BoundingBox] = Field(default_factory=list)
self.pois: list[POI] = Field(default_factory=list)
async def __aenter__(self) -> Self:
self._exit_stack = AsyncExitStack()
self.playwright = await async_playwright().start()
self.browser = await self.playwright.chromium.launch(headless=self.headless)
self.context = await self.browser.new_context(
viewport={"width": self.viewport_width, "height": self.viewport_height},
)
await self.context.new_page()
self.context.set_default_timeout(60_000)
self.current_page.set_default_timeout(60_000)
await stealth_async(self.current_page)
await self.context.add_init_script(
path=Path(__file__).with_name("add_custom_select.js"),
)
await self.context.add_init_script(
path=Path(__file__).with_name("find_pois.js"),
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self.browser:
await self.browser.close()
if self.playwright:
await self.playwright.stop()
if self._exit_stack:
await self._exit_stack.aclose()
@property
def current_page(self) -> Optional[Page]:
if self.context.pages:
return self.context.pages[-1]
return None
@property
def current_url(self) -> Optional[str]:
if self.current_page:
return self.current_page.url
return None
# re-run for cases of mid-run redirects
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def process_iframe(self, iframe) -> Optional[tuple[dict, dict]]:
try:
# Check iframe visibility and size
bounding_box = await iframe.bounding_box()
if not bounding_box:
return None # Skip if iframe is not visible
width, height = bounding_box["width"], bounding_box["height"]
if width < 50 or height < 50:
return None
frame = await iframe.content_frame()
if not frame:
return None
poi = await frame.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
if not poi:
return None
iframe_offset = {"x": round(bounding_box["x"]), "y": round(bounding_box["y"])}
return poi, iframe_offset
except Exception as e:
logger.error(f"Error processing iframe: {e}")
return None
# re-run for cases of mid-run redirects
@retry(
wait=wait_exponential(multiplier=1, min=1, max=10),
stop=stop_after_delay(5),
reraise=True,
before_sleep=before_sleep_log(logger, logging.ERROR),
)
async def update_poi(self) -> None:
try:
await self.current_page.wait_for_load_state(timeout=60000)
except PlaywrightTimeoutError:
logger.error(f"Timeout waiting for website load state: {self.current_url}")
await self.current_page.wait_for_selector("body", timeout=60000, state="visible")
# Run the bounding box javascript code to highlight the points of interest on the page
page_info = await self.current_page.evaluate(
"""() => {
overwriteDefaultSelectConvergence();
return findPOIsConvergence();
}""",
)
# Get the points of interest on the page
self.poi_elements = page_info["element_descriptions"]
element_centroids = page_info["element_centroids"]
try:
# Select all iframes on the page
iframes = await self.current_page.query_selector_all("iframe")
max_iframes = 10
# Define an asynchronous function to process and filter each iframe
tasks = [asyncio.create_task(self.process_iframe(iframe)) for iframe in iframes[:max_iframes]]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
iframes_pois = []
iframe_offsets = []
for poi, offset in filtered_results:
iframes_pois.append(poi)
iframe_offsets.append(offset)
# Combine the points of interest from the iframes with the main page and adjust the centroids
for index, iframe_poi in enumerate(iframes_pois):
self.poi_elements.extend(iframe_poi["element_descriptions"])
for centroid in iframe_poi["element_centroids"]:
centroid["x"] += iframe_offsets[index]["x"]
centroid["y"] += iframe_offsets[index]["y"]
centroid["left"] += iframe_offsets[index]["x"]
centroid["top"] += iframe_offsets[index]["y"]
centroid["right"] += iframe_offsets[index]["x"]
centroid["bottom"] += iframe_offsets[index]["y"]
element_centroids.extend(iframe_poi["element_centroids"])
except Exception as e:
logger.error(f"Error in finding iframes: {e}")
# Get the centroids of the points of interest
self.poi_centroids = [Point(x=xy["x"], y=xy["y"]) for xy in element_centroids]
self.bounding_boxes = [BoundingBox(**xy, label=str(i)) for i, xy in enumerate(element_centroids)]
self.pois = [
POI(info=info, element_centroid=centroid, bounding_box=bbox)
for info, centroid, bbox in zip(
self.poi_elements,
self.poi_centroids,
self.bounding_boxes,
strict=False,
)
]
@property
def poi_text(self) -> str:
# Get all points of interest on the page as text
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.poi_elements)]
# Return formatted text of points of interest on page
return "\n".join([txt for txt in texts if txt])
async def screenshot(
self,
delay: float = 0.0,
quality: int = 70,
type: str = "jpeg",
scale: str = "css",
) -> tuple[bytes, bytes]:
if delay > 0.0:
await asyncio.sleep(delay)
await self.update_poi()
old_poi_positions = [tuple(point) for point in self.poi_centroids]
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
# check page has not changed since the screenshot was taken
await self.update_poi()
new_poi_positions = [tuple(point) for point in self.poi_centroids]
if new_poi_positions != old_poi_positions:
# if it has changed, take another
img = await self.current_page.screenshot(type=type, quality=quality, scale=scale)
await self.update_poi()
annotated_img = annotate_bounding_boxes(image=img, bounding_boxes=self.bounding_boxes)
return img, annotated_img
async def goto(self, url: str) -> None:
await self.current_page.goto(url, wait_until="domcontentloaded")
async def reload(self) -> None:
await self.current_page.reload(wait_until="domcontentloaded")
async def click_tab(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point, button="middle")
async def click(self, mark_id: int) -> None:
point: Point = self.poi_centroids[mark_id]
await self.hover(point)
await self.current_page.mouse.click(*point)
async def enter_text(self, mark_id: int, text: str, submit: bool = False) -> None:
await self.clear_text_field(mark_id)
await self.click(mark_id)
await self.current_page.keyboard.type(text)
if submit:
await self.current_page.keyboard.press("Enter")
async def scroll(
self,
direction: Literal["up", "down", "left", "right"],
mark_id: Optional[int] = None,
) -> None:
if mark_id is None:
point = Point(x=-1, y=-1)
max_scroll_x = self.viewport_width
max_scroll_y = self.viewport_height
else:
point: Point = self.poi_centroids[mark_id]
bbox: BoundingBox = self.bounding_boxes[mark_id]
max_scroll_x = bbox.right - bbox.left
max_scroll_y = bbox.bottom - bbox.top
await self.hover(point=point)
scroll_x = int(max_scroll_x * 0.8)
scroll_y = int(max_scroll_y * 0.8)
is_vertical = direction in ("up", "down")
reverse_scroll = direction in ("up", "left")
await self.current_page.mouse.wheel(
scroll_x * (-1 if reverse_scroll else 1) * (not is_vertical),
scroll_y * (-1 if reverse_scroll else 1) * is_vertical,
)
async def go_back(self) -> None:
# If there is no tab open then return
if not self.current_page:
return
await self.current_page.go_back(wait_until="domcontentloaded")
if self.current_page.url == "about:blank":
if not len(self.context.pages) > 1:
await self.current_page.go_forward(wait_until="domcontentloaded")
raise Exception("There is no previous page to go back to.")
await self.current_page.close()
async def hover(self, point: Point) -> None:
await self.current_page.mouse.move(*point)
async def focus(self, point: Point) -> None:
# Focus on the element on the page at point (x, y)
await self.current_page.evaluate(
"""
([x, y]) => {
const element = document.elementFromPoint(x, y);
if (element && element.focus) {
element.focus();
}
}""",
tuple(point),
)
async def get_text(self, mark_id: int) -> str:
return await self.current_page.evaluate(
"""
(mark_id) => {
const element = marked_elements_convergence[mark_id];
if (element && (element.value !== undefined || element.textContent !== undefined)) {
return element.value || element.textContent;
}
return '';
}
""",
(mark_id,),
)
async def clear_text_field(self, mark_id: int) -> None:
existing_text = await self.get_text(mark_id)
if existing_text.strip():
# Clear existing text only if it exists
await self.click(mark_id)
await self.current_page.keyboard.press("Control+Home")
await self.current_page.keyboard.press("Control+Shift+End")
await self.current_page.keyboard.press("Backspace")
if __name__ == "__main__":
import json
test = """{"name": "return_value", "arguments": {'value': 'The most downloaded French speech recognition model on Hugging Face is DeepSeek-R1. Here are its evaluation metrics:\n\n- Claude-3.5-1022: MMLU 88.3, MMLU-Redux 88.9\n- GPT-4.0-5013: MMLU 87.2, MMLU-Redux 88.0\n- DeepSeek-01-3013: MMLU 88.5, MMLU-Redux 89.1\n- OpenAI-01-mini: MMLU 91.0, MMLU-Redux 88.7\n\nPlease see the attached screenshot for more details.'}}"""
test = json.loads(test)
print(test)
exit()
async def dummy_test():
async with BrowserSession(headless=False) as s:
page = await s.context.new_page()
await page.goto("http://google.co.uk")
await asyncio.sleep(5)
await page.screenshot(path="example.png")
await s.update_poi()
_, annotated_image = await s.screenshot()
with open("output.png", "wb") as f:
f.write(annotated_image)
asyncio.run(dummy_test())