Nathan Brake commited on
Commit
eef7dd3
·
unverified ·
1 Parent(s): 5d76917

Improvement of evaluation functionality (#13)

Browse files
src/surf_spot_finder/evaluation/evaluate.py CHANGED
@@ -1,4 +1,5 @@
1
  import json
 
2
  from textwrap import dedent
3
  from typing import Any, Dict, List, Optional
4
  from loguru import logger
@@ -16,6 +17,10 @@ from surf_spot_finder.evaluation.utils import (
16
  )
17
  from surf_spot_finder.evaluation.test_case import TestCase
18
 
 
 
 
 
19
 
20
  def run_agent(test_case: TestCase) -> str:
21
  input_data = test_case.input
@@ -62,63 +67,58 @@ def evaluate_telemetry(test_case: TestCase, telemetry_path: str) -> bool:
62
  # Extract the final answer from the telemetry
63
  hypothesis_answer = extract_hypothesis_answer(telemetry)
64
  logger.info(
65
- dedent(f"""
66
- Hypothesis Final answer extracted:
67
- - {hypothesis_answer}
68
- """)
69
  )
70
  # Verify agent behavior against checkpoints using llm-as-a-judge
71
  llm_judge = "openai/gpt-4o"
72
  checkpoint_results = verify_checkpoints(
73
- telemetry,
74
- hypothesis_answer,
75
- test_case.checkpoints,
76
- test_case.ground_truth,
77
- llm_judge,
78
  )
79
 
80
  hypothesis_answer_results = verify_hypothesis_answer(
81
- hypothesis_answer,
82
- test_case.ground_truth,
83
- test_case.final_answer_criteria,
84
- llm_judge,
85
  )
86
  # Summarize results
87
 
88
  verification_results = checkpoint_results + hypothesis_answer_results
89
- all_passed = all(result["passed"] for result in verification_results)
90
- failed_checks = [r for r in verification_results if not r["passed"]]
91
- passed_checks = [r for r in verification_results if r["passed"]]
 
92
  if passed_checks:
93
- logger.info(
94
- f"Passed checkpoints: {len(passed_checks)}/{len(verification_results)}"
95
- )
96
  for check in passed_checks:
97
  message = dedent(
98
  f"""
99
- Passed:
100
- - {check["criteria"]}
101
- - {check["reason"]}
102
- """
103
  )
104
  logger.info(message)
105
  if failed_checks:
106
- logger.error(
107
- f"Failed checkpoints: {len(failed_checks)}/{len(verification_results)}"
108
- )
109
  for check in failed_checks:
110
  message = dedent(
111
  f"""
112
- Failed:
113
- - {check["criteria"]}
114
- - {check["reason"]}
115
- """
116
  )
117
  logger.error(message)
118
- else:
119
- logger.info("All checkpoints passed!")
120
-
121
- return all_passed
 
 
 
 
 
 
 
122
 
123
 
124
  def evaluate(test_case_path: str, telemetry_path: Optional[str] = None) -> None:
@@ -139,7 +139,7 @@ def evaluate(test_case_path: str, telemetry_path: Optional[str] = None) -> None:
139
  else:
140
  logger.info(f"Using provided telemetry file: {telemetry_path}")
141
  logger.info(
142
- "For this to work, the telemetry file must align with the test case."
143
  )
144
 
145
  evaluate_telemetry(test_case, telemetry_path)
 
1
  import json
2
+ import sys
3
  from textwrap import dedent
4
  from typing import Any, Dict, List, Optional
5
  from loguru import logger
 
17
  )
18
  from surf_spot_finder.evaluation.test_case import TestCase
19
 
20
+ logger.remove()
21
+ logger = logger.opt(ansi=True)
22
+ logger.add(sys.stdout, colorize=True, format="{message}")
23
+
24
 
25
  def run_agent(test_case: TestCase) -> str:
26
  input_data = test_case.input
 
67
  # Extract the final answer from the telemetry
68
  hypothesis_answer = extract_hypothesis_answer(telemetry)
69
  logger.info(
70
+ f"""<yellow>Hypothesis Final answer extracted: {hypothesis_answer}</yellow>"""
 
 
 
71
  )
72
  # Verify agent behavior against checkpoints using llm-as-a-judge
73
  llm_judge = "openai/gpt-4o"
74
  checkpoint_results = verify_checkpoints(
75
+ telemetry=telemetry,
76
+ checkpoints=test_case.checkpoints,
77
+ model=llm_judge,
 
 
78
  )
79
 
80
  hypothesis_answer_results = verify_hypothesis_answer(
81
+ hypothesis_final_answer=hypothesis_answer,
82
+ ground_truth_answer_dict=test_case.ground_truth,
83
+ ground_truth_checkpoints=test_case.final_answer_criteria,
84
+ model=llm_judge,
85
  )
86
  # Summarize results
87
 
88
  verification_results = checkpoint_results + hypothesis_answer_results
89
+ failed_checks = [r for r in verification_results if not r.passed]
90
+ passed_checks = [r for r in verification_results if r.passed]
91
+ missed_points = sum([r.points for r in failed_checks])
92
+ won_points = sum([r.points for r in passed_checks])
93
  if passed_checks:
 
 
 
94
  for check in passed_checks:
95
  message = dedent(
96
  f"""
97
+ <green>Passed:
98
+ - {check.criteria}
99
+ - {check.reason}</green>"""
 
100
  )
101
  logger.info(message)
102
  if failed_checks:
 
 
 
103
  for check in failed_checks:
104
  message = dedent(
105
  f"""
106
+ <red>Failed:
107
+ - {check.criteria}
108
+ - {check.reason}</red>"""
 
109
  )
110
  logger.error(message)
111
+ else:
112
+ logger.info("<green>All checkpoints passed!</green>")
113
+ logger.info(
114
+ f"<green>Passed checkpoints: {len(passed_checks)}/{len(verification_results)}</green>"
115
+ )
116
+ logger.info(
117
+ f"<red>Failed checkpoints: {len(failed_checks)}/{len(verification_results)}</red>"
118
+ )
119
+ logger.info("<green>=====================================</green>")
120
+ logger.info(f"<green>Score: {won_points}/{won_points + missed_points}</green>")
121
+ logger.info("<green>=====================================</green>")
122
 
123
 
124
  def evaluate(test_case_path: str, telemetry_path: Optional[str] = None) -> None:
 
139
  else:
140
  logger.info(f"Using provided telemetry file: {telemetry_path}")
141
  logger.info(
142
+ "For this to work, the telemetry file must align with the test case.",
143
  )
144
 
145
  evaluate_telemetry(test_case, telemetry_path)
src/surf_spot_finder/evaluation/test_case.py CHANGED
@@ -18,18 +18,17 @@ class InputModel(BaseModel):
18
 
19
 
20
  class CheckpointCriteria(BaseModel):
21
- """Represents a checkpoint criteria with a value and description"""
22
 
23
  model_config = ConfigDict(extra="forbid")
24
- value: int
25
  criteria: str
 
26
 
27
 
28
  class TestCase(BaseModel):
29
  model_config = ConfigDict(extra="forbid")
30
-
31
  input: InputModel
32
- ground_truth: Dict[str, Any]
33
  checkpoints: List[CheckpointCriteria] = Field(default_factory=list)
34
  final_answer_criteria: List[CheckpointCriteria] = Field(default_factory=list)
35
 
@@ -38,26 +37,27 @@ class TestCase(BaseModel):
38
  """Load a test case from a YAML file and process it"""
39
  with open(case_path, "r") as f:
40
  test_case_dict = yaml.safe_load(f)
41
-
42
- # Generate final_answer_criteria if not explicitly provided
43
- if "final_answer_criteria" not in test_case_dict:
44
- final_answer_criteria = []
45
-
46
- def add_gt_final_answer_criteria(ground_truth_dict, prefix=""):
47
- """Recursively add checkpoints for each value in the ground_truth dictionary"""
48
- for key, value in ground_truth_dict.items():
49
- path = f"{prefix}: {key}" if prefix else key
50
- if isinstance(value, dict):
51
- add_gt_final_answer_criteria(value, path)
52
- else:
53
- final_answer_criteria.append(
54
- {
55
- "value": 1,
56
- "criteria": f"Check if {path} is approximately '{value}'.",
57
- }
58
- )
59
-
60
- add_gt_final_answer_criteria(test_case_dict["ground_truth"])
61
- test_case_dict["final_answer_criteria"] = final_answer_criteria
 
62
 
63
  return cls.model_validate(test_case_dict)
 
18
 
19
 
20
  class CheckpointCriteria(BaseModel):
21
+ """Represents a checkpoint criteria with a description"""
22
 
23
  model_config = ConfigDict(extra="forbid")
 
24
  criteria: str
25
+ points: int
26
 
27
 
28
  class TestCase(BaseModel):
29
  model_config = ConfigDict(extra="forbid")
 
30
  input: InputModel
31
+ ground_truth: List[Dict[str, Any]] = Field(default_factory=list)
32
  checkpoints: List[CheckpointCriteria] = Field(default_factory=list)
33
  final_answer_criteria: List[CheckpointCriteria] = Field(default_factory=list)
34
 
 
37
  """Load a test case from a YAML file and process it"""
38
  with open(case_path, "r") as f:
39
  test_case_dict = yaml.safe_load(f)
40
+ final_answer_criteria = []
41
+
42
+ def add_gt_final_answer_criteria(ground_truth_list):
43
+ """Add checkpoints for each item in the ground_truth list"""
44
+ for item in ground_truth_list:
45
+ if isinstance(item, dict) and "name" in item and "value" in item:
46
+ points = item.get(
47
+ "points", 1
48
+ ) # Default to 1 if points not specified
49
+ final_answer_criteria.append(
50
+ {
51
+ "points": points,
52
+ "criteria": f"Check if {item['name']} is approximately '{item['value']}'.",
53
+ }
54
+ )
55
+
56
+ add_gt_final_answer_criteria(test_case_dict["ground_truth"])
57
+ test_case_dict["final_answer_criteria"] = final_answer_criteria
58
+ # remove the points from the ground_truth list but keep the name and value
59
+ test_case_dict["ground_truth"] = [
60
+ item for item in test_case_dict["ground_truth"] if isinstance(item, dict)
61
+ ]
62
 
63
  return cls.model_validate(test_case_dict)
src/surf_spot_finder/evaluation/test_cases/alpha.yaml CHANGED
@@ -3,20 +3,28 @@ input:
3
  location: "Vigo"
4
  date: "2025-03-15 22:00"
5
  max_driving_hours: 3
6
- model_id: "openai/gpt-4o"
7
  api_key_var: "OPENAI_API_KEY"
8
  json_tracer: true
9
  api_base: null
10
  agent_type: "smolagents"
11
 
12
  ground_truth:
13
- "Surf location": "Playa de Patos"
14
- "Water temperature": "about 14°C +-5°C"
15
- "Wave height": "about 1 meter"
 
 
 
 
 
 
16
 
17
  # Base checkpoints for agent behavior
 
 
18
  checkpoints:
19
- - value: 1
20
  criteria: "Check if the agent consulted DuckDuckGoSearchTool for locations near Vigo."
21
- - value: 1
22
  criteria: "Check if the agent fetched a website for forecasting, not relying on text from a DuckDuckGo search."
 
3
  location: "Vigo"
4
  date: "2025-03-15 22:00"
5
  max_driving_hours: 3
6
+ model_id: "openai/o3-mini"
7
  api_key_var: "OPENAI_API_KEY"
8
  json_tracer: true
9
  api_base: null
10
  agent_type: "smolagents"
11
 
12
  ground_truth:
13
+ - name: "Surf location"
14
+ points: 5
15
+ value: "Playa de Patos"
16
+ - name: "Water temperature"
17
+ points: 1
18
+ value: "about 14°C +-5°C"
19
+ - name: "Wave height"
20
+ points: 1
21
+ value: "about 1 meter"
22
 
23
  # Base checkpoints for agent behavior
24
+ # These evaluators for these checkpoints
25
+ # will not consider the hypothesis answer or final answer in their decision making
26
  checkpoints:
27
+ - points: 1
28
  criteria: "Check if the agent consulted DuckDuckGoSearchTool for locations near Vigo."
29
+ - points: 1
30
  criteria: "Check if the agent fetched a website for forecasting, not relying on text from a DuckDuckGo search."
src/surf_spot_finder/evaluation/utils.py CHANGED
@@ -3,10 +3,22 @@ from typing import Dict, List, Any, Optional
3
  import re
4
 
5
  from litellm import completion
 
6
 
 
7
  from surf_spot_finder.evaluation.test_case import CheckpointCriteria
8
 
9
 
 
 
 
 
 
 
 
 
 
 
10
  def extract_hypothesis_answer(telemetry: List[Dict[str, Any]]) -> str | None:
11
  """Extract the hypothesis agent final answer from the telemetry data"""
12
  for span in reversed(telemetry):
@@ -18,47 +30,48 @@ def extract_hypothesis_answer(telemetry: List[Dict[str, Any]]) -> str | None:
18
 
19
  def evaluate_criterion(
20
  criteria: str,
21
- value: int,
22
- ground_truth_output: List[CheckpointCriteria] | Dict[str, Any],
23
- hypothesis_final_answer: str,
24
  model: str,
 
 
 
25
  evidence: Optional[str] = None,
26
- ) -> Dict[str, Any]:
27
  """Evaluate a single criterion using LLM"""
28
 
29
- prompt = f"""
30
- Evaluate if the following {"checkpoint" if evidence else "criterion"} was met {"based on the provided evidence" if evidence else "in the agent's answer"}.
31
 
32
- {"Checkpoint" if evidence else "Criterion"}: {criteria}
33
- Value: {value}
34
 
35
- Expected output: {json.dumps(ground_truth_output)}
36
-
37
- Agent's answer: {hypothesis_final_answer}
38
- """
 
 
 
 
39
 
40
  if evidence:
41
- prompt += f"""
42
-
43
  Telemetry evidence:
44
  {evidence}
45
- """
46
 
47
  prompt += f"""
48
 
49
  Based on the {"evidence" if evidence else "comparison between the expected output and the actual final answer"},
50
- was this {"checkpoint" if evidence else "criterion"} satisfied? Answer with:
51
  1. "passed": true or false
52
  2. "reason": Brief explanation for your decision
53
- 3. "score": A score from 0 to {value} indicating how well the {"checkpoint" if evidence else "criterion"} was met
54
  """
55
  prompt += """
56
  Output valid JSON with these three fields only, in the format:
57
  ```json
58
  {
59
  "passed": true,
60
- "reason": "I have them",
61
- "score": 1
62
  }
63
  ```
64
  """
@@ -82,38 +95,35 @@ def evaluate_criterion(
82
  evaluation = json.loads(content)
83
 
84
  evaluation["criteria"] = criteria
85
- evaluation["value"] = value
86
- return evaluation
87
  except (json.JSONDecodeError, AttributeError, StopIteration) as e:
88
- return {
89
  "passed": False,
90
  "reason": f"Failed to evaluate due to parsing: {str(e)} \n Response: {content}",
91
- "score": 0,
92
  "criteria": criteria,
93
- "value": value,
94
  }
 
 
95
 
96
 
97
  def verify_checkpoints(
98
  telemetry: List[Dict[str, Any]],
99
- hypothesis_final_answer: str,
100
  checkpoints: List[CheckpointCriteria],
101
- ground_truth_checkpoints: List[CheckpointCriteria],
102
  model: str,
103
- ) -> List[Dict[str, Any]]:
104
- """Verify each checkpoint against the telemetry data using LLM"""
 
 
 
 
105
  results = []
106
 
107
  for checkpoint in checkpoints:
108
  criteria = checkpoint.criteria
109
- value = checkpoint.value
110
  evidence = extract_relevant_evidence(telemetry, criteria)
111
 
112
  evaluation = evaluate_criterion(
113
  criteria=criteria,
114
- value=value,
115
- ground_truth_output=ground_truth_checkpoints,
116
- hypothesis_final_answer=hypothesis_final_answer,
117
  model=model,
118
  evidence=evidence,
119
  )
@@ -128,19 +138,16 @@ def verify_hypothesis_answer(
128
  ground_truth_answer_dict: Dict[str, Any],
129
  ground_truth_checkpoints: List[CheckpointCriteria],
130
  model: str,
131
- ) -> List[Dict[str, Any]]:
132
  """
133
  Verify if the final answer meets all specified criteria
134
  """
135
  results = []
136
 
137
  for criterion in ground_truth_checkpoints:
138
- criteria = criterion.criteria
139
- value = criterion.value
140
-
141
  evaluation = evaluate_criterion(
142
- criteria=criteria,
143
- value=value,
144
  ground_truth_output=ground_truth_answer_dict,
145
  hypothesis_final_answer=hypothesis_final_answer,
146
  model=model,
@@ -155,7 +162,8 @@ def extract_relevant_evidence(telemetry: List[Dict[str, Any]], criteria: str) ->
155
  """Extract relevant telemetry evidence based on the checkpoint criteria
156
  TODO this is not a very robust implementation, since it requires knowledge about which tools have been
157
  implemented. We should abstract this so that it can dynamically figure out what tools may have been used
158
- and check for them appropriately."""
 
159
  evidence = ""
160
 
161
  # Look for evidence of tool usage
 
3
  import re
4
 
5
  from litellm import completion
6
+ from textwrap import dedent
7
 
8
+ from pydantic import BaseModel, ConfigDict
9
  from surf_spot_finder.evaluation.test_case import CheckpointCriteria
10
 
11
 
12
+ class EvaluationResult(BaseModel):
13
+ """Represents the result of evaluating a criterion"""
14
+
15
+ model_config = ConfigDict(extra="forbid")
16
+ passed: bool
17
+ reason: str
18
+ criteria: str
19
+ points: int
20
+
21
+
22
  def extract_hypothesis_answer(telemetry: List[Dict[str, Any]]) -> str | None:
23
  """Extract the hypothesis agent final answer from the telemetry data"""
24
  for span in reversed(telemetry):
 
30
 
31
  def evaluate_criterion(
32
  criteria: str,
 
 
 
33
  model: str,
34
+ points: int,
35
+ ground_truth_output: Optional[List[CheckpointCriteria] | Dict[str, Any]] = None,
36
+ hypothesis_final_answer: Optional[str] = None,
37
  evidence: Optional[str] = None,
38
+ ) -> EvaluationResult:
39
  """Evaluate a single criterion using LLM"""
40
 
41
+ prompt = dedent(f"""
42
+ Evaluate if the following criterion was met {"based on the provided evidence" if evidence else "in the agent's answer"}.
43
 
44
+ Criterion: {criteria}
45
+ """)
46
 
47
+ if ground_truth_output:
48
+ prompt += dedent(f"""
49
+ Expected output: {json.dumps(ground_truth_output)}
50
+ """)
51
+ if hypothesis_final_answer:
52
+ prompt += dedent(f"""
53
+ Agent's answer: {hypothesis_final_answer}
54
+ """)
55
 
56
  if evidence:
57
+ prompt += dedent(f"""
 
58
  Telemetry evidence:
59
  {evidence}
60
+ """)
61
 
62
  prompt += f"""
63
 
64
  Based on the {"evidence" if evidence else "comparison between the expected output and the actual final answer"},
65
+ was this criterion satisfied? Answer with:
66
  1. "passed": true or false
67
  2. "reason": Brief explanation for your decision
 
68
  """
69
  prompt += """
70
  Output valid JSON with these three fields only, in the format:
71
  ```json
72
  {
73
  "passed": true,
74
+ "reason": "I have them"
 
75
  }
76
  ```
77
  """
 
95
  evaluation = json.loads(content)
96
 
97
  evaluation["criteria"] = criteria
 
 
98
  except (json.JSONDecodeError, AttributeError, StopIteration) as e:
99
+ evaluation = {
100
  "passed": False,
101
  "reason": f"Failed to evaluate due to parsing: {str(e)} \n Response: {content}",
 
102
  "criteria": criteria,
 
103
  }
104
+ evaluation["points"] = points
105
+ return EvaluationResult.model_validate(evaluation)
106
 
107
 
108
  def verify_checkpoints(
109
  telemetry: List[Dict[str, Any]],
 
110
  checkpoints: List[CheckpointCriteria],
 
111
  model: str,
112
+ ) -> List[EvaluationResult]:
113
+ """Verify each checkpoint against the telemetry data using LLM
114
+ These checkpoints do not take the ground truth or hyupothesis
115
+ answers into account. They are only concerned with the trace and
116
+ the specific criteria mentioned.
117
+ """
118
  results = []
119
 
120
  for checkpoint in checkpoints:
121
  criteria = checkpoint.criteria
 
122
  evidence = extract_relevant_evidence(telemetry, criteria)
123
 
124
  evaluation = evaluate_criterion(
125
  criteria=criteria,
126
+ points=checkpoint.points,
 
 
127
  model=model,
128
  evidence=evidence,
129
  )
 
138
  ground_truth_answer_dict: Dict[str, Any],
139
  ground_truth_checkpoints: List[CheckpointCriteria],
140
  model: str,
141
+ ) -> List[EvaluationResult]:
142
  """
143
  Verify if the final answer meets all specified criteria
144
  """
145
  results = []
146
 
147
  for criterion in ground_truth_checkpoints:
 
 
 
148
  evaluation = evaluate_criterion(
149
+ criteria=criterion.criteria,
150
+ points=criterion.points,
151
  ground_truth_output=ground_truth_answer_dict,
152
  hypothesis_final_answer=hypothesis_final_answer,
153
  model=model,
 
162
  """Extract relevant telemetry evidence based on the checkpoint criteria
163
  TODO this is not a very robust implementation, since it requires knowledge about which tools have been
164
  implemented. We should abstract this so that it can dynamically figure out what tools may have been used
165
+ and check for them appropriately. I understand that this tool should probably have some better way of abstracting
166
+ relevant information from the opentelemetry spans."""
167
  evidence = ""
168
 
169
  # Look for evidence of tool usage