|
import asyncio |
|
from contextlib import AsyncExitStack |
|
from typing import List, Literal, Optional, Any |
|
|
|
from pydantic import BaseModel, Field |
|
|
|
from proxy_lite.browser.browser import BrowserSession |
|
from proxy_lite.logger import logger |
|
|
|
from .tool_base import Tool, ToolExecutionResponse, attach_param_schema |
|
|
|
SELF_CONTAINED_TAGS = [ |
|
|
|
"area", |
|
"base", |
|
"br", |
|
"col", |
|
"embed", |
|
"hr", |
|
"img", |
|
"input", |
|
"link", |
|
"meta", |
|
"param", |
|
"source", |
|
"track", |
|
"wbr", |
|
] |
|
|
|
|
|
def element_as_text( |
|
mark_id: int, |
|
tag: Optional[str] = None, |
|
text: Optional[str] = None, |
|
**raw_attributes, |
|
) -> str: |
|
"""Return a text representation of all elements on the page""" |
|
attributes = [] |
|
for k, v in raw_attributes.items(): |
|
if v is None: |
|
continue |
|
if isinstance(v, bool): |
|
if v: |
|
attributes.append(k) |
|
|
|
else: |
|
v = str(v) |
|
if len(v) > 2500: |
|
v = v[: 2500 - 1] + "…" |
|
attributes.append(f'{k}="{v}"') |
|
attributes = " ".join(attributes) |
|
attributes = (" " + attributes).rstrip() |
|
tag = tag.lower() |
|
if text is None: |
|
text = "" |
|
if len(text) > 2500: |
|
text = text[: 2500 - 1] + "…" |
|
if tag in SELF_CONTAINED_TAGS: |
|
if text: |
|
logger.warning( |
|
f"Got self-contained element '{tag}' which contained text '{text}'.", |
|
) |
|
else: |
|
return f"<{tag} id={mark_id}{attributes}/>" |
|
return f"<{tag} id={mark_id}{attributes}>{text}</{tag}>" |
|
|
|
|
|
class GotoParams(BaseModel): |
|
url: str = Field(..., description="The web address to visit. Must be a valid URL.") |
|
|
|
|
|
class GoogleSearchParams(BaseModel): |
|
query_plan: str = Field( |
|
..., |
|
description="Plan out the query you will make. Re-write queries in a way that will yield the best results.", |
|
) |
|
query: str = Field(..., description="The Google search to perform.") |
|
|
|
|
|
class ClickParams(BaseModel): |
|
mark_id: int = Field(..., description="Element Mark ID.") |
|
|
|
|
|
class TypeEntry(BaseModel): |
|
mark_id: int = Field(..., description="Element Mark ID.") |
|
content: str = Field(..., description="The text to type into the element.") |
|
|
|
|
|
class TypeParams(BaseModel): |
|
entries: List[TypeEntry] = Field( |
|
..., |
|
description="A list of elements and contents to type.", |
|
) |
|
submit: bool = Field( |
|
..., |
|
description='Whether to press the "Enter" key after typing in the last entry.', |
|
) |
|
|
|
|
|
class ScrollParams(BaseModel): |
|
direction: Literal["up", "down", "left", "right"] = Field( |
|
..., |
|
description='Direction to scroll. Must be one of "up", "down", "left" or "right".', |
|
) |
|
mark_id: int = Field( |
|
..., |
|
description="What to scroll. Use -1 to scroll the whole page otherwise give the mark ID of an element that is `scrollable`.", |
|
) |
|
|
|
|
|
class BackParams(BaseModel): |
|
pass |
|
|
|
|
|
class WaitParams(BaseModel): |
|
pass |
|
|
|
|
|
class ReloadParams(BaseModel): |
|
pass |
|
|
|
|
|
class DoNothingParams(BaseModel): |
|
pass |
|
|
|
|
|
class OpenNewTabAndGoToParams(BaseModel): |
|
url: str = Field(..., description="The URL to navigate to in the new tab.") |
|
|
|
|
|
class SelectOptionByTextParams(BaseModel): |
|
mark_id: int = Field(..., description="The mark ID of the select element.") |
|
option_text: str = Field(..., description="The text content of the option to select.") |
|
|
|
|
|
class BrowserTool(Tool): |
|
def __init__(self, session: BrowserSession) -> None: |
|
super().__init__() |
|
self.browser = session |
|
|
|
async def __aenter__(self): |
|
self._exit_stack = AsyncExitStack() |
|
await self._exit_stack.enter_async_context(self.browser) |
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb): |
|
await self._exit_stack.aclose() |
|
|
|
@property |
|
def poi_text(self) -> str: |
|
|
|
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.browser.poi_elements)] |
|
|
|
return "\n".join([txt for txt in texts if txt]) |
|
|
|
@attach_param_schema(GotoParams) |
|
async def goto(self, url: str) -> ToolExecutionResponse: |
|
"""Go directly to a specific web url. Specify the exact URL.""" |
|
await self.browser.goto(url) |
|
return ToolExecutionResponse(content=f"Successfully navigated to URL: {url}") |
|
|
|
@attach_param_schema(GoogleSearchParams) |
|
async def google_search(self, query_plan: str, query: str) -> ToolExecutionResponse: |
|
"""Perform a generic web search using Google. |
|
Results may not be relevant. If you see poor results, you can try another query. |
|
""" |
|
url = f"https://www.google.com/search?q={query}" |
|
await self.browser.goto(url) |
|
return ToolExecutionResponse(content=f"Performed Google search for: {query}") |
|
|
|
@attach_param_schema(ClickParams) |
|
async def click(self, mark_id: int) -> ToolExecutionResponse: |
|
"""Click on an element of the page.""" |
|
try: |
|
await self.browser.click(mark_id=mark_id) |
|
return ToolExecutionResponse(content=f"Clicked element with mark ID: {mark_id}") |
|
except IndexError as e: |
|
|
|
logger.error(f"Click failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") |
|
return ToolExecutionResponse(content=f"Failed to click element with mark ID {mark_id}. Element not found or POI list invalid.") |
|
except Exception as e: |
|
logger.error(f"Click failed with unexpected error for mark ID {mark_id}: {e}") |
|
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to click element {mark_id}: {e}") |
|
|
|
|
|
@attach_param_schema(TypeParams) |
|
async def type(self, entries: List[dict], submit: bool) -> ToolExecutionResponse: |
|
"""Type text. |
|
You can type into one or more elements. |
|
Note that the text inside an element is cleared before typing. |
|
""" |
|
typed_ids = [] |
|
for i, entry_dict in enumerate(entries): |
|
try: |
|
entry = TypeEntry(**entry_dict) |
|
last_entry = i == len(entries) - 1 |
|
old_poi_positions = [tuple(point) for point in self.browser.poi_centroids] |
|
await self.browser.enter_text( |
|
mark_id=entry.mark_id, |
|
text=entry.content, |
|
submit=submit and last_entry, |
|
) |
|
typed_ids.append(entry.mark_id) |
|
await self.browser.update_poi() |
|
new_poi_positions = [tuple(point) for point in self.browser.poi_centroids] |
|
if not last_entry and old_poi_positions != new_poi_positions: |
|
logger.error( |
|
"POI positions changed mid-typing, cancelling future type entries.", |
|
) |
|
break |
|
except IndexError as e: |
|
logger.error(f"Type failed: Mark ID {entry.mark_id} not found or POI list empty. Error: {e}") |
|
return ToolExecutionResponse(content=f"Failed to type into element with mark ID {entry.mark_id}. Element not found or POI list invalid. Typed into: {typed_ids if typed_ids else 'none'}.") |
|
except Exception as e: |
|
logger.error(f"Type failed with unexpected error for mark ID {entry.mark_id}: {e}") |
|
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to type into element {entry.mark_id}: {e}. Typed into: {typed_ids if typed_ids else 'none'}.") |
|
|
|
return ToolExecutionResponse( |
|
content=f"Typed text into elements with mark IDs: {typed_ids}", |
|
) |
|
|
|
@attach_param_schema(ScrollParams) |
|
async def scroll(self, direction: str, mark_id: int) -> ToolExecutionResponse: |
|
"""Scroll the page (or a scrollable element) up, down, left or right.""" |
|
try: |
|
if mark_id == -1: |
|
mark_id_for_browser = None |
|
else: |
|
mark_id_for_browser = mark_id |
|
|
|
await self.browser.scroll(direction=direction, mark_id=mark_id_for_browser) |
|
return ToolExecutionResponse(content=f"Scrolled {direction} on element with mark ID: {mark_id if mark_id != -1 else 'page'}") |
|
except IndexError as e: |
|
logger.error(f"Scroll failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") |
|
return ToolExecutionResponse(content=f"Failed to scroll element with mark ID {mark_id}. Element not found or POI list invalid.") |
|
except Exception as e: |
|
logger.error(f"Scroll failed with unexpected error for mark ID {mark_id}: {e}") |
|
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to scroll element {mark_id}: {e}") |
|
|
|
@attach_param_schema(BackParams) |
|
async def back(self) -> ToolExecutionResponse: |
|
"""Go back to the previous page.""" |
|
try: |
|
await self.browser.go_back() |
|
return ToolExecutionResponse(content="Went back to the previous page.") |
|
except Exception as e: |
|
logger.error(f"Go back failed: {e}") |
|
return ToolExecutionResponse(content=f"Failed to go back: {e}") |
|
|
|
|
|
@attach_param_schema(WaitParams) |
|
async def wait(self) -> ToolExecutionResponse: |
|
"""Wait three seconds. Useful when the page appears to still be loading, or if there are any unfinished webpage processes.""" |
|
await asyncio.sleep(3) |
|
return ToolExecutionResponse(content="Waited for a few seconds.") |
|
|
|
@attach_param_schema(ReloadParams) |
|
async def reload(self) -> ToolExecutionResponse: |
|
"""Reload the current page. Useful when the page seems unresponsive, broken, outdated, or if you want to reset the page to its initial state.""" |
|
try: |
|
await self.browser.reload() |
|
return ToolExecutionResponse(content="Reloaded the current page.") |
|
except Exception as e: |
|
logger.error(f"Reload failed: {e}") |
|
return ToolExecutionResponse(content=f"Failed to reload the page: {e}") |
|
|
|
|
|
@attach_param_schema(DoNothingParams) |
|
async def do_nothing_tool(self) -> ToolExecutionResponse: |
|
"""Do nothing. Use this if you have no need for the browser at this time.""" |
|
return ToolExecutionResponse(content="Did nothing in the browser.") |
|
|
|
|
|
@attach_param_schema(OpenNewTabAndGoToParams) |
|
async def open_new_tab_and_go_to(self, url: str) -> ToolExecutionResponse: |
|
""" |
|
Opens a new browser tab/page and navigates to the specified URL. |
|
Closes the old page if it's not the last one remaining. |
|
Use this to bypass loading issues by forcing a new navigation. |
|
""" |
|
try: |
|
await self.browser.open_new_tab_and_go_to(url) |
|
return ToolExecutionResponse( |
|
content=f"Successfully opened new tab and navigated to: {url}", |
|
) |
|
except Exception as e: |
|
logger.error(f"Error opening new tab and navigating to {url}: {e}") |
|
return ToolExecutionResponse(content=f"Failed to open new tab and navigate to {url}: {e}") |
|
|
|
|
|
@attach_param_schema(SelectOptionByTextParams) |
|
async def select_option_by_text(self, mark_id: int, option_text: str) -> ToolExecutionResponse: |
|
""" |
|
Selects an option from a select element (including dual select picklists) by finding the option with matching text. |
|
This is especially useful for Salesforce dual select picklists where you need to find and select a specific option. |
|
Uses Playwright's native iframe handling to bypass CORS restrictions. |
|
""" |
|
try: |
|
logger.info(f"Attempting to select option '{option_text}' from element {mark_id}") |
|
|
|
|
|
await self.browser.click(mark_id=mark_id) |
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
|
|
|
main_frame = self.browser.current_page.main_frame |
|
all_frames = [main_frame] + main_frame.child_frames |
|
|
|
logger.info(f"Searching for element {mark_id} across {len(all_frames)} frames") |
|
|
|
for frame_idx, frame in enumerate(all_frames): |
|
try: |
|
|
|
select_elements = await frame.query_selector_all('select') |
|
logger.info(f"Frame {frame_idx}: Found {len(select_elements)} select elements") |
|
|
|
for select_elem in select_elements: |
|
|
|
options = await select_elem.query_selector_all('option') |
|
|
|
|
|
for opt_idx, option in enumerate(options): |
|
option_text_content = await option.text_content() |
|
option_value = await option.get_attribute('value') |
|
|
|
logger.info(f"Frame {frame_idx}, Select {select_elem}, Option {opt_idx}: text='{option_text_content}', value='{option_value}'") |
|
|
|
if option_text_content and option_text.lower().strip() == option_text_content.lower().strip(): |
|
|
|
try: |
|
|
|
await option.click(force=True, timeout=5000) |
|
logger.info(f"Successfully clicked option '{option_text_content.strip()}' in frame {frame_idx}") |
|
|
|
return ToolExecutionResponse( |
|
content=f"[ACTION COMPLETED] Successfully selected '{option_text_content.strip()}' from dual select picklist" |
|
) |
|
|
|
except Exception as select_error: |
|
logger.info(f"Click timed out in frame {frame_idx}, but option may have been selected: {select_error}") |
|
|
|
continue |
|
|
|
except Exception as frame_error: |
|
logger.info(f"Could not access frame {frame_idx}: {frame_error}") |
|
continue |
|
|
|
|
|
|
|
all_options = [] |
|
for frame in all_frames: |
|
try: |
|
select_elements = await frame.query_selector_all('select') |
|
for select_elem in select_elements: |
|
options = await select_elem.query_selector_all('option') |
|
for option in options[:5]: |
|
text = await option.text_content() |
|
if text: |
|
all_options.append(text.strip()) |
|
except: |
|
continue |
|
|
|
available_options_str = ', '.join(all_options[:10]) if all_options else 'None found' |
|
return ToolExecutionResponse( |
|
content=f"Failed to find option '{option_text}' in any select element. Available options (first 10): {available_options_str}" |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Error selecting option '{option_text}' from element {mark_id}: {e}") |
|
return ToolExecutionResponse(content=f"An unexpected error occurred while selecting option '{option_text}': {e}") |
|
|
|
|