|
from typing import List |
|
from enum import Enum |
|
from pydantic import BaseModel, Field, computed_field |
|
from pydantic_ai import Agent |
|
from knowlang.configs.config import AppConfig |
|
from knowlang.utils.chunking_util import truncate_chunk |
|
from knowlang.utils.model_provider import create_pydantic_model |
|
from knowlang.chat_bot.chat_graph import ChatResult, process_chat |
|
import asyncio |
|
import datetime |
|
from pathlib import Path |
|
import json |
|
|
|
class EvalMetric(str, Enum): |
|
CHUNK_RELEVANCE = "chunk_relevance" |
|
ANSWER_CORRECTNESS = "answer_correctness" |
|
CODE_REFERENCE = "code_reference" |
|
|
|
class EvalCase(BaseModel): |
|
"""Single evaluation case focused on code understanding""" |
|
question: str |
|
expected_files: List[str] = Field(description="Files that should be in retrieved chunks") |
|
expected_concepts: List[str] = Field(description="Key concepts that should be in answer") |
|
expected_code_refs: List[str] = Field(description="Code references that should be mentioned") |
|
difficulty: int = Field(ge=1, le=3, description="1: Easy, 2: Medium, 3: Hard") |
|
|
|
|
|
class MetricScores(BaseModel): |
|
chunk_relevance: float = Field(ge=0.0, le=10.0, description="Score for chunk relevance") |
|
answer_correctness: float = Field(ge=0.0, le=10.0, description="Score for answer correctness") |
|
code_reference: float = Field(ge=0.0, le=10.0, description="Score for code reference quality") |
|
|
|
@computed_field |
|
def weighted_total(self) -> float: |
|
"""Calculate weighted total score""" |
|
weights = { |
|
"chunk_relevance": 0.4, |
|
"answer_correctness": 0.4, |
|
"code_reference": 0.2 |
|
} |
|
return sum( |
|
getattr(self, metric) * weight |
|
for metric, weight in weights.items() |
|
) |
|
|
|
class EvalAgentResponse(MetricScores): |
|
"""Raw response from evaluation agent""" |
|
feedback: str |
|
|
|
class EvalRound(BaseModel): |
|
"""Single evaluation round results""" |
|
round_id: int |
|
eval_response: EvalAgentResponse |
|
timestamp: datetime.datetime |
|
|
|
class EvalResult(BaseModel): |
|
"""Extended evaluation result with multiple rounds""" |
|
evaluator_model: str |
|
case: EvalCase |
|
eval_rounds: List[EvalRound] |
|
|
|
@computed_field |
|
def aggregated_scores(self) -> MetricScores: |
|
"""Calculate mean scores across rounds""" |
|
chunk_relevance = EvalMetric.CHUNK_RELEVANCE.value |
|
answer_correctness = EvalMetric.ANSWER_CORRECTNESS.value |
|
code_reference = EvalMetric.CODE_REFERENCE.value |
|
|
|
scores = { |
|
chunk_relevance: [], |
|
answer_correctness: [], |
|
code_reference: [] |
|
} |
|
|
|
for round in self.eval_rounds: |
|
for metric in scores.keys(): |
|
scores[metric].append(getattr(round.eval_response, metric)) |
|
|
|
return MetricScores( |
|
chunk_relevance=sum(scores[chunk_relevance]) / len(self.eval_rounds), |
|
answer_correctness=sum(scores[answer_correctness]) / len(self.eval_rounds), |
|
code_reference=sum(scores[code_reference]) / len(self.eval_rounds) |
|
) |
|
|
|
class ChatBotEvaluationContext(EvalCase, ChatResult): |
|
pass |
|
|
|
class EvalSummary(EvalResult, ChatResult): |
|
"""Evaluation summary with chat and evaluation results""" |
|
pass |
|
|
|
|
|
class ChatBotEvaluator: |
|
def __init__(self, config: AppConfig): |
|
"""Initialize evaluator with app config""" |
|
self.config = config |
|
self.eval_agent = Agent( |
|
create_pydantic_model( |
|
model_provider=config.evaluator.model_provider, |
|
model_name=config.evaluator.model_name |
|
), |
|
system_prompt=self._build_eval_prompt(), |
|
result_type=EvalAgentResponse |
|
) |
|
|
|
def _build_eval_prompt(self) -> str: |
|
return """You are an expert evaluator of code understanding systems. |
|
Evaluate the response based on these specific criteria: |
|
|
|
1. Chunk Relevance (0-1): |
|
- Are the retrieved code chunks from the expected files? |
|
- Do they contain relevant code sections? |
|
|
|
2. Answer Correctness (0-1): |
|
- Does the answer accurately explain the code? |
|
- Are the expected concepts covered? |
|
|
|
3. Code Reference Quality (0-1): |
|
- Does it properly cite specific code locations? |
|
- Are code references clear and relevant? |
|
|
|
Format your response as JSON: |
|
{ |
|
"chunk_relevance": float type score (from 0.0f to 10.0f), |
|
"answer_correctness": float type score (from 0.0f to 10.0f), |
|
"code_reference": float type score (from 0.0f to 10.0f), |
|
"feedback": "Brief explanation of scores" |
|
} |
|
""" |
|
|
|
async def evaluate_single( |
|
self, |
|
case: EvalCase, |
|
chat_result: ChatResult, |
|
num_rounds: int = 1, |
|
) -> EvalResult: |
|
"""Evaluate a single case for multiple rounds""" |
|
eval_rounds = [] |
|
|
|
eval_context = ChatBotEvaluationContext( |
|
**case.model_dump(), |
|
**chat_result.model_dump() |
|
) |
|
|
|
for round_id in range(num_rounds): |
|
|
|
for chunk in eval_context.retrieved_context.chunks: |
|
chunk = truncate_chunk(chunk) |
|
|
|
|
|
result = await self.eval_agent.run( |
|
eval_context.model_dump_json(), |
|
) |
|
|
|
eval_rounds.append(EvalRound( |
|
round_id=round_id, |
|
eval_response=result.data, |
|
timestamp=datetime.datetime.now() |
|
)) |
|
|
|
|
|
await asyncio.sleep(2) |
|
|
|
return EvalResult( |
|
case=case, |
|
eval_rounds=eval_rounds, |
|
evaluator_model=f"{self.config.evaluator.model_provider}:{self.config.evaluator.model_name}" |
|
) |
|
|
|
|
|
|
|
TRANSFORMER_QUANTIZER_BASE_CASES = [ |
|
EvalCase( |
|
question= "How are different quantization methods implemented in the transformers library, and what are the key components required to implement a new quantization method?", |
|
expected_files= ["quantizers/base.py"], |
|
expected_concepts= [ |
|
"HfQuantizer abstract base class", |
|
"PreTrainedModel quantization", |
|
"pre/post processing of models", |
|
"quantization configuration", |
|
"requires_calibration flag" |
|
], |
|
expected_code_refs= [ |
|
"class HfQuantizer", |
|
"preprocess_model method", |
|
"postprocess_model method", |
|
"_process_model_before_weight_loading", |
|
"requires_calibration attribute" |
|
], |
|
difficulty= 3 |
|
) |
|
] |
|
|
|
|
|
TRANSFORMER_QUANTIZER_AUTO_CASES = [ |
|
EvalCase( |
|
question="How does the transformers library automatically select and configure the appropriate quantization method, and what happens when loading a pre-quantized model?", |
|
expected_files=[ |
|
"quantizers/auto.py", |
|
"utils/quantization_config.py" |
|
], |
|
expected_concepts=[ |
|
"automatic quantizer selection", |
|
"quantization config mapping", |
|
"config merging behavior", |
|
"backwards compatibility for bitsandbytes", |
|
"quantization method resolution" |
|
], |
|
expected_code_refs=[ |
|
"AUTO_QUANTIZER_MAPPING", |
|
"AUTO_QUANTIZATION_CONFIG_MAPPING", |
|
"AutoHfQuantizer.from_config", |
|
"AutoQuantizationConfig.from_pretrained", |
|
"merge_quantization_configs method" |
|
], |
|
difficulty=3 |
|
) |
|
] |
|
|
|
|
|
|
|
TRANSFORMER_PIPELINE_BASE_TEST_CASES = [ |
|
EvalCase( |
|
question="How does the Pipeline class handle model and device initialization?", |
|
expected_files=["base.py"], |
|
expected_concepts=[ |
|
"device placement", |
|
"model initialization", |
|
"framework detection", |
|
"device type detection", |
|
"torch dtype handling" |
|
], |
|
expected_code_refs=[ |
|
"def __init__", |
|
"def device_placement", |
|
"infer_framework_load_model", |
|
"self.device = torch.device" |
|
], |
|
difficulty=3 |
|
), |
|
EvalCase( |
|
question="How does the Pipeline class implement batched inference and data loading?", |
|
expected_files=["base.py", "pt_utils.py"], |
|
expected_concepts=[ |
|
"batch processing", |
|
"data loading", |
|
"collate function", |
|
"padding implementation", |
|
"iterator pattern" |
|
], |
|
expected_code_refs=[ |
|
"def get_iterator", |
|
"class PipelineDataset", |
|
"class PipelineIterator", |
|
"_pad", |
|
"pad_collate_fn" |
|
], |
|
difficulty=3 |
|
) |
|
] |
|
|
|
|
|
TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES = [ |
|
EvalCase( |
|
question="How does the TextGenerationPipeline handle chat-based generation and template processing?", |
|
expected_files=["text_generation.py", "base.py"], |
|
expected_concepts=[ |
|
"chat message formatting", |
|
"template application", |
|
"message continuation", |
|
"role handling", |
|
"assistant prefill behavior" |
|
], |
|
expected_code_refs=[ |
|
"class Chat", |
|
"tokenizer.apply_chat_template", |
|
"continue_final_message", |
|
"isinstance(prompt_text, Chat)", |
|
"postprocess" |
|
], |
|
difficulty=3 |
|
) |
|
] |
|
|
|
|
|
TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES = [ |
|
EvalCase( |
|
question="How does TopKLogitsWarper implement top-k filtering for text generation?", |
|
expected_files=["generation/logits_process.py"], |
|
expected_concepts=[ |
|
"top-k filtering algorithm", |
|
"probability masking", |
|
"batch processing", |
|
"logits manipulation", |
|
"vocabulary filtering" |
|
], |
|
expected_code_refs=[ |
|
"class TopKLogitsWarper(LogitsProcessor)", |
|
"torch.topk(scores, top_k)[0]", |
|
"indices_to_remove = scores < torch.topk", |
|
"scores_processed = scores.masked_fill(indices_to_remove, self.filter_value)", |
|
"top_k = max(top_k, min_tokens_to_keep)" |
|
], |
|
difficulty=3 |
|
), |
|
EvalCase( |
|
question="How does TemperatureLogitsProcessor implement temperature sampling for controlling generation randomness?", |
|
expected_files=["generation/logits_process.py"], |
|
expected_concepts=[ |
|
"temperature scaling", |
|
"probability distribution shaping", |
|
"logits normalization", |
|
"generation randomness control", |
|
"batch processing with temperature" |
|
], |
|
expected_code_refs=[ |
|
"class TemperatureLogitsProcessor(LogitsProcessor)", |
|
"scores_processed = scores / self.temperature", |
|
"if not isinstance(temperature, float) or not (temperature > 0)", |
|
"def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor)", |
|
"raise ValueError(except_msg)" |
|
], |
|
difficulty=3 |
|
) |
|
] |
|
|
|
|
|
TRANSFORMER_TRAINER_TEST_CASES = [ |
|
EvalCase( |
|
question="How does Trainer handle distributed training and gradient accumulation? Explain the implementation details.", |
|
expected_files=["trainer.py"], |
|
expected_concepts=[ |
|
"gradient accumulation steps", |
|
"distributed training logic", |
|
"optimizer step scheduling", |
|
"loss scaling", |
|
"device synchronization" |
|
], |
|
expected_code_refs=[ |
|
"def training_step", |
|
"def _wrap_model", |
|
"self.accelerator.backward", |
|
"self.args.gradient_accumulation_steps", |
|
"if args.n_gpu > 1", |
|
"model.zero_grad()" |
|
], |
|
difficulty=3 |
|
), |
|
EvalCase( |
|
question="How does the Trainer class implement custom optimizer and learning rate scheduler creation? Explain the initialization process and supported configurations.", |
|
expected_files=["trainer.py"], |
|
expected_concepts=[ |
|
"optimizer initialization", |
|
"learning rate scheduler", |
|
"weight decay handling", |
|
"optimizer parameter groups", |
|
"AdamW configuration", |
|
"custom optimizer support" |
|
], |
|
expected_code_refs=[ |
|
"def create_optimizer", |
|
"def create_scheduler", |
|
"get_decay_parameter_names", |
|
"optimizer_grouped_parameters", |
|
"self.args.learning_rate", |
|
"optimizer_kwargs" |
|
], |
|
difficulty=3 |
|
) |
|
] |
|
|
|
TRANSFORMER_TEST_CASES : List[EvalCase] = [ |
|
*TRANSFORMER_QUANTIZER_BASE_CASES, |
|
*TRANSFORMER_QUANTIZER_AUTO_CASES, |
|
*TRANSFORMER_PIPELINE_BASE_TEST_CASES, |
|
*TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES, |
|
*TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES, |
|
*TRANSFORMER_TRAINER_TEST_CASES, |
|
] |
|
|
|
class DateTimeEncoder(json.JSONEncoder): |
|
def default(self, obj): |
|
if isinstance(obj, datetime.datetime): |
|
return obj.isoformat() |
|
return super().default(obj) |
|
|
|
async def main(): |
|
from rich.console import Console |
|
from rich.pretty import Pretty |
|
import chromadb |
|
console = Console() |
|
config = AppConfig() |
|
evaluator = ChatBotEvaluator(config) |
|
collection = chromadb.PersistentClient(path=str(config.db.persist_directory)).get_collection(name=config.db.collection_name) |
|
|
|
summary_list : List[EvalSummary] = [] |
|
|
|
for case in TRANSFORMER_TEST_CASES: |
|
try: |
|
chat_result : ChatResult = await process_chat(question=case.question, collection=collection, config=config) |
|
result : EvalResult = await evaluator.evaluate_single(case, chat_result, config.evaluator.evaluation_rounds) |
|
|
|
eval_summary = EvalSummary( |
|
**chat_result.model_dump(), |
|
**result.model_dump() |
|
) |
|
summary_list.append(eval_summary) |
|
|
|
import time |
|
time.sleep(3) |
|
|
|
except Exception: |
|
console.print_exception() |
|
|
|
|
|
current_date = datetime.datetime.now().strftime("%Y%m%d") |
|
file_name = Path("evaluations", f"transformers_{config.evaluator.model_provider}_evaluation_results_{current_date}.json") |
|
with open(file_name, "w") as f: |
|
json_list = [summary.model_dump() for summary in summary_list] |
|
json.dump(json_list, f, indent=2, cls=DateTimeEncoder) |
|
|
|
|
|
console.print(Pretty(summary_list)) |
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |