Spaces:
Running
Running
File size: 4,122 Bytes
fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 94a64b0 fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from typing import Any, Dict, List
import json
from any_agent import AgentFramework
from surf_spot_finder.evaluation.telemetry import TelemetryProcessor
class OpenAITelemetryProcessor(TelemetryProcessor):
"""Processor for OpenAI agent telemetry data."""
def _get_agent_framework(self) -> AgentFramework:
return AgentFramework.OPENAI
def extract_hypothesis_answer(self, trace: List[Dict[str, Any]]) -> str:
for span in reversed(trace):
# Looking for the final response that has the summary answer
if (
"attributes" in span
and span.get("attributes", {}).get("openinference.span.kind") == "LLM"
):
output_key = (
"llm.output_messages.0.message.contents.0.message_content.text"
)
if output_key in span["attributes"]:
return span["attributes"][output_key]
raise ValueError("No agent final answer found in trace")
def _extract_telemetry_data(self, telemetry: List[Dict[str, Any]]) -> list:
"""Extract LLM calls and tool calls from OpenAI telemetry."""
calls = []
for span in telemetry:
if "attributes" not in span:
continue
attributes = span.get("attributes", {})
span_kind = attributes.get("openinference.span.kind", "")
# Collect LLM interactions - look for direct message content first
if span_kind == "LLM":
# Initialize the LLM info dictionary
span_info = {}
# Try to get input message
input_key = "llm.input_messages.1.message.content" # User message is usually at index 1
if input_key in attributes:
span_info["input"] = attributes[input_key]
# Try to get output message directly
output_content = None
# Try in multiple possible locations
for key in [
"llm.output_messages.0.message.content",
"llm.output_messages.0.message.contents.0.message_content.text",
]:
if key in attributes:
output_content = attributes[key]
break
# If we found direct output content, use it
if output_content:
span_info["output"] = output_content
calls.append(span_info)
elif span_kind == "TOOL":
tool_name = attributes.get("tool.name", "Unknown tool")
tool_output = attributes.get("output.value", "")
span_info = {
"tool_name": tool_name,
"input": attributes.get("input.value", ""),
"output": tool_output,
# Can't add status yet because it isn't being set by openinference
# "status": span.get("status", {}).get("status_code"),
}
span_info["input"] = json.loads(span_info["input"])
calls.append(span_info)
return calls
# Backward compatibility functions that use the new class structure
def extract_hypothesis_answer(
trace: List[Dict[str, Any]], agent_framework: AgentFramework
) -> str:
"""Extract the hypothesis agent final answer from the trace"""
processor = TelemetryProcessor.create(agent_framework)
return processor.extract_hypothesis_answer(trace)
def parse_generic_key_value_string(text: str) -> Dict[str, str]:
"""
Parse a string that has items of a dict with key-value pairs separated by '='.
Only splits on '=' signs, handling quoted strings properly.
"""
return TelemetryProcessor.parse_generic_key_value_string(text)
def extract_evidence(
telemetry: List[Dict[str, Any]], agent_framework: AgentFramework
) -> str:
"""Extract relevant telemetry evidence based on the agent type."""
processor = TelemetryProcessor.create(agent_framework)
return processor.extract_evidence(telemetry)
|