|
import json |
|
import uuid |
|
import re |
|
from typing import List |
|
import subprocess |
|
import sys |
|
|
|
def install(package): |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) |
|
|
|
try: |
|
import pythonmonkey |
|
except ImportError: |
|
install('pythonmonkey') |
|
import pythonmonkey |
|
|
|
|
|
|
|
|
|
jsonrepair = pythonmonkey.require('jsonrepair').jsonrepair |
|
|
|
def clean_command_string(command_str): |
|
cleaned_command = re.sub(r'\\(?!["\\/bfnrt]|u[a-fA-F0-9]{4})', '', command_str) |
|
cleaned_command = cleaned_command.replace('\\"', '"') |
|
if cleaned_command.startswith('"') and cleaned_command.endswith('"'): |
|
cleaned_command = cleaned_command[1:-1] |
|
return cleaned_command |
|
|
|
def parse_json_safely(json_str): |
|
try: |
|
return json.loads(json_str) |
|
except json.JSONDecodeError: |
|
try: |
|
repaired = jsonrepair(json_str) |
|
return json.loads(repaired) |
|
except Exception: |
|
return json_str |
|
|
|
def clean_json_object(obj): |
|
if isinstance(obj, dict): |
|
return {k: clean_json_object(v) for k, v in obj.items()} |
|
elif isinstance(obj, list): |
|
return [clean_json_object(item) for item in obj] |
|
elif isinstance(obj, str): |
|
cleaned = clean_command_string(obj) |
|
return parse_json_safely(cleaned) if cleaned.startswith('{') or cleaned.startswith('[') else cleaned |
|
else: |
|
return obj |
|
|
|
def extract_tool_calls(output_str): |
|
|
|
pattern = r'starttoolcall(.*?)(?:endtoolcall|$)' |
|
matches = [match for match in re.findall(pattern, output_str, re.DOTALL)] |
|
return matches |
|
|
|
def extract_tool_calls_and_text(output_str): |
|
|
|
segments = [] |
|
|
|
|
|
last_end = 0 |
|
|
|
|
|
pattern = r'(starttoolcall(.*?)(?:endtoolcall|$))' |
|
for match in re.finditer(pattern, output_str, re.DOTALL): |
|
start, end = match.span(1) |
|
|
|
|
|
if start > last_end: |
|
text_between = output_str[last_end:start].strip() |
|
if text_between: |
|
segments.append({"text": text_between, "type": "text"}) |
|
|
|
|
|
tool_call_content = match.group(2).strip() |
|
segments.append({"tool_call": tool_call_content, "type": "function"}) |
|
|
|
|
|
last_end = end |
|
|
|
|
|
if last_end < len(output_str): |
|
remaining_text = output_str[last_end:].strip() |
|
if remaining_text: |
|
segments.append({"text": remaining_text, "type": "text"}) |
|
|
|
return segments |
|
|
|
def postprocess_output(output_str: str): |
|
segments = extract_tool_calls_and_text(output_str) |
|
results = [] |
|
|
|
for segment in segments: |
|
print("processing segment") |
|
print(segment) |
|
if segment['type'] == 'function': |
|
call = segment['tool_call'] |
|
try: |
|
parsed_call = parse_json_safely(call) |
|
cleaned_call = clean_json_object(parsed_call) |
|
|
|
if isinstance(cleaned_call, dict) and 'name' in cleaned_call and 'arguments' in cleaned_call: |
|
if isinstance(cleaned_call.get('arguments'), dict): |
|
cleaned_call['arguments'] = json.dumps(cleaned_call['arguments']) |
|
results.append({ |
|
"id": uuid.uuid4().hex[:8], |
|
"function": cleaned_call, |
|
"type": "function", |
|
}) |
|
else: |
|
results.append({ |
|
"id": uuid.uuid4().hex[:8], |
|
"text": call, |
|
"type": "text", |
|
}) |
|
except Exception as e: |
|
results.append({ |
|
"id": uuid.uuid4().hex[:8], |
|
"text": call, |
|
"type": "text", |
|
}) |
|
else: |
|
results.append({ |
|
"id": uuid.uuid4().hex[:8], |
|
"text": segment['text'], |
|
"type": "text", |
|
}) |
|
|
|
return results |
|
|
|
def json_to_markdown(json_obj): |
|
"""Convert a JSON object to a formatted markdown string.""" |
|
markdown = "" |
|
for item in json_obj: |
|
if item.get("type") == "text": |
|
|
|
markdown += item.get("text", "") + "\n\n" |
|
elif item.get("type") == "function": |
|
|
|
markdown += "```json\n" |
|
markdown += json.dumps(item.get("function", {}), indent=2) |
|
markdown += "\n```\n\n" |
|
return markdown.strip() |
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
output_str = '''starttoolcall{"name": "get_current_weather", "arguments": {"location": "San Francisco", "unit": "celsius"}}endtoolcall starttoolcall{"name": "get_current_weather", "arguments": {"location": "Tokyo", "unit": "celsius"}}endtoolcall starttoolcall{"name": "get_current_weather", "arguments": {"location": "Paris", "unit": ''' |
|
parsed_json = postprocess_output(output_str) |
|
print(json.dumps(parsed_json, indent=2)) |
|
|
|
print("-----") |
|
print(json_to_markdown(parsed_json)) |