Spaces:
Sleeping
Sleeping
from typing import List | |
from enum import Enum | |
from pydantic import BaseModel, Field, computed_field | |
from pydantic_ai import Agent | |
from knowlang.configs.config import AppConfig | |
from knowlang.utils.chunking_util import truncate_chunk | |
from knowlang.utils.model_provider import create_pydantic_model | |
from knowlang.chat_bot.chat_graph import ChatResult, process_chat | |
import asyncio | |
import datetime | |
from pathlib import Path | |
import json | |
class EvalMetric(str, Enum): | |
CHUNK_RELEVANCE = "chunk_relevance" | |
ANSWER_CORRECTNESS = "answer_correctness" | |
CODE_REFERENCE = "code_reference" | |
class EvalCase(BaseModel): | |
"""Single evaluation case focused on code understanding""" | |
question: str | |
expected_files: List[str] = Field(description="Files that should be in retrieved chunks") | |
expected_concepts: List[str] = Field(description="Key concepts that should be in answer") | |
expected_code_refs: List[str] = Field(description="Code references that should be mentioned") | |
difficulty: int = Field(ge=1, le=3, description="1: Easy, 2: Medium, 3: Hard") | |
class MetricScores(BaseModel): | |
chunk_relevance: float = Field(ge=0.0, le=10.0, description="Score for chunk relevance") | |
answer_correctness: float = Field(ge=0.0, le=10.0, description="Score for answer correctness") | |
code_reference: float = Field(ge=0.0, le=10.0, description="Score for code reference quality") | |
def weighted_total(self) -> float: | |
"""Calculate weighted total score""" | |
weights = { | |
"chunk_relevance": 0.4, | |
"answer_correctness": 0.4, | |
"code_reference": 0.2 | |
} | |
return sum( | |
getattr(self, metric) * weight | |
for metric, weight in weights.items() | |
) | |
class EvalAgentResponse(MetricScores): | |
"""Raw response from evaluation agent""" | |
feedback: str | |
class EvalRound(BaseModel): | |
"""Single evaluation round results""" | |
round_id: int | |
eval_response: EvalAgentResponse | |
timestamp: datetime.datetime | |
class EvalResult(BaseModel): | |
"""Extended evaluation result with multiple rounds""" | |
evaluator_model: str | |
case: EvalCase | |
eval_rounds: List[EvalRound] | |
def aggregated_scores(self) -> MetricScores: | |
"""Calculate mean scores across rounds""" | |
chunk_relevance = EvalMetric.CHUNK_RELEVANCE.value | |
answer_correctness = EvalMetric.ANSWER_CORRECTNESS.value | |
code_reference = EvalMetric.CODE_REFERENCE.value | |
scores = { | |
chunk_relevance: [], | |
answer_correctness: [], | |
code_reference: [] | |
} | |
for round in self.eval_rounds: | |
for metric in scores.keys(): | |
scores[metric].append(getattr(round.eval_response, metric)) | |
return MetricScores( | |
chunk_relevance=sum(scores[chunk_relevance]) / len(self.eval_rounds), | |
answer_correctness=sum(scores[answer_correctness]) / len(self.eval_rounds), | |
code_reference=sum(scores[code_reference]) / len(self.eval_rounds) | |
) | |
class ChatBotEvaluationContext(EvalCase, ChatResult): | |
pass | |
class EvalSummary(EvalResult, ChatResult): | |
"""Evaluation summary with chat and evaluation results""" | |
pass | |
class ChatBotEvaluator: | |
def __init__(self, config: AppConfig): | |
"""Initialize evaluator with app config""" | |
self.config = config | |
self.eval_agent = Agent( | |
create_pydantic_model( | |
model_provider=config.evaluator.model_provider, | |
model_name=config.evaluator.model_name | |
), | |
system_prompt=self._build_eval_prompt(), | |
result_type=EvalAgentResponse | |
) | |
def _build_eval_prompt(self) -> str: | |
return """You are an expert evaluator of code understanding systems. | |
Evaluate the response based on these specific criteria: | |
1. Chunk Relevance (0-1): | |
- Are the retrieved code chunks from the expected files? | |
- Do they contain relevant code sections? | |
2. Answer Correctness (0-1): | |
- Does the answer accurately explain the code? | |
- Are the expected concepts covered? | |
3. Code Reference Quality (0-1): | |
- Does it properly cite specific code locations? | |
- Are code references clear and relevant? | |
Format your response as JSON: | |
{ | |
"chunk_relevance": float type score (from 0.0f to 10.0f), | |
"answer_correctness": float type score (from 0.0f to 10.0f), | |
"code_reference": float type score (from 0.0f to 10.0f), | |
"feedback": "Brief explanation of scores" | |
} | |
""" | |
async def evaluate_single( | |
self, | |
case: EvalCase, | |
chat_result: ChatResult, | |
num_rounds: int = 1, | |
) -> EvalResult: | |
"""Evaluate a single case for multiple rounds""" | |
eval_rounds = [] | |
# Prepare evaluation context | |
eval_context = ChatBotEvaluationContext( | |
**case.model_dump(), | |
**chat_result.model_dump() | |
) | |
for round_id in range(num_rounds): | |
# truncate chunks to avoid long text | |
for chunk in eval_context.retrieved_context.chunks: | |
chunk = truncate_chunk(chunk) | |
# Get evaluation from the model | |
result = await self.eval_agent.run( | |
eval_context.model_dump_json(), | |
) | |
eval_rounds.append(EvalRound( | |
round_id=round_id, | |
eval_response=result.data, | |
timestamp=datetime.datetime.now() | |
)) | |
# Add delay between rounds to avoid rate limits | |
await asyncio.sleep(2) | |
return EvalResult( | |
case=case, | |
eval_rounds=eval_rounds, | |
evaluator_model=f"{self.config.evaluator.model_provider}:{self.config.evaluator.model_name}" | |
) | |
# src/transformers/quantizers/base.py | |
TRANSFORMER_QUANTIZER_BASE_CASES = [ | |
EvalCase( | |
question= "How are different quantization methods implemented in the transformers library, and what are the key components required to implement a new quantization method?", | |
expected_files= ["quantizers/base.py"], | |
expected_concepts= [ | |
"HfQuantizer abstract base class", | |
"PreTrainedModel quantization", | |
"pre/post processing of models", | |
"quantization configuration", | |
"requires_calibration flag" | |
], | |
expected_code_refs= [ | |
"class HfQuantizer", | |
"preprocess_model method", | |
"postprocess_model method", | |
"_process_model_before_weight_loading", | |
"requires_calibration attribute" | |
], | |
difficulty= 3 | |
) | |
] | |
# src/transformers/quantizers/auto.py | |
TRANSFORMER_QUANTIZER_AUTO_CASES = [ | |
EvalCase( | |
question="How does the transformers library automatically select and configure the appropriate quantization method, and what happens when loading a pre-quantized model?", | |
expected_files=[ | |
"quantizers/auto.py", | |
"utils/quantization_config.py" | |
], | |
expected_concepts=[ | |
"automatic quantizer selection", | |
"quantization config mapping", | |
"config merging behavior", | |
"backwards compatibility for bitsandbytes", | |
"quantization method resolution" | |
], | |
expected_code_refs=[ | |
"AUTO_QUANTIZER_MAPPING", | |
"AUTO_QUANTIZATION_CONFIG_MAPPING", | |
"AutoHfQuantizer.from_config", | |
"AutoQuantizationConfig.from_pretrained", | |
"merge_quantization_configs method" | |
], | |
difficulty=3 | |
) | |
] | |
# src/transformers/pipelines/base.py | |
TRANSFORMER_PIPELINE_BASE_TEST_CASES = [ | |
EvalCase( | |
question="How does the Pipeline class handle model and device initialization?", | |
expected_files=["base.py"], | |
expected_concepts=[ | |
"device placement", | |
"model initialization", | |
"framework detection", | |
"device type detection", | |
"torch dtype handling" | |
], | |
expected_code_refs=[ | |
"def __init__", | |
"def device_placement", | |
"infer_framework_load_model", | |
"self.device = torch.device" | |
], | |
difficulty=3 | |
), | |
EvalCase( | |
question="How does the Pipeline class implement batched inference and data loading?", | |
expected_files=["base.py", "pt_utils.py"], | |
expected_concepts=[ | |
"batch processing", | |
"data loading", | |
"collate function", | |
"padding implementation", | |
"iterator pattern" | |
], | |
expected_code_refs=[ | |
"def get_iterator", | |
"class PipelineDataset", | |
"class PipelineIterator", | |
"_pad", | |
"pad_collate_fn" | |
], | |
difficulty=3 | |
) | |
] | |
# src/transformers/pipelines/text_generation.py | |
TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES = [ | |
EvalCase( | |
question="How does the TextGenerationPipeline handle chat-based generation and template processing?", | |
expected_files=["text_generation.py", "base.py"], | |
expected_concepts=[ | |
"chat message formatting", | |
"template application", | |
"message continuation", | |
"role handling", | |
"assistant prefill behavior" | |
], | |
expected_code_refs=[ | |
"class Chat", | |
"tokenizer.apply_chat_template", | |
"continue_final_message", | |
"isinstance(prompt_text, Chat)", | |
"postprocess" | |
], | |
difficulty=3 | |
) | |
] | |
# src/transformers/generation/logits_process.py | |
TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES = [ | |
EvalCase( | |
question="How does TopKLogitsWarper implement top-k filtering for text generation?", | |
expected_files=["generation/logits_process.py"], | |
expected_concepts=[ | |
"top-k filtering algorithm", | |
"probability masking", | |
"batch processing", | |
"logits manipulation", | |
"vocabulary filtering" | |
], | |
expected_code_refs=[ | |
"class TopKLogitsWarper(LogitsProcessor)", | |
"torch.topk(scores, top_k)[0]", | |
"indices_to_remove = scores < torch.topk", | |
"scores_processed = scores.masked_fill(indices_to_remove, self.filter_value)", | |
"top_k = max(top_k, min_tokens_to_keep)" | |
], | |
difficulty=3 | |
), | |
EvalCase( | |
question="How does TemperatureLogitsProcessor implement temperature sampling for controlling generation randomness?", | |
expected_files=["generation/logits_process.py"], | |
expected_concepts=[ | |
"temperature scaling", | |
"probability distribution shaping", | |
"logits normalization", | |
"generation randomness control", | |
"batch processing with temperature" | |
], | |
expected_code_refs=[ | |
"class TemperatureLogitsProcessor(LogitsProcessor)", | |
"scores_processed = scores / self.temperature", | |
"if not isinstance(temperature, float) or not (temperature > 0)", | |
"def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor)", | |
"raise ValueError(except_msg)" | |
], | |
difficulty=3 | |
) | |
] | |
# src/transformers/trainer.py | |
TRANSFORMER_TRAINER_TEST_CASES = [ | |
EvalCase( | |
question="How does Trainer handle distributed training and gradient accumulation? Explain the implementation details.", | |
expected_files=["trainer.py"], | |
expected_concepts=[ | |
"gradient accumulation steps", | |
"distributed training logic", | |
"optimizer step scheduling", | |
"loss scaling", | |
"device synchronization" | |
], | |
expected_code_refs=[ | |
"def training_step", | |
"def _wrap_model", | |
"self.accelerator.backward", | |
"self.args.gradient_accumulation_steps", | |
"if args.n_gpu > 1", | |
"model.zero_grad()" | |
], | |
difficulty=3 | |
), | |
EvalCase( | |
question="How does the Trainer class implement custom optimizer and learning rate scheduler creation? Explain the initialization process and supported configurations.", | |
expected_files=["trainer.py"], | |
expected_concepts=[ | |
"optimizer initialization", | |
"learning rate scheduler", | |
"weight decay handling", | |
"optimizer parameter groups", | |
"AdamW configuration", | |
"custom optimizer support" | |
], | |
expected_code_refs=[ | |
"def create_optimizer", | |
"def create_scheduler", | |
"get_decay_parameter_names", | |
"optimizer_grouped_parameters", | |
"self.args.learning_rate", | |
"optimizer_kwargs" | |
], | |
difficulty=3 | |
) | |
] | |
TRANSFORMER_TEST_CASES : List[EvalCase] = [ | |
*TRANSFORMER_QUANTIZER_BASE_CASES, | |
*TRANSFORMER_QUANTIZER_AUTO_CASES, | |
*TRANSFORMER_PIPELINE_BASE_TEST_CASES, | |
*TRANSFORMER_PIPELINE_TEXT_GENERATION_TEST_CASES, | |
*TRANSFORMER_LOGITS_PROCESSOR_TEST_CASES, | |
*TRANSFORMER_TRAINER_TEST_CASES, | |
] | |
class DateTimeEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, datetime.datetime): | |
return obj.isoformat() | |
return super().default(obj) | |
async def main(): | |
from rich.console import Console | |
from rich.pretty import Pretty | |
import chromadb | |
console = Console() | |
config = AppConfig() | |
evaluator = ChatBotEvaluator(config) | |
collection = chromadb.PersistentClient(path=str(config.db.persist_directory)).get_collection(name=config.db.collection_name) | |
summary_list : List[EvalSummary] = [] | |
for case in TRANSFORMER_TEST_CASES: | |
try: | |
chat_result : ChatResult = await process_chat(question=case.question, collection=collection, config=config) | |
result : EvalResult = await evaluator.evaluate_single(case, chat_result, config.evaluator.evaluation_rounds) | |
eval_summary = EvalSummary( | |
**chat_result.model_dump(), | |
**result.model_dump() | |
) | |
summary_list.append(eval_summary) | |
import time | |
time.sleep(3) # Sleep to avoid rate limiting | |
except Exception: | |
console.print_exception() | |
# Write the final JSON array to a file | |
current_date = datetime.datetime.now().strftime("%Y%m%d") | |
file_name = Path("evaluations", f"transformers_{config.evaluator.model_provider}_evaluation_results_{current_date}.json") | |
with open(file_name, "w") as f: | |
json_list = [summary.model_dump() for summary in summary_list] | |
json.dump(json_list, f, indent=2, cls=DateTimeEncoder) | |
console.print(Pretty(summary_list)) | |
if __name__ == "__main__": | |
asyncio.run(main()) |