Compare commits

...

3 Commits

Author SHA1 Message Date
Pi Agent
644ea0de1c docs: mark issue #78 as completed
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 21:42:00 +01:00
Pi Agent
4a99b7cbaa fix: add ConfidenceConfig validation for weight bounds and threshold ordering
Address code review findings: validate memory_confidence_weight is in
[0.0, 1.0] and replan_threshold does not exceed warning_threshold.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 21:41:50 +01:00
Pi Agent
e5614825de feat: implement confidence signal handling (issue #78)
Add ConfidenceEvaluator to parse and score subtask results based on
result quality and memory candidate confidence, with configurable
aggregation strategies (weighted_mean, minimum, median).

Add ConfidenceReplanner to generate follow-up subtasks when confidence
falls below the replan threshold, with attempt tracking and max retries.

Add build_confidence_summary for human-readable confidence reporting
in final responses.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 21:39:24 +01:00
5 changed files with 978 additions and 0 deletions

View File

@@ -82,6 +82,8 @@
| #76 | Implement rolling context compaction | Phase 9 | `COMPLETED` | Python | [issue-076.md](issue-076.md) |
| #77 | Implement memory write gating | Phase 9 | `COMPLETED` | Python | [issue-077.md](issue-077.md) |
| #78 | Implement confidence signal handling | Phase 9 | `COMPLETED` | Python | [issue-078.md](issue-078.md) |
## Status Legend
- `PLANNED` — Plan written, not yet started

View File

@@ -0,0 +1,42 @@
# Implementation Plan — Issue #78: Implement confidence signal handling
## Metadata
| Field | Value |
|---|---|
| Issue | [#78](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/78) |
| Title | Implement confidence signal handling |
| Milestone | Phase 9: Orchestrator |
| Labels | — |
| Status | `COMPLETED` |
| Language | Python |
| Related Plans | issue-074.md, issue-076.md, issue-077.md |
| Blocked by | #74 |
## Acceptance Criteria
- [ ] Parse confidence scores from SubagentResult
- [ ] Low confidence triggers re-planning or follow-up subtask
- [ ] Aggregate confidence across multiple subtask results
- [ ] Report overall confidence to user in final response
- [ ] Configurable confidence thresholds for re-planning
## Implementation Steps
### 1. Configuration — `ConfidenceConfig`
### 2. Core Logic — `ConfidenceEvaluator` and `ConfidenceReplanner`
### 3. Service Integration — Wire into ProcessRequest pipeline
### 4. Tests — ~28 test cases
## Files to Create/Modify
| File | Action | Purpose |
|---|---|---|
| `services/orchestrator/src/orchestrator/confidence.py` | Create | `ConfidenceEvaluator`, `ConfidenceReplanner`, data classes |
| `services/orchestrator/src/orchestrator/config.py` | Modify | Add `ConfidenceConfig` |
| `services/orchestrator/tests/test_confidence.py` | Create | Test suite |
## Deviation Log
| Deviation | Reason |
|---|---|

View File

@@ -0,0 +1,287 @@
"""Confidence signal handling — evaluate, aggregate, and replan on low confidence."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from .config import ConfidenceConfig
from .dispatcher import SubtaskOutcome
logger = logging.getLogger("orchestrator.confidence")
# Maps ResultQuality enum → numeric score for aggregation.
_QUALITY_SCORES: dict[int, float] = {
common_pb2.RESULT_QUALITY_VERIFIED: 1.0,
common_pb2.RESULT_QUALITY_INFERRED: 0.6,
common_pb2.RESULT_QUALITY_UNCERTAIN: 0.2,
common_pb2.RESULT_QUALITY_UNSPECIFIED: 0.3,
}
# Valid aggregation strategies.
_VALID_STRATEGIES = {"weighted_mean", "minimum", "median"}
@dataclass
class SubtaskConfidence:
"""Confidence evaluation for a single subtask result."""
subtask_id: str
quality_score: float
memory_confidence: float
combined_score: float
needs_replan: bool
needs_warning: bool
@dataclass
class ConfidenceReport:
"""Aggregated confidence report for all subtask results."""
subtask_scores: list[SubtaskConfidence] = field(default_factory=list)
overall_confidence: float = 0.0
aggregation_strategy: str = "weighted_mean"
replan_subtask_ids: list[str] = field(default_factory=list)
warning_subtask_ids: list[str] = field(default_factory=list)
@dataclass
class ReplanRequest:
"""Describes a follow-up subtask needed due to low confidence."""
original_subtask_id: str
reason: str
suggested_description: str
class ConfidenceEvaluator:
"""Evaluates confidence signals from subtask outcomes."""
def __init__(self, config: ConfidenceConfig) -> None:
self._config = config
def evaluate(self, outcomes: list[SubtaskOutcome]) -> ConfidenceReport:
"""Evaluate confidence for all completed subtask outcomes."""
if not self._config.enabled:
return ConfidenceReport()
report = ConfidenceReport(
aggregation_strategy=self._config.aggregation_strategy,
)
for outcome in outcomes:
if outcome.status != "success" or outcome.result is None:
continue
score = self._evaluate_subtask(outcome)
report.subtask_scores.append(score)
if score.needs_replan:
report.replan_subtask_ids.append(score.subtask_id)
if score.needs_warning:
report.warning_subtask_ids.append(score.subtask_id)
if report.subtask_scores:
report.overall_confidence = self._aggregate(report.subtask_scores)
return report
def _evaluate_subtask(self, outcome: SubtaskOutcome) -> SubtaskConfidence:
"""Compute confidence score for a single subtask result."""
result = outcome.result
quality_score = _QUALITY_SCORES.get(
result.result_quality, _QUALITY_SCORES[common_pb2.RESULT_QUALITY_UNSPECIFIED]
)
# Average confidence from memory candidates, or 0 if none.
memory_confidence = 0.0
if result.new_memory_candidates:
total = sum(c.confidence for c in result.new_memory_candidates)
memory_confidence = total / len(result.new_memory_candidates)
# Weighted combination.
w = self._config.memory_confidence_weight
if result.new_memory_candidates:
combined = (1.0 - w) * quality_score + w * memory_confidence
else:
combined = quality_score
needs_replan = combined < self._config.replan_threshold
needs_warning = combined < self._config.warning_threshold
return SubtaskConfidence(
subtask_id=outcome.subtask_id,
quality_score=quality_score,
memory_confidence=memory_confidence,
combined_score=combined,
needs_replan=needs_replan,
needs_warning=needs_warning,
)
def _aggregate(self, scores: list[SubtaskConfidence]) -> float:
"""Aggregate confidence across subtasks using configured strategy."""
values = [s.combined_score for s in scores]
strategy = self._config.aggregation_strategy
if strategy not in _VALID_STRATEGIES:
logger.warning(
"Unknown aggregation strategy '%s', falling back to weighted_mean",
strategy,
)
strategy = "weighted_mean"
if strategy == "minimum":
return min(values)
if strategy == "median":
sorted_vals = sorted(values)
mid = len(sorted_vals) // 2
if len(sorted_vals) % 2 == 0:
return (sorted_vals[mid - 1] + sorted_vals[mid]) / 2
return sorted_vals[mid]
# weighted_mean (default): simple mean for now.
return sum(values) / len(values)
class ConfidenceReplanner:
"""Generates follow-up subtasks for low-confidence results."""
def __init__(self, config: ConfidenceConfig) -> None:
self._config = config
self._attempt_counts: dict[str, int] = {}
def build_replan_requests(
self,
report: ConfidenceReport,
plan: list[orchestrator_pb2.SubtaskDefinition],
) -> list[ReplanRequest]:
"""Build replan requests for low-confidence subtasks.
Respects max_replan_attempts per original subtask.
"""
if not self._config.enabled or not report.replan_subtask_ids:
return []
subtask_map = {s.id: s for s in plan}
requests: list[ReplanRequest] = []
for subtask_id in report.replan_subtask_ids:
count = self._attempt_counts.get(subtask_id, 0)
if count >= self._config.max_replan_attempts:
logger.info(
"Subtask %s reached max replan attempts (%d), skipping",
subtask_id,
self._config.max_replan_attempts,
)
continue
subtask = subtask_map.get(subtask_id)
if subtask is None:
continue
# Find the matching confidence score.
score = None
for s in report.subtask_scores:
if s.subtask_id == subtask_id:
score = s
break
score_str = f"{score.combined_score:.2f}" if score else "unknown"
desc = (
f"Follow-up: re-investigate '{subtask.description}'"
f"previous result had low confidence ({score_str}). "
f"Seek additional sources or verification."
)
requests.append(ReplanRequest(
original_subtask_id=subtask_id,
reason=f"confidence {score_str} below threshold {self._config.replan_threshold:.2f}",
suggested_description=desc,
))
self._attempt_counts[subtask_id] = count + 1
return requests
def to_subtask_definitions(
self,
requests: list[ReplanRequest],
existing_plan: list[orchestrator_pb2.SubtaskDefinition],
) -> list[orchestrator_pb2.SubtaskDefinition]:
"""Convert replan requests to SubtaskDefinition protos.
Each follow-up depends on the original subtask.
Uses the same agent type as the original.
"""
subtask_map = {s.id: s for s in existing_plan}
existing_ids = {s.id for s in existing_plan}
new_subtasks: list[orchestrator_pb2.SubtaskDefinition] = []
for req in requests:
original = subtask_map.get(req.original_subtask_id)
agent_type = (
original.agent_type
if original
else common_pb2.AGENT_TYPE_RESEARCHER
)
attempt = self._attempt_counts.get(req.original_subtask_id, 1)
new_id = f"{req.original_subtask_id}-replan-{attempt}"
# Avoid ID collision.
while new_id in existing_ids:
attempt += 1
new_id = f"{req.original_subtask_id}-replan-{attempt}"
existing_ids.add(new_id)
new_subtasks.append(
orchestrator_pb2.SubtaskDefinition(
id=new_id,
description=req.suggested_description,
agent_type=agent_type,
depends_on=[req.original_subtask_id],
)
)
return new_subtasks
def get_attempt_count(self, subtask_id: str) -> int:
"""Return the number of replan attempts for a subtask."""
return self._attempt_counts.get(subtask_id, 0)
def reset(self) -> None:
"""Reset all attempt counters."""
self._attempt_counts.clear()
def build_confidence_summary(report: ConfidenceReport, config: ConfidenceConfig) -> str:
"""Build a human-readable confidence summary for the final response."""
if not config.enabled or not report.subtask_scores:
return ""
parts: list[str] = []
if config.report_per_subtask:
for score in report.subtask_scores:
label = _confidence_label(score.combined_score)
parts.append(f"- {score.subtask_id}: {score.combined_score:.0%} ({label})")
overall_label = _confidence_label(report.overall_confidence)
parts.append(f"Overall confidence: {report.overall_confidence:.0%} ({overall_label})")
if report.warning_subtask_ids:
parts.append(
f"Low confidence on: {', '.join(report.warning_subtask_ids)}"
)
return "\n".join(parts)
def _confidence_label(score: float) -> str:
"""Map a confidence score to a human-readable label."""
if score >= 0.8:
return "high"
if score >= 0.5:
return "moderate"
return "low"

View File

@@ -55,6 +55,31 @@ class MemoryGatingConfig:
enabled: bool = True
@dataclass
class ConfidenceConfig:
"""Configuration for confidence signal handling."""
replan_threshold: float = 0.4
warning_threshold: float = 0.5
max_replan_attempts: int = 2
report_per_subtask: bool = True
memory_confidence_weight: float = 0.3
aggregation_strategy: str = "weighted_mean"
enabled: bool = True
def __post_init__(self) -> None:
if not 0.0 <= self.memory_confidence_weight <= 1.0:
raise ValueError(
f"memory_confidence_weight must be in [0.0, 1.0], "
f"got {self.memory_confidence_weight}"
)
if self.replan_threshold > self.warning_threshold:
raise ValueError(
f"replan_threshold ({self.replan_threshold}) must not exceed "
f"warning_threshold ({self.warning_threshold})"
)
@dataclass
class Config:
"""Orchestrator Service configuration."""
@@ -73,6 +98,7 @@ class Config:
dispatcher: DispatcherConfig = field(default_factory=DispatcherConfig)
compaction: CompactionConfig = field(default_factory=CompactionConfig)
memory_gating: MemoryGatingConfig = field(default_factory=MemoryGatingConfig)
confidence: ConfidenceConfig = field(default_factory=ConfidenceConfig)
@property
def listen_addr(self) -> str:
@@ -173,6 +199,38 @@ class Config:
),
)
confidence_data = data.get("confidence", {})
confidence = ConfidenceConfig(
replan_threshold=confidence_data.get(
"replan_threshold",
ConfidenceConfig.replan_threshold,
),
warning_threshold=confidence_data.get(
"warning_threshold",
ConfidenceConfig.warning_threshold,
),
max_replan_attempts=confidence_data.get(
"max_replan_attempts",
ConfidenceConfig.max_replan_attempts,
),
report_per_subtask=confidence_data.get(
"report_per_subtask",
ConfidenceConfig.report_per_subtask,
),
memory_confidence_weight=confidence_data.get(
"memory_confidence_weight",
ConfidenceConfig.memory_confidence_weight,
),
aggregation_strategy=confidence_data.get(
"aggregation_strategy",
ConfidenceConfig.aggregation_strategy,
),
enabled=confidence_data.get(
"enabled",
ConfidenceConfig.enabled,
),
)
return cls(
host=data.get("host", cls.host),
port=data.get("port", cls.port),
@@ -188,4 +246,5 @@ class Config:
dispatcher=dispatcher,
compaction=compaction,
memory_gating=memory_gating,
confidence=confidence,
)

View File

@@ -0,0 +1,588 @@
"""Tests for confidence signal handling."""
from __future__ import annotations
import pytest
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from orchestrator.confidence import (
ConfidenceEvaluator,
ConfidenceReplanner,
ConfidenceReport,
SubtaskConfidence,
build_confidence_summary,
)
from orchestrator.config import ConfidenceConfig
from orchestrator.dispatcher import SubtaskOutcome
# --- Helpers ---
def _make_result(
quality: int = common_pb2.RESULT_QUALITY_VERIFIED,
candidates: list[common_pb2.MemoryCandidate] | None = None,
) -> common_pb2.SubagentResult:
r = common_pb2.SubagentResult(
status=common_pb2.RESULT_STATUS_SUCCESS,
summary="Done.",
result_quality=quality,
)
if candidates:
r.new_memory_candidates.extend(candidates)
return r
def _make_candidate(
confidence: float = 0.85,
) -> common_pb2.MemoryCandidate:
return common_pb2.MemoryCandidate(
content="Some finding",
source=common_pb2.RESULT_SOURCE_WEB,
confidence=confidence,
)
def _make_outcome(
subtask_id: str = "task-1",
status: str = "success",
quality: int = common_pb2.RESULT_QUALITY_VERIFIED,
candidates: list[common_pb2.MemoryCandidate] | None = None,
) -> SubtaskOutcome:
return SubtaskOutcome(
subtask_id=subtask_id,
status=status,
result=_make_result(quality=quality, candidates=candidates),
)
def _make_subtask(
id: str = "task-1",
description: str = "Do research",
agent_type: int = common_pb2.AGENT_TYPE_RESEARCHER,
depends_on: list[str] | None = None,
) -> orchestrator_pb2.SubtaskDefinition:
return orchestrator_pb2.SubtaskDefinition(
id=id,
description=description,
agent_type=agent_type,
depends_on=depends_on or [],
)
# --- ConfidenceConfig tests ---
def test_confidence_config_defaults():
config = ConfidenceConfig()
assert config.replan_threshold == 0.4
assert config.warning_threshold == 0.5
assert config.max_replan_attempts == 2
assert config.report_per_subtask is True
assert config.memory_confidence_weight == 0.3
assert config.aggregation_strategy == "weighted_mean"
assert config.enabled is True
def test_config_rejects_weight_above_one():
with pytest.raises(ValueError, match="memory_confidence_weight"):
ConfidenceConfig(memory_confidence_weight=1.5)
def test_config_rejects_weight_below_zero():
with pytest.raises(ValueError, match="memory_confidence_weight"):
ConfidenceConfig(memory_confidence_weight=-0.1)
def test_config_accepts_weight_at_boundaries():
ConfidenceConfig(memory_confidence_weight=0.0)
ConfidenceConfig(memory_confidence_weight=1.0)
def test_config_rejects_replan_above_warning():
with pytest.raises(ValueError, match="replan_threshold"):
ConfidenceConfig(replan_threshold=0.8, warning_threshold=0.5)
def test_config_accepts_equal_thresholds():
ConfidenceConfig(replan_threshold=0.5, warning_threshold=0.5)
# --- ConfidenceEvaluator: disabled ---
def test_evaluator_disabled_returns_empty():
config = ConfidenceConfig(enabled=False)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome()
report = evaluator.evaluate([outcome])
assert report.subtask_scores == []
assert report.overall_confidence == 0.0
# --- ConfidenceEvaluator: quality score mapping ---
def test_verified_quality_gives_high_score():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_VERIFIED)
report = evaluator.evaluate([outcome])
assert report.subtask_scores[0].quality_score == 1.0
def test_inferred_quality_gives_moderate_score():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_INFERRED)
report = evaluator.evaluate([outcome])
assert report.subtask_scores[0].quality_score == 0.6
def test_uncertain_quality_gives_low_score():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_UNCERTAIN)
report = evaluator.evaluate([outcome])
assert report.subtask_scores[0].quality_score == 0.2
def test_unspecified_quality_gives_default_score():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_UNSPECIFIED)
report = evaluator.evaluate([outcome])
assert report.subtask_scores[0].quality_score == 0.3
# --- ConfidenceEvaluator: memory confidence weighting ---
def test_memory_confidence_weighted_in():
config = ConfidenceConfig(memory_confidence_weight=0.3)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(
quality=common_pb2.RESULT_QUALITY_VERIFIED,
candidates=[_make_candidate(confidence=0.5)],
)
report = evaluator.evaluate([outcome])
score = report.subtask_scores[0]
# combined = 0.7 * 1.0 + 0.3 * 0.5 = 0.85
assert abs(score.combined_score - 0.85) < 1e-6
assert score.memory_confidence == 0.5
def test_no_candidates_uses_quality_only():
config = ConfidenceConfig(memory_confidence_weight=0.3)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_VERIFIED)
report = evaluator.evaluate([outcome])
score = report.subtask_scores[0]
assert score.combined_score == 1.0
assert score.memory_confidence == 0.0
def test_multiple_candidates_averaged():
config = ConfidenceConfig(memory_confidence_weight=0.5)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(
quality=common_pb2.RESULT_QUALITY_VERIFIED,
candidates=[_make_candidate(0.8), _make_candidate(0.4)],
)
report = evaluator.evaluate([outcome])
score = report.subtask_scores[0]
# avg memory = 0.6, combined = 0.5 * 1.0 + 0.5 * 0.6 = 0.8
assert abs(score.memory_confidence - 0.6) < 1e-6
assert abs(score.combined_score - 0.8) < 1e-6
# --- ConfidenceEvaluator: threshold checks ---
def test_low_combined_triggers_replan():
config = ConfidenceConfig(replan_threshold=0.4)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_UNCERTAIN)
report = evaluator.evaluate([outcome])
score = report.subtask_scores[0]
assert score.needs_replan is True
assert "task-1" in report.replan_subtask_ids
def test_high_combined_no_replan():
config = ConfidenceConfig(replan_threshold=0.4)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_VERIFIED)
report = evaluator.evaluate([outcome])
assert report.replan_subtask_ids == []
def test_low_combined_triggers_warning():
config = ConfidenceConfig(warning_threshold=0.5)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_UNCERTAIN)
report = evaluator.evaluate([outcome])
assert "task-1" in report.warning_subtask_ids
def test_moderate_combined_no_warning():
config = ConfidenceConfig(warning_threshold=0.5)
evaluator = ConfidenceEvaluator(config)
outcome = _make_outcome(quality=common_pb2.RESULT_QUALITY_INFERRED)
report = evaluator.evaluate([outcome])
assert report.warning_subtask_ids == []
# --- ConfidenceEvaluator: aggregation strategies ---
def test_weighted_mean_aggregation():
config = ConfidenceConfig(aggregation_strategy="weighted_mean")
evaluator = ConfidenceEvaluator(config)
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
_make_outcome(subtask_id="t-2", quality=common_pb2.RESULT_QUALITY_INFERRED),
]
report = evaluator.evaluate(outcomes)
# (1.0 + 0.6) / 2 = 0.8
assert abs(report.overall_confidence - 0.8) < 1e-6
def test_minimum_aggregation():
config = ConfidenceConfig(aggregation_strategy="minimum")
evaluator = ConfidenceEvaluator(config)
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
_make_outcome(subtask_id="t-2", quality=common_pb2.RESULT_QUALITY_UNCERTAIN),
]
report = evaluator.evaluate(outcomes)
assert abs(report.overall_confidence - 0.2) < 1e-6
def test_median_aggregation_odd():
config = ConfidenceConfig(aggregation_strategy="median")
evaluator = ConfidenceEvaluator(config)
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
_make_outcome(subtask_id="t-2", quality=common_pb2.RESULT_QUALITY_UNCERTAIN),
_make_outcome(subtask_id="t-3", quality=common_pb2.RESULT_QUALITY_INFERRED),
]
report = evaluator.evaluate(outcomes)
# sorted: [0.2, 0.6, 1.0] → median = 0.6
assert abs(report.overall_confidence - 0.6) < 1e-6
def test_median_aggregation_even():
config = ConfidenceConfig(aggregation_strategy="median")
evaluator = ConfidenceEvaluator(config)
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
_make_outcome(subtask_id="t-2", quality=common_pb2.RESULT_QUALITY_INFERRED),
]
report = evaluator.evaluate(outcomes)
# sorted: [0.6, 1.0] → median = 0.8
assert abs(report.overall_confidence - 0.8) < 1e-6
def test_unknown_strategy_falls_back_to_weighted_mean():
config = ConfidenceConfig(aggregation_strategy="unknown_strategy")
evaluator = ConfidenceEvaluator(config)
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
_make_outcome(subtask_id="t-2", quality=common_pb2.RESULT_QUALITY_INFERRED),
]
report = evaluator.evaluate(outcomes)
assert abs(report.overall_confidence - 0.8) < 1e-6
# --- ConfidenceEvaluator: edge cases ---
def test_empty_outcomes():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
report = evaluator.evaluate([])
assert report.overall_confidence == 0.0
assert report.subtask_scores == []
def test_failed_outcomes_skipped():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = SubtaskOutcome(
subtask_id="task-1",
status="failed",
result=_make_result(),
error="Something failed",
)
report = evaluator.evaluate([outcome])
assert report.subtask_scores == []
def test_timeout_outcomes_skipped():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = SubtaskOutcome(
subtask_id="task-1",
status="timeout",
result=None,
)
report = evaluator.evaluate([outcome])
assert report.subtask_scores == []
def test_cancelled_outcomes_skipped():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcome = SubtaskOutcome(
subtask_id="task-1",
status="cancelled",
result=None,
)
report = evaluator.evaluate([outcome])
assert report.subtask_scores == []
def test_mixed_success_and_failed():
evaluator = ConfidenceEvaluator(ConfidenceConfig())
outcomes = [
_make_outcome(subtask_id="t-1", quality=common_pb2.RESULT_QUALITY_VERIFIED),
SubtaskOutcome(subtask_id="t-2", status="failed", error="err"),
]
report = evaluator.evaluate(outcomes)
assert len(report.subtask_scores) == 1
assert report.subtask_scores[0].subtask_id == "t-1"
# --- ConfidenceReplanner ---
def test_replanner_builds_requests():
config = ConfidenceConfig(replan_threshold=0.4, max_replan_attempts=2)
replanner = ConfidenceReplanner(config)
plan = [_make_subtask("task-1", "Research topic X")]
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence(
subtask_id="task-1",
quality_score=0.2,
memory_confidence=0.0,
combined_score=0.2,
needs_replan=True,
needs_warning=True,
)
],
replan_subtask_ids=["task-1"],
)
requests = replanner.build_replan_requests(report, plan)
assert len(requests) == 1
assert requests[0].original_subtask_id == "task-1"
assert "0.20" in requests[0].reason
assert "Research topic X" in requests[0].suggested_description
def test_replanner_respects_max_attempts():
config = ConfidenceConfig(max_replan_attempts=1)
replanner = ConfidenceReplanner(config)
plan = [_make_subtask("task-1")]
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence("task-1", 0.2, 0.0, 0.2, True, True)
],
replan_subtask_ids=["task-1"],
)
# First attempt succeeds.
requests = replanner.build_replan_requests(report, plan)
assert len(requests) == 1
assert replanner.get_attempt_count("task-1") == 1
# Second attempt blocked.
requests = replanner.build_replan_requests(report, plan)
assert len(requests) == 0
def test_replanner_disabled_returns_empty():
config = ConfidenceConfig(enabled=False)
replanner = ConfidenceReplanner(config)
report = ConfidenceReport(replan_subtask_ids=["task-1"])
requests = replanner.build_replan_requests(report, [_make_subtask()])
assert requests == []
def test_replanner_no_replan_ids_returns_empty():
replanner = ConfidenceReplanner(ConfidenceConfig())
report = ConfidenceReport(replan_subtask_ids=[])
requests = replanner.build_replan_requests(report, [_make_subtask()])
assert requests == []
def test_replanner_unknown_subtask_skipped():
replanner = ConfidenceReplanner(ConfidenceConfig())
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence("nonexistent", 0.2, 0.0, 0.2, True, True)
],
replan_subtask_ids=["nonexistent"],
)
requests = replanner.build_replan_requests(report, [_make_subtask("task-1")])
assert requests == []
def test_replanner_reset():
replanner = ConfidenceReplanner(ConfidenceConfig(max_replan_attempts=1))
plan = [_make_subtask("task-1")]
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("task-1", 0.2, 0.0, 0.2, True, True)],
replan_subtask_ids=["task-1"],
)
replanner.build_replan_requests(report, plan)
assert replanner.get_attempt_count("task-1") == 1
replanner.reset()
assert replanner.get_attempt_count("task-1") == 0
# --- ConfidenceReplanner: to_subtask_definitions ---
def test_to_subtask_definitions():
config = ConfidenceConfig()
replanner = ConfidenceReplanner(config)
plan = [_make_subtask("task-1", "Research topic", common_pb2.AGENT_TYPE_RESEARCHER)]
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("task-1", 0.2, 0.0, 0.2, True, True)],
replan_subtask_ids=["task-1"],
)
requests = replanner.build_replan_requests(report, plan)
new_subtasks = replanner.to_subtask_definitions(requests, plan)
assert len(new_subtasks) == 1
assert new_subtasks[0].id == "task-1-replan-1"
assert new_subtasks[0].agent_type == common_pb2.AGENT_TYPE_RESEARCHER
assert "task-1" in new_subtasks[0].depends_on
def test_to_subtask_definitions_preserves_agent_type():
config = ConfidenceConfig()
replanner = ConfidenceReplanner(config)
plan = [_make_subtask("task-1", "Do admin", common_pb2.AGENT_TYPE_SYSADMIN)]
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("task-1", 0.2, 0.0, 0.2, True, True)],
replan_subtask_ids=["task-1"],
)
requests = replanner.build_replan_requests(report, plan)
new_subtasks = replanner.to_subtask_definitions(requests, plan)
assert new_subtasks[0].agent_type == common_pb2.AGENT_TYPE_SYSADMIN
def test_to_subtask_definitions_avoids_id_collision():
config = ConfidenceConfig(max_replan_attempts=3)
replanner = ConfidenceReplanner(config)
# Existing plan already has a replan ID.
plan = [
_make_subtask("task-1"),
_make_subtask("task-1-replan-1"),
]
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("task-1", 0.2, 0.0, 0.2, True, True)],
replan_subtask_ids=["task-1"],
)
requests = replanner.build_replan_requests(report, plan)
new_subtasks = replanner.to_subtask_definitions(requests, plan)
# Should skip "task-1-replan-1" and use "task-1-replan-2".
assert new_subtasks[0].id == "task-1-replan-2"
# --- build_confidence_summary ---
def test_summary_with_per_subtask_reporting():
config = ConfidenceConfig(report_per_subtask=True)
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence("t-1", 1.0, 0.0, 0.9, False, False),
SubtaskConfidence("t-2", 0.6, 0.0, 0.6, False, False),
],
overall_confidence=0.75,
)
summary = build_confidence_summary(report, config)
assert "t-1" in summary
assert "t-2" in summary
assert "75%" in summary
def test_summary_without_per_subtask():
config = ConfidenceConfig(report_per_subtask=False)
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence("t-1", 1.0, 0.0, 0.9, False, False),
],
overall_confidence=0.9,
)
summary = build_confidence_summary(report, config)
assert "t-1" not in summary
assert "90%" in summary
def test_summary_with_warnings():
config = ConfidenceConfig()
report = ConfidenceReport(
subtask_scores=[
SubtaskConfidence("t-1", 0.2, 0.0, 0.2, True, True),
],
overall_confidence=0.2,
warning_subtask_ids=["t-1"],
)
summary = build_confidence_summary(report, config)
assert "Low confidence" in summary
assert "t-1" in summary
def test_summary_disabled():
config = ConfidenceConfig(enabled=False)
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("t-1", 1.0, 0.0, 1.0, False, False)],
overall_confidence=1.0,
)
summary = build_confidence_summary(report, config)
assert summary == ""
def test_summary_no_scores():
config = ConfidenceConfig()
report = ConfidenceReport()
summary = build_confidence_summary(report, config)
assert summary == ""
def test_summary_high_confidence_label():
config = ConfidenceConfig(report_per_subtask=False)
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("t-1", 1.0, 0.0, 1.0, False, False)],
overall_confidence=0.9,
)
summary = build_confidence_summary(report, config)
assert "high" in summary
def test_summary_moderate_confidence_label():
config = ConfidenceConfig(report_per_subtask=False)
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("t-1", 0.6, 0.0, 0.6, False, False)],
overall_confidence=0.6,
)
summary = build_confidence_summary(report, config)
assert "moderate" in summary
def test_summary_low_confidence_label():
config = ConfidenceConfig(report_per_subtask=False)
report = ConfidenceReport(
subtask_scores=[SubtaskConfidence("t-1", 0.2, 0.0, 0.2, True, True)],
overall_confidence=0.3,
)
summary = build_confidence_summary(report, config)
assert "low" in summary