Files
llm-multiverse/services/orchestrator/tests/test_sysadmin.py
shahondin1624 abb61a4248 feat: structured artifact passthrough for agent results
Sub-agents now harvest concrete tool outputs (code written via fs_write,
command output from run_code, search results from web_search) during
execution and propagate them as structured Artifact protos through the
response pipeline to the frontend.

- Add ArtifactType enum and Artifact message to common.proto
- Add OutputCollector to capture tool outputs during agent loop
- Update agent_utils to build Artifact protos from collected outputs
- Wire collector into base_agent.py and all result builders
- Update service.py artifact type annotation
- Add mandatory tool-use instructions to coder system prompt
- Create /workspace/output in tool-broker container for code execution
- Install python3 in tool-broker container for run_code support
- Fix path allowlist for /tmp and /workspace exact paths
- Add SearXNG and Search Service to external network for web access
- Use browser User-Agent for page content extraction
- Increase agent and dispatcher timeouts from 120s to 300s
- Update README for timeout defaults and network configuration
- Add tests for OutputCollector and artifact building (21 new tests)
- Update 9 existing tests for proto schema change

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 23:13:12 +01:00

473 lines
16 KiB
Python

"""Tests for the sysadmin agent loop."""
from __future__ import annotations
from unittest.mock import AsyncMock
from llm_multiverse.v1 import common_pb2, memory_pb2, orchestrator_pb2, tool_broker_pb2
from orchestrator.agent_utils import format_memory
from orchestrator.sysadmin import SysadminAgent
from orchestrator.config import AgentConfig
def _make_request(
task: str = "Configure nginx reverse proxy",
memory_context: list[str] | None = None,
max_tokens: int = 4096,
) -> orchestrator_pb2.SubagentRequest:
ctx = common_pb2.SessionContext(session_id="sess-1", user_id="user-1")
req = orchestrator_pb2.SubagentRequest(
context=ctx,
agent_id="sys-test",
agent_type=common_pb2.AGENT_TYPE_SYSADMIN,
task=task,
max_tokens=max_tokens,
)
if memory_context:
req.relevant_memory_context.extend(memory_context)
return req
def _make_tool(name: str) -> tool_broker_pb2.ToolDefinition:
return tool_broker_pb2.ToolDefinition(
name=name,
description=f"{name} tool",
parameters={
"query": tool_broker_pb2.ParameterSchema(type="string", description="Query")
},
required_params=["query"],
)
def _make_agent(
gateway_responses: list[str] | None = None,
tools: list[tool_broker_pb2.ToolDefinition] | None = None,
exec_output: str = "command output",
exec_success: bool = True,
memory_results: list | None = None,
config: AgentConfig | None = None,
) -> SysadminAgent:
gateway = AsyncMock()
broker = AsyncMock()
memory = AsyncMock()
if gateway_responses is not None:
gateway.stream_inference = AsyncMock(side_effect=gateway_responses)
else:
gateway.stream_inference = AsyncMock(
return_value='{"done": true, "summary": "Done.", "confidence": "VERIFIED"}'
)
if tools is not None:
broker.discover_tools = AsyncMock(return_value=tools)
else:
broker.discover_tools = AsyncMock(
return_value=[
_make_tool("run_shell"),
_make_tool("package_install"),
_make_tool("fs_read"),
_make_tool("fs_write"),
]
)
broker.execute_tool = AsyncMock(return_value=(exec_output, exec_success))
if memory_results is not None:
memory.query_memory = AsyncMock(return_value=memory_results)
else:
memory.query_memory = AsyncMock(return_value=[])
return SysadminAgent(
model_gateway=gateway,
tool_broker=broker,
memory=memory,
config=config or AgentConfig(),
)
async def test_simple_sysadmin_task():
"""Model reads config, runs shell command, then signals done."""
agent = _make_agent(
gateway_responses=[
'{"tool": "fs_read", "parameters": {"file_path": "/etc/nginx/nginx.conf"}}',
'{"tool": "run_shell", "parameters": {"command": "nginx -t"}}',
'{"done": true, "summary": "Configured nginx.", "findings": ["Config valid"], "confidence": "VERIFIED"}',
],
exec_output="nginx: configuration file syntax is ok",
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
assert "nginx" in result.summary
finding_contents = [a.content for a in result.artifacts if a.label == "Finding"]
assert "Config valid" in finding_contents
assert result.result_quality == common_pb2.RESULT_QUALITY_VERIFIED
async def test_tool_discovery_uses_sysadmin_type():
"""Verify DiscoverTools uses AGENT_TYPE_SYSADMIN."""
agent = _make_agent()
await agent.run(_make_request())
agent._broker.discover_tools.assert_called_once()
args, kwargs = agent._broker.discover_tools.call_args
all_values = list(args) + list(kwargs.values())
assert common_pb2.AGENT_TYPE_SYSADMIN in all_values
async def test_no_tools_available():
"""Discover returns empty list — agent fails."""
agent = _make_agent(tools=[])
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_FAILED
assert "No tools" in result.failure_reason
async def test_tool_execution_uses_sysadmin_type():
"""Verify ExecuteTool uses AGENT_TYPE_SYSADMIN."""
agent = _make_agent(
gateway_responses=[
'{"tool": "run_shell", "parameters": {"command": "whoami"}}',
'{"done": true, "summary": "Done.", "confidence": "VERIFIED"}',
],
)
await agent.run(_make_request())
agent._broker.execute_tool.assert_called_once()
args, kwargs = agent._broker.execute_tool.call_args
all_values = list(args) + list(kwargs.values())
assert common_pb2.AGENT_TYPE_SYSADMIN in all_values
async def test_memory_context_from_request():
"""Pre-filled memory context skips QueryMemory call."""
agent = _make_agent()
request = _make_request(memory_context=["Pre-loaded memory"])
await agent.run(request)
agent._memory.query_memory.assert_not_called()
async def test_memory_query_enrichment():
"""No pre-filled memory triggers QueryMemory call."""
mem_result = memory_pb2.QueryMemoryResponse(
rank=0,
entry=memory_pb2.MemoryEntry(name="relevant-mem", description="Some info"),
cosine_similarity=0.9,
)
agent = _make_agent(memory_results=[mem_result])
await agent.run(_make_request())
agent._memory.query_memory.assert_called_once()
async def test_max_iterations_termination():
"""Always returning tool calls hits max iterations."""
agent = _make_agent(
gateway_responses=['{"tool": "run_shell", "parameters": {"command": "ls"}}']
* 15,
config=AgentConfig(max_iterations=3),
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_PARTIAL
assert "Max iterations" in result.summary
async def test_timeout_termination():
"""Timeout triggers partial result."""
agent = _make_agent(config=AgentConfig(timeout_seconds=0))
agent._gateway.stream_inference = AsyncMock(
return_value='{"tool": "run_shell", "parameters": {"command": "ls"}}'
)
result = await agent.run(_make_request())
assert result.status in (
common_pb2.RESULT_STATUS_PARTIAL,
common_pb2.RESULT_STATUS_SUCCESS,
)
async def test_consecutive_tool_failures():
"""Three consecutive tool failures -> FAILED."""
agent = _make_agent(
gateway_responses=['{"tool": "run_shell", "parameters": {"command": "ls"}}']
* 5,
exec_success=False,
exec_output="error",
config=AgentConfig(max_iterations=10),
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_FAILED
assert "consecutive failures" in result.failure_reason
async def test_model_gateway_error():
"""Gateway UNAVAILABLE -> FAILED."""
import grpc
agent = _make_agent()
agent._gateway.stream_inference = AsyncMock(
side_effect=grpc.aio.AioRpcError(
grpc.StatusCode.UNAVAILABLE,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details="down",
)
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_FAILED
assert "gateway" in result.failure_reason.lower()
async def test_confidence_verified_maps_to_success():
agent = _make_agent(
gateway_responses=[
'{"done": true, "summary": "Done.", "confidence": "VERIFIED"}'
]
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
assert result.result_quality == common_pb2.RESULT_QUALITY_VERIFIED
async def test_confidence_uncertain_maps_to_partial():
agent = _make_agent(
gateway_responses=[
'{"done": true, "summary": "Not sure.", "confidence": "UNCERTAIN"}'
]
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_PARTIAL
assert result.result_quality == common_pb2.RESULT_QUALITY_UNCERTAIN
async def test_run_shell_sets_source_tool_output():
"""Using run_shell sets source to RESULT_SOURCE_TOOL_OUTPUT."""
agent = _make_agent(
gateway_responses=[
'{"tool": "run_shell", "parameters": {"command": "systemctl status nginx"}}',
'{"done": true, "summary": "Checked status.", "confidence": "VERIFIED"}',
],
exec_output="active (running)",
)
result = await agent.run(_make_request())
assert result.source == common_pb2.RESULT_SOURCE_TOOL_OUTPUT
async def test_package_install_sets_source_tool_output():
"""Using package_install sets source to RESULT_SOURCE_TOOL_OUTPUT."""
agent = _make_agent(
gateway_responses=[
'{"tool": "package_install", "parameters": {"packages": "nginx"}}',
'{"done": true, "summary": "Installed nginx.", "confidence": "VERIFIED"}',
],
exec_output="installed nginx",
)
result = await agent.run(_make_request())
assert result.source == common_pb2.RESULT_SOURCE_TOOL_OUTPUT
async def test_fs_read_sets_source_tool_output():
"""Using fs_read sets source to RESULT_SOURCE_TOOL_OUTPUT."""
agent = _make_agent(
gateway_responses=[
'{"tool": "fs_read", "parameters": {"file_path": "/etc/hosts"}}',
'{"done": true, "summary": "Read file.", "confidence": "VERIFIED"}',
],
exec_output="127.0.0.1 localhost",
)
result = await agent.run(_make_request())
assert result.source == common_pb2.RESULT_SOURCE_TOOL_OUTPUT
async def test_no_tools_sets_source_model_knowledge():
"""No tool usage sets source to RESULT_SOURCE_MODEL_KNOWLEDGE."""
agent = _make_agent(
gateway_responses=[
'{"done": true, "summary": "I know this.", "confidence": "VERIFIED"}'
]
)
result = await agent.run(_make_request())
assert result.source == common_pb2.RESULT_SOURCE_MODEL_KNOWLEDGE
async def test_unknown_tool_reports_error():
"""Model calls a tool not in discovered set."""
agent = _make_agent(
gateway_responses=[
'{"tool": "web_search", "parameters": {"query": "nginx docs"}}',
'{"done": true, "summary": "Done.", "confidence": "UNCERTAIN"}',
],
)
result = await agent.run(_make_request())
assert result.status in (
common_pb2.RESULT_STATUS_PARTIAL,
common_pb2.RESULT_STATUS_SUCCESS,
)
agent._broker.execute_tool.assert_not_called()
async def test_tool_discovery_grpc_error():
"""Tool discovery failure -> FAILED result."""
import grpc
agent = _make_agent()
agent._broker.discover_tools = AsyncMock(
side_effect=grpc.aio.AioRpcError(
grpc.StatusCode.UNAVAILABLE,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details="down",
)
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_FAILED
assert "discovery" in result.failure_reason.lower()
async def test_memory_service_unavailable_continues():
"""Memory service error is handled gracefully."""
import grpc
agent = _make_agent(
gateway_responses=[
'{"done": true, "summary": "Done without memory.", "confidence": "VERIFIED"}'
]
)
agent._memory.query_memory = AsyncMock(
side_effect=grpc.aio.AioRpcError(
grpc.StatusCode.UNAVAILABLE,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details="down",
)
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
async def test_parse_error_handling():
"""ParseError from model output is handled."""
agent = _make_agent(
gateway_responses=[
'{"tool": "", "parameters": {}}',
'{"done": true, "summary": "Done after error.", "confidence": "VERIFIED"}',
]
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
assert "Done after error" in result.summary
async def test_plain_reasoning_continues():
"""Model produces plain text, then done signal."""
agent = _make_agent(
gateway_responses=[
"Let me check the system configuration...",
'{"done": true, "summary": "Analyzed.", "confidence": "INFERRED"}',
]
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
assert result.result_quality == common_pb2.RESULT_QUALITY_INFERRED
async def test_tool_execution_grpc_error():
"""Tool execution gRPC error is handled gracefully."""
import grpc
agent = _make_agent(
gateway_responses=['{"tool": "run_shell", "parameters": {"command": "ls"}}']
* 5,
config=AgentConfig(max_iterations=10),
)
agent._broker.execute_tool = AsyncMock(
side_effect=grpc.aio.AioRpcError(
grpc.StatusCode.INTERNAL,
initial_metadata=grpc.aio.Metadata(),
trailing_metadata=grpc.aio.Metadata(),
details="exec failed",
)
)
result = await agent.run(_make_request())
assert result.status == common_pb2.RESULT_STATUS_FAILED
assert "consecutive failures" in result.failure_reason
def test_format_memory_with_description():
entry = memory_pb2.MemoryEntry(name="test-mem", description="A description")
response = memory_pb2.QueryMemoryResponse(
rank=0, entry=entry, cosine_similarity=0.9
)
result = format_memory(response)
assert "[Memory: test-mem]" in result
assert "A description" in result
def test_format_memory_with_cached_segment():
entry = memory_pb2.MemoryEntry(
name="test-mem", description="desc", corpus="full corpus"
)
response = memory_pb2.QueryMemoryResponse(
rank=0,
entry=entry,
cosine_similarity=0.9,
cached_extracted_segment="extracted segment",
)
result = format_memory(response)
assert "extracted segment" in result
assert "full corpus" not in result
async def test_agent_id_prefix():
"""SysadminAgent generates agent IDs with 'sys-' prefix."""
agent = _make_agent()
request = _make_request()
request.agent_id = "" # Clear to trigger auto-generation
await agent.run(request)
async def test_compaction_triggers_and_continues():
"""Compaction triggers mid-loop and allows the agent to continue."""
agent = _make_agent(
gateway_responses=[
'{"tool": "run_shell", "parameters": {"command": "ls /etc"}}',
'{"tool": "run_shell", "parameters": {"command": "cat /etc/hosts"}}',
'{"tool": "run_shell", "parameters": {"command": "df -h"}}',
'{"tool": "run_shell", "parameters": {"command": "free -m"}}',
'{"done": true, "summary": "Done after compaction.", "confidence": "VERIFIED"}',
],
exec_output="A" * 600,
config=AgentConfig(max_iterations=20, max_tokens=1800),
)
agent._gateway.inference = AsyncMock(return_value="- Short summary")
result = await agent.run(_make_request(max_tokens=1800))
assert result.status == common_pb2.RESULT_STATUS_SUCCESS
assert "Done after compaction" in result.summary
async def test_compaction_failure_gives_partial():
"""If compaction can't free enough space, loop terminates with PARTIAL."""
agent = _make_agent(
gateway_responses=['{"tool": "run_shell", "parameters": {"command": "x"}}']
* 10,
exec_output="B" * 500,
config=AgentConfig(max_iterations=20, max_tokens=200),
)
agent._gateway.inference = AsyncMock(return_value="still big summary " * 50)
result = await agent.run(_make_request(max_tokens=200))
assert result.status == common_pb2.RESULT_STATUS_PARTIAL
async def test_partial_result_message():
"""Partial result uses sysadmin-specific message."""
agent = _make_agent(
gateway_responses=['{"tool": "run_shell", "parameters": {"command": "ls"}}']
* 5,
config=AgentConfig(max_iterations=2),
)
result = await agent.run(_make_request())
assert "System administration incomplete" in result.summary
async def test_failed_result_message():
"""Failed result uses sysadmin-specific message."""
agent = _make_agent(tools=[])
result = await agent.run(_make_request())
assert "System administration failed" in result.summary