Compare commits

...

2 Commits

Author SHA1 Message Date
ad5d7f6769 Merge pull request 'refactor: extract shared BaseAgent class from 4 agents (#188)' (#221) from refactor/issue-188-agent-base-class into main 2026-03-11 11:53:33 +01:00
Pi Agent
573ddf8fc6 refactor: extract shared BaseAgent class from 4 agent types (issue #188)
Create base_agent.py with the shared run() loop parameterized by
agent_type, id_prefix, summary_prefix, system_prompt, and
tool_output_sources. Reduce each agent module from ~210 lines to ~35.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 11:53:12 +01:00
7 changed files with 323 additions and 784 deletions

View File

@@ -110,6 +110,7 @@
| #201 | Refactor: extract duplicated _CONFIDENCE_MAP and _JSON_BLOCK_RE | — | `COMPLETED` | Python | [issue-201.md](issue-201.md) |
| #202 | Bug: Docker network missing internal:true and edge separation | — | `COMPLETED` | Docker / YAML | [issue-202.md](issue-202.md) |
| #181 | Tech debt: deduplicate agent type name strings across manifest and config | — | `COMPLETED` | Rust / Python | [issue-181.md](issue-181.md) |
| #188 | Tech debt: extract shared agent loop base class | — | `COMPLETED` | Python | [issue-188.md](issue-188.md) |
| #182 | Tech debt: extract ToolOutput error helper to reduce boilerplate | — | `COMPLETED` | Rust | [issue-182.md](issue-182.md) |
| #203 | Refactor: deduplicate Rust Dockerfile builder stage boilerplate | — | `COMPLETED` | Docker | [issue-203.md](issue-203.md) |

View File

@@ -0,0 +1,50 @@
# Implementation Plan — Issue #188: Extract shared agent loop base class
## Metadata
| Field | Value |
|---|---|
| Issue | [#188](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/188) |
| Title | Tech debt: extract shared agent loop base class from researcher/coder/assistant |
| Milestone | — (tech debt) |
| Labels | `type:refactor`, `priority:low`, `service:orchestrator` |
| Status | `COMPLETED` |
| Language | Python |
| Related Plans | [issue-192.md](issue-192.md) |
| Blocked by | — |
## Acceptance Criteria
- [x] Shared `BaseAgent` class with parameterized `run()` loop
- [x] All 4 agent files reduced to ~20 lines (class attrs + factory function)
- [x] All existing tests pass (482 tests)
- [x] Lint clean
## Implementation Steps
### 1. Create `base_agent.py` with `BaseAgent` class
Extracts the shared `run()` method parameterized by class attributes:
- `agent_type`: proto enum value
- `id_prefix`: agent ID prefix string
- `summary_prefix`: human-readable prefix for results
- `system_prompt`: system prompt string or None for default
- `tool_output_sources`: frozenset of tool names indicating RESULT_SOURCE_TOOL_OUTPUT
### 2. Simplify 4 agent files
Each agent module reduced to: class with 5 class attributes + factory function.
## Files Created/Modified
| File | Action | Purpose |
|---|---|---|
| `services/orchestrator/src/orchestrator/base_agent.py` | Create | Shared base class with run() loop |
| `services/orchestrator/src/orchestrator/researcher.py` | Modify | Extend BaseAgent |
| `services/orchestrator/src/orchestrator/coder.py` | Modify | Extend BaseAgent |
| `services/orchestrator/src/orchestrator/assistant.py` | Modify | Extend BaseAgent |
| `services/orchestrator/src/orchestrator/sysadmin.py` | Modify | Extend BaseAgent |
## Deviation Log
| Deviation | Reason |
|---|---|
| "No tools available" message uses summary_prefix instead of agent type name | Minor wording change; tests only check for "No tools" substring |

View File

@@ -1,204 +1,23 @@
"""Assistant agent — conversational Q&A, summarization, and information retrieval."""
"""Assistant agent — general-purpose help and Q&A."""
from __future__ import annotations
import asyncio
import logging
import uuid
import grpc
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from llm_multiverse.v1 import common_pb2
from .agent_utils import (
build_failed_result,
build_partial_result,
build_success_result,
format_memory,
)
from .base_agent import BaseAgent
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
from .config import AgentConfig
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
from .prompt import ASSISTANT_SYSTEM_PROMPT, PromptBuilder
logger = logging.getLogger("orchestrator.assistant")
_SUMMARY_PREFIX = "Assistance"
_MAX_CONSECUTIVE_FAILURES = 3
from .prompt import ASSISTANT_SYSTEM_PROMPT
class AssistantAgent:
"""Executes the assistant agent loop: infer -> tool call -> observe -> repeat."""
class AssistantAgent(BaseAgent):
"""Assistant agent: provides general-purpose help and answers questions."""
def __init__(
self,
model_gateway: ModelGatewayClient,
tool_broker: ToolBrokerClient,
memory: MemoryClient,
config: AgentConfig,
) -> None:
self._gateway = model_gateway
self._broker = tool_broker
self._memory = memory
self._config = config
async def run(
self, request: orchestrator_pb2.SubagentRequest
) -> common_pb2.SubagentResult:
"""Execute the assistant loop for a given task."""
session_ctx = request.context
agent_id = request.agent_id or f"ast-{uuid.uuid4().hex[:8]}"
# Step 1: Discover available tools
try:
tools = await self._broker.discover_tools(
session_ctx, common_pb2.AGENT_TYPE_ASSISTANT, request.task
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
if not tools:
return build_failed_result("No tools available for assistant", _SUMMARY_PREFIX)
tool_names = {t.name for t in tools}
# Step 2: Memory context enrichment
memory_context = list(request.relevant_memory_context)
if not memory_context:
try:
mem_results = await self._memory.query_memory(
session_ctx, request.task, limit=3
)
memory_context = [format_memory(r) for r in mem_results]
except grpc.aio.AioRpcError:
logger.warning("Memory service unavailable, proceeding without context")
# Step 3: Initialize prompt builder with assistant system prompt
max_tokens = request.max_tokens or self._config.max_tokens
prompt_builder = PromptBuilder(
max_tokens=max_tokens,
system_prompt=ASSISTANT_SYSTEM_PROMPT,
)
prompt_builder.set_task(request.task)
prompt_builder.set_tool_definitions(tools)
prompt_builder.set_memory_context(memory_context)
# Step 4: Agent loop
iteration = 0
consecutive_failures = 0
used_web_search = False
start_time = asyncio.get_event_loop().time()
while True:
# Termination checks
if iteration >= self._config.max_iterations:
return build_partial_result(
"Max iterations reached", _SUMMARY_PREFIX, False, used_web_search
)
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= self._config.timeout_seconds:
return build_partial_result("Timeout", _SUMMARY_PREFIX, False, used_web_search)
if prompt_builder.needs_compaction():
compacted = await prompt_builder.compact(
self._gateway, session_ctx
)
if not compacted:
return build_partial_result(
"Context overflow after compaction",
_SUMMARY_PREFIX,
False,
used_web_search,
)
logger.info(
"Context compacted for agent %s (iteration %d)",
agent_id,
iteration,
)
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
return build_failed_result(
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
_SUMMARY_PREFIX,
)
# Build prompt and call model
prompt = prompt_builder.build()
try:
response_text = await self._gateway.stream_inference(
session_ctx, prompt, max_tokens=max_tokens
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
# Parse model output
try:
parsed = parse_agent_response(response_text)
except ParseError as e:
prompt_builder.add_reasoning(
f"[Parse error: {e}. Please output valid JSON.]"
)
iteration += 1
continue
if parsed is None:
# Plain reasoning text — append and continue
prompt_builder.add_reasoning(response_text)
iteration += 1
continue
if isinstance(parsed, DoneSignalParsed):
return build_success_result(parsed, False, used_web_search)
if isinstance(parsed, ToolCallParsed):
# Validate tool name
if parsed.tool not in tool_names:
prompt_builder.add_tool_result(
parsed.tool,
f"Error: tool '{parsed.tool}' is not available. "
f"Available tools: {', '.join(sorted(tool_names))}",
False,
agent_id,
session_ctx.session_id,
)
iteration += 1
consecutive_failures += 1
continue
# Execute tool
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
try:
output, success = await self._broker.execute_tool(
session_ctx,
common_pb2.AGENT_TYPE_ASSISTANT,
parsed.tool,
parsed.parameters,
)
except grpc.aio.AioRpcError as e:
output = f"Tool execution error: {e.code()}"
success = False
prompt_builder.add_tool_result(
parsed.tool,
output,
success,
agent_id,
session_ctx.session_id,
)
if parsed.tool == "web_search" and success:
used_web_search = True
if success:
consecutive_failures = 0
else:
consecutive_failures += 1
iteration += 1
# Unreachable, but satisfies type checker
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
agent_type = common_pb2.AGENT_TYPE_ASSISTANT
id_prefix = "ast-"
summary_prefix = "Assistance"
system_prompt = ASSISTANT_SYSTEM_PROMPT
def create_assistant_agent(
@@ -214,5 +33,3 @@ def create_assistant_agent(
memory=MemoryClient(memory_channel),
config=config,
)

View File

@@ -0,0 +1,231 @@
"""Base agent loop shared by all agent types."""
from __future__ import annotations
import asyncio
import logging
import uuid
import grpc
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from .agent_utils import (
build_failed_result,
build_partial_result,
build_success_result,
format_memory,
)
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
from .config import AgentConfig
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
from .prompt import PromptBuilder
_MAX_CONSECUTIVE_FAILURES = 3
class BaseAgent:
"""Shared agent loop: infer -> tool call -> observe -> repeat.
Subclasses configure via class attributes:
agent_type: Proto AgentType enum value (e.g. AGENT_TYPE_RESEARCHER).
id_prefix: Short prefix for agent IDs (e.g. "res-").
summary_prefix: Human-readable prefix for result summaries.
system_prompt: System prompt string, or None for default.
tool_output_sources: Set of tool names that indicate RESULT_SOURCE_TOOL_OUTPUT.
"""
agent_type: int
id_prefix: str
summary_prefix: str
system_prompt: str | None = None
tool_output_sources: frozenset[str] = frozenset()
def __init__(
self,
model_gateway: ModelGatewayClient,
tool_broker: ToolBrokerClient,
memory: MemoryClient,
config: AgentConfig,
) -> None:
self._gateway = model_gateway
self._broker = tool_broker
self._memory = memory
self._config = config
self._logger = logging.getLogger(
f"orchestrator.{self.__class__.__name__.lower()}"
)
async def run(
self, request: orchestrator_pb2.SubagentRequest
) -> common_pb2.SubagentResult:
"""Execute the agent loop for a given task."""
session_ctx = request.context
agent_id = request.agent_id or f"{self.id_prefix}{uuid.uuid4().hex[:8]}"
# Step 1: Discover available tools
try:
tools = await self._broker.discover_tools(
session_ctx, self.agent_type, request.task
)
except grpc.aio.AioRpcError as e:
return build_failed_result(
f"Tool discovery failed: {e.code()}", self.summary_prefix
)
if not tools:
return build_failed_result(
f"No tools available for {self.summary_prefix.lower()}",
self.summary_prefix,
)
tool_names = {t.name for t in tools}
# Step 2: Memory context enrichment
memory_context = list(request.relevant_memory_context)
if not memory_context:
try:
mem_results = await self._memory.query_memory(
session_ctx, request.task, limit=3
)
memory_context = [format_memory(r) for r in mem_results]
except grpc.aio.AioRpcError:
self._logger.warning(
"Memory service unavailable, proceeding without context"
)
# Step 3: Initialize prompt builder
max_tokens = request.max_tokens or self._config.max_tokens
kwargs: dict = {"max_tokens": max_tokens}
if self.system_prompt is not None:
kwargs["system_prompt"] = self.system_prompt
prompt_builder = PromptBuilder(**kwargs)
prompt_builder.set_task(request.task)
prompt_builder.set_tool_definitions(tools)
prompt_builder.set_memory_context(memory_context)
# Step 4: Agent loop
iteration = 0
consecutive_failures = 0
used_tool_output = False
used_web_search = False
start_time = asyncio.get_event_loop().time()
while True:
# Termination checks
if iteration >= self._config.max_iterations:
return build_partial_result(
"Max iterations reached",
self.summary_prefix,
used_tool_output,
used_web_search,
)
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= self._config.timeout_seconds:
return build_partial_result(
"Timeout",
self.summary_prefix,
used_tool_output,
used_web_search,
)
if prompt_builder.needs_compaction():
compacted = await prompt_builder.compact(
self._gateway, session_ctx
)
if not compacted:
return build_partial_result(
"Context overflow after compaction",
self.summary_prefix,
used_tool_output,
used_web_search,
)
self._logger.info(
"Context compacted for agent %s (iteration %d)",
agent_id,
iteration,
)
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
return build_failed_result(
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
self.summary_prefix,
)
# Build prompt and call model
prompt = prompt_builder.build()
try:
response_text = await self._gateway.stream_inference(
session_ctx, prompt, max_tokens=max_tokens
)
except grpc.aio.AioRpcError as e:
return build_failed_result(
f"Model gateway error: {e.code()}", self.summary_prefix
)
# Parse model output
try:
parsed = parse_agent_response(response_text)
except ParseError as e:
prompt_builder.add_reasoning(
f"[Parse error: {e}. Please output valid JSON.]"
)
iteration += 1
continue
if parsed is None:
prompt_builder.add_reasoning(response_text)
iteration += 1
continue
if isinstance(parsed, DoneSignalParsed):
return build_success_result(
parsed, used_tool_output, used_web_search
)
if isinstance(parsed, ToolCallParsed):
if parsed.tool not in tool_names:
prompt_builder.add_tool_result(
parsed.tool,
f"Error: tool '{parsed.tool}' is not available. "
f"Available tools: {', '.join(sorted(tool_names))}",
False,
agent_id,
session_ctx.session_id,
)
iteration += 1
consecutive_failures += 1
continue
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
try:
output, success = await self._broker.execute_tool(
session_ctx,
self.agent_type,
parsed.tool,
parsed.parameters,
)
except grpc.aio.AioRpcError as e:
output = f"Tool execution error: {e.code()}"
success = False
prompt_builder.add_tool_result(
parsed.tool,
output,
success,
agent_id,
session_ctx.session_id,
)
if success:
if parsed.tool in self.tool_output_sources:
used_tool_output = True
if parsed.tool == "web_search":
used_web_search = True
consecutive_failures = 0
else:
consecutive_failures += 1
iteration += 1
return build_failed_result("Unexpected loop exit", self.summary_prefix) # pragma: no cover

View File

@@ -1,213 +1,24 @@
"""Coder agent — code generation, debugging, and review loop."""
"""Coder agent — code generation, debugging, and review."""
from __future__ import annotations
import asyncio
import logging
import uuid
import grpc
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from llm_multiverse.v1 import common_pb2
from .agent_utils import (
build_failed_result,
build_partial_result,
build_success_result,
format_memory,
)
from .base_agent import BaseAgent
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
from .config import AgentConfig
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
from .prompt import CODER_SYSTEM_PROMPT, PromptBuilder
logger = logging.getLogger("orchestrator.coder")
_SUMMARY_PREFIX = "Coding task"
_MAX_CONSECUTIVE_FAILURES = 3
# Tools whose successful use indicates RESULT_SOURCE_TOOL_OUTPUT.
_TOOL_OUTPUT_SOURCES = {"fs_read", "fs_write", "run_code"}
from .prompt import CODER_SYSTEM_PROMPT
class CoderAgent:
"""Executes the coder agent loop: infer -> tool call -> observe -> repeat."""
class CoderAgent(BaseAgent):
"""Coder agent: generates, debugs, and reviews code."""
def __init__(
self,
model_gateway: ModelGatewayClient,
tool_broker: ToolBrokerClient,
memory: MemoryClient,
config: AgentConfig,
) -> None:
self._gateway = model_gateway
self._broker = tool_broker
self._memory = memory
self._config = config
async def run(
self, request: orchestrator_pb2.SubagentRequest
) -> common_pb2.SubagentResult:
"""Execute the coder loop for a given task."""
session_ctx = request.context
agent_id = request.agent_id or f"cod-{uuid.uuid4().hex[:8]}"
# Step 1: Discover available tools
try:
tools = await self._broker.discover_tools(
session_ctx, common_pb2.AGENT_TYPE_CODER, request.task
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
if not tools:
return build_failed_result("No tools available for coder", _SUMMARY_PREFIX)
tool_names = {t.name for t in tools}
# Step 2: Memory context enrichment
memory_context = list(request.relevant_memory_context)
if not memory_context:
try:
mem_results = await self._memory.query_memory(
session_ctx, request.task, limit=3
)
memory_context = [format_memory(r) for r in mem_results]
except grpc.aio.AioRpcError:
logger.warning("Memory service unavailable, proceeding without context")
# Step 3: Initialize prompt builder with coder system prompt
max_tokens = request.max_tokens or self._config.max_tokens
prompt_builder = PromptBuilder(
max_tokens=max_tokens,
system_prompt=CODER_SYSTEM_PROMPT,
)
prompt_builder.set_task(request.task)
prompt_builder.set_tool_definitions(tools)
prompt_builder.set_memory_context(memory_context)
# Step 4: Agent loop
iteration = 0
consecutive_failures = 0
used_tool_output = False
used_web_search = False
start_time = asyncio.get_event_loop().time()
while True:
# Termination checks
if iteration >= self._config.max_iterations:
return build_partial_result(
"Max iterations reached", _SUMMARY_PREFIX, used_tool_output, used_web_search
)
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= self._config.timeout_seconds:
return build_partial_result(
"Timeout", _SUMMARY_PREFIX, used_tool_output, used_web_search
)
if prompt_builder.needs_compaction():
compacted = await prompt_builder.compact(
self._gateway, session_ctx
)
if not compacted:
return build_partial_result(
"Context overflow after compaction",
_SUMMARY_PREFIX,
used_tool_output,
used_web_search,
)
logger.info(
"Context compacted for agent %s (iteration %d)",
agent_id,
iteration,
)
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
return build_failed_result(
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
_SUMMARY_PREFIX,
)
# Build prompt and call model
prompt = prompt_builder.build()
try:
response_text = await self._gateway.stream_inference(
session_ctx, prompt, max_tokens=max_tokens
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
# Parse model output
try:
parsed = parse_agent_response(response_text)
except ParseError as e:
prompt_builder.add_reasoning(
f"[Parse error: {e}. Please output valid JSON.]"
)
iteration += 1
continue
if parsed is None:
# Plain reasoning text — append and continue
prompt_builder.add_reasoning(response_text)
iteration += 1
continue
if isinstance(parsed, DoneSignalParsed):
return build_success_result(
parsed, used_tool_output, used_web_search
)
if isinstance(parsed, ToolCallParsed):
# Validate tool name
if parsed.tool not in tool_names:
prompt_builder.add_tool_result(
parsed.tool,
f"Error: tool '{parsed.tool}' is not available. "
f"Available tools: {', '.join(sorted(tool_names))}",
False,
agent_id,
session_ctx.session_id,
)
iteration += 1
consecutive_failures += 1
continue
# Execute tool
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
try:
output, success = await self._broker.execute_tool(
session_ctx,
common_pb2.AGENT_TYPE_CODER,
parsed.tool,
parsed.parameters,
)
except grpc.aio.AioRpcError as e:
output = f"Tool execution error: {e.code()}"
success = False
prompt_builder.add_tool_result(
parsed.tool,
output,
success,
agent_id,
session_ctx.session_id,
)
if success:
if parsed.tool in _TOOL_OUTPUT_SOURCES:
used_tool_output = True
if parsed.tool == "web_search":
used_web_search = True
consecutive_failures = 0
else:
consecutive_failures += 1
iteration += 1
# Unreachable, but satisfies type checker
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
agent_type = common_pb2.AGENT_TYPE_CODER
id_prefix = "cod-"
summary_prefix = "Coding task"
system_prompt = CODER_SYSTEM_PROMPT
tool_output_sources = frozenset({"fs_read", "fs_write", "run_code"})
def create_coder_agent(
@@ -223,5 +34,3 @@ def create_coder_agent(
memory=MemoryClient(memory_channel),
config=config,
)

View File

@@ -1,198 +1,22 @@
"""Researcher agent — core tool-use loop."""
"""Researcher agent — web search and knowledge synthesis."""
from __future__ import annotations
import asyncio
import logging
import uuid
import grpc
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from llm_multiverse.v1 import common_pb2
from .agent_utils import (
build_failed_result,
build_partial_result,
build_success_result,
format_memory,
)
from .base_agent import BaseAgent
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
from .config import AgentConfig
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
from .prompt import PromptBuilder
logger = logging.getLogger("orchestrator.researcher")
_SUMMARY_PREFIX = "Research"
_MAX_CONSECUTIVE_FAILURES = 3
class ResearcherAgent:
"""Executes the researcher agent loop: infer → tool call → observe → repeat."""
class ResearcherAgent(BaseAgent):
"""Researcher agent: searches the web and synthesizes knowledge."""
def __init__(
self,
model_gateway: ModelGatewayClient,
tool_broker: ToolBrokerClient,
memory: MemoryClient,
config: AgentConfig,
) -> None:
self._gateway = model_gateway
self._broker = tool_broker
self._memory = memory
self._config = config
async def run(
self, request: orchestrator_pb2.SubagentRequest
) -> common_pb2.SubagentResult:
"""Execute the researcher loop for a given task."""
session_ctx = request.context
agent_id = request.agent_id or f"res-{uuid.uuid4().hex[:8]}"
# Step 1: Discover available tools
try:
tools = await self._broker.discover_tools(
session_ctx, common_pb2.AGENT_TYPE_RESEARCHER, request.task
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
if not tools:
return build_failed_result("No tools available for researcher", _SUMMARY_PREFIX)
tool_names = {t.name for t in tools}
# Step 2: Memory context enrichment
memory_context = list(request.relevant_memory_context)
if not memory_context:
try:
mem_results = await self._memory.query_memory(
session_ctx, request.task, limit=3
)
memory_context = [format_memory(r) for r in mem_results]
except grpc.aio.AioRpcError:
logger.warning("Memory service unavailable, proceeding without context")
# Step 3: Initialize prompt builder
max_tokens = request.max_tokens or self._config.max_tokens
prompt_builder = PromptBuilder(max_tokens=max_tokens)
prompt_builder.set_task(request.task)
prompt_builder.set_tool_definitions(tools)
prompt_builder.set_memory_context(memory_context)
# Step 4: Agent loop
iteration = 0
consecutive_failures = 0
used_web_search = False
start_time = asyncio.get_event_loop().time()
while True:
# Termination checks
if iteration >= self._config.max_iterations:
return build_partial_result(
"Max iterations reached", _SUMMARY_PREFIX, False, used_web_search
)
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= self._config.timeout_seconds:
return build_partial_result("Timeout", _SUMMARY_PREFIX, False, used_web_search)
if prompt_builder.needs_compaction():
compacted = await prompt_builder.compact(
self._gateway, session_ctx
)
if not compacted:
return build_partial_result(
"Context overflow after compaction", _SUMMARY_PREFIX, False, used_web_search
)
logger.info(
"Context compacted for agent %s (iteration %d)",
agent_id,
iteration,
)
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
return build_failed_result(
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
_SUMMARY_PREFIX,
)
# Build prompt and call model
prompt = prompt_builder.build()
try:
response_text = await self._gateway.stream_inference(
session_ctx, prompt, max_tokens=max_tokens
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
# Parse model output
try:
parsed = parse_agent_response(response_text)
except ParseError as e:
prompt_builder.add_reasoning(
f"[Parse error: {e}. Please output valid JSON.]"
)
iteration += 1
continue
if parsed is None:
# Plain reasoning text — append and continue
prompt_builder.add_reasoning(response_text)
iteration += 1
continue
if isinstance(parsed, DoneSignalParsed):
return build_success_result(parsed, False, used_web_search)
if isinstance(parsed, ToolCallParsed):
# Validate tool name
if parsed.tool not in tool_names:
prompt_builder.add_tool_result(
parsed.tool,
f"Error: tool '{parsed.tool}' is not available. "
f"Available tools: {', '.join(sorted(tool_names))}",
False,
agent_id,
session_ctx.session_id,
)
iteration += 1
consecutive_failures += 1
continue
# Execute tool
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
try:
output, success = await self._broker.execute_tool(
session_ctx,
common_pb2.AGENT_TYPE_RESEARCHER,
parsed.tool,
parsed.parameters,
)
except grpc.aio.AioRpcError as e:
output = f"Tool execution error: {e.code()}"
success = False
prompt_builder.add_tool_result(
parsed.tool,
output,
success,
agent_id,
session_ctx.session_id,
)
if parsed.tool == "web_search" and success:
used_web_search = True
if success:
consecutive_failures = 0
else:
consecutive_failures += 1
iteration += 1
# Unreachable, but satisfies type checker
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
agent_type = common_pb2.AGENT_TYPE_RESEARCHER
id_prefix = "res-"
summary_prefix = "Research"
system_prompt = None # uses PromptBuilder default
def create_researcher_agent(
@@ -208,5 +32,3 @@ def create_researcher_agent(
memory=MemoryClient(memory_channel),
config=config,
)

View File

@@ -1,213 +1,24 @@
"""Sysadmin agent — system configuration, troubleshooting, and deployment."""
"""Sysadmin agent — system administration and infrastructure tasks."""
from __future__ import annotations
import asyncio
import logging
import uuid
import grpc
from llm_multiverse.v1 import common_pb2, orchestrator_pb2
from llm_multiverse.v1 import common_pb2
from .agent_utils import (
build_failed_result,
build_partial_result,
build_success_result,
format_memory,
)
from .base_agent import BaseAgent
from .clients import MemoryClient, ModelGatewayClient, ToolBrokerClient
from .config import AgentConfig
from .parser import DoneSignalParsed, ParseError, ToolCallParsed, parse_agent_response
from .prompt import SYSADMIN_SYSTEM_PROMPT, PromptBuilder
logger = logging.getLogger("orchestrator.sysadmin")
_SUMMARY_PREFIX = "System administration"
_MAX_CONSECUTIVE_FAILURES = 3
# Tools whose successful use indicates RESULT_SOURCE_TOOL_OUTPUT.
_TOOL_OUTPUT_SOURCES = {"fs_read", "fs_write", "run_shell", "package_install"}
from .prompt import SYSADMIN_SYSTEM_PROMPT
class SysadminAgent:
"""Executes the sysadmin agent loop: infer -> tool call -> observe -> repeat."""
class SysadminAgent(BaseAgent):
"""Sysadmin agent: manages systems and infrastructure."""
def __init__(
self,
model_gateway: ModelGatewayClient,
tool_broker: ToolBrokerClient,
memory: MemoryClient,
config: AgentConfig,
) -> None:
self._gateway = model_gateway
self._broker = tool_broker
self._memory = memory
self._config = config
async def run(
self, request: orchestrator_pb2.SubagentRequest
) -> common_pb2.SubagentResult:
"""Execute the sysadmin loop for a given task."""
session_ctx = request.context
agent_id = request.agent_id or f"sys-{uuid.uuid4().hex[:8]}"
# Step 1: Discover available tools
try:
tools = await self._broker.discover_tools(
session_ctx, common_pb2.AGENT_TYPE_SYSADMIN, request.task
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Tool discovery failed: {e.code()}", _SUMMARY_PREFIX)
if not tools:
return build_failed_result("No tools available for sysadmin", _SUMMARY_PREFIX)
tool_names = {t.name for t in tools}
# Step 2: Memory context enrichment
memory_context = list(request.relevant_memory_context)
if not memory_context:
try:
mem_results = await self._memory.query_memory(
session_ctx, request.task, limit=3
)
memory_context = [format_memory(r) for r in mem_results]
except grpc.aio.AioRpcError:
logger.warning("Memory service unavailable, proceeding without context")
# Step 3: Initialize prompt builder with sysadmin system prompt
max_tokens = request.max_tokens or self._config.max_tokens
prompt_builder = PromptBuilder(
max_tokens=max_tokens,
system_prompt=SYSADMIN_SYSTEM_PROMPT,
)
prompt_builder.set_task(request.task)
prompt_builder.set_tool_definitions(tools)
prompt_builder.set_memory_context(memory_context)
# Step 4: Agent loop
iteration = 0
consecutive_failures = 0
used_tool_output = False
used_web_search = False
start_time = asyncio.get_event_loop().time()
while True:
# Termination checks
if iteration >= self._config.max_iterations:
return build_partial_result(
"Max iterations reached", _SUMMARY_PREFIX, used_tool_output, used_web_search
)
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed >= self._config.timeout_seconds:
return build_partial_result(
"Timeout", _SUMMARY_PREFIX, used_tool_output, used_web_search
)
if prompt_builder.needs_compaction():
compacted = await prompt_builder.compact(
self._gateway, session_ctx
)
if not compacted:
return build_partial_result(
"Context overflow after compaction",
_SUMMARY_PREFIX,
used_tool_output,
used_web_search,
)
logger.info(
"Context compacted for agent %s (iteration %d)",
agent_id,
iteration,
)
if consecutive_failures >= _MAX_CONSECUTIVE_FAILURES:
return build_failed_result(
f"All tools failed after {_MAX_CONSECUTIVE_FAILURES} consecutive failures",
_SUMMARY_PREFIX,
)
# Build prompt and call model
prompt = prompt_builder.build()
try:
response_text = await self._gateway.stream_inference(
session_ctx, prompt, max_tokens=max_tokens
)
except grpc.aio.AioRpcError as e:
return build_failed_result(f"Model gateway error: {e.code()}", _SUMMARY_PREFIX)
# Parse model output
try:
parsed = parse_agent_response(response_text)
except ParseError as e:
prompt_builder.add_reasoning(
f"[Parse error: {e}. Please output valid JSON.]"
)
iteration += 1
continue
if parsed is None:
# Plain reasoning text — append and continue
prompt_builder.add_reasoning(response_text)
iteration += 1
continue
if isinstance(parsed, DoneSignalParsed):
return build_success_result(
parsed, used_tool_output, used_web_search
)
if isinstance(parsed, ToolCallParsed):
# Validate tool name
if parsed.tool not in tool_names:
prompt_builder.add_tool_result(
parsed.tool,
f"Error: tool '{parsed.tool}' is not available. "
f"Available tools: {', '.join(sorted(tool_names))}",
False,
agent_id,
session_ctx.session_id,
)
iteration += 1
consecutive_failures += 1
continue
# Execute tool
prompt_builder.add_tool_call(parsed.tool, parsed.parameters)
try:
output, success = await self._broker.execute_tool(
session_ctx,
common_pb2.AGENT_TYPE_SYSADMIN,
parsed.tool,
parsed.parameters,
)
except grpc.aio.AioRpcError as e:
output = f"Tool execution error: {e.code()}"
success = False
prompt_builder.add_tool_result(
parsed.tool,
output,
success,
agent_id,
session_ctx.session_id,
)
if success:
if parsed.tool in _TOOL_OUTPUT_SOURCES:
used_tool_output = True
if parsed.tool == "web_search":
used_web_search = True
consecutive_failures = 0
else:
consecutive_failures += 1
iteration += 1
# Unreachable, but satisfies type checker
return build_failed_result("Unexpected loop exit", _SUMMARY_PREFIX) # pragma: no cover
agent_type = common_pb2.AGENT_TYPE_SYSADMIN
id_prefix = "sys-"
summary_prefix = "System administration"
system_prompt = SYSADMIN_SYSTEM_PROMPT
tool_output_sources = frozenset({"fs_read", "fs_write", "run_shell", "package_install"})
def create_sysadmin_agent(
@@ -223,5 +34,3 @@ def create_sysadmin_agent(
memory=MemoryClient(memory_channel),
config=config,
)