Merge pull request 'fix: resolve tech debt from issue #32 review (#122)' (#131) from feature/issue-122-tech-debt-32-review into main
This commit was merged in pull request #131.
This commit is contained in:
@@ -35,6 +35,7 @@
|
||||
| #116 | Tech debt: minor findings from issue #29 review | Phase 4 | `COMPLETED` | Rust | [issue-116.md](issue-116.md) |
|
||||
| #118 | Tech debt: minor findings from issue #30 review | Phase 4 | `COMPLETED` | Rust | [issue-118.md](issue-118.md) |
|
||||
| #120 | Tech debt: minor findings from issue #31 review | Phase 4 | `COMPLETED` | Rust | [issue-120.md](issue-120.md) |
|
||||
| #122 | Tech debt: minor findings from issue #32 review | Phase 4 | `COMPLETED` | Rust | [issue-122.md](issue-122.md) |
|
||||
|
||||
## Status Legend
|
||||
|
||||
|
||||
66
implementation-plans/issue-122.md
Normal file
66
implementation-plans/issue-122.md
Normal file
@@ -0,0 +1,66 @@
|
||||
# Issue #122: Tech debt: minor findings from issue #32 review
|
||||
|
||||
## Summary
|
||||
|
||||
Address five tech debt items from the semantic cache review:
|
||||
|
||||
1. **Return best match, not first match** - `lookup()` returns the first entry above the similarity threshold via linear scan. Track and return the highest-similarity match instead of short-circuiting.
|
||||
|
||||
2. **Deduplicate extraction segment building** - The logic for building `(extracted_segment, extraction_confidence)` tuples is duplicated between cache population and streaming response paths. Extract into a helper.
|
||||
|
||||
3. **Reuse params.tag_filter** - The `tag_filter_ref` derivation duplicates the same empty-string-to-None conversion already done for `RetrievalParams`. Derive from the already-constructed params instead.
|
||||
|
||||
4. **Prevent duplicate cache entries** - `insert()` doesn't check if a sufficiently similar entry already exists, so repeated identical queries can accumulate duplicates.
|
||||
|
||||
5. **Proactive expired entry eviction** - `evict_expired()` only runs during insert. Also evict during the periodic metrics logging.
|
||||
|
||||
## Item 1: Return best match
|
||||
|
||||
### Approach
|
||||
|
||||
Replace the `for` loop with early return in `lookup()` with a scan that tracks the best (highest similarity) match. After scanning all non-expired, tag-matching entries, return the best match if it exceeds the threshold.
|
||||
|
||||
### Files changed
|
||||
|
||||
- `services/memory/src/cache/mod.rs` - Rewrite `lookup()` loop
|
||||
|
||||
## Item 2: Deduplicate extraction segment building
|
||||
|
||||
### Approach
|
||||
|
||||
Extract a helper function `get_extraction_pair(extraction_results, rank)` that returns `(Option<String>, Option<f32>)`. Use it in both the cache population and streaming response paths.
|
||||
|
||||
### Files changed
|
||||
|
||||
- `services/memory/src/service.rs` - Add helper, use in both places
|
||||
|
||||
## Item 3: Reuse params.tag_filter
|
||||
|
||||
### Approach
|
||||
|
||||
The `RetrievalParams` already converts empty `memory_type` to `None` via the `from_config` call. Use `params.tag_filter.as_deref()` instead of re-deriving `tag_filter_ref` from `req.memory_type`.
|
||||
|
||||
### Files changed
|
||||
|
||||
- `services/memory/src/service.rs` - Replace `tag_filter_ref` derivation
|
||||
|
||||
## Item 4: Prevent duplicate cache entries
|
||||
|
||||
### Approach
|
||||
|
||||
In `insert()`, before pushing the new entry, check if any existing non-expired entry has similarity >= threshold and the same tag filter. If so, replace it instead of adding a duplicate.
|
||||
|
||||
### Files changed
|
||||
|
||||
- `services/memory/src/cache/mod.rs` - Add duplicate check in `insert()`
|
||||
|
||||
## Item 5: Proactive expired entry eviction
|
||||
|
||||
### Approach
|
||||
|
||||
Add a public `evict_expired_entries()` method that acquires the write lock and calls the internal `evict_expired()`. Call it from the periodic metrics logging task in `main.rs`.
|
||||
|
||||
### Files changed
|
||||
|
||||
- `services/memory/src/cache/mod.rs` - Add public `evict_expired_entries()` method
|
||||
- `services/memory/src/main.rs` - Call eviction before metrics logging
|
||||
143
services/memory/src/cache/mod.rs
vendored
143
services/memory/src/cache/mod.rs
vendored
@@ -118,7 +118,7 @@ impl SemanticCache {
|
||||
/// Look up a cache entry by query embedding similarity.
|
||||
///
|
||||
/// Computes cosine similarity between the `query_embedding` and each
|
||||
/// cached entry's embedding. Returns the first entry whose similarity
|
||||
/// cached entry's embedding. Returns the highest-similarity entry that
|
||||
/// exceeds `config.similarity_threshold` and whose TTL has not expired.
|
||||
///
|
||||
/// Also filters by `tag_filter` — a cached entry is only a hit if its
|
||||
@@ -140,6 +140,8 @@ impl SemanticCache {
|
||||
let now = Instant::now();
|
||||
let ttl = Duration::from_secs(self.config.ttl_secs);
|
||||
|
||||
let mut best_match: Option<(f32, &CacheEntry)> = None;
|
||||
|
||||
for entry in entries.iter() {
|
||||
// Skip expired entries
|
||||
if now.duration_since(entry.created_at) >= ttl {
|
||||
@@ -151,20 +153,28 @@ impl SemanticCache {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check embedding similarity
|
||||
// Check embedding similarity — track best match
|
||||
let sim = cosine_similarity(query_embedding, &entry.query_embedding);
|
||||
if sim >= self.config.similarity_threshold {
|
||||
self.metrics.hits.fetch_add(1, Ordering::Relaxed);
|
||||
return Some(entry.results.clone());
|
||||
if sim >= self.config.similarity_threshold
|
||||
&& best_match.as_ref().is_none_or(|(best_sim, _)| sim > *best_sim)
|
||||
{
|
||||
best_match = Some((sim, entry));
|
||||
}
|
||||
}
|
||||
|
||||
self.metrics.misses.fetch_add(1, Ordering::Relaxed);
|
||||
None
|
||||
if let Some((_, entry)) = best_match {
|
||||
self.metrics.hits.fetch_add(1, Ordering::Relaxed);
|
||||
Some(entry.results.clone())
|
||||
} else {
|
||||
self.metrics.misses.fetch_add(1, Ordering::Relaxed);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Insert a new cache entry.
|
||||
///
|
||||
/// If a sufficiently similar entry (same tag filter, similarity above
|
||||
/// threshold) already exists, it is replaced instead of adding a duplicate.
|
||||
/// If the cache is at `max_entries` capacity, evicts the oldest entry
|
||||
/// (by `created_at`). Also removes any expired entries during insertion.
|
||||
pub async fn insert(&self, entry: CacheEntry) {
|
||||
@@ -177,6 +187,17 @@ impl SemanticCache {
|
||||
// Evict expired entries first
|
||||
self.evict_expired(&mut entries);
|
||||
|
||||
// Check for an existing sufficiently similar entry (prevent duplicates)
|
||||
if let Some(existing_idx) = entries.iter().position(|existing| {
|
||||
existing.tag_filter == entry.tag_filter
|
||||
&& cosine_similarity(&existing.query_embedding, &entry.query_embedding)
|
||||
>= self.config.similarity_threshold
|
||||
}) {
|
||||
// Replace the existing entry with the new one
|
||||
entries[existing_idx] = entry;
|
||||
return;
|
||||
}
|
||||
|
||||
// If at capacity, remove the oldest entry
|
||||
if entries.len() >= self.config.max_entries {
|
||||
if let Some(oldest_idx) = entries
|
||||
@@ -223,9 +244,20 @@ impl SemanticCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Proactively remove expired entries from the cache.
|
||||
///
|
||||
/// Acquires the write lock and evicts all entries whose TTL has elapsed.
|
||||
/// Intended to be called periodically (e.g., during metrics logging) to
|
||||
/// prevent stale entries from accumulating in long-running services with
|
||||
/// infrequent queries.
|
||||
pub async fn evict_expired_entries(&self) {
|
||||
let mut entries = self.entries.write().await;
|
||||
self.evict_expired(&mut entries);
|
||||
}
|
||||
|
||||
/// Remove expired entries (TTL check).
|
||||
///
|
||||
/// Called during `insert` to lazily clean up expired entries.
|
||||
/// Called during `insert` and `evict_expired_entries` to clean up expired entries.
|
||||
fn evict_expired(&self, entries: &mut Vec<CacheEntry>) {
|
||||
let now = Instant::now();
|
||||
let ttl = Duration::from_secs(self.config.ttl_secs);
|
||||
@@ -586,4 +618,99 @@ mod tests {
|
||||
let m = cache.metrics().await;
|
||||
assert!(m.current_size <= 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_lookup_returns_best_match_not_first() {
|
||||
let config = CacheConfig {
|
||||
enabled: true,
|
||||
similarity_threshold: 0.90,
|
||||
ttl_secs: 300,
|
||||
max_entries: 10,
|
||||
};
|
||||
let cache = SemanticCache::new(config);
|
||||
|
||||
// Insert two entries with similar embeddings that both exceed the threshold
|
||||
// Entry 1: somewhat similar to [1.0, 0.0, 0.0]
|
||||
let emb1 = vec![0.95, 0.31, 0.0]; // cosine sim with [1,0,0] ~ 0.95
|
||||
let mut entry1 = make_entry(emb1, None, vec!["mem-1"]);
|
||||
entry1.query_text = "first entry".to_string();
|
||||
entry1.results[0].rank = 1;
|
||||
cache.insert(entry1).await;
|
||||
|
||||
// Entry 2: more similar (exact match)
|
||||
let emb2 = vec![1.0, 0.0, 0.0]; // cosine sim with [1,0,0] = 1.0
|
||||
let mut entry2 = make_entry(emb2, None, vec!["mem-2"]);
|
||||
entry2.query_text = "second entry".to_string();
|
||||
entry2.results[0].rank = 2;
|
||||
cache.insert(entry2).await;
|
||||
|
||||
// Lookup should return the best match (entry2, rank=2), not the first (entry1, rank=1)
|
||||
let result = cache.lookup(&[1.0, 0.0, 0.0], None).await;
|
||||
assert!(result.is_some());
|
||||
let results = result.unwrap();
|
||||
assert_eq!(results[0].rank, 2, "should return best match (entry2), not first match");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_prevents_duplicate_entries() {
|
||||
let cache = SemanticCache::new(make_config());
|
||||
let embedding = vec![1.0, 0.0, 0.0];
|
||||
|
||||
// Insert same embedding twice
|
||||
let entry1 = make_entry(embedding.clone(), None, vec!["mem-1"]);
|
||||
cache.insert(entry1).await;
|
||||
|
||||
let entry2 = make_entry(embedding.clone(), None, vec!["mem-2"]);
|
||||
cache.insert(entry2).await;
|
||||
|
||||
let m = cache.metrics().await;
|
||||
assert_eq!(m.current_size, 1, "duplicate entry should replace existing, not accumulate");
|
||||
|
||||
// The second insert should have replaced the first
|
||||
let result = cache.lookup(&embedding, None).await;
|
||||
assert!(result.is_some());
|
||||
let results = result.unwrap();
|
||||
assert_eq!(results[0].entry.id, "mem-2", "should have the updated entry");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_insert_allows_different_tag_filter_entries() {
|
||||
let cache = SemanticCache::new(make_config());
|
||||
let embedding = vec![1.0, 0.0, 0.0];
|
||||
|
||||
// Insert same embedding with different tags — should NOT be treated as duplicates
|
||||
let entry1 = make_entry(embedding.clone(), Some("tag-A"), vec!["mem-1"]);
|
||||
cache.insert(entry1).await;
|
||||
|
||||
let entry2 = make_entry(embedding.clone(), Some("tag-B"), vec!["mem-2"]);
|
||||
cache.insert(entry2).await;
|
||||
|
||||
let m = cache.metrics().await;
|
||||
assert_eq!(m.current_size, 2, "different tags should be separate entries");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_evict_expired_entries_public() {
|
||||
let config = CacheConfig {
|
||||
enabled: true,
|
||||
similarity_threshold: 0.95,
|
||||
ttl_secs: 1,
|
||||
max_entries: 10,
|
||||
};
|
||||
let cache = SemanticCache::new(config);
|
||||
let embedding = vec![1.0, 0.0, 0.0];
|
||||
cache.insert(make_entry(embedding, None, vec!["mem-1"])).await;
|
||||
|
||||
assert_eq!(cache.metrics().await.current_size, 1);
|
||||
|
||||
// Wait for TTL to expire
|
||||
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||
|
||||
// Call the public eviction method
|
||||
cache.evict_expired_entries().await;
|
||||
|
||||
let m = cache.metrics().await;
|
||||
assert_eq!(m.current_size, 0, "expired entry should be evicted");
|
||||
assert!(m.evictions >= 1, "should record eviction");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,12 +78,13 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Log cache metrics periodically
|
||||
// Log cache metrics periodically (and proactively evict expired entries)
|
||||
let cache_ref = memory_service.cache().clone();
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
cache_ref.evict_expired_entries().await;
|
||||
let m = cache_ref.metrics().await;
|
||||
tracing::info!(
|
||||
hits = m.hits,
|
||||
|
||||
@@ -14,11 +14,29 @@ use llm_multiverse_proto::llm_multiverse::v1::{
|
||||
MemoryEntry, ProvenanceMetadata, QueryMemoryRequest, QueryMemoryResponse,
|
||||
RevokeMemoryRequest, RevokeMemoryResponse, WriteMemoryRequest, WriteMemoryResponse,
|
||||
};
|
||||
use crate::extraction::prompt::ExtractionResult;
|
||||
use prost_types::Timestamp;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
/// Extract the `(extracted_segment, extraction_confidence)` pair for a given
|
||||
/// rank from an optional extraction results vector.
|
||||
///
|
||||
/// Returns `(None, None)` when extraction was not performed or the rank has
|
||||
/// no corresponding result.
|
||||
fn get_extraction_pair(
|
||||
extraction_results: Option<&Vec<ExtractionResult>>,
|
||||
rank: usize,
|
||||
) -> (Option<String>, Option<f32>) {
|
||||
if let Some(extractions) = extraction_results {
|
||||
if let Some(result) = extractions.get(rank) {
|
||||
return (Some(result.segment.clone()), Some(result.confidence));
|
||||
}
|
||||
}
|
||||
(None, None)
|
||||
}
|
||||
|
||||
/// Memory service implementation backed by DuckDB with vector similarity search.
|
||||
///
|
||||
/// The `db` field holds a shared reference to the [`DuckDbManager`] which manages
|
||||
@@ -189,11 +207,8 @@ impl MemoryService for MemoryServiceImpl {
|
||||
};
|
||||
|
||||
// Cache lookup: check if a semantically similar query was recently cached
|
||||
let tag_filter_ref = if req.memory_type.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(req.memory_type.as_str())
|
||||
};
|
||||
// Reuse the tag_filter already computed for RetrievalParams
|
||||
let tag_filter_ref = params.tag_filter.as_deref();
|
||||
|
||||
if self.cache.config().enabled {
|
||||
if let Some(cached_results) = self.cache.lookup(&query_vector, tag_filter_ref).await {
|
||||
@@ -270,15 +285,7 @@ impl MemoryService for MemoryServiceImpl {
|
||||
.enumerate()
|
||||
.map(|(rank, candidate)| {
|
||||
let (extracted_segment, extraction_confidence) =
|
||||
if let Some(ref extractions) = extraction_results {
|
||||
if let Some(result) = extractions.get(rank) {
|
||||
(Some(result.segment.clone()), Some(result.confidence))
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
get_extraction_pair(extraction_results.as_ref(), rank);
|
||||
CachedResult {
|
||||
rank: (rank + 1) as u32,
|
||||
entry: candidate_to_memory_entry(candidate),
|
||||
@@ -294,11 +301,7 @@ impl MemoryService for MemoryServiceImpl {
|
||||
let cache_entry = CacheEntry {
|
||||
query_embedding: query_vector,
|
||||
query_text: req.query.clone(),
|
||||
tag_filter: if req.memory_type.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(req.memory_type.clone())
|
||||
},
|
||||
tag_filter: params.tag_filter.clone(),
|
||||
results: cached_results,
|
||||
result_memory_ids,
|
||||
created_at: Instant::now(),
|
||||
@@ -311,15 +314,7 @@ impl MemoryService for MemoryServiceImpl {
|
||||
tokio::spawn(async move {
|
||||
for (rank, candidate) in candidates.into_iter().enumerate() {
|
||||
let (extracted_segment, extraction_confidence) =
|
||||
if let Some(ref extractions) = extraction_results {
|
||||
if let Some(result) = extractions.get(rank) {
|
||||
(Some(result.segment.clone()), Some(result.confidence))
|
||||
} else {
|
||||
(None, None)
|
||||
}
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
get_extraction_pair(extraction_results.as_ref(), rank);
|
||||
|
||||
let response = QueryMemoryResponse {
|
||||
rank: (rank + 1) as u32,
|
||||
|
||||
Reference in New Issue
Block a user