Spaces:
Running
Running
import asyncio | |
from contextlib import AsyncExitStack | |
from typing import List, Literal, Optional, Any | |
from pydantic import BaseModel, Field | |
from proxy_lite.browser.browser import BrowserSession | |
from proxy_lite.logger import logger | |
from .tool_base import Tool, ToolExecutionResponse, attach_param_schema | |
SELF_CONTAINED_TAGS = [ | |
# many of these are non-interactive but keeping them anyway | |
"area", | |
"base", | |
"br", | |
"col", | |
"embed", | |
"hr", | |
"img", | |
"input", | |
"link", | |
"meta", | |
"param", | |
"source", | |
"track", | |
"wbr", | |
] | |
def element_as_text( | |
mark_id: int, | |
tag: Optional[str] = None, | |
text: Optional[str] = None, | |
**raw_attributes, | |
) -> str: | |
"""Return a text representation of all elements on the page""" | |
attributes = [] | |
for k, v in raw_attributes.items(): | |
if v is None: | |
continue | |
if isinstance(v, bool): | |
if v: | |
attributes.append(k) | |
# we ignore False bool attributes | |
else: | |
v = str(v) | |
if len(v) > 2500: | |
v = v[: 2500 - 1] + "…" | |
attributes.append(f'{k}="{v}"') | |
attributes = " ".join(attributes) | |
attributes = (" " + attributes).rstrip() | |
tag = tag.lower() | |
if text is None: | |
text = "" | |
if len(text) > 2500: | |
text = text[: 2500 - 1] + "…" | |
if tag in SELF_CONTAINED_TAGS: | |
if text: | |
logger.warning( | |
f"Got self-contained element '{tag}' which contained text '{text}'.", | |
) | |
else: | |
return f"<{tag} id={mark_id}{attributes}/>" | |
return f"<{tag} id={mark_id}{attributes}>{text}</{tag}>" | |
class GotoParams(BaseModel): | |
url: str = Field(..., description="The web address to visit. Must be a valid URL.") | |
class GoogleSearchParams(BaseModel): | |
query_plan: str = Field( | |
..., | |
description="Plan out the query you will make. Re-write queries in a way that will yield the best results.", | |
) | |
query: str = Field(..., description="The Google search to perform.") | |
class ClickParams(BaseModel): | |
mark_id: int = Field(..., description="Element Mark ID.") | |
class TypeEntry(BaseModel): | |
mark_id: int = Field(..., description="Element Mark ID.") | |
content: str = Field(..., description="The text to type into the element.") | |
class TypeParams(BaseModel): | |
entries: List[TypeEntry] = Field( | |
..., | |
description="A list of elements and contents to type.", | |
) | |
submit: bool = Field( | |
..., | |
description='Whether to press the "Enter" key after typing in the last entry.', | |
) | |
class ScrollParams(BaseModel): | |
direction: Literal["up", "down", "left", "right"] = Field( | |
..., | |
description='Direction to scroll. Must be one of "up", "down", "left" or "right".', | |
) | |
mark_id: int = Field( | |
..., | |
description="What to scroll. Use -1 to scroll the whole page otherwise give the mark ID of an element that is `scrollable`.", # noqa: E501 | |
) | |
class BackParams(BaseModel): | |
pass | |
class WaitParams(BaseModel): | |
pass | |
class ReloadParams(BaseModel): | |
pass | |
class DoNothingParams(BaseModel): | |
pass | |
# --- NEW: Parameters for open_new_tab_and_go_to tool --- | |
class OpenNewTabAndGoToParams(BaseModel): | |
url: str = Field(..., description="The URL to navigate to in the new tab.") | |
class BrowserTool(Tool): | |
def __init__(self, session: BrowserSession) -> None: | |
super().__init__() | |
self.browser = session | |
async def __aenter__(self): | |
self._exit_stack = AsyncExitStack() | |
await self._exit_stack.enter_async_context(self.browser) | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
await self._exit_stack.aclose() | |
def poi_text(self) -> str: | |
# Get all points of interest on the page as text | |
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.browser.poi_elements)] | |
# Return formatted text of points of interest on page | |
return "\n".join([txt for txt in texts if txt]) | |
async def goto(self, url: str) -> ToolExecutionResponse: | |
"""Go directly to a specific web url. Specify the exact URL.""" | |
await self.browser.goto(url) | |
return ToolExecutionResponse(observation=f"Successfully navigated to URL: {url}") # Added observation | |
async def google_search(self, query_plan: str, query: str) -> ToolExecutionResponse: | |
"""Perform a generic web search using Google. | |
Results may not be relevant. If you see poor results, you can try another query. | |
""" | |
url = f"https://www.google.com/search?q={query}" | |
await self.browser.goto(url) | |
return ToolExecutionResponse(observation=f"Performed Google search for: {query}") # Added observation | |
async def click(self, mark_id: int) -> ToolExecutionResponse: | |
"""Click on an element of the page.""" | |
try: | |
await self.browser.click(mark_id=mark_id) | |
return ToolExecutionResponse(observation=f"Clicked element with mark ID: {mark_id}") | |
except IndexError as e: | |
# This happens if mark_id is out of bounds for browser.poi_centroids | |
logger.error(f"Click failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") | |
return ToolExecutionResponse(observation=f"Failed to click element with mark ID {mark_id}. Element not found or POI list invalid.") | |
except Exception as e: | |
logger.error(f"Click failed with unexpected error for mark ID {mark_id}: {e}") | |
return ToolExecutionResponse(observation=f"An unexpected error occurred while trying to click element {mark_id}: {e}") | |
async def type(self, entries: List[dict], submit: bool) -> ToolExecutionResponse: | |
"""Type text. | |
You can type into one or more elements. | |
Note that the text inside an element is cleared before typing. | |
""" | |
typed_ids = [] | |
for i, entry_dict in enumerate(entries): | |
try: | |
entry = TypeEntry(**entry_dict) | |
last_entry = i == len(entries) - 1 | |
old_poi_positions = [tuple(point) for point in self.browser.poi_centroids] | |
await self.browser.enter_text( | |
mark_id=entry.mark_id, | |
text=entry.content, | |
submit=submit and last_entry, | |
) | |
typed_ids.append(entry.mark_id) | |
await self.browser.update_poi() | |
new_poi_positions = [tuple(point) for point in self.browser.poi_centroids] | |
if not last_entry and old_poi_positions != new_poi_positions: | |
logger.error( | |
"POI positions changed mid-typing, cancelling future type entries.", | |
) | |
break | |
except IndexError as e: | |
logger.error(f"Type failed: Mark ID {entry.mark_id} not found or POI list empty. Error: {e}") | |
return ToolExecutionResponse(observation=f"Failed to type into element with mark ID {entry.mark_id}. Element not found or POI list invalid. Typed into: {typed_ids if typed_ids else 'none'}.") | |
except Exception as e: | |
logger.error(f"Type failed with unexpected error for mark ID {entry.mark_id}: {e}") | |
return ToolExecutionResponse(observation=f"An unexpected error occurred while trying to type into element {entry.mark_id}: {e}. Typed into: {typed_ids if typed_ids else 'none'}.") | |
return ToolExecutionResponse( | |
observation=f"Typed text into elements with mark IDs: {typed_ids}", | |
) | |
async def scroll(self, direction: str, mark_id: int) -> ToolExecutionResponse: | |
"""Scroll the page (or a scrollable element) up, down, left or right.""" | |
try: | |
if mark_id == -1: | |
mark_id_for_browser = None # Pass None to browser.scroll for page scroll | |
else: | |
mark_id_for_browser = mark_id | |
await self.browser.scroll(direction=direction, mark_id=mark_id_for_browser) | |
return ToolExecutionResponse(observation=f"Scrolled {direction} on element with mark ID: {mark_id if mark_id != -1 else 'page'}") | |
except IndexError as e: | |
logger.error(f"Scroll failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") | |
return ToolExecutionResponse(observation=f"Failed to scroll element with mark ID {mark_id}. Element not found or POI list invalid.") | |
except Exception as e: | |
logger.error(f"Scroll failed with unexpected error for mark ID {mark_id}: {e}") | |
return ToolExecutionResponse(observation=f"An unexpected error occurred while trying to scroll element {mark_id}: {e}") | |
async def back(self) -> ToolExecutionResponse: | |
"""Go back to the previous page.""" | |
try: | |
await self.browser.go_back() | |
return ToolExecutionResponse(observation="Went back to the previous page.") | |
except Exception as e: | |
logger.error(f"Go back failed: {e}") | |
return ToolExecutionResponse(observation=f"Failed to go back: {e}") | |
async def wait(self) -> ToolExecutionResponse: | |
"""Wait three seconds. Useful when the page appears to still be loading, or if there are any unfinished webpage processes.""" # noqa: E501 | |
await asyncio.sleep(3) | |
return ToolExecutionResponse(observation="Waited for a few seconds.") | |
async def reload(self) -> ToolExecutionResponse: | |
"""Reload the current page. Useful when the page seems unresponsive, broken, outdated, or if you want to reset the page to its initial state.""" # noqa: E501 | |
try: | |
await self.browser.reload() | |
return ToolExecutionResponse(observation="Reloaded the current page.") | |
except Exception as e: | |
logger.error(f"Reload failed: {e}") | |
return ToolExecutionResponse(observation=f"Failed to reload the page: {e}") | |
async def do_nothing_tool(self) -> ToolExecutionResponse: | |
"""Do nothing. Use this if you have no need for the browser at this time.""" | |
return ToolExecutionResponse(observation="Did nothing in the browser.") | |
# --- NEW: Expose the open_new_tab_and_go_to method as a tool --- | |
async def open_new_tab_and_go_to(self, url: str) -> ToolExecutionResponse: | |
""" | |
Opens a new browser tab/page and navigates to the specified URL. | |
Closes the old page if it's not the last one remaining. | |
Use this to bypass loading issues by forcing a new navigation. | |
""" | |
try: | |
await self.browser.open_new_tab_and_go_to(url) | |
return ToolExecutionResponse( | |
observation=f"Successfully opened new tab and navigated to: {url}", | |
) | |
except Exception as e: | |
logger.error(f"Error opening new tab and navigating to {url}: {e}") | |
return ToolExecutionResponse(observation=f"Failed to open new tab and navigate to {url}: {e}") | |