Spaces:
Running
Running
File size: 3,631 Bytes
fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 a9fb876 fea07c2 94a64b0 fea07c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
from typing import Any, Dict, List
import json
from any_agent import AgentFramework
from langchain_core.messages import BaseMessage
from surf_spot_finder.evaluation.telemetry import TelemetryProcessor
class LangchainTelemetryProcessor(TelemetryProcessor):
"""Processor for Langchain agent telemetry data."""
def _get_agent_framework(self) -> AgentFramework:
return AgentFramework.LANGCHAIN
def extract_hypothesis_answer(self, trace: List[Dict[str, Any]]) -> str:
for span in reversed(trace):
if span["attributes"]["openinference.span.kind"] == "AGENT":
content = span["attributes"]["output.value"]
# Extract content from serialized langchain message
message = json.loads(content)["messages"][0]
message = self.parse_generic_key_value_string(message)
base_message = BaseMessage(content=message["content"], type="AGENT")
# Use the interpreted string for printing
final_text = base_message.text()
# Either decode escape sequences if they're present
try:
final_text = final_text.encode().decode("unicode_escape")
except UnicodeDecodeError:
# If that fails, the escape sequences might already be interpreted
pass
return final_text
raise ValueError("No agent final answer found in trace")
def _extract_telemetry_data(self, telemetry: List[Dict[str, Any]]) -> List[Dict]:
"""Extract LLM calls and tool calls from LangChain telemetry."""
calls = []
for span in telemetry:
if "attributes" not in span:
continue
attributes = span.get("attributes", {})
span_kind = attributes.get("openinference.span.kind", "")
# Collect LLM calls
if (
span_kind == "LLM"
and "llm.output_messages.0.message.content" in attributes
):
llm_info = {
"model": attributes.get("llm.model_name", "Unknown model"),
"input": attributes.get("llm.input_messages.0.message.content", ""),
"output": attributes.get(
"llm.output_messages.0.message.content", ""
),
"type": "reasoning",
}
calls.append(llm_info)
# Try to find tool calls
if "tool.name" in attributes or span.get("name", "").endswith("Tool"):
tool_info = {
"tool_name": attributes.get(
"tool.name", span.get("name", "Unknown tool")
),
"status": "success"
if span.get("status", {}).get("status_code") == "OK"
else "error",
"error": span.get("status", {}).get("description", None),
}
if "input.value" in attributes:
try:
input_value = json.loads(attributes["input.value"])
tool_info["input"] = input_value
except Exception:
tool_info["input"] = attributes["input.value"]
if "output.value" in attributes:
tool_info["output"] = self.parse_generic_key_value_string(
json.loads(attributes["output.value"])["output"]
)["content"]
calls.append(tool_info)
return calls
|