gabykim's picture
refactor package name to knowlang
60532a1
raw
history blame
14.8 kB
from typing import List
from enum import Enum
from pydantic import BaseModel, Field, computed_field
from pydantic_ai import Agent
from knowlang.configs.config import AppConfig
from knowlang.utils.chunking_util import truncate_chunk
from knowlang.utils.model_provider import create_pydantic_model
from knowlang.chat_bot.chat_graph import ChatResult, process_chat
import asyncio
import datetime
from pathlib import Path
import json
class EvalMetric(str, Enum):
CHUNK_RELEVANCE = "chunk_relevance"
ANSWER_CORRECTNESS = "answer_correctness"
CODE_REFERENCE = "code_reference"
class EvalCase(BaseModel):
"""Single evaluation case focused on code understanding"""
question: str
expected_files: List[str] = Field(description="Files that should be in retrieved chunks")
expected_concepts: List[str] = Field(description="Key concepts that should be in answer")
expected_code_refs: List[str] = Field(description="Code references that should be mentioned")
difficulty: int = Field(ge=1, le=3, description="1: Easy, 2: Medium, 3: Hard")
class MetricScores(BaseModel):
chunk_relevance: float = Field(ge=0.0, le=10.0, description="Score for chunk relevance")
answer_correctness: float = Field(ge=0.0, le=10.0, description="Score for answer correctness")
code_reference: float = Field(ge=0.0, le=10.0, description="Score for code reference quality")
@computed_field
def weighted_total(self) -> float:
"""Calculate weighted total score"""
weights = {
"chunk_relevance": 0.4,
"answer_correctness": 0.4,
"code_reference": 0.2
}
return sum(
getattr(self, metric) * weight
for metric, weight in weights.items()
)
class EvalAgentResponse(MetricScores):
"""Raw response from evaluation agent"""
feedback: str
class EvalRound(BaseModel):
"""Single evaluation round results"""
round_id: int
eval_response: EvalAgentResponse
timestamp: datetime.datetime
class EvalResult(BaseModel):
"""Extended evaluation result with multiple rounds"""
evaluator_model: str
case: EvalCase
eval_rounds: List[EvalRound]
@computed_field
def aggregated_scores(self) -> MetricScores:
"""Calculate mean scores across rounds"""
chunk_relevance = EvalMetric.CHUNK_RELEVANCE.value
answer_correctness = EvalMetric.ANSWER_CORRECTNESS.value
code_reference = EvalMetric.CODE_REFERENCE.value
scores = {
chunk_relevance: [],
answer_correctness: [],
code_reference: []
}
for round in self.eval_rounds:
for metric in scores.keys():
scores[metric].append(getattr(round.eval_response, metric))
return MetricScores(
chunk_relevance=sum(scores[chunk_relevance]) / len(self.eval_rounds),
answer_correctness=sum(scores[answer_correctness]) / len(self.eval_rounds),
code_reference=sum(scores[code_reference]) / len(self.eval_rounds)
)
class ChatBotEvaluationContext(EvalCase, ChatResult):
pass
class EvalSummary(EvalResult, ChatResult):
"""Evaluation summary with chat and evaluation results"""
pass
class ChatBotEvaluator:
def __init__(self, config: AppConfig):
"""Initialize evaluator with app config"""
self.config = config
self.eval_agent = Agent(
create_pydantic_model(
model_provider=config.evaluator.model_provider,
model_name=config.evaluator.model_name
),
system_prompt=self._build_eval_prompt(),
result_type=EvalAgentResponse
)
def _build_eval_prompt(self) -> str:
return """You are an expert evaluator of code understanding systems.
Evaluate the response based on these specific criteria:
1. Chunk Relevance (0-1):
- Are the retrieved code chunks from the expected files?
- Do they contain relevant code sections?
2. Answer Correctness (0-1):
- Does the answer accurately explain the code?
- Are the expected concepts covered?
3. Code Reference Quality (0-1):
- Does it properly cite specific code locations?
- Are code references clear and relevant?
Format your response as JSON:
{
"chunk_relevance": float type score (from 0.0f to 10.0f),
"answer_correctness": float type score (from 0.0f to 10.0f),
"code_reference": float type score (from 0.0f to 10.0f),
"feedback": "Brief explanation of scores"
}
"""
async def evaluate_single(
self,
case: EvalCase,
chat_result: ChatResult,
num_rounds: int = 1,
) -> EvalResult:
"""Evaluate a single case for multiple rounds"""
eval_rounds = []
# Prepare evaluation context
eval_context = ChatBotEvaluationContext(
**case.model_dump(),
**chat_result.model_dump()
)
for round_id in range(num_rounds):
# truncate chunks to avoid long text
for chunk in eval_context.retrieved_context.chunks:
chunk = truncate_chunk(chunk)
# Get evaluation from the model
result = await self.eval_agent.run(
eval_context.model_dump_json(),
)
eval_rounds.append(EvalRound(
round_id=round_id,
eval_response=result.data,
timestamp=datetime.datetime.now()
))
# Add delay between rounds to avoid rate limits
await asyncio.sleep(2)
return EvalResult(
case=case,
eval_rounds=eval_rounds,
evaluator_model=f"{self.config.evaluator.model_provider}:{self.config.evaluator.model_name}"
)
# src/transformers/quantizers/base.py
TRANSFORMER_QUANTIZER_BASE_CASES = [
EvalCase(
question= "How are different quantization methods implemented in the transformers library, and what are the key components required to implement a new quantization method?",
expected_files= ["quantizers/base.py"],
expected_concepts= [
"HfQuantizer abstract base class",
"PreTrainedModel quantization",
"pre/post processing of models",
"quantization configuration",
"requires_calibration flag"
],
expected_code_refs= [
"class HfQuantizer",
"preprocess_model method",
"postprocess_model method",
"_process_model_before_weight_loading",
"requires_calibration attribute"
],
difficulty= 3
)
]
# src/transformers/quantizers/auto.py
TRANSFORMER_QUANTIZER_AUTO_CASES = [
EvalCase(
question="How does the transformers library automatically select and configure the appropriate quantization method, and what happens when loading a pre-quantized model?",
expected_files=[
"quantizers/auto.py",
"utils/quantization_config.py"
],
expected_concepts=[
"automatic quantizer selection",
"quantization config mapping",
"config merging behavior",
"backwards compatibility for bitsandbytes",
"quantization method resolution"
],
expected_code_refs=[
"AUTO_QUANTIZER_MAPPING",
"AUTO_QUANTIZATION_CONFIG_MAPPING",
"AutoHfQuantizer.from_config",
"AutoQuantizationConfig.from_pretrained",
"merge_quantization_configs method"
],
difficulty=3
)
]
# src/transformers/pipelines/base.py
TRANSFORMER_PIPELINE_BASE_TEST_CASES = [
EvalCase(
question="How does the Pipeline class handle model and device initialization?",
expected_files=["base.py"],
expected_concepts=[
"device placement",
"model initialization",
"framework detection",
"device type detection",
"torch dtype handling"
],
expected_code_refs=[
"def __init__",
"def device_placement",
"infer_framework_load_model",
"self.device = torch.device"
],
difficulty=3
),
EvalCase(
question="How does the Pipeline class implement batched inference and data loading?",
expected_files=["base.py", "pt_utils.py"],
expected_concepts=[
"batch processing",
"data loading",
"collate function",
"padding implementation",
"iterator pattern"
],
expected_code_refs=[
"def get_iterator",
"class PipelineDataset",
"class PipelineIterator",
"_pad",
"pad_collate_fn"
],
difficulty=3
)
]
# src/transformers/pipelines/text_generation.py
TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES = [
EvalCase(
question="How does the TextGenerationPipeline handle chat-based generation and template processing?",
expected_files=["text_generation.py", "base.py"],
expected_concepts=[
"chat message formatting",
"template application",
"message continuation",
"role handling",
"assistant prefill behavior"
],
expected_code_refs=[
"class Chat",
"tokenizer.apply_chat_template",
"continue_final_message",
"isinstance(prompt_text, Chat)",
"postprocess"
],
difficulty=3
)
]
# src/transformers/generation/logits_process.py
TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES = [
EvalCase(
question="How does TopKLogitsWarper implement top-k filtering for text generation?",
expected_files=["generation/logits_process.py"],
expected_concepts=[
"top-k filtering algorithm",
"probability masking",
"batch processing",
"logits manipulation",
"vocabulary filtering"
],
expected_code_refs=[
"class TopKLogitsWarper(LogitsProcessor)",
"torch.topk(scores, top_k)[0]",
"indices_to_remove = scores < torch.topk",
"scores_processed = scores.masked_fill(indices_to_remove, self.filter_value)",
"top_k = max(top_k, min_tokens_to_keep)"
],
difficulty=3
),
EvalCase(
question="How does TemperatureLogitsProcessor implement temperature sampling for controlling generation randomness?",
expected_files=["generation/logits_process.py"],
expected_concepts=[
"temperature scaling",
"probability distribution shaping",
"logits normalization",
"generation randomness control",
"batch processing with temperature"
],
expected_code_refs=[
"class TemperatureLogitsProcessor(LogitsProcessor)",
"scores_processed = scores / self.temperature",
"if not isinstance(temperature, float) or not (temperature > 0)",
"def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor)",
"raise ValueError(except_msg)"
],
difficulty=3
)
]
# src/transformers/trainer.py
TRANSFORMER_TRAINER_TEST_CASES = [
EvalCase(
question="How does Trainer handle distributed training and gradient accumulation? Explain the implementation details.",
expected_files=["trainer.py"],
expected_concepts=[
"gradient accumulation steps",
"distributed training logic",
"optimizer step scheduling",
"loss scaling",
"device synchronization"
],
expected_code_refs=[
"def training_step",
"def _wrap_model",
"self.accelerator.backward",
"self.args.gradient_accumulation_steps",
"if args.n_gpu > 1",
"model.zero_grad()"
],
difficulty=3
),
EvalCase(
question="How does the Trainer class implement custom optimizer and learning rate scheduler creation? Explain the initialization process and supported configurations.",
expected_files=["trainer.py"],
expected_concepts=[
"optimizer initialization",
"learning rate scheduler",
"weight decay handling",
"optimizer parameter groups",
"AdamW configuration",
"custom optimizer support"
],
expected_code_refs=[
"def create_optimizer",
"def create_scheduler",
"get_decay_parameter_names",
"optimizer_grouped_parameters",
"self.args.learning_rate",
"optimizer_kwargs"
],
difficulty=3
)
]
TRANSFORMER_TEST_CASES : List[EvalCase] = [
*TRANSFORMER_QUANTIZER_BASE_CASES,
*TRANSFORMER_QUANTIZER_AUTO_CASES,
*TRANSFORMER_PIPELINE_BASE_TEST_CASES,
*TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES,
*TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES,
*TRANSFORMER_TRAINER_TEST_CASES,
]
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
return super().default(obj)
async def main():
from rich.console import Console
from rich.pretty import Pretty
import chromadb
console = Console()
config = AppConfig()
evaluator = ChatBotEvaluator(config)
collection = chromadb.PersistentClient(path=str(config.db.persist_directory)).get_collection(name=config.db.collection_name)
summary_list : List[EvalSummary] = []
for case in TRANSFORMER_TEST_CASES:
try:
chat_result : ChatResult = await process_chat(question=case.question, collection=collection, config=config)
result : EvalResult = await evaluator.evaluate_single(case, chat_result, config.evaluator.evaluation_rounds)
eval_summary = EvalSummary(
**chat_result.model_dump(),
**result.model_dump()
)
summary_list.append(eval_summary)
import time
time.sleep(3) # Sleep to avoid rate limiting
except Exception:
console.print_exception()
# Write the final JSON array to a file
current_date = datetime.datetime.now().strftime("%Y%m%d")
file_name = Path("evaluations", f"transformers_{config.evaluator.model_provider}_evaluation_results_{current_date}.json")
with open(file_name, "w") as f:
json_list = [summary.model_dump() for summary in summary_list]
json.dump(json_list, f, indent=2, cls=DateTimeEncoder)
console.print(Pretty(summary_list))
if __name__ == "__main__":
asyncio.run(main())