File size: 4,122 Bytes
fea07c2
 
 
a9fb876
 
fea07c2
 
 
 
 
 
a9fb876
 
fea07c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94a64b0
 
fea07c2
 
 
 
 
 
 
 
 
 
a9fb876
fea07c2
 
a9fb876
fea07c2
 
 
 
 
 
 
 
 
 
 
a9fb876
 
 
fea07c2
a9fb876
fea07c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from typing import Any, Dict, List
import json

from any_agent import AgentFramework

from surf_spot_finder.evaluation.telemetry import TelemetryProcessor


class OpenAITelemetryProcessor(TelemetryProcessor):
    """Processor for OpenAI agent telemetry data."""

    def _get_agent_framework(self) -> AgentFramework:
        return AgentFramework.OPENAI

    def extract_hypothesis_answer(self, trace: List[Dict[str, Any]]) -> str:
        for span in reversed(trace):
            # Looking for the final response that has the summary answer
            if (
                "attributes" in span
                and span.get("attributes", {}).get("openinference.span.kind") == "LLM"
            ):
                output_key = (
                    "llm.output_messages.0.message.contents.0.message_content.text"
                )
                if output_key in span["attributes"]:
                    return span["attributes"][output_key]

        raise ValueError("No agent final answer found in trace")

    def _extract_telemetry_data(self, telemetry: List[Dict[str, Any]]) -> list:
        """Extract LLM calls and tool calls from OpenAI telemetry."""
        calls = []

        for span in telemetry:
            if "attributes" not in span:
                continue

            attributes = span.get("attributes", {})
            span_kind = attributes.get("openinference.span.kind", "")

            # Collect LLM interactions - look for direct message content first
            if span_kind == "LLM":
                # Initialize the LLM info dictionary
                span_info = {}

                # Try to get input message
                input_key = "llm.input_messages.1.message.content"  # User message is usually at index 1
                if input_key in attributes:
                    span_info["input"] = attributes[input_key]

                # Try to get output message directly
                output_content = None
                # Try in multiple possible locations
                for key in [
                    "llm.output_messages.0.message.content",
                    "llm.output_messages.0.message.contents.0.message_content.text",
                ]:
                    if key in attributes:
                        output_content = attributes[key]
                        break

                # If we found direct output content, use it
                if output_content:
                    span_info["output"] = output_content
                    calls.append(span_info)
            elif span_kind == "TOOL":
                tool_name = attributes.get("tool.name", "Unknown tool")
                tool_output = attributes.get("output.value", "")

                span_info = {
                    "tool_name": tool_name,
                    "input": attributes.get("input.value", ""),
                    "output": tool_output,
                    # Can't add status yet because it isn't being set by openinference
                    # "status": span.get("status", {}).get("status_code"),
                }
                span_info["input"] = json.loads(span_info["input"])

                calls.append(span_info)

        return calls


# Backward compatibility functions that use the new class structure
def extract_hypothesis_answer(
    trace: List[Dict[str, Any]], agent_framework: AgentFramework
) -> str:
    """Extract the hypothesis agent final answer from the trace"""
    processor = TelemetryProcessor.create(agent_framework)
    return processor.extract_hypothesis_answer(trace)


def parse_generic_key_value_string(text: str) -> Dict[str, str]:
    """
    Parse a string that has items of a dict with key-value pairs separated by '='.
    Only splits on '=' signs, handling quoted strings properly.
    """
    return TelemetryProcessor.parse_generic_key_value_string(text)


def extract_evidence(
    telemetry: List[Dict[str, Any]], agent_framework: AgentFramework
) -> str:
    """Extract relevant telemetry evidence based on the agent type."""
    processor = TelemetryProcessor.create(agent_framework)
    return processor.extract_evidence(telemetry)