Compare commits
2 Commits
59e1ca6305
...
400f5601d4
| Author | SHA1 | Date | |
|---|---|---|---|
| 400f5601d4 | |||
|
|
35b159fe5c |
@@ -103,6 +103,7 @@
|
||||
| #97 | Convert docker-compose.yml to Swarm stack | Phase 12 | `COMPLETED` | Docker / YAML | [issue-097.md](issue-097.md) |
|
||||
| #98 | Validate zero service code changes | Phase 12 | `COMPLETED` | Shell | [issue-098.md](issue-098.md) |
|
||||
| #99 | Document multi-machine deployment guide | Phase 12 | `COMPLETED` | Markdown | [issue-099.md](issue-099.md) |
|
||||
| #191 | Refactor: add audit logging to Tool Broker service | — | `COMPLETED` | Rust | [issue-191.md](issue-191.md) |
|
||||
| #202 | Bug: Docker network missing internal:true and edge separation | — | `COMPLETED` | Docker / YAML | [issue-202.md](issue-202.md) |
|
||||
|
||||
## Status Legend
|
||||
|
||||
48
implementation-plans/issue-191.md
Normal file
48
implementation-plans/issue-191.md
Normal file
@@ -0,0 +1,48 @@
|
||||
# Issue #191: Refactor: add audit logging to Tool Broker service
|
||||
|
||||
## Metadata
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Issue | #191 |
|
||||
| Title | Refactor: add audit logging to Tool Broker service |
|
||||
| Milestone | — |
|
||||
| Status | `COMPLETED` |
|
||||
| Language | Rust |
|
||||
| Related Plans | issue-064.md, issue-065.md |
|
||||
|
||||
## Problem
|
||||
|
||||
The Tool Broker was the only service missing audit logging, violating the architecture requirement that every tool invocation and broker decision be audit-logged.
|
||||
|
||||
## Implementation
|
||||
|
||||
### Changes to `services/tool-broker/src/service.rs`
|
||||
|
||||
1. Added `audit_client: Option<Arc<Mutex<AuditServiceClient<Channel>>>>` field
|
||||
2. Added `with_audit_client()` builder method
|
||||
3. Added `audit_log()` helper method (same pattern as Model Gateway)
|
||||
4. Audit calls at 3 points:
|
||||
- **ExecuteTool deny**: `AUDIT_ACTION_BROKER_DECISION (2)` with layer and reason metadata
|
||||
- **ExecuteTool allow**: `AUDIT_ACTION_BROKER_DECISION (2)`
|
||||
- **ExecuteTool result**: `AUDIT_ACTION_TOOL_INVOCATION (1)` with success/failure
|
||||
- **ValidateCall**: `AUDIT_ACTION_BROKER_DECISION (2)` with `dry_run=true` metadata
|
||||
|
||||
### Changes to `services/tool-broker/src/main.rs`
|
||||
|
||||
- Connect to Audit Service if `audit_addr` is configured (graceful fallback)
|
||||
|
||||
## Files Modified
|
||||
|
||||
| File | Action | Purpose |
|
||||
|---|---|---|
|
||||
| `services/tool-broker/src/service.rs` | Modify | Add audit client and logging calls |
|
||||
| `services/tool-broker/src/main.rs` | Modify | Connect audit client on startup |
|
||||
| `implementation-plans/issue-191.md` | Create | Plan |
|
||||
| `implementation-plans/_index.md` | Modify | Index entry |
|
||||
|
||||
## Deviation Log
|
||||
|
||||
| Deviation | Reason |
|
||||
|---|---|
|
||||
| None | — |
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use llm_multiverse_proto::llm_multiverse::v1::audit_service_client::AuditServiceClient;
|
||||
use llm_multiverse_proto::llm_multiverse::v1::tool_broker_service_server::ToolBrokerServiceServer;
|
||||
use tokio::sync::Mutex;
|
||||
use tool_broker::config::Config;
|
||||
@@ -63,7 +64,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
let firewall_config = FirewallConfig::default();
|
||||
|
||||
let service = ToolBrokerServiceImpl::new(
|
||||
let mut service = ToolBrokerServiceImpl::new(
|
||||
manifest_store,
|
||||
dispatcher,
|
||||
loop_detector,
|
||||
@@ -71,6 +72,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
firewall_config,
|
||||
);
|
||||
|
||||
// Connect to Audit Service if configured.
|
||||
if let Some(ref audit_addr) = config.audit_addr {
|
||||
match AuditServiceClient::connect(audit_addr.clone()).await {
|
||||
Ok(client) => {
|
||||
tracing::info!(%audit_addr, "Connected to Audit Service");
|
||||
service = service.with_audit_client(client);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(%audit_addr, error = %e, "Failed to connect to Audit Service; audit logging disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let addr = config.listen_addr().parse()?;
|
||||
tracing::info!(%addr, "Tool Broker listening");
|
||||
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use llm_multiverse_proto::llm_multiverse::v1::{
|
||||
tool_broker_service_server::ToolBrokerService, DiscoverToolsRequest, DiscoverToolsResponse,
|
||||
ExecuteToolRequest, ExecuteToolResponse, ExecutionStatus, SessionContext,
|
||||
ValidateCallRequest, ValidateCallResponse,
|
||||
audit_service_client::AuditServiceClient, tool_broker_service_server::ToolBrokerService,
|
||||
AppendRequest, AuditEntry, DiscoverToolsRequest, DiscoverToolsResponse, ExecuteToolRequest,
|
||||
ExecuteToolResponse, ExecutionStatus, SessionContext, ValidateCallRequest,
|
||||
ValidateCallResponse,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::transport::Channel;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tracing::{info, warn};
|
||||
|
||||
@@ -29,6 +32,7 @@ pub struct ToolBrokerServiceImpl {
|
||||
loop_detector: Arc<Mutex<LoopDetector>>,
|
||||
credential_injector: Option<Arc<CredentialInjector>>,
|
||||
firewall_config: FirewallConfig,
|
||||
audit_client: Option<Arc<std::sync::Mutex<AuditServiceClient<Channel>>>>,
|
||||
}
|
||||
|
||||
impl ToolBrokerServiceImpl {
|
||||
@@ -45,9 +49,15 @@ impl ToolBrokerServiceImpl {
|
||||
loop_detector,
|
||||
credential_injector,
|
||||
firewall_config,
|
||||
audit_client: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_audit_client(mut self, client: AuditServiceClient<Channel>) -> Self {
|
||||
self.audit_client = Some(Arc::new(std::sync::Mutex::new(client)));
|
||||
self
|
||||
}
|
||||
|
||||
/// Run the 5-layer enforcement pipeline.
|
||||
fn enforce(
|
||||
&self,
|
||||
@@ -119,6 +129,60 @@ impl ToolBrokerServiceImpl {
|
||||
|
||||
EnforcementResult::allow()
|
||||
}
|
||||
|
||||
async fn audit_log(
|
||||
&self,
|
||||
ctx: Option<&SessionContext>,
|
||||
action: i32,
|
||||
tool_name: &str,
|
||||
result_status: &str,
|
||||
metadata: HashMap<String, String>,
|
||||
) {
|
||||
let audit = match &self.audit_client {
|
||||
Some(c) => c,
|
||||
None => return,
|
||||
};
|
||||
|
||||
let (session_id, agent_id) = match ctx {
|
||||
Some(c) => {
|
||||
let aid = c
|
||||
.agent_lineage
|
||||
.as_ref()
|
||||
.and_then(|l| l.agents.last())
|
||||
.map(|a| a.agent_id.clone())
|
||||
.unwrap_or_else(|| c.user_id.clone());
|
||||
(c.session_id.clone(), aid)
|
||||
}
|
||||
None => ("unknown".into(), "unknown".into()),
|
||||
};
|
||||
|
||||
let entry = AuditEntry {
|
||||
session_id,
|
||||
agent_id,
|
||||
action,
|
||||
tool_name: tool_name.into(),
|
||||
result_status: result_status.into(),
|
||||
metadata,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let request = AppendRequest {
|
||||
entry: Some(entry),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut client = match audit.lock() {
|
||||
Ok(c) => c.clone(),
|
||||
Err(_) => {
|
||||
warn!("Audit client lock poisoned, skipping audit log");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = client.append(request).await {
|
||||
warn!(error = %e, "Failed to log to audit service");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
@@ -170,6 +234,14 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
|
||||
"Tool call denied by enforcement"
|
||||
);
|
||||
|
||||
// Audit: broker deny decision
|
||||
let mut meta = HashMap::new();
|
||||
meta.insert("layer".into(), format!("{:?}", enforcement.layer));
|
||||
if let Some(ref reason) = enforcement.denial_reason {
|
||||
meta.insert("reason".into(), reason.clone());
|
||||
}
|
||||
self.audit_log(context, 2, &req.tool_name, "denied", meta).await;
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let _ = tx
|
||||
.send(Ok(ExecuteToolResponse {
|
||||
@@ -181,6 +253,9 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
|
||||
return Ok(Response::new(ReceiverStream::new(rx)));
|
||||
}
|
||||
|
||||
// Audit: broker allow decision
|
||||
self.audit_log(context, 2, &req.tool_name, "allowed", HashMap::new()).await;
|
||||
|
||||
// Step 2: Loop/thrash detection.
|
||||
let detection = {
|
||||
let mut detector = self.loop_detector.lock().await;
|
||||
@@ -268,6 +343,10 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
|
||||
|
||||
let full_output = format!("{}\n{}", header, output_content);
|
||||
|
||||
// Audit: tool invocation result
|
||||
let exec_status = if tagged.output.success { "success" } else { "failure" };
|
||||
self.audit_log(context, 1, &req.tool_name, exec_status, HashMap::new()).await;
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let _ = tx
|
||||
.send(Ok(ExecuteToolResponse {
|
||||
@@ -297,8 +376,9 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
|
||||
);
|
||||
|
||||
// Build an ExecuteToolRequest to reuse the enforce() pipeline.
|
||||
let ctx = req.context.as_ref();
|
||||
let exec_req = ExecuteToolRequest {
|
||||
context: req.context,
|
||||
context: req.context.clone(),
|
||||
agent_type: req.agent_type,
|
||||
tool_name: req.tool_name,
|
||||
parameters: req.parameters,
|
||||
@@ -306,6 +386,15 @@ impl ToolBrokerService for ToolBrokerServiceImpl {
|
||||
|
||||
let result = self.enforce(&exec_req);
|
||||
|
||||
// Audit: broker decision (dry-run)
|
||||
let decision = if result.allowed { "allowed" } else { "denied" };
|
||||
let mut meta = HashMap::new();
|
||||
meta.insert("dry_run".into(), "true".into());
|
||||
if let Some(ref reason) = result.denial_reason {
|
||||
meta.insert("reason".into(), reason.clone());
|
||||
}
|
||||
self.audit_log(ctx, 2, &exec_req.tool_name, decision, meta).await;
|
||||
|
||||
Ok(Response::new(ValidateCallResponse {
|
||||
is_allowed: result.allowed,
|
||||
denial_reason: result.denial_reason,
|
||||
|
||||
Reference in New Issue
Block a user