Skip to content

Processer

BaseProcesser

Base class for processers in evaluation tasks.

Each processer implements the following evaluation phases
  • load: load and process data (if necessary)
  • judge: judge the correctness of a batch of predictions
  • stat: get metrics.
Source code in utu/eval/processer/base_processor.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class BaseProcesser:
    """Base class for processers in evaluation tasks.

    Each processer implements the following evaluation phases:
      - load: load and process data (if necessary)
      - judge: judge the correctness of a batch of predictions
      - stat: get metrics.
    """

    name: str = None
    config: EvalConfig = None

    def __init__(self, config: EvalConfig) -> None:
        self.config = config

    @abc.abstractmethod
    def preprocess_one(self, sample: EvaluationSample) -> EvaluationSample:
        """Preprocess a single sample."""
        raise NotImplementedError

    @abc.abstractmethod
    async def judge_one(self, sample: EvaluationSample) -> EvaluationSample:
        """Judge a single sample."""
        raise NotImplementedError

    @abc.abstractmethod
    def calculate_metrics(self, samples: list[EvaluationSample]) -> dict:
        """Calculate metrics from the judged data."""
        raise NotImplementedError

    async def stat(self, samples: list[EvaluationSample]) -> dict:
        metrics = self.calculate_metrics(samples)
        return {"benchmark": self.name, "metrics": metrics}

preprocess_one abstractmethod

preprocess_one(
    sample: EvaluationSample,
) -> EvaluationSample

Preprocess a single sample.

Source code in utu/eval/processer/base_processor.py
22
23
24
25
@abc.abstractmethod
def preprocess_one(self, sample: EvaluationSample) -> EvaluationSample:
    """Preprocess a single sample."""
    raise NotImplementedError

judge_one abstractmethod async

judge_one(sample: EvaluationSample) -> EvaluationSample

Judge a single sample.

Source code in utu/eval/processer/base_processor.py
27
28
29
30
@abc.abstractmethod
async def judge_one(self, sample: EvaluationSample) -> EvaluationSample:
    """Judge a single sample."""
    raise NotImplementedError

calculate_metrics abstractmethod

calculate_metrics(samples: list[EvaluationSample]) -> dict

Calculate metrics from the judged data.

Source code in utu/eval/processer/base_processor.py
32
33
34
35
@abc.abstractmethod
def calculate_metrics(self, samples: list[EvaluationSample]) -> dict:
    """Calculate metrics from the judged data."""
    raise NotImplementedError

BaseLLMJudgeProcesser

Bases: BaseProcesser

Base class for processers that use LLM for judging.

Source code in utu/eval/processer/base_llm_processor.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class BaseLLMJudgeProcesser(BaseProcesser):
    """Base class for processers that use LLM for judging."""

    name = "default"

    def __init__(self, config: EvalConfig) -> None:
        super().__init__(config)
        self.judge_client = SimplifiedAsyncOpenAI(**config.judge_model.model_provider.model_dump())

    def preprocess_one(self, sample: EvaluationSample) -> EvaluationSample:
        """Preprocess a single sample."""
        question = sample.raw_question
        template = AUGMENTATION_PROMPTS.get(self.name, AUGMENTATION_PROMPTS["default"])
        augmented_question = template.format(question=question)
        sample.update(
            augmented_question=augmented_question,
        )
        return sample

    async def judge_one(self, data: EvaluationSample) -> EvaluationSample:
        """Judge a single sample."""
        question = data.raw_question
        response = data.response
        correct_answer = data.correct_answer or "unknown"

        if correct_answer == "unknown":
            # if correct answer is unknown, we cannot judge
            data.update(judged_response="invalid", correct=False)
            return data

        # if exact match, return directly(maybe extract exact answer from response first)
        if self._extract_exact_answer(response) == correct_answer:
            data.update(judged_response="Exact match", correct=True)
            return data

        messages = self._get_judge_messages(question=question, response=response, correct_answer=correct_answer)
        content = await self.judge_client.query_one(
            messages=messages, **self.config.judge_model.model_params.model_dump()
        )
        parsed_content = self._parse_judge_response(content)

        data.judged_response = content
        # update the return data with parsed content
        data.update(**parsed_content)
        return data

    def calculate_metrics(self, samples: list[EvaluationSample]) -> dict:
        """Caculate metrics from the judged data."""
        return {
            **MetricsUtils.calculate_overall_metrics(samples),
            **MetricsUtils.calculate_level_metrics(samples),
        }

    def _get_judge_messages(self, question: str, response: str, correct_answer: str) -> list:
        if self.name not in JUDGE_PROMPT_MAP:
            logger.warning(f"Judge prompt for {self.name} is not implemented! Using default judge prompt.")
        template = JUDGE_PROMPT_MAP.get(self.name, JUDGE_PROMPT_MAP["default"])
        input = template.format(question=question, response=response, correct_answer=correct_answer)
        return [{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": input}]

    def _parse_judge_response(self, response: str) -> dict:
        """Parse the judge response into a structured format."""
        pattern = re.compile(
            r"(?=.*?extracted_final_answer:\s*(?P<extracted_final_answer>.*?)(?=\n\s*\w+:|$))?"
            r"(?=.*?reasoning:\s*(?P<reasoning>.*?)(?=\n\s*\w+:|$))?"
            r"(?=.*?correct:\s*(?P<correct>.*?)(?=\n\s*\w+:|$))?"
            r"(?=.*?confidence:\s*(?P<confidence>\d+)\s*%?(?=\n\s*\w+:|$))?",
            re.DOTALL,
        )
        # remove the bold formatting
        response = response.replace("**", "")
        # search for the pattern in the response
        match = pattern.search(response)
        if not match:
            raise ValueError("Invalid judge response format.")

        return {
            "extracted_final_answer": match.group("extracted_final_answer").strip()
            if match.group("extracted_final_answer")
            else "",
            "reasoning": match.group("reasoning").strip() if match.group("reasoning") else "",
            "correct": match.group("correct").strip().lower() == "yes" if match.group("correct") else False,
            "confidence": int(match.group("confidence")) if match.group("confidence") else None,
        }

    def _extract_exact_answer(self, response: str) -> str:
        """Extract the exact answer from the response."""
        return response.strip() if response else ""

preprocess_one

preprocess_one(
    sample: EvaluationSample,
) -> EvaluationSample

Preprocess a single sample.

Source code in utu/eval/processer/base_llm_processor.py
32
33
34
35
36
37
38
39
40
def preprocess_one(self, sample: EvaluationSample) -> EvaluationSample:
    """Preprocess a single sample."""
    question = sample.raw_question
    template = AUGMENTATION_PROMPTS.get(self.name, AUGMENTATION_PROMPTS["default"])
    augmented_question = template.format(question=question)
    sample.update(
        augmented_question=augmented_question,
    )
    return sample

judge_one async

judge_one(data: EvaluationSample) -> EvaluationSample

Judge a single sample.

Source code in utu/eval/processer/base_llm_processor.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
async def judge_one(self, data: EvaluationSample) -> EvaluationSample:
    """Judge a single sample."""
    question = data.raw_question
    response = data.response
    correct_answer = data.correct_answer or "unknown"

    if correct_answer == "unknown":
        # if correct answer is unknown, we cannot judge
        data.update(judged_response="invalid", correct=False)
        return data

    # if exact match, return directly(maybe extract exact answer from response first)
    if self._extract_exact_answer(response) == correct_answer:
        data.update(judged_response="Exact match", correct=True)
        return data

    messages = self._get_judge_messages(question=question, response=response, correct_answer=correct_answer)
    content = await self.judge_client.query_one(
        messages=messages, **self.config.judge_model.model_params.model_dump()
    )
    parsed_content = self._parse_judge_response(content)

    data.judged_response = content
    # update the return data with parsed content
    data.update(**parsed_content)
    return data

calculate_metrics

calculate_metrics(samples: list[EvaluationSample]) -> dict

Caculate metrics from the judged data.

Source code in utu/eval/processer/base_llm_processor.py
69
70
71
72
73
74
def calculate_metrics(self, samples: list[EvaluationSample]) -> dict:
    """Caculate metrics from the judged data."""
    return {
        **MetricsUtils.calculate_overall_metrics(samples),
        **MetricsUtils.calculate_level_metrics(samples),
    }