|
from typing import List |
|
import json |
|
|
|
|
|
TOOL_SYSTEM_PROMPT_RUBRA = ( |
|
"You have access to the following tools: {tool_text}\n" |
|
"You can choose to respond with one or more tool calls at once, or with a chat message back to the user. " |
|
"Ensure you have all necessary details before making tool calls. If additional information is needed, " |
|
"ask the user appropriately. Any tool call you make must correspond to the functions listed above.\n" |
|
"If you decide to call a tool, format it like this: " |
|
'starttoolcall{{"name": "<function_name>", "arguments": {{"<arg1_name>": "<arg1_value>", "<arg2_name>": "<arg2_value>", ...}}}}endtoolcall ' |
|
"where the JSON wrapped between starttoolcall and endtoolcall represents the function call.\n" |
|
) |
|
|
|
def json_schema_to_typescript_type(schema, param_name): |
|
ts_type = "any" |
|
enum_comment = "" |
|
integer_comment = "" |
|
description_comment = "" |
|
|
|
if isinstance(schema, dict) and "type" in schema: |
|
json_type = schema["type"] |
|
if json_type == "array": |
|
item_type = ( |
|
"any" |
|
if "items" not in schema |
|
else json_schema_to_typescript_type(schema["items"], param_name)[0] |
|
) |
|
ts_type = f"{item_type}[]" |
|
elif json_type == "number": |
|
ts_type = "number" |
|
elif json_type == "integer": |
|
ts_type = ( |
|
"number" |
|
) |
|
integer_comment = f" * @param {param_name} - Integer" |
|
elif json_type == "object": |
|
ts_type, _ = generate_typescript_interface(schema, param_name) |
|
elif json_type == "boolean": |
|
ts_type = "boolean" |
|
elif json_type == "null": |
|
ts_type = "null" |
|
elif json_type == "string": |
|
ts_type = "string" |
|
|
|
if "enum" in schema: |
|
enum_comment = f" * @enum {param_name} - Possible values: " + ", ".join( |
|
[f'"{enum_value}"' for enum_value in schema["enum"]] |
|
) |
|
ts_type = "string" |
|
if "description" in schema: |
|
description_comment = f' * @param {param_name} - {schema["description"]}' |
|
|
|
|
|
if isinstance(schema, dict) and schema.get("type") == "object": |
|
return ts_type, "", "", "" |
|
|
|
return ts_type, enum_comment, integer_comment, description_comment |
|
|
|
|
|
def generate_typescript_interface(schema, interface_name): |
|
properties = schema.get("properties", {}) |
|
required = schema.get("required", []) |
|
|
|
interface_body = [] |
|
descriptions = [] |
|
for prop_name, prop_schema in properties.items(): |
|
prop_type, enum_comment, integer_comment, description_comment = ( |
|
json_schema_to_typescript_type(prop_schema, prop_name) |
|
) |
|
is_optional = prop_name not in required |
|
interface_body.append( |
|
f' {prop_name}{"?" if is_optional else ""}: {prop_type};' |
|
) |
|
if description_comment: |
|
descriptions.append(description_comment) |
|
if enum_comment: |
|
descriptions.append(enum_comment) |
|
if integer_comment: |
|
descriptions.append(integer_comment) |
|
|
|
comments = "\n".join(descriptions) |
|
interface_definition = ( |
|
f"interface {interface_name} {{\n" + "\n".join(interface_body) + "\n}" |
|
) |
|
return interface_definition, comments |
|
|
|
|
|
def convert_parameters_list_to_dict(parameters): |
|
properties = {} |
|
required = [] |
|
for param in parameters: |
|
properties[param["name"]] = param |
|
if "default" not in param: |
|
required.append(param["name"]) |
|
return {"properties": properties, "required": required} |
|
|
|
|
|
def generate_typescript_function(function_schema) -> str: |
|
func_name = function_schema["name"] |
|
description = function_schema.get("description", "") |
|
|
|
|
|
parameters_info = function_schema.get("parameters", {}) |
|
if isinstance(parameters_info, list): |
|
parameters_info = convert_parameters_list_to_dict(parameters_info) |
|
if parameters_info is None: |
|
parameters_info = {} |
|
|
|
parameters_schema = parameters_info.get("properties", {}) |
|
required_params = parameters_info.get("required", []) |
|
|
|
args_list = [] |
|
comments_list = [] |
|
interfaces = [] |
|
for param_name, param_schema in parameters_schema.items(): |
|
ts_type, enum_comment, integer_comment, description_comment = ( |
|
json_schema_to_typescript_type(param_schema, param_name) |
|
) |
|
if ts_type.startswith("interface"): |
|
interface_definition, nested_comments = generate_typescript_interface( |
|
param_schema, f"{func_name}_{param_name.capitalize()}Params" |
|
) |
|
interfaces.append(interface_definition) |
|
comments_list.append(nested_comments) |
|
ts_type = f"{func_name}_{param_name.capitalize()}Params" |
|
else: |
|
if description_comment: |
|
comments_list.append(description_comment) |
|
if enum_comment: |
|
comments_list.append(enum_comment) |
|
if integer_comment: |
|
comments_list.append(integer_comment) |
|
is_optional = param_name not in required_params |
|
args_list.append(f'{param_name}{"?" if is_optional else ""}: {ts_type}') |
|
|
|
args_str = ", ".join(args_list) |
|
comments_str = "\n".join(comments_list) |
|
interfaces_str = "\n\n".join(interfaces) |
|
|
|
description_comment = f" * {description}\n" if description else "" |
|
typescript_func_declaration = ( |
|
"/**\n" |
|
+ description_comment |
|
+ (comments_str + "\n" if comments_str else "") |
|
+ " */\n" |
|
+ (interfaces_str + "\n\n" if interfaces_str else "") |
|
+ f"function {func_name}({args_str}): any {{}}" |
|
) |
|
|
|
return typescript_func_declaration |
|
|
|
|
|
|
|
def format_tools(tools: List[dict]) -> str: |
|
func_defs = [] |
|
for t in tools: |
|
tool_schema = t["function"] if "function" in t else t |
|
func_defs.append(generate_typescript_function(tool_schema)) |
|
|
|
typescript_functions_str = "\n\n".join(func_defs) |
|
res = TOOL_SYSTEM_PROMPT_RUBRA.format(tool_text=typescript_functions_str) |
|
return res |
|
|
|
|
|
|
|
def preprocess_input(msgs: List[dict], tools: List[dict]): |
|
tool_system_prompt = format_tools(tools) |
|
processed_msgs = process_messages(msgs, tool_system_prompt) |
|
return processed_msgs |
|
|
|
|
|
def process_messages(messages: List[dict], function_str: str): |
|
func_observation_map = {} |
|
processed_msg = [] |
|
|
|
for i in range(len(messages)): |
|
|
|
if messages[i]["role"] != "tool" and len(func_observation_map) > 0: |
|
|
|
func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] |
|
observation_str = json.dumps(func_observation_array) |
|
observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"} |
|
processed_msg.append(observation_call) |
|
func_observation_map.clear() |
|
|
|
if i == 0: |
|
if messages[0]["role"] == "system": |
|
old_content = messages[0]["content"] |
|
sys_msg = {"role": "system", "content": old_content + "\n" + function_str} |
|
processed_msg.append(sys_msg) |
|
else: |
|
|
|
sys_msg = {"role": "system", "content": "You are a helpful assistant.\n" + function_str} |
|
processed_msg.append(sys_msg) |
|
processed_msg.append(messages[0]) |
|
|
|
elif messages[i]["role"] == "assistant" and "tool_calls" in messages[i]: |
|
|
|
tool_call_str = construct_tool_call_str(messages[i]["tool_calls"], func_observation_map) |
|
function_call = {"role": "assistant", "content": tool_call_str} |
|
processed_msg.append(function_call) |
|
|
|
elif messages[i]["role"] == "tool": |
|
tool_call_id = messages[i]["tool_call_id"] |
|
if tool_call_id in func_observation_map: |
|
func_observation_map[tool_call_id] = messages[i]["content"] |
|
else: |
|
print(func_observation_map) |
|
print(f"Tool call id not found in the map: {tool_call_id}") |
|
|
|
|
|
else: |
|
processed_msg.append(messages[i]) |
|
|
|
|
|
if len(func_observation_map) > 0: |
|
|
|
func_observation_array = [f'{func_observation_map[k] if func_observation_map[k] != "" else "done"}' for k in func_observation_map] |
|
observation_str = json.dumps(func_observation_array) |
|
observation_call = {"role": "user", "content": "start observation " + observation_str + " end observation"} |
|
processed_msg.append(observation_call) |
|
func_observation_map.clear() |
|
|
|
return processed_msg |
|
|
|
|
|
def construct_tool_call_str(tool_calls, func_observation_map) -> str: |
|
tool_list = [] |
|
for tool_call in tool_calls: |
|
tool_call_id = tool_call["id"] |
|
func_observation_map[tool_call_id] = "" |
|
|
|
if type(tool_call["function"]["arguments"]) == str: |
|
tool_call["function"]["arguments"] = json.loads(tool_call["function"]["arguments"]) |
|
tool_list.append("starttoolcall"+str(tool_call["function"]) + "endtoolcall") |
|
|
|
|
|
tool_call_str = "".join(tool_list) |
|
return tool_call_str |
|
|
|
|
|
if __name__ == "__main__": |
|
tools = [{ |
|
"type": "function", |
|
"function": { |
|
"name": "dummy", |
|
"description": "just to say hi", |
|
"parameters": None, |
|
} |
|
},{"type": "function","function":{"name":"calculate_distance","description":"Calculate the distance between two locations","parameters":{"type":"object","properties":{"origin":{"type":"string","description":"The starting location"},"destination":{"type":"string","description":"The destination location"},"mode":{"type":"string","description":"The mode of transportation"}},"required":["origin","destination","mode"]}}},{"type": "function","function":{"name":"generate_password","description":"Generate a random password","parameters":{"type":"object","properties":{"length":{"type":"integer","description":"The length of the password"}},"required":["length"]}}}] |
|
|
|
msgs = [{'role': 'user', 'content': "\nYou are task oriented system.\nYou receive input from a user, process the input from the given instructions, and then output the result.\nYour objective is to provide consistent and correct results.\nYou do not need to explain the steps taken, only provide the result to the given instructions.\nYou are referred to as a tool.\nYou don't move to the next step until you have a result.\n\nDownload https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip to a\nrandom file. Then expand the archive to a temporary location as there is a sqlite\ndatabase in it.\n\nFirst inspect the schema of the database to understand the table structure.\n\nForm and run a SQL query to find the artist with the most number of albums and output\nthe result of that.\n\nWhen done remove the database file and the downloaded content."}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'cdf7eb2e', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download2815959664.zip', 'name': 'download', 'tool_call_id': 'cdf7eb2e'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': '07cdee86', 'type': 'function', 'function': {'name': 'exec', 'arguments': '{"command":"unzip /tmp/gpt-download2815959664.zip","directory":"."}'}}]}, {'role': 'tool', 'content': 'Archive: /tmp/gpt-download2815959664.zip\n inflating: chinook.db \n', 'name': 'exec', 'tool_call_id': '07cdee86'}, {'role': 'assistant', 'content': '', 'tool_calls': [{'id': 'e8042027', 'type': 'function', 'function': {'name': 'download', 'arguments': '{"url":"https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"}'}}]}, {'role': 'tool', 'content': '/tmp/gpt-download3736408077.zip', 'name': 'download', 'tool_call_id': 'e8042027'}] |
|
new_msgs = preprocess_input(msgs, tools) |
|
print(json.dumps(new_msgs, indent=2)) |
|
|