Files
llm-multiverse/implementation-plans/issue-034.md
Pi Agent f9bb5adf94 feat: implement WriteMemory gRPC endpoint (issue #34)
Full write pipeline: validate request, assign/generate memory ID,
determine provenance and trust level, sanitize external content,
generate embeddings via Model Gateway, store all data atomically
in DuckDB (memory + tags + correlations + embeddings + provenance +
derivation links), invalidate semantic cache, and audit-log the
write via Audit Service (best-effort).

- New db/write.rs: transactional write helper with rollback
- Audit client integration following Secrets Service pattern
- Remove #[allow(dead_code)] from provenance_config and sanitizer
- 15 new tests (7 db/write, 8 service-level)
- All 226 tests pass, clippy clean

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 10:54:38 +01:00

24 KiB

Implementation Plan — Issue #34: Implement WriteMemory gRPC endpoint

Metadata

Field Value
Issue #34
Title Implement WriteMemory gRPC endpoint
Milestone Phase 4: Memory Service
Labels
Status COMPLETED
Language Rust
Related Plans issue-020.md, issue-028.md, issue-029.md, issue-033.md
Blocked by #29 (completed), #33 (completed), #20 (completed)

Acceptance Criteria

  • WriteMemory RPC handler implemented
  • Generates embedding via Model Gateway integration
  • Applies provenance tags from session context
  • Stores memory + embedding + provenance in DuckDB
  • Audit-logs the write via Audit Service

Architecture Analysis

Service Context

This issue belongs to the Memory Service (Rust). It replaces the current stub write_memory handler (which returns Status::unimplemented) with a full implementation that persists memory entries, their embeddings, provenance metadata, and audit-logs the operation.

The existing WriteMemory RPC is already defined in proto/llm_multiverse/v1/memory.proto (line 13) as a unary endpoint:

rpc WriteMemory(WriteMemoryRequest) returns (WriteMemoryResponse);

Proto messages involved:

  • WriteMemoryRequest — carries SessionContext context, MemoryEntry entry, MemoryProvenance provenance
  • WriteMemoryResponse — returns bool success, string memory_id, optional string error_message
  • MemoryEntry — the full memory entry with id, name, description, tags, correlating_ids, corpus, embedding bytes, timestamps, provenance
  • ProvenanceMetadata — detailed provenance metadata (source_agent_id, source_session_id, creation_tool, trust_level, parent_memory_ids, etc.)
  • SessionContext — carries session_id, user_id, agent_lineage, override_level
  • AuditEntry / AppendRequest — for audit logging via the Audit Service

Affected gRPC endpoints:

  • WriteMemory — the primary target of this issue (currently unimplemented)

Existing Patterns

The following patterns are already established in the codebase and must be followed:

  • Service struct: MemoryServiceImpl at services/memory/src/service.rs:47-60 already holds provenance_config: ProvenanceConfig and sanitizer: ContentSanitizer (with #[allow(dead_code)] annotations noting they are for issue #34). These annotations should be removed.
  • Embedding generation: EmbeddingClient at services/memory/src/embedding/mod.rs:116-196 provides generate_for_entry() which generates embeddings for name, description, and corpus fields. Returns Vec<EmbeddingResult>.
  • Embedding storage: services/memory/src/embedding/store.rs provides store_embeddings() to persist embeddings in the embeddings table within a DuckDB transaction.
  • Provenance storage: services/memory/src/provenance/store.rs provides insert_provenance() and insert_derivations() for persisting provenance records and derivation links.
  • Content sanitization: services/memory/src/provenance/sanitizer.rs provides ContentSanitizer::sanitize() which strips imperative constructions from external content.
  • Trust level logic: services/memory/src/provenance/mod.rs provides TrustLevel::from_memory_provenance() to map MemoryProvenance enum to internal trust levels.
  • Audit client pattern: The Secrets Service (services/secrets/src/service.rs:14-67) demonstrates the audit client integration: AuditServiceClient<Channel> wrapped in Arc<Mutex<>>, configured via a builder method, and called best-effort (log warning on failure, don't fail the operation).
  • DB access from async handlers: DuckDbManager::with_connection_blocking() at services/memory/src/db/mod.rs:100-109 runs a closure on the blocking thread pool to avoid stalling the tokio runtime.
  • Cache invalidation on write: The SemanticCache::invalidate_by_memory_id() method is already referenced in the TODO comment at service.rs:357.
  • Input validation: The current stub already validates context and entry presence, and session_id non-emptiness. This must be preserved and extended.

Dependencies

  • Embedding generation (issue #29) — completed. Provides EmbeddingClient::generate_for_entry().
  • Provenance tagging (issue #33) — completed. Provides ContentSanitizer, TrustLevel, provenance store, and the provenance / memory_derivations DuckDB tables.
  • Audit Service Append (issue #20) — completed. Provides the AuditService::Append RPC.
  • Proto stubsAuditServiceClient, AppendRequest, AuditEntry are already generated in llm-multiverse-proto.
  • No new crate dependencies requiredsha2 is needed for params_hash in audit entries, but can use a simpler approach (e.g., hash the memory name or use a fixed string since WriteMemory params are not credential-adjacent).

Implementation Steps

1. Types & Configuration

Add audit client to MemoryServiceImpl in services/memory/src/service.rs:

Add a new field following the pattern from SecretsServiceImpl:

use llm_multiverse_proto::llm_multiverse::v1::{
    audit_service_client::AuditServiceClient,
    AppendRequest, AuditEntry,
};
use tonic::transport::Channel;

pub struct MemoryServiceImpl {
    db: Arc<DuckDbManager>,
    embedding_client: Option<Arc<Mutex<EmbeddingClient>>>,
    extraction_client: Option<Arc<Mutex<ExtractionClient>>>,
    retrieval_config: RetrievalConfig,
    extraction_config: ExtractionConfig,
    provenance_config: ProvenanceConfig,     // remove #[allow(dead_code)]
    sanitizer: ContentSanitizer,              // remove #[allow(dead_code)]
    cache: Arc<SemanticCache>,
    audit_client: Option<Arc<Mutex<AuditServiceClient<Channel>>>>,
}

Add a builder method:

pub fn with_audit_client(mut self, client: AuditServiceClient<Channel>) -> Self {
    self.audit_client = Some(Arc::new(Mutex::new(client)));
    self
}

Add sha2 dependency to services/memory/Cargo.toml:

sha2 = "0.10"

This is needed for generating params_hash in audit log entries, following the same pattern as the Secrets Service (never log raw parameters).

2. Core Logic

Memory ID generation:

Generate a unique ID for new memory entries. If the request's entry.id is empty, generate a UUID-like ID. If it is provided, use it as-is (for update/upsert semantics). Use a simple approach:

fn generate_memory_id() -> String {
    uuid::Uuid::new_v4().to_string()
}

Alternatively, to avoid adding the uuid crate, generate a timestamp-based ID with randomness using existing dependencies:

fn generate_memory_id() -> String {
    use std::time::{SystemTime, UNIX_EPOCH};
    let ts = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_nanos();
    format!("mem-{ts:x}")
}

Write pipeline (the core logic inside write_memory):

The write operation must perform the following steps in order:

  1. Validate request — context present, session_id non-empty, entry present, entry.name non-empty.
  2. Assign memory ID — use provided entry.id if non-empty, otherwise generate one.
  3. Determine provenance — extract agent ID and session ID from SessionContext.agent_lineage. Determine trust level from WriteMemoryRequest.provenance via TrustLevel::from_memory_provenance().
  4. Sanitize external content — if provenance is EXTERNAL and provenance_config.sanitization_enabled, run sanitizer.sanitize() on the corpus (and optionally description). If sanitized, upgrade trust level from Untrusted to Sanitized. Log what was stripped.
  5. Generate embeddings — if embedding_client is available, call generate_for_entry() for name, description, and corpus. If unavailable and required, return Status::failed_precondition.
  6. Store in DuckDB (single transaction) — within with_connection_blocking(): a. Insert row into memories table (id, name, description, corpus, provenance, created_at, last_accessed, access_count). b. Insert tags into memory_tags table. c. Insert correlating IDs into correlations table. d. Store embeddings via store_embeddings(). e. Insert provenance record via insert_provenance(). f. Insert derivation links via insert_derivations() if parent_memory_ids are provided.
  7. Invalidate cache — call self.cache.invalidate_by_memory_id(&memory_id).await (in case this is an update to an existing memory).
  8. Audit log — fire-and-forget audit log entry via audit client (best-effort, log warning on failure).
  9. Return responseWriteMemoryResponse { success: true, memory_id, error_message: None }.

DuckDB storage helper — create services/memory/src/db/write.rs:

A helper module for the write transaction that groups all DB insertions:

use duckdb::Connection;
use crate::db::DbError;
use crate::embedding::EmbeddingResult;
use crate::provenance::ProvenanceRecord;

/// Parameters for writing a complete memory entry (row + tags + correlations + provenance).
pub struct WriteMemoryParams {
    pub id: String,
    pub name: String,
    pub description: String,
    pub corpus: String,
    pub provenance: i32,
    pub tags: Vec<String>,
    pub correlating_ids: Vec<String>,
}

/// Write a complete memory entry in a single transaction.
///
/// Inserts the memory row, tags, correlations, embeddings, and provenance
/// atomically. On any failure, the transaction is rolled back.
pub fn write_memory_entry(
    conn: &Connection,
    params: &WriteMemoryParams,
    embeddings: &[EmbeddingResult],
    provenance_record: &ProvenanceRecord,
) -> Result<(), DbError>;

Key implementation detail: use BEGIN TRANSACTION; ... COMMIT; with rollback on error, the same pattern as embedding/store.rs:store_embeddings().

Audit logging helper — add to services/memory/src/service.rs:

/// Log a memory write to the audit service (best-effort).
async fn audit_log_write(
    audit_client: &Arc<Mutex<AuditServiceClient<Channel>>>,
    ctx: &SessionContext,
    memory_id: &str,
    result_status: &str,
) {
    let entry = AuditEntry {
        session_id: ctx.session_id.clone(),
        agent_id: ctx.agent_lineage.as_ref()
            .and_then(|l| l.agents.last())
            .map(|a| a.agent_id.clone())
            .unwrap_or_else(|| ctx.user_id.clone()),
        action: 4, // AUDIT_ACTION_MEMORY_WRITE
        tool_name: "WriteMemory".into(),
        params_hash: hash_memory_write(memory_id),
        result_status: result_status.into(),
        ..Default::default()
    };
    let request = AppendRequest {
        context: Some(ctx.clone()),
        entry: Some(entry),
    };
    let mut client = audit_client.lock().await;
    if let Err(e) = client.append(request).await {
        tracing::warn!(error = %e, memory_id = %memory_id, "failed to audit log memory write");
    }
}

fn hash_memory_write(memory_id: &str) -> String {
    use sha2::{Sha256, Digest};
    let mut hasher = Sha256::new();
    hasher.update(b"WriteMemory:");
    hasher.update(memory_id.as_bytes());
    format!("{:x}", hasher.finalize())
}

3. gRPC Handler Wiring

Replace the write_memory stub in services/memory/src/service.rs:

The existing handler at lines 336-361 currently validates context and entry, then returns Status::unimplemented. Replace the TODO/unimplemented section with the full write pipeline:

async fn write_memory(
    &self,
    request: Request<WriteMemoryRequest>,
) -> Result<Response<WriteMemoryResponse>, Status> {
    let req = request.into_inner();

    // 1. Validate
    let ctx = req.context
        .ok_or_else(|| Status::invalid_argument("session context is required"))?;
    if ctx.session_id.is_empty() {
        return Err(Status::invalid_argument("context.session_id is required"));
    }
    let entry = req.entry
        .ok_or_else(|| Status::invalid_argument("entry is required"))?;
    if entry.name.is_empty() {
        return Err(Status::invalid_argument("entry.name is required"));
    }

    // 2. Memory ID
    let memory_id = if entry.id.is_empty() {
        generate_memory_id()
    } else {
        entry.id.clone()
    };

    // 3. Determine provenance and trust level
    let trust_level = TrustLevel::from_memory_provenance(req.provenance);
    let agent_id = ctx.agent_lineage.as_ref()
        .and_then(|l| l.agents.last())
        .map(|a| a.agent_id.clone())
        .unwrap_or_else(|| ctx.user_id.clone());
    let creation_tool = entry.provenance_metadata.as_ref()
        .map(|pm| pm.creation_tool.clone())
        .unwrap_or_default();
    let parent_memory_ids: Vec<String> = entry.provenance_metadata.as_ref()
        .map(|pm| pm.parent_memory_ids.clone())
        .unwrap_or_default();

    // 4. Sanitize external content
    let (corpus, final_trust_level) = if req.provenance == 2 /* EXTERNAL */
        && self.provenance_config.sanitization_enabled
    {
        let result = self.sanitizer.sanitize(&entry.corpus);
        if result.was_modified {
            tracing::info!(
                memory_id = %memory_id,
                patterns_removed = result.patterns_removed,
                "Sanitized external content"
            );
            (result.content, TrustLevel::Sanitized)
        } else {
            (entry.corpus.clone(), trust_level)
        }
    } else {
        (entry.corpus.clone(), trust_level)
    };

    // 5. Generate embeddings
    let embedding_results = if let Some(ref emb_client) = self.embedding_client {
        let client = emb_client.lock().await;
        let session_ctx = SessionContext { /* from ctx */ ..ctx.clone() };
        client.generate_for_entry(&session_ctx, &memory_id, &entry.name, &entry.description, &corpus)
            .await
            .map_err(|e| Status::internal(format!("embedding generation failed: {e}")))?
    } else {
        return Err(Status::failed_precondition("embedding client not configured"));
    };

    // 6. Store in DuckDB
    let write_params = WriteMemoryParams { /* ... */ };
    let provenance_record = ProvenanceRecord {
        memory_id: memory_id.clone(),
        source_agent_id: agent_id.clone(),
        source_session_id: ctx.session_id.clone(),
        creation_tool,
        trust_level: final_trust_level,
        parent_memory_ids: parent_memory_ids.clone(),
        is_revoked: false,
        revocation_reason: None,
        revoked_at: None,
        revoked_by: None,
    };
    self.db.with_connection_blocking(move |conn| {
        write_memory_entry(conn, &write_params, &embedding_results, &provenance_record)
    }).await.map_err(|e| Status::internal(format!("storage failed: {e}")))?;

    // 7. Invalidate cache
    self.cache.invalidate_by_memory_id(&memory_id).await;

    // 8. Audit log (best-effort)
    if let Some(ref audit) = self.audit_client {
        audit_log_write(audit, &ctx, &memory_id, "success").await;
    }

    // 9. Return
    Ok(Response::new(WriteMemoryResponse {
        success: true,
        memory_id,
        error_message: None,
    }))
}

4. Service Integration

Update services/memory/src/main.rs:

Add audit client connection at startup, following the pattern from services/secrets/src/main.rs:42-52:

use llm_multiverse_proto::llm_multiverse::v1::audit_service_client::AuditServiceClient;

// After embedding/extraction client setup:
if let Some(ref audit_addr) = config.audit_addr {
    match AuditServiceClient::connect(audit_addr.clone()).await {
        Ok(client) => {
            tracing::info!(audit_addr = %audit_addr, "Connected to Audit Service");
            memory_service = memory_service.with_audit_client(client);
        }
        Err(e) => {
            tracing::warn!(
                audit_addr = %audit_addr,
                error = %e,
                "Audit Service unavailable -- writes will not be audit-logged"
            );
        }
    }
}

The config.audit_addr field already exists in Config (line 201 of config.rs).

Update services/memory/src/db/mod.rs:

Add pub mod write; to expose the new write module.

Update services/memory/src/lib.rs:

No change needed -- db module is already public and the new db/write.rs submodule will be exposed through db::mod.rs.

5. Tests

Unit tests in services/memory/src/db/write.rs:

Test Case Description
test_write_memory_entry_success Insert a full memory entry (row + tags + correlations + embeddings + provenance), verify all tables populated
test_write_memory_entry_with_tags Insert entry with 3 tags, verify all tags in memory_tags
test_write_memory_entry_with_correlations Insert entry with correlating IDs, verify correlations table
test_write_memory_entry_with_derivations Insert entry with parent_memory_ids, verify memory_derivations table
test_write_memory_entry_rollback_on_error Trigger an error mid-transaction (e.g., FK violation), verify nothing was persisted
test_write_memory_entry_no_embeddings Pass empty embeddings slice, verify memory row and provenance still stored

Service-level tests in services/memory/src/service.rs:

Test Case Description
test_write_rejects_missing_context Returns InvalidArgument (existing test, preserved)
test_write_rejects_missing_entry Returns InvalidArgument (existing test, preserved)
test_write_rejects_empty_name New: entry with empty name returns InvalidArgument
test_write_rejects_no_embedding_client Service without embedding client returns FailedPrecondition
test_write_success_internal Write an internal memory, verify response has success=true and memory_id is non-empty
test_write_success_external_sanitized Write external content with injection patterns, verify corpus is sanitized, trust level is Sanitized
test_write_success_external_clean Write external content without injection patterns, verify trust level is Untrusted
test_write_generates_id_when_empty Write with empty entry.id, verify a non-empty memory_id is returned
test_write_uses_provided_id Write with specific entry.id, verify same ID in response
test_write_stores_provenance After write, read provenance from DB, verify source_agent_id, source_session_id, trust_level
test_write_stores_embeddings After write, verify 3 embeddings (name, desc, corpus) exist in embeddings table
test_write_stores_tags After write, verify tags in memory_tags table
test_write_invalidates_cache Write a memory, then write again with same ID, verify cache was invalidated
test_write_audit_logged Use a mock audit server, verify Append is called with action MEMORY_WRITE
test_write_works_without_audit_client Service without audit client still writes successfully (no audit log, no error)
test_write_derivation_links_stored Write with parent_memory_ids in provenance_metadata, verify memory_derivations populated

Mocking strategy:

  • Use DuckDbManager::in_memory() for all tests.
  • Use the existing MockEmbeddingGenerator (or the mock tonic server pattern from embedding/mod.rs:299) for embedding generation.
  • For audit client testing, create a mock AuditService server (same pattern as the mock ModelGatewayService in embedding/mod.rs:299-393) that captures AppendRequest calls.
  • Pre-populate parent memories in the DB for derivation link tests.

Trait Implementations

  • No new traits needed. The EmbeddingGenerator trait is already used for mock testing.

Error Types

  • No new error types needed. Use tonic::Status directly in the handler. DB errors convert via DbError. Provenance errors convert via ProvenanceError -> tonic::Status.

Files to Create/Modify

File Action Purpose
services/memory/src/service.rs Modify Replace write_memory stub with full implementation; add audit_client field and with_audit_client() builder; add audit_log_write() and hash_memory_write() helpers; remove #[allow(dead_code)] from provenance_config and sanitizer
services/memory/src/db/write.rs Create WriteMemoryParams struct and write_memory_entry() function for transactional DB writes (memory row + tags + correlations + embeddings + provenance)
services/memory/src/db/mod.rs Modify Add pub mod write;
services/memory/src/main.rs Modify Connect to Audit Service at startup if audit_addr is configured; pass audit client to MemoryServiceImpl via with_audit_client()
services/memory/Cargo.toml Modify Add sha2 = "0.10" dependency for audit params hashing

Risks and Edge Cases

  • Embedding client unavailability at write time: If the Model Gateway is unreachable after startup (connection drops), generate_for_entry() will return EmbeddingError::GenerationFailed. The handler maps this to Status::internal. The caller (orchestrator) can retry. If the embedding client was never connected at startup, return Status::failed_precondition to clearly indicate a configuration issue rather than a transient failure.
  • Upsert vs insert semantics: The proto WriteMemoryRequest does not have an explicit "update" flag. If entry.id is provided and a memory with that ID already exists, the DuckDB INSERT will fail with a primary key violation. The initial implementation should use INSERT OR REPLACE for the memories table to support upsert semantics. For related tables (tags, correlations), delete existing rows for the memory ID before re-inserting within the same transaction.
  • Transaction atomicity across multiple tables: DuckDB does not support ON DELETE CASCADE. The write transaction must insert into all tables (memories, memory_tags, correlations, embeddings, provenance, memory_derivations) atomically. On failure, ROLLBACK ensures no partial writes. The existing pattern in embedding/store.rs:57-83 demonstrates this approach.
  • Audit logging is best-effort: The audit log call must not fail the write operation. If the Audit Service is unavailable, log a warning and return success. This matches the Secrets Service pattern.
  • Sanitization modifies corpus: When external content is sanitized, the stored corpus differs from what was submitted. This is by design (architecture doc: "external-sourced writes pass through a summarization step that strips imperative constructions before storage"). The SanitizeResult metadata (patterns_removed, removed_patterns) is logged for transparency.
  • Empty corpus/description: A memory entry may have an empty corpus or description. The embedding client handles this by generating zero vectors for empty fields (see embedding/mod.rs:153-154). The write should succeed with zero-vector embeddings.
  • Memory ID collisions with timestamp-based generation: The generate_memory_id() function using nanosecond timestamps has extremely low collision probability for single-threaded writes. If collisions are a concern, the uuid crate could be added. For now, the timestamp approach avoids an extra dependency.
  • sha2 crate addition: Adding sha2 for audit params_hash follows the precedent in the Secrets Service. The alternative is to skip hashing and use a descriptive string (e.g., "WriteMemory:mem-xyz"), but hashing is preferred per the architecture doc ("Hash of parameters -- never raw params to avoid logging credential-adjacent data").
  • Concurrent writes to the same memory ID: DuckDbManager uses a std::sync::Mutex so concurrent writes are serialized at the connection level. INSERT OR REPLACE ensures the last writer wins. No additional locking is needed.
  • Provenance config require_provenance: When provenance_config.require_provenance is true and the request has provenance == UNSPECIFIED (0), the handler should still succeed (defaulting to Trusted trust level per TrustLevel::from_memory_provenance(0)). The require_provenance flag could optionally enforce that an explicit provenance value (INTERNAL or EXTERNAL) is provided, returning InvalidArgument if UNSPECIFIED. This provides a safety net against callers forgetting to set the field.

Deviation Log

Deviation Reason
Used AgentIdentifier instead of AgentInfo Proto type is named AgentIdentifier in common.proto
Did not add uuid crate Used nanosecond-timestamp ID generation to avoid extra dependency