feat: implement append-only file log backend (#19)
- Add AuditLogWriter with SHA-256 hash chain for tamper evidence - Add fsync after every write for durability guarantee - Add file rotation by size with configurable max files - Add hash chain recovery on service restart - Switch to serde_json for proper JSON serialization - 16 tests pass, clippy clean Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -16,6 +16,7 @@
|
|||||||
| #16 | Generate Rust stubs (prost/tonic) | Phase 1 | `COMPLETED` | Rust | [issue-016.md](issue-016.md) |
|
| #16 | Generate Rust stubs (prost/tonic) | Phase 1 | `COMPLETED` | Rust | [issue-016.md](issue-016.md) |
|
||||||
| #17 | Generate Python stubs (grpcio-tools) | Phase 1 | `COMPLETED` | Python | [issue-017.md](issue-017.md) |
|
| #17 | Generate Python stubs (grpcio-tools) | Phase 1 | `COMPLETED` | Python | [issue-017.md](issue-017.md) |
|
||||||
| #18 | Scaffold Audit Service Rust project | Phase 2 | `COMPLETED` | Rust | [issue-018.md](issue-018.md) |
|
| #18 | Scaffold Audit Service Rust project | Phase 2 | `COMPLETED` | Rust | [issue-018.md](issue-018.md) |
|
||||||
|
| #19 | Implement append-only file log backend | Phase 2 | `COMPLETED` | Rust | [issue-019.md](issue-019.md) |
|
||||||
|
|
||||||
## Status Legend
|
## Status Legend
|
||||||
|
|
||||||
@@ -30,6 +31,7 @@
|
|||||||
### Audit Service
|
### Audit Service
|
||||||
- [issue-009.md](issue-009.md) — audit.proto (AuditService, AuditEntry)
|
- [issue-009.md](issue-009.md) — audit.proto (AuditService, AuditEntry)
|
||||||
- [issue-018.md](issue-018.md) — Scaffold Audit Service Rust project
|
- [issue-018.md](issue-018.md) — Scaffold Audit Service Rust project
|
||||||
|
- [issue-019.md](issue-019.md) — Append-only file log backend
|
||||||
|
|
||||||
### Secrets Service
|
### Secrets Service
|
||||||
- [issue-010.md](issue-010.md) — secrets.proto (SecretsService, GetSecret)
|
- [issue-010.md](issue-010.md) — secrets.proto (SecretsService, GetSecret)
|
||||||
|
|||||||
45
implementation-plans/issue-019.md
Normal file
45
implementation-plans/issue-019.md
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
# Implementation Plan — Issue #19: Implement append-only file log backend
|
||||||
|
|
||||||
|
## Metadata
|
||||||
|
|
||||||
|
| Field | Value |
|
||||||
|
|---|---|
|
||||||
|
| Issue | [#19](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/19) |
|
||||||
|
| Title | Implement append-only file log backend |
|
||||||
|
| Milestone | Phase 2: Audit Service |
|
||||||
|
| Labels | `type:feature`, `priority:critical`, `lang:rust`, `service:audit` |
|
||||||
|
| Status | `COMPLETED` |
|
||||||
|
| Language | Rust |
|
||||||
|
| Related Plans | [issue-018.md](issue-018.md) |
|
||||||
|
| Blocked by | #18 (completed) |
|
||||||
|
|
||||||
|
## Acceptance Criteria
|
||||||
|
|
||||||
|
- [x] Append-only file writer with JSON lines format
|
||||||
|
- [x] Each entry includes timestamp, actor, action, resource, session context
|
||||||
|
- [x] fsync after each write for durability
|
||||||
|
- [x] File rotation support (by size)
|
||||||
|
- [x] Tamper-evident: SHA-256 hash chain (_prev_hash, _hash fields)
|
||||||
|
|
||||||
|
## Architecture Analysis
|
||||||
|
|
||||||
|
Extracted `AuditLogWriter` from service.rs into dedicated module. Key features:
|
||||||
|
- **Hash chain:** Each entry includes SHA-256(_prev_hash + entry_json) for tamper evidence
|
||||||
|
- **fsync:** `sync_data()` called after every write for durability guarantee
|
||||||
|
- **Rotation:** When file exceeds `max_file_size`, rotates .log → .log.1 → .log.2 etc.
|
||||||
|
- **Recovery:** On restart, recovers prev_hash from last line of existing log
|
||||||
|
- **Serialization:** Uses serde_json for proper JSON output with special character handling
|
||||||
|
|
||||||
|
## Files to Create/Modify
|
||||||
|
|
||||||
|
| File | Action | Purpose |
|
||||||
|
|---|---|---|
|
||||||
|
| `services/audit/src/log_writer.rs` | Create | AuditLogWriter with rotation and hash chain |
|
||||||
|
| `services/audit/src/service.rs` | Modify | Use AuditLogWriter, serde_json serialization |
|
||||||
|
| `services/audit/src/main.rs` | Modify | Wire up log writer with config |
|
||||||
|
| `services/audit/src/config.rs` | Modify | Add rotation config fields |
|
||||||
|
| `services/audit/Cargo.toml` | Modify | Add sha2, serde_json dependencies |
|
||||||
|
|
||||||
|
## Deviation Log
|
||||||
|
|
||||||
|
_(No deviations)_
|
||||||
@@ -14,9 +14,11 @@ tokio = { version = "1", features = ["full"] }
|
|||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
thiserror = "2"
|
thiserror = "2"
|
||||||
|
sha2 = "0.10"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
|||||||
@@ -9,6 +9,12 @@ pub struct Config {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
#[serde(default = "default_log_path")]
|
#[serde(default = "default_log_path")]
|
||||||
pub log_path: PathBuf,
|
pub log_path: PathBuf,
|
||||||
|
/// Max log file size in bytes before rotation (default: 50MB).
|
||||||
|
#[serde(default = "default_max_file_size")]
|
||||||
|
pub max_file_size: u64,
|
||||||
|
/// Max number of rotated log files to keep (default: 10).
|
||||||
|
#[serde(default = "default_max_rotated_files")]
|
||||||
|
pub max_rotated_files: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_host() -> String {
|
fn default_host() -> String {
|
||||||
@@ -23,12 +29,22 @@ fn default_log_path() -> PathBuf {
|
|||||||
PathBuf::from("/tmp/llm-multiverse-audit.log")
|
PathBuf::from("/tmp/llm-multiverse-audit.log")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_max_file_size() -> u64 {
|
||||||
|
50 * 1024 * 1024 // 50 MB
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_max_rotated_files() -> u32 {
|
||||||
|
10
|
||||||
|
}
|
||||||
|
|
||||||
impl Default for Config {
|
impl Default for Config {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self {
|
||||||
host: default_host(),
|
host: default_host(),
|
||||||
port: default_port(),
|
port: default_port(),
|
||||||
log_path: default_log_path(),
|
log_path: default_log_path(),
|
||||||
|
max_file_size: default_max_file_size(),
|
||||||
|
max_rotated_files: default_max_rotated_files(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
352
services/audit/src/log_writer.rs
Normal file
352
services/audit/src/log_writer.rs
Normal file
@@ -0,0 +1,352 @@
|
|||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
/// Append-only log writer with rotation and hash chain for tamper evidence.
|
||||||
|
pub struct AuditLogWriter {
|
||||||
|
log_path: PathBuf,
|
||||||
|
file: tokio::fs::File,
|
||||||
|
current_size: u64,
|
||||||
|
max_file_size: u64,
|
||||||
|
max_rotated_files: u32,
|
||||||
|
/// SHA-256 hash of the previous entry for tamper-evidence chain.
|
||||||
|
prev_hash: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AuditLogWriter {
|
||||||
|
/// Open or create the audit log file.
|
||||||
|
pub async fn new(
|
||||||
|
log_path: PathBuf,
|
||||||
|
max_file_size: u64,
|
||||||
|
max_rotated_files: u32,
|
||||||
|
) -> anyhow::Result<Self> {
|
||||||
|
let file = tokio::fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&log_path)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Restrict permissions on Unix (owner read/write only).
|
||||||
|
#[cfg(unix)]
|
||||||
|
{
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
let perms = std::fs::Permissions::from_mode(0o600);
|
||||||
|
std::fs::set_permissions(&log_path, perms)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata = file.metadata().await?;
|
||||||
|
let current_size = metadata.len();
|
||||||
|
|
||||||
|
// Recover prev_hash from last line of existing file.
|
||||||
|
let prev_hash = recover_prev_hash(&log_path).await;
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
?log_path,
|
||||||
|
current_size,
|
||||||
|
"Audit log writer opened"
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
log_path,
|
||||||
|
file,
|
||||||
|
current_size,
|
||||||
|
max_file_size,
|
||||||
|
max_rotated_files,
|
||||||
|
prev_hash,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append a JSON entry to the log with hash chain and fsync.
|
||||||
|
pub async fn append(&mut self, json_entry: &str) -> Result<(), AuditWriteError> {
|
||||||
|
// Check if rotation is needed.
|
||||||
|
if self.current_size > 0 && self.current_size >= self.max_file_size {
|
||||||
|
self.rotate().await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute hash chain: SHA-256(prev_hash + json_entry).
|
||||||
|
let entry_hash = compute_hash(&self.prev_hash, json_entry);
|
||||||
|
|
||||||
|
// Build the final line with hash field appended.
|
||||||
|
let line = format!(
|
||||||
|
"{{\"_prev_hash\":\"{}\",\"_hash\":\"{}\",{}}}\n",
|
||||||
|
self.prev_hash,
|
||||||
|
entry_hash,
|
||||||
|
// Strip outer braces from json_entry to merge fields.
|
||||||
|
&json_entry[1..json_entry.len() - 1]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Write + fsync for durability.
|
||||||
|
self.file
|
||||||
|
.write_all(line.as_bytes())
|
||||||
|
.await
|
||||||
|
.map_err(AuditWriteError::Io)?;
|
||||||
|
self.file.flush().await.map_err(AuditWriteError::Io)?;
|
||||||
|
self.file.sync_data().await.map_err(AuditWriteError::Io)?;
|
||||||
|
|
||||||
|
self.current_size += line.len() as u64;
|
||||||
|
self.prev_hash = entry_hash;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rotate the log file: rename current to .1, shift existing .N to .N+1.
|
||||||
|
async fn rotate(&mut self) -> Result<(), AuditWriteError> {
|
||||||
|
// Close the current file by dropping it (we'll reopen after rotation).
|
||||||
|
// Flush and sync first.
|
||||||
|
self.file.flush().await.map_err(AuditWriteError::Io)?;
|
||||||
|
self.file.sync_data().await.map_err(AuditWriteError::Io)?;
|
||||||
|
|
||||||
|
// Shift rotated files: .N -> .N+1 (delete oldest if beyond max).
|
||||||
|
for i in (1..self.max_rotated_files).rev() {
|
||||||
|
let from = rotated_path(&self.log_path, i);
|
||||||
|
let to = rotated_path(&self.log_path, i + 1);
|
||||||
|
if tokio::fs::try_exists(&from).await.unwrap_or(false) {
|
||||||
|
let _ = tokio::fs::rename(&from, &to).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the file beyond max_rotated_files.
|
||||||
|
let oldest = rotated_path(&self.log_path, self.max_rotated_files);
|
||||||
|
if tokio::fs::try_exists(&oldest).await.unwrap_or(false) {
|
||||||
|
let _ = tokio::fs::remove_file(&oldest).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rename current log to .1.
|
||||||
|
let rotated = rotated_path(&self.log_path, 1);
|
||||||
|
tokio::fs::rename(&self.log_path, &rotated)
|
||||||
|
.await
|
||||||
|
.map_err(AuditWriteError::Io)?;
|
||||||
|
|
||||||
|
// Open a new file.
|
||||||
|
self.file = tokio::fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&self.log_path)
|
||||||
|
.await
|
||||||
|
.map_err(AuditWriteError::Io)?;
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
{
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
let perms = std::fs::Permissions::from_mode(0o600);
|
||||||
|
std::fs::set_permissions(&self.log_path, perms).map_err(AuditWriteError::Io)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.current_size = 0;
|
||||||
|
|
||||||
|
tracing::info!(rotated_to = ?rotated, "Audit log rotated");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the log file path.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn log_path(&self) -> &Path {
|
||||||
|
&self.log_path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute SHA-256(prev_hash + entry_json) for tamper-evidence.
|
||||||
|
fn compute_hash(prev_hash: &str, entry: &str) -> String {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(prev_hash.as_bytes());
|
||||||
|
hasher.update(entry.as_bytes());
|
||||||
|
format!("{:x}", hasher.finalize())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build rotated file path: /path/to/audit.log.N
|
||||||
|
fn rotated_path(base: &Path, n: u32) -> PathBuf {
|
||||||
|
let mut p = base.as_os_str().to_owned();
|
||||||
|
p.push(format!(".{n}"));
|
||||||
|
PathBuf::from(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Recover the _hash from the last line of an existing log file.
|
||||||
|
async fn recover_prev_hash(log_path: &Path) -> String {
|
||||||
|
let contents = match tokio::fs::read_to_string(log_path).await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => return "genesis".to_string(),
|
||||||
|
};
|
||||||
|
let last_line = contents.trim().lines().last().unwrap_or("");
|
||||||
|
// Extract _hash field from JSON.
|
||||||
|
if let Some(start) = last_line.find("\"_hash\":\"") {
|
||||||
|
let rest = &last_line[start + 9..];
|
||||||
|
if let Some(end) = rest.find('"') {
|
||||||
|
return rest[..end].to_string();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"genesis".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum AuditWriteError {
|
||||||
|
#[error("I/O error: {0}")]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use tempfile::tempdir;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_append_with_hash_chain() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let log_path = dir.path().join("audit.log");
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s1\",\"action\":1}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s2\",\"action\":2}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let contents = tokio::fs::read_to_string(&log_path).await.unwrap();
|
||||||
|
let lines: Vec<&str> = contents.trim().split('\n').collect();
|
||||||
|
assert_eq!(lines.len(), 2);
|
||||||
|
|
||||||
|
// First entry's prev_hash should be "genesis".
|
||||||
|
assert!(lines[0].contains("\"_prev_hash\":\"genesis\""));
|
||||||
|
|
||||||
|
// Second entry's prev_hash should be the first entry's hash.
|
||||||
|
// Extract first entry's hash.
|
||||||
|
let first_hash = extract_hash(lines[0], "_hash");
|
||||||
|
let second_prev = extract_hash(lines[1], "_prev_hash");
|
||||||
|
assert_eq!(first_hash, second_prev);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_fsync_durability() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let log_path = dir.path().join("audit.log");
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s1\"}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// File should exist and have content immediately (fsync'd).
|
||||||
|
let contents = std::fs::read_to_string(&log_path).unwrap();
|
||||||
|
assert!(contents.contains("s1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_rotation() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let log_path = dir.path().join("audit.log");
|
||||||
|
// Small max size to trigger rotation.
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 100, 3)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Write enough entries to trigger rotation.
|
||||||
|
for i in 0..5 {
|
||||||
|
writer
|
||||||
|
.append(&format!("{{\"session_id\":\"sess-{i}\",\"data\":\"padding-to-exceed-limit\"}}"))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rotated file should exist.
|
||||||
|
let rotated = rotated_path(&log_path, 1);
|
||||||
|
assert!(tokio::fs::try_exists(&rotated).await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_hash_chain_recovery() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let log_path = dir.path().join("audit.log");
|
||||||
|
|
||||||
|
// Write some entries.
|
||||||
|
{
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s1\"}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reopen — should recover hash chain.
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s2\"}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let contents = tokio::fs::read_to_string(&log_path).await.unwrap();
|
||||||
|
let lines: Vec<&str> = contents.trim().split('\n').collect();
|
||||||
|
assert_eq!(lines.len(), 2);
|
||||||
|
|
||||||
|
// Chain should be continuous.
|
||||||
|
let first_hash = extract_hash(lines[0], "_hash");
|
||||||
|
let second_prev = extract_hash(lines[1], "_prev_hash");
|
||||||
|
assert_eq!(first_hash, second_prev);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_tamper_detection() {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let log_path = dir.path().join("audit.log");
|
||||||
|
let mut writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s1\"}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
writer
|
||||||
|
.append("{\"session_id\":\"s2\"}")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let contents = tokio::fs::read_to_string(&log_path).await.unwrap();
|
||||||
|
let lines: Vec<&str> = contents.trim().split('\n').collect();
|
||||||
|
|
||||||
|
// Verify hash chain integrity.
|
||||||
|
let first_hash = extract_hash(lines[0], "_hash");
|
||||||
|
let second_prev = extract_hash(lines[1], "_prev_hash");
|
||||||
|
assert_eq!(first_hash, second_prev, "Hash chain should be valid");
|
||||||
|
|
||||||
|
// Verify that the hash is deterministic.
|
||||||
|
let recomputed = compute_hash("genesis", "{\"session_id\":\"s1\"}");
|
||||||
|
assert_eq!(first_hash, recomputed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_compute_hash() {
|
||||||
|
let h1 = compute_hash("genesis", "{\"test\":1}");
|
||||||
|
let h2 = compute_hash("genesis", "{\"test\":1}");
|
||||||
|
assert_eq!(h1, h2, "Same input should produce same hash");
|
||||||
|
|
||||||
|
let h3 = compute_hash("genesis", "{\"test\":2}");
|
||||||
|
assert_ne!(h1, h3, "Different input should produce different hash");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_rotated_path() {
|
||||||
|
let base = PathBuf::from("/tmp/audit.log");
|
||||||
|
assert_eq!(rotated_path(&base, 1), PathBuf::from("/tmp/audit.log.1"));
|
||||||
|
assert_eq!(rotated_path(&base, 10), PathBuf::from("/tmp/audit.log.10"));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_hash<'a>(line: &'a str, field: &str) -> &'a str {
|
||||||
|
let pattern = format!("\"{}\":\"", field);
|
||||||
|
let start = line.find(&pattern).unwrap() + pattern.len();
|
||||||
|
let rest = &line[start..];
|
||||||
|
let end = rest.find('"').unwrap();
|
||||||
|
&rest[..end]
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
mod config;
|
mod config;
|
||||||
|
mod log_writer;
|
||||||
mod service;
|
mod service;
|
||||||
|
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use llm_multiverse_proto::llm_multiverse::v1::audit_service_server::AuditServiceServer;
|
use llm_multiverse_proto::llm_multiverse::v1::audit_service_server::AuditServiceServer;
|
||||||
|
use log_writer::AuditLogWriter;
|
||||||
use service::AuditServiceImpl;
|
use service::AuditServiceImpl;
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
use tracing_subscriber::EnvFilter;
|
use tracing_subscriber::EnvFilter;
|
||||||
@@ -21,14 +23,22 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
tracing::info!(
|
tracing::info!(
|
||||||
addr = %config.listen_addr(),
|
addr = %config.listen_addr(),
|
||||||
log_path = ?config.log_path,
|
log_path = ?config.log_path,
|
||||||
|
max_file_size = config.max_file_size,
|
||||||
|
max_rotated_files = config.max_rotated_files,
|
||||||
"Starting Audit Service"
|
"Starting Audit Service"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Start the gRPC server.
|
// Start the gRPC server.
|
||||||
let addr = config.listen_addr().parse()?;
|
let addr = config.listen_addr().parse()?;
|
||||||
|
|
||||||
// Create the audit service.
|
// Create the log writer and audit service.
|
||||||
let audit_service = AuditServiceImpl::new(config.log_path).await?;
|
let writer = AuditLogWriter::new(
|
||||||
|
config.log_path,
|
||||||
|
config.max_file_size,
|
||||||
|
config.max_rotated_files,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let audit_service = AuditServiceImpl::new(writer);
|
||||||
|
|
||||||
tracing::info!(%addr, "Audit Service listening");
|
tracing::info!(%addr, "Audit Service listening");
|
||||||
|
|
||||||
|
|||||||
@@ -1,48 +1,22 @@
|
|||||||
|
use crate::log_writer::AuditLogWriter;
|
||||||
use llm_multiverse_proto::llm_multiverse::v1::{
|
use llm_multiverse_proto::llm_multiverse::v1::{
|
||||||
audit_service_server::AuditService, AppendRequest, AppendResponse,
|
audit_service_server::AuditService, AppendRequest, AppendResponse,
|
||||||
};
|
};
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
/// Append-only audit log service implementation.
|
/// Append-only audit log service implementation.
|
||||||
pub struct AuditServiceImpl {
|
pub struct AuditServiceImpl {
|
||||||
#[allow(dead_code)]
|
writer: Arc<Mutex<AuditLogWriter>>,
|
||||||
log_path: PathBuf,
|
|
||||||
writer: Arc<Mutex<tokio::fs::File>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AuditServiceImpl {
|
impl AuditServiceImpl {
|
||||||
/// Create a new audit service writing to the given log file.
|
/// Create a new audit service backed by the given log writer.
|
||||||
pub async fn new(log_path: PathBuf) -> anyhow::Result<Self> {
|
pub fn new(writer: AuditLogWriter) -> Self {
|
||||||
let file = tokio::fs::OpenOptions::new()
|
Self {
|
||||||
.create(true)
|
writer: Arc::new(Mutex::new(writer)),
|
||||||
.append(true)
|
|
||||||
.open(&log_path)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// Restrict permissions on Unix (owner read/write only).
|
|
||||||
#[cfg(unix)]
|
|
||||||
{
|
|
||||||
use std::os::unix::fs::PermissionsExt;
|
|
||||||
let perms = std::fs::Permissions::from_mode(0o600);
|
|
||||||
std::fs::set_permissions(&log_path, perms)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(?log_path, "Audit log opened");
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
log_path,
|
|
||||||
writer: Arc::new(Mutex::new(file)),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns the log file path for diagnostics.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn log_path(&self) -> &PathBuf {
|
|
||||||
&self.log_path
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -58,23 +32,15 @@ impl AuditService for AuditServiceImpl {
|
|||||||
.entry
|
.entry
|
||||||
.ok_or_else(|| Status::invalid_argument("missing audit entry"))?;
|
.ok_or_else(|| Status::invalid_argument("missing audit entry"))?;
|
||||||
|
|
||||||
// Serialize entry as JSON line.
|
// Serialize entry as JSON.
|
||||||
let json = serialize_entry(&entry)
|
let json = serialize_entry(&entry);
|
||||||
.map_err(|e| Status::internal(format!("serialization failed: {e}")))?;
|
|
||||||
|
|
||||||
let mut line = json;
|
// Write to audit log (append-only, synchronized, fsync'd, hash-chained).
|
||||||
line.push('\n');
|
|
||||||
|
|
||||||
// Write to audit log (append-only, synchronized).
|
|
||||||
let mut writer = self.writer.lock().await;
|
let mut writer = self.writer.lock().await;
|
||||||
writer
|
writer
|
||||||
.write_all(line.as_bytes())
|
.append(&json)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Status::internal(format!("write failed: {e}")))?;
|
.map_err(|e| Status::internal(format!("audit write failed: {e}")))?;
|
||||||
writer
|
|
||||||
.flush()
|
|
||||||
.await
|
|
||||||
.map_err(|e| Status::internal(format!("flush failed: {e}")))?;
|
|
||||||
|
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
session_id = %entry.session_id,
|
session_id = %entry.session_id,
|
||||||
@@ -90,69 +56,70 @@ impl AuditService for AuditServiceImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serialize an AuditEntry to a JSON string.
|
/// Serialize an AuditEntry to a JSON object string using serde_json.
|
||||||
fn serialize_entry(
|
fn serialize_entry(
|
||||||
entry: &llm_multiverse_proto::llm_multiverse::v1::AuditEntry,
|
entry: &llm_multiverse_proto::llm_multiverse::v1::AuditEntry,
|
||||||
) -> Result<String, String> {
|
) -> String {
|
||||||
// Manual JSON construction to avoid serde dependency on proto types.
|
|
||||||
let timestamp = entry
|
let timestamp = entry
|
||||||
.timestamp
|
.timestamp
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|t| format!("{}.{}", t.seconds, t.nanos))
|
.map(|t| format!("{}.{}", t.seconds, t.nanos))
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let lineage_agents: Vec<String> = entry
|
let lineage_agents: Vec<serde_json::Value> = entry
|
||||||
.lineage
|
.lineage
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|l| {
|
.map(|l| {
|
||||||
l.agents
|
l.agents
|
||||||
.iter()
|
.iter()
|
||||||
.map(|a| format!("{{\"agent_id\":\"{}\",\"agent_type\":{}}}", a.agent_id, a.agent_type))
|
.map(|a| {
|
||||||
|
serde_json::json!({
|
||||||
|
"agent_id": a.agent_id,
|
||||||
|
"agent_type": a.agent_type,
|
||||||
|
"spawn_depth": a.spawn_depth,
|
||||||
|
})
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
})
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let metadata_entries: Vec<String> = entry
|
let obj = serde_json::json!({
|
||||||
.metadata
|
"timestamp": timestamp,
|
||||||
.iter()
|
"session_id": entry.session_id,
|
||||||
.map(|(k, v)| format!("\"{}\":\"{}\"", escape_json(k), escape_json(v)))
|
"agent_id": entry.agent_id,
|
||||||
.collect();
|
"agent_type": entry.agent_type,
|
||||||
|
"lineage": lineage_agents,
|
||||||
|
"action": entry.action,
|
||||||
|
"tool_name": entry.tool_name,
|
||||||
|
"params_hash": entry.params_hash,
|
||||||
|
"result_status": entry.result_status,
|
||||||
|
"metadata": entry.metadata,
|
||||||
|
});
|
||||||
|
|
||||||
Ok(format!(
|
obj.to_string()
|
||||||
"{{\"timestamp\":\"{}\",\"session_id\":\"{}\",\"agent_id\":\"{}\",\"agent_type\":{},\"lineage\":[{}],\"action\":{},\"tool_name\":\"{}\",\"params_hash\":\"{}\",\"result_status\":\"{}\",\"metadata\":{{{}}}}}",
|
|
||||||
escape_json(×tamp),
|
|
||||||
escape_json(&entry.session_id),
|
|
||||||
escape_json(&entry.agent_id),
|
|
||||||
entry.agent_type,
|
|
||||||
lineage_agents.join(","),
|
|
||||||
entry.action,
|
|
||||||
escape_json(&entry.tool_name),
|
|
||||||
escape_json(&entry.params_hash),
|
|
||||||
escape_json(&entry.result_status),
|
|
||||||
metadata_entries.join(","),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn escape_json(s: &str) -> String {
|
|
||||||
s.replace('\\', "\\\\")
|
|
||||||
.replace('"', "\\\"")
|
|
||||||
.replace('\n', "\\n")
|
|
||||||
.replace('\r', "\\r")
|
|
||||||
.replace('\t', "\\t")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::log_writer::AuditLogWriter;
|
||||||
use llm_multiverse_proto::llm_multiverse::v1::AuditEntry;
|
use llm_multiverse_proto::llm_multiverse::v1::AuditEntry;
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
async fn test_service() -> (AuditServiceImpl, std::path::PathBuf) {
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
#[allow(deprecated)]
|
||||||
|
let log_path = dir.into_path().join("test-audit.log");
|
||||||
|
let writer = AuditLogWriter::new(log_path.clone(), 1024 * 1024, 5)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
(AuditServiceImpl::new(writer), log_path)
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_append_writes_to_file() {
|
async fn test_append_writes_to_file() {
|
||||||
let dir = tempdir().unwrap();
|
let (svc, log_path) = test_service().await;
|
||||||
let log_path = dir.path().join("test-audit.log");
|
|
||||||
let svc = AuditServiceImpl::new(log_path.clone()).await.unwrap();
|
|
||||||
|
|
||||||
let entry = AuditEntry {
|
let entry = AuditEntry {
|
||||||
session_id: "sess-1".into(),
|
session_id: "sess-1".into(),
|
||||||
@@ -183,14 +150,13 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
assert!(contents.contains("sess-1"));
|
assert!(contents.contains("sess-1"));
|
||||||
assert!(contents.contains("web_search"));
|
assert!(contents.contains("web_search"));
|
||||||
|
assert!(contents.contains("_hash"));
|
||||||
assert!(contents.ends_with('\n'));
|
assert!(contents.ends_with('\n'));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_append_rejects_missing_entry() {
|
async fn test_append_rejects_missing_entry() {
|
||||||
let dir = tempdir().unwrap();
|
let (svc, _) = test_service().await;
|
||||||
let log_path = dir.path().join("test-audit.log");
|
|
||||||
let svc = AuditServiceImpl::new(log_path).await.unwrap();
|
|
||||||
|
|
||||||
let request = Request::new(AppendRequest {
|
let request = Request::new(AppendRequest {
|
||||||
context: None,
|
context: None,
|
||||||
@@ -204,9 +170,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_multiple_appends() {
|
async fn test_multiple_appends() {
|
||||||
let dir = tempdir().unwrap();
|
let (svc, log_path) = test_service().await;
|
||||||
let log_path = dir.path().join("test-audit.log");
|
|
||||||
let svc = AuditServiceImpl::new(log_path.clone()).await.unwrap();
|
|
||||||
|
|
||||||
for i in 0..3 {
|
for i in 0..3 {
|
||||||
let entry = AuditEntry {
|
let entry = AuditEntry {
|
||||||
@@ -226,14 +190,6 @@ mod tests {
|
|||||||
assert_eq!(lines.len(), 3);
|
assert_eq!(lines.len(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_escape_json() {
|
|
||||||
assert_eq!(escape_json("hello"), "hello");
|
|
||||||
assert_eq!(escape_json("he\"llo"), "he\\\"llo");
|
|
||||||
assert_eq!(escape_json("he\\llo"), "he\\\\llo");
|
|
||||||
assert_eq!(escape_json("line\nbreak"), "line\\nbreak");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_serialize_entry() {
|
fn test_serialize_entry() {
|
||||||
let entry = AuditEntry {
|
let entry = AuditEntry {
|
||||||
@@ -246,8 +202,22 @@ mod tests {
|
|||||||
result_status: "success".into(),
|
result_status: "success".into(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let json = serialize_entry(&entry).unwrap();
|
let json = serialize_entry(&entry);
|
||||||
assert!(json.contains("\"session_id\":\"sess-1\""));
|
assert!(json.contains("\"session_id\":\"sess-1\""));
|
||||||
assert!(json.contains("\"tool_name\":\"test_tool\""));
|
assert!(json.contains("\"tool_name\":\"test_tool\""));
|
||||||
|
// Should be valid JSON.
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["session_id"], "sess-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_serialize_with_special_chars() {
|
||||||
|
let entry = AuditEntry {
|
||||||
|
session_id: "sess with \"quotes\" and \\backslash".into(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let json = serialize_entry(&entry);
|
||||||
|
// Should produce valid JSON even with special characters.
|
||||||
|
let _parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user