import asyncio from contextlib import asynccontextmanager from typing import AsyncGenerator from time import sleep from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client org_or_user = "a-mahla" space_name = "test_desktop" @asynccontextmanager async def mcp_server() -> AsyncGenerator[ClientSession, None]: """Context manager for recording the demo with MCP server""" base_url = f"https://{org_or_user}-{space_name}.hf.space" server_url = f"{base_url}/mcp/" print("🎬 MCP Server started:", server_url) # Connect to the server async with streamablehttp_client(server_url) as ( read_stream, write_stream, _, ): async with ClientSession(read_stream, write_stream) as session: await session.initialize() yield session async def main(): async with mcp_server() as session: print("Opening https://www.huggingface.co for research...") await session.call_tool("open", {"file_or_url": "https://www.huggingface.co/"}) sleep(7) print(await session.call_tool("move_mouse", {"x": 1200, "y": 120})) sleep(0.5) print(await session.call_tool("left_click", {})) sleep(1) print(await session.call_tool("move_mouse", {"x": 1200, "y": 160})) print(await session.call_tool("left_click", {})) sleep(2) print(await session.call_tool("move_mouse", {"x": 1600, "y": 320})) print(await session.call_tool("left_click", {})) sleep(1) if __name__ == "__main__": asyncio.run(main())