Spaces:
Configuration error
Configuration error
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= | |
import atexit | |
import json | |
import logging | |
import subprocess | |
import time | |
from functools import wraps | |
from pathlib import Path | |
from subprocess import Popen | |
from typing import Any, Dict, List, Optional, Union | |
import requests | |
from pydantic import BaseModel | |
from camel.runtime import BaseRuntime | |
from camel.toolkits.function_tool import FunctionTool | |
logger = logging.getLogger(__name__) | |
class RemoteHttpRuntime(BaseRuntime): | |
r"""A runtime that runs functions in a remote HTTP server. | |
You need to run the API server in the remote server first. | |
Args: | |
host (str): The host of the remote server. | |
port (int): The port of the remote server. (default::obj: `8000`) | |
python_exec (str): The python executable to run the API server. | |
(default::obj: `python3`) | |
""" | |
def __init__( | |
self, host: str, port: int = 8000, python_exec: str = "python3" | |
): | |
super().__init__() | |
self.host = host | |
self.port = port | |
self.python_exec = python_exec | |
self.api_path = Path(__file__).parent / "api.py" | |
self.entrypoint: Dict[str, str] = dict() | |
self.process: Optional[Popen] = None | |
def build(self) -> "RemoteHttpRuntime": | |
r"""Build the API server. | |
Returns: | |
RemoteHttpRuntime: The current runtime. | |
""" | |
self.process = subprocess.Popen( | |
[ | |
self.python_exec, | |
str(self.api_path), | |
str(self.port), | |
*list(self.entrypoint.values()), | |
] | |
) | |
atexit.register(self._cleanup) | |
return self | |
def _cleanup(self): | |
r"""Clean up the API server when exiting.""" | |
if self.process and self.process.poll() is None: | |
self.process.terminate() | |
self.process.wait() | |
self.process = None | |
def add( # type: ignore[override] | |
self, | |
funcs: Union[FunctionTool, List[FunctionTool]], | |
entrypoint: str, | |
redirect_stdout: bool = False, | |
arguments: Optional[Dict[str, Any]] = None, | |
) -> "RemoteHttpRuntime": | |
r"""Add a function or list of functions to the runtime. | |
Args: | |
funcs (Union[FunctionTool, List[FunctionTool]]): The function or | |
list of functions to add. | |
entrypoint (str): The entrypoint for the function. | |
redirect_stdout (bool): Whether to return the stdout of | |
the function. (default::obj: `False`) | |
arguments (Optional[Dict[str, Any]]): The arguments for the | |
function. (default::obj: `None`) | |
Returns: | |
RemoteHttpRuntime: The current runtime. | |
""" | |
if not isinstance(funcs, list): | |
funcs = [funcs] | |
if arguments is not None: | |
entrypoint += json.dumps(arguments) | |
for func in funcs: | |
inner_func = func.func | |
# Create a wrapper that explicitly binds `func` | |
def wrapper( | |
*args, func=func, redirect_stdout=redirect_stdout, **kwargs | |
): | |
for key, value in kwargs.items(): | |
if isinstance(value, BaseModel): | |
kwargs[key] = value.model_dump() | |
resp = requests.post( | |
f"http://{self.host}:{self.port}/{func.get_function_name()}", | |
json=dict( | |
args=args, | |
kwargs=kwargs, | |
redirect_stdout=redirect_stdout, | |
), | |
) | |
if resp.status_code != 200: | |
logger.error( | |
f"""ailed to execute function: | |
{func.get_function_name()}, | |
status code: {resp.status_code}, | |
response: {resp.text}""" | |
) | |
return { | |
"error": f"""Failed to execute function: | |
{func.get_function_name()}, | |
response: {resp.text}""" | |
} | |
data = resp.json() | |
if redirect_stdout: | |
print(data["stdout"]) | |
return json.loads(data["output"]) | |
func.func = wrapper | |
self.tools_map[func.get_function_name()] = func | |
self.entrypoint[func.get_function_name()] = entrypoint | |
return self | |
def ok(self) -> bool: | |
r"""Check if the API Server is running. | |
Returns: | |
bool: Whether the API Server is running. | |
""" | |
try: | |
_ = requests.get(f"http://{self.host}:{self.port}") | |
return True | |
except requests.exceptions.ConnectionError: | |
return False | |
def wait(self, timeout: int = 10) -> bool: | |
r"""Wait for the API Server to be ready. | |
Args: | |
timeout (int): The number of seconds to wait. (default::obj: `10`) | |
Returns: | |
bool: Whether the API Server is ready. | |
""" | |
for _ in range(timeout): | |
if self.ok: | |
return True | |
time.sleep(1) | |
return False | |
def __del__(self): | |
r"""Clean up the API server when the object is deleted.""" | |
self._cleanup() | |
def stop(self) -> "RemoteHttpRuntime": | |
r"""Stop the API server. | |
Returns: | |
RemoteHttpRuntime: The current runtime. | |
""" | |
self._cleanup() | |
return self | |
def reset(self) -> "RemoteHttpRuntime": | |
r"""Reset the API server. | |
Returns: | |
RemoteHttpRuntime: The current runtime. | |
""" | |
return self.stop().build() | |
def docs(self) -> str: | |
r"""Get the URL for the API documentation. | |
Returns: | |
str: The URL for the API documentation. | |
""" | |
return f"http://{self.host}:{self.port}/docs" | |