Compare commits
2 Commits
45d9ebe3b0
...
ad5d7f6769
| Author | SHA1 | Date | |
|---|---|---|---|
| ad5d7f6769 | |||
|
|
573ddf8fc6 |
@@ -110,6 +110,7 @@
|
||||
| #201 | Refactor: extract duplicated _CONFIDENCE_MAP and _JSON_BLOCK_RE | — | `COMPLETED` | Python | [issue-201.md](issue-201.md) |
|
||||
| #202 | Bug: Docker network missing internal:true and edge separation | — | `COMPLETED` | Docker / YAML | [issue-202.md](issue-202.md) |
|
||||
| #181 | Tech debt: deduplicate agent type name strings across manifest and config | — | `COMPLETED` | Rust / Python | [issue-181.md](issue-181.md) |
|
||||
| #188 | Tech debt: extract shared agent loop base class | — | `COMPLETED` | Python | [issue-188.md](issue-188.md) |
|
||||
| #182 | Tech debt: extract ToolOutput error helper to reduce boilerplate | — | `COMPLETED` | Rust | [issue-182.md](issue-182.md) |
|
||||
| #203 | Refactor: deduplicate Rust Dockerfile builder stage boilerplate | — | `COMPLETED` | Docker | [issue-203.md](issue-203.md) |
|
||||
|
||||
|
||||
50
implementation-plans/issue-188.md
Normal file
50
implementation-plans/issue-188.md
Normal file
@@ -0,0 +1,50 @@
|
||||
# Implementation Plan — Issue #188: Extract shared agent loop base class
|
||||
|
||||
## Metadata
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Issue | [#188](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/188) |
|
||||
| Title | Tech debt: extract shared agent loop base class from researcher/coder/assistant |
|
||||
| Milestone | — (tech debt) |
|
||||
| Labels | `type:refactor`, `priority:low`, `service:orchestrator` |
|
||||
| Status | `COMPLETED` |
|
||||
| Language | Python |
|
||||
| Related Plans | [issue-192.md](issue-192.md) |
|
||||
| Blocked by | — |
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [x] Shared `BaseAgent` class with parameterized `run()` loop
|
||||
- [x] All 4 agent files reduced to ~20 lines (class attrs + factory function)
|
||||
- [x] All existing tests pass (482 tests)
|
||||
- [x] Lint clean
|
||||
|
||||
## Implementation Steps
|
||||
|
||||
### 1. Create `base_agent.py` with `BaseAgent` class
|
||||
Extracts the shared `run()` method parameterized by class attributes:
|
||||
- `agent_type`: proto enum value
|
||||
- `id_prefix`: agent ID prefix string
|
||||
- `summary_prefix`: human-readable prefix for results
|
||||
- `system_prompt`: system prompt string or None for default
|
||||
- `tool_output_sources`: frozenset of tool names indicating RESULT_SOURCE_TOOL_OUTPUT
|
||||
|
||||
### 2. Simplify 4 agent files
|
||||
Each agent module reduced to: class with 5 class attributes + factory function.
|
||||
|
||||
## Files Created/Modified
|
||||
|
||||
| File | Action | Purpose |
|
||||
|---|---|---|
|
||||
| `services/orchestrator/src/orchestrator/base_agent.py` | Create | Shared base class with run() loop |
|
||||
| `services/orchestrator/src/orchestrator/researcher.py` | Modify | Extend BaseAgent |
|
||||
| `services/orchestrator/src/orchestrator/coder.py` | Modify | Extend BaseAgent |
|
||||
| `services/orchestrator/src/orchestrator/assistant.py` | Modify | Extend BaseAgent |
|
||||
| `services/orchestrator/src/orchestrator/sysadmin.py` | Modify | Extend BaseAgent |
|
||||
|
||||
## Deviation Log
|
||||
|
||||
| Deviation | Reason |
|
||||
|---|---|
|
||||
| "No tools available" message uses summary_prefix instead of agent type name | Minor wording change; tests only check for "No tools" substring |
|
||||
@@ -1,204 +1,23 @@
|
||||
"""Assistant agent — conversational Q&A, summarization, and information retrieval."""
|
||||
"""Assistant agent — general-purpose help and Q&A."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import grpc
|
||||
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
|
||||
from llm_multiverse.v1 import common_pb2
|
||||
|
||||
from .agent_utils import (
|
||||
build_failed_result,
|
||||
build_partial_result,
|
||||
build_success_result,
|
||||
format_memory,
|
||||
)
|
||||
from .base_agent import BaseAgent
|
||||
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
|
||||
from .config import AgentConfig
|
||||
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
|
||||
from .prompt import ASSISTANT_SYSTEM_PROMPT, PromptBuilder
|
||||
|
||||
logger = logging.getLogger("orchestrator.assistant")
|
||||
|
||||
_SUMMARY_PREFIX = "Assistance"
|
||||
|
||||
_MAX_CONSECUTIVE_FAILURES = 3
|
||||
from .prompt import ASSISTANT_SYSTEM_PROMPT
|
||||
|
||||
|
||||
class AssistantAgent:
|
||||
"""Executes the assistant agent loop: infer -> tool call -> observe -> repeat."""
|
||||
class AssistantAgent(BaseAgent):
|
||||
"""Assistant agent: provides general-purpose help and answers questions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_gateway: ModelGatewayClient,
|
||||
tool_broker: ToolBrokerClient,
|
||||
memory: MemoryClient,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
self._gateway = model_gateway
|
||||
self._broker = tool_broker
|
||||
self._memory = memory
|
||||
self._config = config
|
||||
|
||||
async def run(
|
||||
self, request: orchestrator_pb2.SubagentRequest
|
||||
) -> common_pb2.SubagentResult:
|
||||
"""Execute the assistant loop for a given task."""
|
||||
session_ctx = request.context
|
||||
agent_id = request.agent_id or f"ast-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Step 1: Discover available tools
|
||||
try:
|
||||
tools = await self._broker.discover_tools(
|
||||
session_ctx, common_pb2.AGENT_TYPE_ASSISTANT, request.task
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
if not tools:
|
||||
return build_failed_result("No tools available for assistant", _SUMMARY_PREFIX)
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
|
||||
# Step 2: Memory context enrichment
|
||||
memory_context = list(request.relevant_memory_context)
|
||||
if not memory_context:
|
||||
try:
|
||||
mem_results = await self._memory.query_memory(
|
||||
session_ctx, request.task, limit=3
|
||||
)
|
||||
memory_context = [format_memory(r) for r in mem_results]
|
||||
except grpc.aio.AioRpcError:
|
||||
logger.warning("Memory service unavailable, proceeding without context")
|
||||
|
||||
# Step 3: Initialize prompt builder with assistant system prompt
|
||||
max_tokens = request.max_tokens or self._config.max_tokens
|
||||
prompt_builder = PromptBuilder(
|
||||
max_tokens=max_tokens,
|
||||
system_prompt=ASSISTANT_SYSTEM_PROMPT,
|
||||
)
|
||||
prompt_builder.set_task(request.task)
|
||||
prompt_builder.set_tool_definitions(tools)
|
||||
prompt_builder.set_memory_context(memory_context)
|
||||
|
||||
# Step 4: Agent loop
|
||||
iteration = 0
|
||||
consecutive_failures = 0
|
||||
used_web_search = False
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
# Termination checks
|
||||
if iteration >= self._config.max_iterations:
|
||||
return build_partial_result(
|
||||
"Max iterations reached", _SUMMARY_PREFIX, False, used_web_search
|
||||
)
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= self._config.timeout_seconds:
|
||||
return build_partial_result("Timeout", _SUMMARY_PREFIX, False, used_web_search)
|
||||
|
||||
if prompt_builder.needs_compaction():
|
||||
compacted = await prompt_builder.compact(
|
||||
self._gateway, session_ctx
|
||||
)
|
||||
if not compacted:
|
||||
return build_partial_result(
|
||||
"Context overflow after compaction",
|
||||
_SUMMARY_PREFIX,
|
||||
False,
|
||||
used_web_search,
|
||||
)
|
||||
logger.info(
|
||||
"Context compacted for agent %s (iteration %d)",
|
||||
agent_id,
|
||||
iteration,
|
||||
)
|
||||
|
||||
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
|
||||
return build_failed_result(
|
||||
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
|
||||
_SUMMARY_PREFIX,
|
||||
)
|
||||
|
||||
# Build prompt and call model
|
||||
prompt = prompt_builder.build()
|
||||
try:
|
||||
response_text = await self._gateway.stream_inference(
|
||||
session_ctx, prompt, max_tokens=max_tokens
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
# Parse model output
|
||||
try:
|
||||
parsed = parse_agent_response(response_text)
|
||||
except ParseError as e:
|
||||
prompt_builder.add_reasoning(
|
||||
f"[Parse error: {e}. Please output valid JSON.]"
|
||||
)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if parsed is None:
|
||||
# Plain reasoning text — append and continue
|
||||
prompt_builder.add_reasoning(response_text)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if isinstance(parsed, DoneSignalParsed):
|
||||
return build_success_result(parsed, False, used_web_search)
|
||||
|
||||
if isinstance(parsed, ToolCallParsed):
|
||||
# Validate tool name
|
||||
if parsed.tool not in tool_names:
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
f"Error: tool '{parsed.tool}' is not available. "
|
||||
f"Available tools: {', '.join(sorted(tool_names))}",
|
||||
False,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
iteration += 1
|
||||
consecutive_failures += 1
|
||||
continue
|
||||
|
||||
# Execute tool
|
||||
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
|
||||
try:
|
||||
output, success = await self._broker.execute_tool(
|
||||
session_ctx,
|
||||
common_pb2.AGENT_TYPE_ASSISTANT,
|
||||
parsed.tool,
|
||||
parsed.parameters,
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
output = f"Tool execution error: {e.code()}"
|
||||
success = False
|
||||
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
output,
|
||||
success,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
|
||||
if parsed.tool == "web_search" and success:
|
||||
used_web_search = True
|
||||
|
||||
if success:
|
||||
consecutive_failures = 0
|
||||
else:
|
||||
consecutive_failures += 1
|
||||
|
||||
iteration += 1
|
||||
|
||||
# Unreachable, but satisfies type checker
|
||||
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
|
||||
agent_type = common_pb2.AGENT_TYPE_ASSISTANT
|
||||
id_prefix = "ast-"
|
||||
summary_prefix = "Assistance"
|
||||
system_prompt = ASSISTANT_SYSTEM_PROMPT
|
||||
|
||||
|
||||
def create_assistant_agent(
|
||||
@@ -214,5 +33,3 @@ def create_assistant_agent(
|
||||
memory=MemoryClient(memory_channel),
|
||||
config=config,
|
||||
)
|
||||
|
||||
|
||||
|
||||
231
services/orchestrator/src/orchestrator/base_agent.py
Normal file
231
services/orchestrator/src/orchestrator/base_agent.py
Normal file
@@ -0,0 +1,231 @@
|
||||
"""Base agent loop shared by all agent types."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import grpc
|
||||
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
|
||||
|
||||
from .agent_utils import (
|
||||
build_failed_result,
|
||||
build_partial_result,
|
||||
build_success_result,
|
||||
format_memory,
|
||||
)
|
||||
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
|
||||
from .config import AgentConfig
|
||||
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
|
||||
from .prompt import PromptBuilder
|
||||
|
||||
_MAX_CONSECUTIVE_FAILURES = 3
|
||||
|
||||
|
||||
class BaseAgent:
|
||||
"""Shared agent loop: infer -> tool call -> observe -> repeat.
|
||||
|
||||
Subclasses configure via class attributes:
|
||||
agent_type: Proto AgentType enum value (e.g. AGENT_TYPE_RESEARCHER).
|
||||
id_prefix: Short prefix for agent IDs (e.g. "res-").
|
||||
summary_prefix: Human-readable prefix for result summaries.
|
||||
system_prompt: System prompt string, or None for default.
|
||||
tool_output_sources: Set of tool names that indicate RESULT_SOURCE_TOOL_OUTPUT.
|
||||
"""
|
||||
|
||||
agent_type: int
|
||||
id_prefix: str
|
||||
summary_prefix: str
|
||||
system_prompt: str | None = None
|
||||
tool_output_sources: frozenset[str] = frozenset()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_gateway: ModelGatewayClient,
|
||||
tool_broker: ToolBrokerClient,
|
||||
memory: MemoryClient,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
self._gateway = model_gateway
|
||||
self._broker = tool_broker
|
||||
self._memory = memory
|
||||
self._config = config
|
||||
self._logger = logging.getLogger(
|
||||
f"orchestrator.{self.__class__.__name__.lower()}"
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, request: orchestrator_pb2.SubagentRequest
|
||||
) -> common_pb2.SubagentResult:
|
||||
"""Execute the agent loop for a given task."""
|
||||
session_ctx = request.context
|
||||
agent_id = request.agent_id or f"{self.id_prefix}{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Step 1: Discover available tools
|
||||
try:
|
||||
tools = await self._broker.discover_tools(
|
||||
session_ctx, self.agent_type, request.task
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(
|
||||
f"Tool discovery failed: {e.code()}", self.summary_prefix
|
||||
)
|
||||
|
||||
if not tools:
|
||||
return build_failed_result(
|
||||
f"No tools available for {self.summary_prefix.lower()}",
|
||||
self.summary_prefix,
|
||||
)
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
|
||||
# Step 2: Memory context enrichment
|
||||
memory_context = list(request.relevant_memory_context)
|
||||
if not memory_context:
|
||||
try:
|
||||
mem_results = await self._memory.query_memory(
|
||||
session_ctx, request.task, limit=3
|
||||
)
|
||||
memory_context = [format_memory(r) for r in mem_results]
|
||||
except grpc.aio.AioRpcError:
|
||||
self._logger.warning(
|
||||
"Memory service unavailable, proceeding without context"
|
||||
)
|
||||
|
||||
# Step 3: Initialize prompt builder
|
||||
max_tokens = request.max_tokens or self._config.max_tokens
|
||||
kwargs: dict = {"max_tokens": max_tokens}
|
||||
if self.system_prompt is not None:
|
||||
kwargs["system_prompt"] = self.system_prompt
|
||||
prompt_builder = PromptBuilder(**kwargs)
|
||||
prompt_builder.set_task(request.task)
|
||||
prompt_builder.set_tool_definitions(tools)
|
||||
prompt_builder.set_memory_context(memory_context)
|
||||
|
||||
# Step 4: Agent loop
|
||||
iteration = 0
|
||||
consecutive_failures = 0
|
||||
used_tool_output = False
|
||||
used_web_search = False
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
# Termination checks
|
||||
if iteration >= self._config.max_iterations:
|
||||
return build_partial_result(
|
||||
"Max iterations reached",
|
||||
self.summary_prefix,
|
||||
used_tool_output,
|
||||
used_web_search,
|
||||
)
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= self._config.timeout_seconds:
|
||||
return build_partial_result(
|
||||
"Timeout",
|
||||
self.summary_prefix,
|
||||
used_tool_output,
|
||||
used_web_search,
|
||||
)
|
||||
|
||||
if prompt_builder.needs_compaction():
|
||||
compacted = await prompt_builder.compact(
|
||||
self._gateway, session_ctx
|
||||
)
|
||||
if not compacted:
|
||||
return build_partial_result(
|
||||
"Context overflow after compaction",
|
||||
self.summary_prefix,
|
||||
used_tool_output,
|
||||
used_web_search,
|
||||
)
|
||||
self._logger.info(
|
||||
"Context compacted for agent %s (iteration %d)",
|
||||
agent_id,
|
||||
iteration,
|
||||
)
|
||||
|
||||
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
|
||||
return build_failed_result(
|
||||
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
|
||||
self.summary_prefix,
|
||||
)
|
||||
|
||||
# Build prompt and call model
|
||||
prompt = prompt_builder.build()
|
||||
try:
|
||||
response_text = await self._gateway.stream_inference(
|
||||
session_ctx, prompt, max_tokens=max_tokens
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(
|
||||
f"Model gateway error: {e.code()}", self.summary_prefix
|
||||
)
|
||||
|
||||
# Parse model output
|
||||
try:
|
||||
parsed = parse_agent_response(response_text)
|
||||
except ParseError as e:
|
||||
prompt_builder.add_reasoning(
|
||||
f"[Parse error: {e}. Please output valid JSON.]"
|
||||
)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if parsed is None:
|
||||
prompt_builder.add_reasoning(response_text)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if isinstance(parsed, DoneSignalParsed):
|
||||
return build_success_result(
|
||||
parsed, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
if isinstance(parsed, ToolCallParsed):
|
||||
if parsed.tool not in tool_names:
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
f"Error: tool '{parsed.tool}' is not available. "
|
||||
f"Available tools: {', '.join(sorted(tool_names))}",
|
||||
False,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
iteration += 1
|
||||
consecutive_failures += 1
|
||||
continue
|
||||
|
||||
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
|
||||
try:
|
||||
output, success = await self._broker.execute_tool(
|
||||
session_ctx,
|
||||
self.agent_type,
|
||||
parsed.tool,
|
||||
parsed.parameters,
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
output = f"Tool execution error: {e.code()}"
|
||||
success = False
|
||||
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
output,
|
||||
success,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
|
||||
if success:
|
||||
if parsed.tool in self.tool_output_sources:
|
||||
used_tool_output = True
|
||||
if parsed.tool == "web_search":
|
||||
used_web_search = True
|
||||
consecutive_failures = 0
|
||||
else:
|
||||
consecutive_failures += 1
|
||||
|
||||
iteration += 1
|
||||
|
||||
return build_failed_result("Unexpected loop exit", self.summary_prefix) # pragma: no cover
|
||||
@@ -1,213 +1,24 @@
|
||||
"""Coder agent — code generation, debugging, and review loop."""
|
||||
"""Coder agent — code generation, debugging, and review."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import grpc
|
||||
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
|
||||
from llm_multiverse.v1 import common_pb2
|
||||
|
||||
from .agent_utils import (
|
||||
build_failed_result,
|
||||
build_partial_result,
|
||||
build_success_result,
|
||||
format_memory,
|
||||
)
|
||||
from .base_agent import BaseAgent
|
||||
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
|
||||
from .config import AgentConfig
|
||||
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
|
||||
from .prompt import CODER_SYSTEM_PROMPT, PromptBuilder
|
||||
|
||||
logger = logging.getLogger("orchestrator.coder")
|
||||
|
||||
_SUMMARY_PREFIX = "Coding task"
|
||||
|
||||
_MAX_CONSECUTIVE_FAILURES = 3
|
||||
|
||||
# Tools whose successful use indicates RESULT_SOURCE_TOOL_OUTPUT.
|
||||
_TOOL_OUTPUT_SOURCES = {"fs_read", "fs_write", "run_code"}
|
||||
from .prompt import CODER_SYSTEM_PROMPT
|
||||
|
||||
|
||||
class CoderAgent:
|
||||
"""Executes the coder agent loop: infer -> tool call -> observe -> repeat."""
|
||||
class CoderAgent(BaseAgent):
|
||||
"""Coder agent: generates, debugs, and reviews code."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_gateway: ModelGatewayClient,
|
||||
tool_broker: ToolBrokerClient,
|
||||
memory: MemoryClient,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
self._gateway = model_gateway
|
||||
self._broker = tool_broker
|
||||
self._memory = memory
|
||||
self._config = config
|
||||
|
||||
async def run(
|
||||
self, request: orchestrator_pb2.SubagentRequest
|
||||
) -> common_pb2.SubagentResult:
|
||||
"""Execute the coder loop for a given task."""
|
||||
session_ctx = request.context
|
||||
agent_id = request.agent_id or f"cod-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Step 1: Discover available tools
|
||||
try:
|
||||
tools = await self._broker.discover_tools(
|
||||
session_ctx, common_pb2.AGENT_TYPE_CODER, request.task
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
if not tools:
|
||||
return build_failed_result("No tools available for coder", _SUMMARY_PREFIX)
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
|
||||
# Step 2: Memory context enrichment
|
||||
memory_context = list(request.relevant_memory_context)
|
||||
if not memory_context:
|
||||
try:
|
||||
mem_results = await self._memory.query_memory(
|
||||
session_ctx, request.task, limit=3
|
||||
)
|
||||
memory_context = [format_memory(r) for r in mem_results]
|
||||
except grpc.aio.AioRpcError:
|
||||
logger.warning("Memory service unavailable, proceeding without context")
|
||||
|
||||
# Step 3: Initialize prompt builder with coder system prompt
|
||||
max_tokens = request.max_tokens or self._config.max_tokens
|
||||
prompt_builder = PromptBuilder(
|
||||
max_tokens=max_tokens,
|
||||
system_prompt=CODER_SYSTEM_PROMPT,
|
||||
)
|
||||
prompt_builder.set_task(request.task)
|
||||
prompt_builder.set_tool_definitions(tools)
|
||||
prompt_builder.set_memory_context(memory_context)
|
||||
|
||||
# Step 4: Agent loop
|
||||
iteration = 0
|
||||
consecutive_failures = 0
|
||||
used_tool_output = False
|
||||
used_web_search = False
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
# Termination checks
|
||||
if iteration >= self._config.max_iterations:
|
||||
return build_partial_result(
|
||||
"Max iterations reached", _SUMMARY_PREFIX, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= self._config.timeout_seconds:
|
||||
return build_partial_result(
|
||||
"Timeout", _SUMMARY_PREFIX, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
if prompt_builder.needs_compaction():
|
||||
compacted = await prompt_builder.compact(
|
||||
self._gateway, session_ctx
|
||||
)
|
||||
if not compacted:
|
||||
return build_partial_result(
|
||||
"Context overflow after compaction",
|
||||
_SUMMARY_PREFIX,
|
||||
used_tool_output,
|
||||
used_web_search,
|
||||
)
|
||||
logger.info(
|
||||
"Context compacted for agent %s (iteration %d)",
|
||||
agent_id,
|
||||
iteration,
|
||||
)
|
||||
|
||||
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
|
||||
return build_failed_result(
|
||||
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
|
||||
_SUMMARY_PREFIX,
|
||||
)
|
||||
|
||||
# Build prompt and call model
|
||||
prompt = prompt_builder.build()
|
||||
try:
|
||||
response_text = await self._gateway.stream_inference(
|
||||
session_ctx, prompt, max_tokens=max_tokens
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
# Parse model output
|
||||
try:
|
||||
parsed = parse_agent_response(response_text)
|
||||
except ParseError as e:
|
||||
prompt_builder.add_reasoning(
|
||||
f"[Parse error: {e}. Please output valid JSON.]"
|
||||
)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if parsed is None:
|
||||
# Plain reasoning text — append and continue
|
||||
prompt_builder.add_reasoning(response_text)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if isinstance(parsed, DoneSignalParsed):
|
||||
return build_success_result(
|
||||
parsed, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
if isinstance(parsed, ToolCallParsed):
|
||||
# Validate tool name
|
||||
if parsed.tool not in tool_names:
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
f"Error: tool '{parsed.tool}' is not available. "
|
||||
f"Available tools: {', '.join(sorted(tool_names))}",
|
||||
False,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
iteration += 1
|
||||
consecutive_failures += 1
|
||||
continue
|
||||
|
||||
# Execute tool
|
||||
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
|
||||
try:
|
||||
output, success = await self._broker.execute_tool(
|
||||
session_ctx,
|
||||
common_pb2.AGENT_TYPE_CODER,
|
||||
parsed.tool,
|
||||
parsed.parameters,
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
output = f"Tool execution error: {e.code()}"
|
||||
success = False
|
||||
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
output,
|
||||
success,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
|
||||
if success:
|
||||
if parsed.tool in _TOOL_OUTPUT_SOURCES:
|
||||
used_tool_output = True
|
||||
if parsed.tool == "web_search":
|
||||
used_web_search = True
|
||||
consecutive_failures = 0
|
||||
else:
|
||||
consecutive_failures += 1
|
||||
|
||||
iteration += 1
|
||||
|
||||
# Unreachable, but satisfies type checker
|
||||
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
|
||||
agent_type = common_pb2.AGENT_TYPE_CODER
|
||||
id_prefix = "cod-"
|
||||
summary_prefix = "Coding task"
|
||||
system_prompt = CODER_SYSTEM_PROMPT
|
||||
tool_output_sources = frozenset({"fs_read", "fs_write", "run_code"})
|
||||
|
||||
|
||||
def create_coder_agent(
|
||||
@@ -223,5 +34,3 @@ def create_coder_agent(
|
||||
memory=MemoryClient(memory_channel),
|
||||
config=config,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,198 +1,22 @@
|
||||
"""Researcher agent — core tool-use loop."""
|
||||
"""Researcher agent — web search and knowledge synthesis."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import grpc
|
||||
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
|
||||
from llm_multiverse.v1 import common_pb2
|
||||
|
||||
from .agent_utils import (
|
||||
build_failed_result,
|
||||
build_partial_result,
|
||||
build_success_result,
|
||||
format_memory,
|
||||
)
|
||||
from .base_agent import BaseAgent
|
||||
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
|
||||
from .config import AgentConfig
|
||||
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
|
||||
from .prompt import PromptBuilder
|
||||
|
||||
logger = logging.getLogger("orchestrator.researcher")
|
||||
|
||||
_SUMMARY_PREFIX = "Research"
|
||||
|
||||
_MAX_CONSECUTIVE_FAILURES = 3
|
||||
|
||||
|
||||
class ResearcherAgent:
|
||||
"""Executes the researcher agent loop: infer → tool call → observe → repeat."""
|
||||
class ResearcherAgent(BaseAgent):
|
||||
"""Researcher agent: searches the web and synthesizes knowledge."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_gateway: ModelGatewayClient,
|
||||
tool_broker: ToolBrokerClient,
|
||||
memory: MemoryClient,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
self._gateway = model_gateway
|
||||
self._broker = tool_broker
|
||||
self._memory = memory
|
||||
self._config = config
|
||||
|
||||
async def run(
|
||||
self, request: orchestrator_pb2.SubagentRequest
|
||||
) -> common_pb2.SubagentResult:
|
||||
"""Execute the researcher loop for a given task."""
|
||||
session_ctx = request.context
|
||||
agent_id = request.agent_id or f"res-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Step 1: Discover available tools
|
||||
try:
|
||||
tools = await self._broker.discover_tools(
|
||||
session_ctx, common_pb2.AGENT_TYPE_RESEARCHER, request.task
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
if not tools:
|
||||
return build_failed_result("No tools available for researcher", _SUMMARY_PREFIX)
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
|
||||
# Step 2: Memory context enrichment
|
||||
memory_context = list(request.relevant_memory_context)
|
||||
if not memory_context:
|
||||
try:
|
||||
mem_results = await self._memory.query_memory(
|
||||
session_ctx, request.task, limit=3
|
||||
)
|
||||
memory_context = [format_memory(r) for r in mem_results]
|
||||
except grpc.aio.AioRpcError:
|
||||
logger.warning("Memory service unavailable, proceeding without context")
|
||||
|
||||
# Step 3: Initialize prompt builder
|
||||
max_tokens = request.max_tokens or self._config.max_tokens
|
||||
prompt_builder = PromptBuilder(max_tokens=max_tokens)
|
||||
prompt_builder.set_task(request.task)
|
||||
prompt_builder.set_tool_definitions(tools)
|
||||
prompt_builder.set_memory_context(memory_context)
|
||||
|
||||
# Step 4: Agent loop
|
||||
iteration = 0
|
||||
consecutive_failures = 0
|
||||
used_web_search = False
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
# Termination checks
|
||||
if iteration >= self._config.max_iterations:
|
||||
return build_partial_result(
|
||||
"Max iterations reached", _SUMMARY_PREFIX, False, used_web_search
|
||||
)
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= self._config.timeout_seconds:
|
||||
return build_partial_result("Timeout", _SUMMARY_PREFIX, False, used_web_search)
|
||||
|
||||
if prompt_builder.needs_compaction():
|
||||
compacted = await prompt_builder.compact(
|
||||
self._gateway, session_ctx
|
||||
)
|
||||
if not compacted:
|
||||
return build_partial_result(
|
||||
"Context overflow after compaction", _SUMMARY_PREFIX, False, used_web_search
|
||||
)
|
||||
logger.info(
|
||||
"Context compacted for agent %s (iteration %d)",
|
||||
agent_id,
|
||||
iteration,
|
||||
)
|
||||
|
||||
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
|
||||
return build_failed_result(
|
||||
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
|
||||
_SUMMARY_PREFIX,
|
||||
)
|
||||
|
||||
# Build prompt and call model
|
||||
prompt = prompt_builder.build()
|
||||
try:
|
||||
response_text = await self._gateway.stream_inference(
|
||||
session_ctx, prompt, max_tokens=max_tokens
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
# Parse model output
|
||||
try:
|
||||
parsed = parse_agent_response(response_text)
|
||||
except ParseError as e:
|
||||
prompt_builder.add_reasoning(
|
||||
f"[Parse error: {e}. Please output valid JSON.]"
|
||||
)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if parsed is None:
|
||||
# Plain reasoning text — append and continue
|
||||
prompt_builder.add_reasoning(response_text)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if isinstance(parsed, DoneSignalParsed):
|
||||
return build_success_result(parsed, False, used_web_search)
|
||||
|
||||
if isinstance(parsed, ToolCallParsed):
|
||||
# Validate tool name
|
||||
if parsed.tool not in tool_names:
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
f"Error: tool '{parsed.tool}' is not available. "
|
||||
f"Available tools: {', '.join(sorted(tool_names))}",
|
||||
False,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
iteration += 1
|
||||
consecutive_failures += 1
|
||||
continue
|
||||
|
||||
# Execute tool
|
||||
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
|
||||
try:
|
||||
output, success = await self._broker.execute_tool(
|
||||
session_ctx,
|
||||
common_pb2.AGENT_TYPE_RESEARCHER,
|
||||
parsed.tool,
|
||||
parsed.parameters,
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
output = f"Tool execution error: {e.code()}"
|
||||
success = False
|
||||
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
output,
|
||||
success,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
|
||||
if parsed.tool == "web_search" and success:
|
||||
used_web_search = True
|
||||
|
||||
if success:
|
||||
consecutive_failures = 0
|
||||
else:
|
||||
consecutive_failures += 1
|
||||
|
||||
iteration += 1
|
||||
|
||||
# Unreachable, but satisfies type checker
|
||||
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
|
||||
agent_type = common_pb2.AGENT_TYPE_RESEARCHER
|
||||
id_prefix = "res-"
|
||||
summary_prefix = "Research"
|
||||
system_prompt = None # uses PromptBuilder default
|
||||
|
||||
|
||||
def create_researcher_agent(
|
||||
@@ -208,5 +32,3 @@ def create_researcher_agent(
|
||||
memory=MemoryClient(memory_channel),
|
||||
config=config,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1,213 +1,24 @@
|
||||
"""Sysadmin agent — system configuration, troubleshooting, and deployment."""
|
||||
"""Sysadmin agent — system administration and infrastructure tasks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import grpc
|
||||
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
|
||||
from llm_multiverse.v1 import common_pb2
|
||||
|
||||
from .agent_utils import (
|
||||
build_failed_result,
|
||||
build_partial_result,
|
||||
build_success_result,
|
||||
format_memory,
|
||||
)
|
||||
from .base_agent import BaseAgent
|
||||
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
|
||||
from .config import AgentConfig
|
||||
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
|
||||
from .prompt import SYSADMIN_SYSTEM_PROMPT, PromptBuilder
|
||||
|
||||
logger = logging.getLogger("orchestrator.sysadmin")
|
||||
|
||||
_SUMMARY_PREFIX = "System administration"
|
||||
|
||||
_MAX_CONSECUTIVE_FAILURES = 3
|
||||
|
||||
# Tools whose successful use indicates RESULT_SOURCE_TOOL_OUTPUT.
|
||||
_TOOL_OUTPUT_SOURCES = {"fs_read", "fs_write", "run_shell", "package_install"}
|
||||
from .prompt import SYSADMIN_SYSTEM_PROMPT
|
||||
|
||||
|
||||
class SysadminAgent:
|
||||
"""Executes the sysadmin agent loop: infer -> tool call -> observe -> repeat."""
|
||||
class SysadminAgent(BaseAgent):
|
||||
"""Sysadmin agent: manages systems and infrastructure."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_gateway: ModelGatewayClient,
|
||||
tool_broker: ToolBrokerClient,
|
||||
memory: MemoryClient,
|
||||
config: AgentConfig,
|
||||
) -> None:
|
||||
self._gateway = model_gateway
|
||||
self._broker = tool_broker
|
||||
self._memory = memory
|
||||
self._config = config
|
||||
|
||||
async def run(
|
||||
self, request: orchestrator_pb2.SubagentRequest
|
||||
) -> common_pb2.SubagentResult:
|
||||
"""Execute the sysadmin loop for a given task."""
|
||||
session_ctx = request.context
|
||||
agent_id = request.agent_id or f"sys-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# Step 1: Discover available tools
|
||||
try:
|
||||
tools = await self._broker.discover_tools(
|
||||
session_ctx, common_pb2.AGENT_TYPE_SYSADMIN, request.task
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
if not tools:
|
||||
return build_failed_result("No tools available for sysadmin", _SUMMARY_PREFIX)
|
||||
|
||||
tool_names = {t.name for t in tools}
|
||||
|
||||
# Step 2: Memory context enrichment
|
||||
memory_context = list(request.relevant_memory_context)
|
||||
if not memory_context:
|
||||
try:
|
||||
mem_results = await self._memory.query_memory(
|
||||
session_ctx, request.task, limit=3
|
||||
)
|
||||
memory_context = [format_memory(r) for r in mem_results]
|
||||
except grpc.aio.AioRpcError:
|
||||
logger.warning("Memory service unavailable, proceeding without context")
|
||||
|
||||
# Step 3: Initialize prompt builder with sysadmin system prompt
|
||||
max_tokens = request.max_tokens or self._config.max_tokens
|
||||
prompt_builder = PromptBuilder(
|
||||
max_tokens=max_tokens,
|
||||
system_prompt=SYSADMIN_SYSTEM_PROMPT,
|
||||
)
|
||||
prompt_builder.set_task(request.task)
|
||||
prompt_builder.set_tool_definitions(tools)
|
||||
prompt_builder.set_memory_context(memory_context)
|
||||
|
||||
# Step 4: Agent loop
|
||||
iteration = 0
|
||||
consecutive_failures = 0
|
||||
used_tool_output = False
|
||||
used_web_search = False
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
while True:
|
||||
# Termination checks
|
||||
if iteration >= self._config.max_iterations:
|
||||
return build_partial_result(
|
||||
"Max iterations reached", _SUMMARY_PREFIX, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
elapsed = asyncio.get_event_loop().time() - start_time
|
||||
if elapsed >= self._config.timeout_seconds:
|
||||
return build_partial_result(
|
||||
"Timeout", _SUMMARY_PREFIX, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
if prompt_builder.needs_compaction():
|
||||
compacted = await prompt_builder.compact(
|
||||
self._gateway, session_ctx
|
||||
)
|
||||
if not compacted:
|
||||
return build_partial_result(
|
||||
"Context overflow after compaction",
|
||||
_SUMMARY_PREFIX,
|
||||
used_tool_output,
|
||||
used_web_search,
|
||||
)
|
||||
logger.info(
|
||||
"Context compacted for agent %s (iteration %d)",
|
||||
agent_id,
|
||||
iteration,
|
||||
)
|
||||
|
||||
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
|
||||
return build_failed_result(
|
||||
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
|
||||
_SUMMARY_PREFIX,
|
||||
)
|
||||
|
||||
# Build prompt and call model
|
||||
prompt = prompt_builder.build()
|
||||
try:
|
||||
response_text = await self._gateway.stream_inference(
|
||||
session_ctx, prompt, max_tokens=max_tokens
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
|
||||
|
||||
# Parse model output
|
||||
try:
|
||||
parsed = parse_agent_response(response_text)
|
||||
except ParseError as e:
|
||||
prompt_builder.add_reasoning(
|
||||
f"[Parse error: {e}. Please output valid JSON.]"
|
||||
)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if parsed is None:
|
||||
# Plain reasoning text — append and continue
|
||||
prompt_builder.add_reasoning(response_text)
|
||||
iteration += 1
|
||||
continue
|
||||
|
||||
if isinstance(parsed, DoneSignalParsed):
|
||||
return build_success_result(
|
||||
parsed, used_tool_output, used_web_search
|
||||
)
|
||||
|
||||
if isinstance(parsed, ToolCallParsed):
|
||||
# Validate tool name
|
||||
if parsed.tool not in tool_names:
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
f"Error: tool '{parsed.tool}' is not available. "
|
||||
f"Available tools: {', '.join(sorted(tool_names))}",
|
||||
False,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
iteration += 1
|
||||
consecutive_failures += 1
|
||||
continue
|
||||
|
||||
# Execute tool
|
||||
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
|
||||
try:
|
||||
output, success = await self._broker.execute_tool(
|
||||
session_ctx,
|
||||
common_pb2.AGENT_TYPE_SYSADMIN,
|
||||
parsed.tool,
|
||||
parsed.parameters,
|
||||
)
|
||||
except grpc.aio.AioRpcError as e:
|
||||
output = f"Tool execution error: {e.code()}"
|
||||
success = False
|
||||
|
||||
prompt_builder.add_tool_result(
|
||||
parsed.tool,
|
||||
output,
|
||||
success,
|
||||
agent_id,
|
||||
session_ctx.session_id,
|
||||
)
|
||||
|
||||
if success:
|
||||
if parsed.tool in _TOOL_OUTPUT_SOURCES:
|
||||
used_tool_output = True
|
||||
if parsed.tool == "web_search":
|
||||
used_web_search = True
|
||||
consecutive_failures = 0
|
||||
else:
|
||||
consecutive_failures += 1
|
||||
|
||||
iteration += 1
|
||||
|
||||
# Unreachable, but satisfies type checker
|
||||
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
|
||||
agent_type = common_pb2.AGENT_TYPE_SYSADMIN
|
||||
id_prefix = "sys-"
|
||||
summary_prefix = "System administration"
|
||||
system_prompt = SYSADMIN_SYSTEM_PROMPT
|
||||
tool_output_sources = frozenset({"fs_read", "fs_write", "run_shell", "package_install"})
|
||||
|
||||
|
||||
def create_sysadmin_agent(
|
||||
@@ -223,5 +34,3 @@ def create_sysadmin_agent(
|
||||
memory=MemoryClient(memory_channel),
|
||||
config=config,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user