Compare commits

...

2 Commits

Author SHA1 Message Date
400f5601d4 Merge pull request 'feat: add audit logging to Tool Broker service (#191)' (#211) from feature/issue-191-tool-broker-audit into main 2026-03-11 11:10:55 +01:00
Pi Agent
35b159fe5c feat: add audit logging to Tool Broker service (issue #191)
Log broker allow/deny decisions (AUDIT_ACTION_BROKER_DECISION) and tool
invocation results (AUDIT_ACTION_TOOL_INVOCATION) to the Audit Service.
Follows same pattern as Model Gateway. Fixes architecture drift where
Tool Broker was the only service without audit logging.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 11:10:37 +01:00
4 changed files with 157 additions and 5 deletions

View File

@@ -103,6 +103,7 @@
| #97 | Convert docker-compose.yml to Swarm stack | Phase 12 | `COMPLETED` | Docker / YAML | [issue-097.md](issue-097.md) |
| #98 | Validate zero service code changes | Phase 12 | `COMPLETED` | Shell | [issue-098.md](issue-098.md) |
| #99 | Document multi-machine deployment guide | Phase 12 | `COMPLETED` | Markdown | [issue-099.md](issue-099.md) |
| #191 | Refactor: add audit logging to Tool Broker service | — | `COMPLETED` | Rust | [issue-191.md](issue-191.md) |
| #202 | Bug: Docker network missing internal:true and edge separation | — | `COMPLETED` | Docker / YAML | [issue-202.md](issue-202.md) |
## Status Legend

View File

@@ -0,0 +1,48 @@
# Issue #191: Refactor: add audit logging to Tool Broker service
## Metadata
| Field | Value |
|---|---|
| Issue | #191 |
| Title | Refactor: add audit logging to Tool Broker service |
| Milestone | — |
| Status | `COMPLETED` |
| Language | Rust |
| Related Plans | issue-064.md, issue-065.md |
## Problem
The Tool Broker was the only service missing audit logging, violating the architecture requirement that every tool invocation and broker decision be audit-logged.
## Implementation
### Changes to `services/tool-broker/src/service.rs`
1. Added `audit_client: Option<Arc<Mutex<AuditServiceClient<Channel>>>>` field
2. Added `with_audit_client()` builder method
3. Added `audit_log()` helper method (same pattern as Model Gateway)
4. Audit calls at 3 points:
- **ExecuteTool deny**: `AUDIT_ACTION_BROKER_DECISION (2)` with layer and reason metadata
- **ExecuteTool allow**: `AUDIT_ACTION_BROKER_DECISION (2)`
- **ExecuteTool result**: `AUDIT_ACTION_TOOL_INVOCATION (1)` with success/failure
- **ValidateCall**: `AUDIT_ACTION_BROKER_DECISION (2)` with `dry_run=true` metadata
### Changes to `services/tool-broker/src/main.rs`
- Connect to Audit Service if `audit_addr` is configured (graceful fallback)
## Files Modified
| File | Action | Purpose |
|---|---|---|
| `services/tool-broker/src/service.rs` | Modify | Add audit client and logging calls |
| `services/tool-broker/src/main.rs` | Modify | Connect audit client on startup |
| `implementation-plans/issue-191.md` | Create | Plan |
| `implementation-plans/_index.md` | Modify | Index entry |
## Deviation Log
| Deviation | Reason |
|---|---|
| None | — |

View File

@@ -1,6 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use llm_multiverse_proto::llm_multiverse::v1::audit_service_client::AuditServiceClient;
use llm_multiverse_proto::llm_multiverse::v1::tool_broker_service_server::ToolBrokerServiceServer;
use tokio::sync::Mutex;
use tool_broker::config::Config;
@@ -63,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
let firewall_config = FirewallConfig::default();
let service = ToolBrokerServiceImpl::new(
let mut service = ToolBrokerServiceImpl::new(
manifest_store,
dispatcher,
loop_detector,
@@ -71,6 +72,19 @@ async fn main() -> anyhow::Result<()> {
firewall_config,
);
// Connect to Audit Service if configured.
if let Some(ref audit_addr) = config.audit_addr {
match AuditServiceClient::connect(audit_addr.clone()).await {
Ok(client) => {
tracing::info!(%audit_addr, "Connected to Audit Service");
service = service.with_audit_client(client);
}
Err(e) => {
tracing::warn!(%audit_addr, error = %e, "Failed to connect to Audit Service; audit logging disabled");
}
}
}
let addr = config.listen_addr().parse()?;
tracing::info!(%addr, "Tool Broker listening");

View File

@@ -1,12 +1,15 @@
use std::collections::HashMap;
use std::sync::Arc;
use llm_multiverse_proto::llm_multiverse::v1::{
tool_broker_service_server::ToolBrokerService, DiscoverToolsRequest, DiscoverToolsResponse,
ExecuteToolRequest, ExecuteToolResponse, ExecutionStatus, SessionContext,
ValidateCallRequest, ValidateCallResponse,
audit_service_client::AuditServiceClient, tool_broker_service_server::ToolBrokerService,
AppendRequest, AuditEntry, DiscoverToolsRequest, DiscoverToolsResponse, ExecuteToolRequest,
ExecuteToolResponse, ExecutionStatus, SessionContext, ValidateCallRequest,
ValidateCallResponse,
};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tonic::{Request, Response, Status};
use tracing::{info, warn};
@@ -29,6 +32,7 @@ pub struct ToolBrokerServiceImpl {
loop_detector: Arc<Mutex<LoopDetector>>,
credential_injector: Option<Arc<CredentialInjector>>,
firewall_config: FirewallConfig,
audit_client: Option<Arc<std::sync::Mutex<AuditServiceClient<Channel>>>>,
}
impl ToolBrokerServiceImpl {
@@ -45,9 +49,15 @@ impl ToolBrokerServiceImpl {
loop_detector,
credential_injector,
firewall_config,
audit_client: None,
}
}
pub fn with_audit_client(mut self, client: AuditServiceClient<Channel>) -> Self {
self.audit_client = Some(Arc::new(std::sync::Mutex::new(client)));
self
}
/// Run the 5-layer enforcement pipeline.
fn enforce(
&self,
@@ -119,6 +129,60 @@ impl ToolBrokerServiceImpl {
EnforcementResult::allow()
}
async fn audit_log(
&self,
ctx: Option<&SessionContext>,
action: i32,
tool_name: &str,
result_status: &str,
metadata: HashMap<String, String>,
) {
let audit = match &self.audit_client {
Some(c) => c,
None => return,
};
let (session_id, agent_id) = match ctx {
Some(c) => {
let aid = c
.agent_lineage
.as_ref()
.and_then(|l| l.agents.last())
.map(|a| a.agent_id.clone())
.unwrap_or_else(|| c.user_id.clone());
(c.session_id.clone(), aid)
}
None => ("unknown".into(), "unknown".into()),
};
let entry = AuditEntry {
session_id,
agent_id,
action,
tool_name: tool_name.into(),
result_status: result_status.into(),
metadata,
..Default::default()
};
let request = AppendRequest {
entry: Some(entry),
..Default::default()
};
let mut client = match audit.lock() {
Ok(c) => c.clone(),
Err(_) => {
warn!("Audit client lock poisoned, skipping audit log");
return;
}
};
if let Err(e) = client.append(request).await {
warn!(error = %e, "Failed to log to audit service");
}
}
}
#[tonic::async_trait]
@@ -170,6 +234,14 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
"Tool call denied by enforcement"
);
// Audit: broker deny decision
let mut meta = HashMap::new();
meta.insert("layer".into(), format!("{:?}", enforcement.layer));
if let Some(ref reason) = enforcement.denial_reason {
meta.insert("reason".into(), reason.clone());
}
self.audit_log(context, 2, &req.tool_name, "denied", meta).await;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let _ = tx
.send(Ok(ExecuteToolResponse {
@@ -181,6 +253,9 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
return Ok(Response::new(ReceiverStream::new(rx)));
}
// Audit: broker allow decision
self.audit_log(context, 2, &req.tool_name, "allowed", HashMap::new()).await;
// Step 2: Loop/thrash detection.
let detection = {
let mut detector = self.loop_detector.lock().await;
@@ -268,6 +343,10 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
let full_output = format!("{}\n{}", header, output_content);
// Audit: tool invocation result
let exec_status = if tagged.output.success { "success" } else { "failure" };
self.audit_log(context, 1, &req.tool_name, exec_status, HashMap::new()).await;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let _ = tx
.send(Ok(ExecuteToolResponse {
@@ -297,8 +376,9 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
);
// Build an ExecuteToolRequest to reuse the enforce() pipeline.
let ctx = req.context.as_ref();
let exec_req = ExecuteToolRequest {
context: req.context,
context: req.context.clone(),
agent_type: req.agent_type,
tool_name: req.tool_name,
parameters: req.parameters,
@@ -306,6 +386,15 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
let result = self.enforce(&exec_req);
// Audit: broker decision (dry-run)
let decision = if result.allowed { "allowed" } else { "denied" };
let mut meta = HashMap::new();
meta.insert("dry_run".into(), "true".into());
if let Some(ref reason) = result.denial_reason {
meta.insert("reason".into(), reason.clone());
}
self.audit_log(ctx, 2, &exec_req.tool_name, decision, meta).await;
Ok(Response::new(ValidateCallResponse {
is_allowed: result.allowed,
denial_reason: result.denial_reason,