Spaces:
Running
Running
File size: 1,595 Bytes
e074e5b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from time import sleep
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
org_or_user = "a-mahla"
space_name = "test_desktop"
@asynccontextmanager
async def mcp_server() -> AsyncGenerator[ClientSession, None]:
"""Context manager for recording the demo with MCP server"""
base_url = f"https://{org_or_user}-{space_name}.hf.space"
server_url = f"{base_url}/mcp/"
print("🎬 MCP Server started:", server_url)
# Connect to the server
async with streamablehttp_client(server_url) as (
read_stream,
write_stream,
_,
):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
yield session
async def main():
async with mcp_server() as session:
print("Opening https://www.huggingface.co for research...")
await session.call_tool("open", {"file_or_url": "https://www.huggingface.co/"})
sleep(7)
print(await session.call_tool("move_mouse", {"x": 1200, "y": 120}))
sleep(0.5)
print(await session.call_tool("left_click", {}))
sleep(1)
print(await session.call_tool("move_mouse", {"x": 1200, "y": 160}))
print(await session.call_tool("left_click", {}))
sleep(2)
print(await session.call_tool("move_mouse", {"x": 1600, "y": 320}))
print(await session.call_tool("left_click", {}))
sleep(1)
if __name__ == "__main__":
asyncio.run(main()) |