Files
llm-multiverse/implementation-plans/issue-041.md
Pi Agent 120f5e5bd1 chore: mark issue #41 as COMPLETED
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 14:12:45 +01:00

11 KiB

Implementation Plan — Issue #41: Implement StreamInference gRPC endpoint

Metadata

Field Value
Issue #41
Title Implement StreamInference gRPC endpoint
Milestone Phase 5: Model Gateway
Labels
Status COMPLETED
Language Rust
Related Plans issue-038.md, issue-039.md, issue-040.md
Blocked by #40

Acceptance Criteria

  • StreamInference RPC handler implemented as server-streaming
  • Routes request through model routing logic
  • Streams tokens from Ollama HTTP streaming response
  • Includes usage metadata (token counts) in final message
  • Proper error handling for model loading failures

Architecture Analysis

Service Context

  • Service: model-gateway (services/model-gateway)
  • gRPC endpoint: ModelGatewayService::StreamInference (server-streaming)
  • Proto messages: StreamInferenceRequest (contains InferenceParams), StreamInferenceResponse (contains token, optional finish_reason)

Existing Patterns

  • Server-streaming with mpsc channel: Memory service's QueryMemory in services/memory/src/service.rs (line 268-448) uses tokio::sync::mpsc::channel + ReceiverStream + tokio::spawn to stream results. The StreamInferenceStream type alias is already declared as ReceiverStream<Result<StreamInferenceResponse, Status>> in service.rs (line 100-101).
  • Ollama streaming: OllamaClient::generate_stream() in services/model-gateway/src/ollama/client.rs (line 67-92) returns Pin<Box<dyn Stream<Item = Result<GenerateStreamChunk, OllamaError>> + Send>>. Each GenerateStreamChunk has response (token text), done (bool), done_reason (optional), eval_count (optional), and prompt_eval_count (optional).
  • Model routing: ModelRouter::resolve_model() in routing.rs takes task_complexity: i32 and model_hint: Option<&str>, returns the resolved Ollama model name.
  • Audit logging: audit_log_inference() helper already exists in service.rs (line 50-90).

Dependencies

  • OllamaClient (already initialized in ModelGatewayServiceImpl)
  • ModelRouter (already initialized)
  • Audit service client (optional, already wired)
  • tokio::sync::mpsc, tokio_stream::wrappers::ReceiverStream (already imported)
  • futures::StreamExt (needed for .next() on Ollama stream)

Implementation Steps

1. Types & Configuration — params_to_options() helper

Create a pub(crate) fn params_to_options(params: &InferenceParams) -> Option<GenerateOptions> helper function in service.rs. This maps proto InferenceParams fields to Ollama's GenerateOptions:

InferenceParams field GenerateOptions field Mapping
temperature (optional float) temperature Direct pass-through
top_p (optional float) top_p Direct pass-through
max_tokens (uint32) num_predict Cast u32 to i32; if max_tokens == 0, set to None (Ollama default)
stop_sequences (repeated string) stop If empty, None; otherwise Some(vec)

Return None if all fields are at their defaults (no temperature, no top_p, max_tokens=0, no stop_sequences) to avoid sending an empty options object.

This helper is deliberately pub(crate) so issue #42 (Inference endpoint) can reuse it.

2. Core Logic — stream_inference handler

Replace the stub in the #[tonic::async_trait] impl ModelGatewayService block:

a) Extract and validate request:

let req = request.into_inner();
let params = req.params
    .ok_or_else(|| Status::invalid_argument("params is required"))?;
let ctx = params.context.clone()
    .ok_or_else(|| Status::invalid_argument("params.context is required"))?;
if ctx.session_id.is_empty() {
    return Err(Status::invalid_argument("context.session_id is required"));
}
if params.prompt.is_empty() {
    return Err(Status::invalid_argument("prompt is required"));
}

b) Resolve model via router:

let model_name = self.router.resolve_model(
    params.task_complexity,
    params.model_hint.as_deref(),
);

Where params.task_complexity is the i32 enum value from the proto (0=UNSPECIFIED, 1=SIMPLE, 2=COMPLEX).

c) Map params to Ollama options:

let options = params_to_options(&params);

d) Audit log (best-effort, before streaming starts):

if let Some(audit_client) = &self.audit_client {
    audit_log_inference(
        audit_client,
        &ctx,
        &model_name,
        params.prompt.len(),
        params.task_complexity,
        "StreamInference",
        "started",
    ).await;
}

e) Call Ollama streaming API:

let ollama_stream = self.ollama.generate_stream(&model_name, &params.prompt, options)
    .await
    .map_err(|e| match &e {
        OllamaError::Api { status, message } if *status == 404 => {
            Status::not_found(format!("model '{}' not found: {}", model_name, message))
        }
        OllamaError::Api { status, message } => {
            Status::internal(format!("Ollama error ({}): {}", status, message))
        }
        OllamaError::Http(e) => {
            Status::unavailable(format!("Ollama unreachable: {}", e))
        }
        _ => Status::internal(format!("Ollama error: {}", e)),
    })?;

f) Bridge Ollama stream to gRPC stream via mpsc channel:

let (tx, rx) = tokio::sync::mpsc::channel(32);

tokio::spawn(async move {
    let mut stream = ollama_stream;
    while let Some(chunk_result) = stream.next().await {
        match chunk_result {
            Ok(chunk) => {
                let finish_reason = if chunk.done {
                    Some(chunk.done_reason.unwrap_or_else(|| "stop".to_string()))
                } else {
                    None
                };
                let response = StreamInferenceResponse {
                    token: chunk.response,
                    finish_reason,
                };
                if tx.send(Ok(response)).await.is_err() {
                    break; // Client disconnected
                }
            }
            Err(e) => {
                let _ = tx.send(Err(Status::internal(format!("stream error: {}", e)))).await;
                break;
            }
        }
    }
});

Ok(Response::new(ReceiverStream::new(rx)))

Key design decisions:

  • Channel capacity of 32 provides backpressure without blocking the Ollama stream excessively.
  • Each GenerateStreamChunk maps 1:1 to a StreamInferenceResponse.
  • Non-done chunks have finish_reason = None; the final chunk (done=true) carries done_reason as finish_reason (defaulting to "stop" if Ollama omits it).
  • Token counts (eval_count, prompt_eval_count) are present only on the final chunk from Ollama. The current proto StreamInferenceResponse only has token and finish_reason -- there is no field for usage metadata. The acceptance criteria mentions "usage metadata in final message", but the proto does not have these fields. The implementation will log token counts via tracing on the final chunk. If the proto is later extended with usage fields, they can be populated from the final chunk.
  • If the Ollama stream yields an error mid-stream, send Status::internal through the channel and terminate.

3. gRPC Handler Wiring

No new wiring needed. The stream_inference method is already declared in the impl ModelGatewayService block with the correct StreamInferenceStream type alias. The stub just needs to be replaced with the real implementation from step 2.

4. Service Integration

  • Ollama: Already initialized as self.ollama in ModelGatewayServiceImpl.
  • ModelRouter: Already initialized as self.router.
  • Audit client: Already wired via self.audit_client and audit_log_inference().
  • Imports needed: Add futures::StreamExt to the imports in service.rs. Add use crate::ollama::types::GenerateOptions and use crate::ollama::error::OllamaError (or import through crate::ollama::* if re-exported).

Check the ollama/mod.rs to see what is re-exported; may need to add GenerateOptions to the public API if not already exported.

5. Tests

Unit tests in service.rs

a) test_params_to_options_all_defaultsInferenceParams with zero/empty values yields None.

b) test_params_to_options_temperature_only — Only temperature set, returns Some(GenerateOptions { temperature: Some(0.7), .. }).

c) test_params_to_options_all_fields — All fields populated: temperature, top_p, max_tokens=100, stop_sequences=["STOP"]. Verify num_predict is Some(100), stop is Some(vec!["STOP"]).

d) test_params_to_options_max_tokens_zero_is_nonemax_tokens=0 maps to num_predict=None.

e) test_stream_inference_missing_params — Request with params: None returns Status::invalid_argument.

f) test_stream_inference_missing_context — Request with params but no context returns Status::invalid_argument.

g) test_stream_inference_empty_prompt — Empty prompt returns Status::invalid_argument.

h) Remove test_stream_inference_unimplemented — The stub test is no longer valid.

Integration tests

Full streaming tests require a running Ollama instance or wiremock. These are better suited for issue #43 (integration tests). For this issue, focus on the params_to_options helper and request validation tests that do not require an Ollama connection.

Files to Create/Modify

File Action Purpose
services/model-gateway/src/service.rs Modify Replace stream_inference stub with real implementation; add params_to_options() helper; add unit tests; add futures::StreamExt and Ollama type imports
services/model-gateway/src/ollama/mod.rs Modify (if needed) Ensure GenerateOptions and OllamaError are re-exported

Risks and Edge Cases

  • Proto lacks usage fields: StreamInferenceResponse has no tokens_used / prompt_tokens field. Token counts from the final Ollama chunk will be logged via tracing but cannot be sent to the client. If the proto is extended later, this is a small change.
  • Model not loaded in Ollama: Ollama returns 404 if the model is not pulled. The handler maps this to Status::not_found with a descriptive message. Ollama may also auto-pull (depending on config), causing a long delay before streaming starts -- this is acceptable and handled by the reqwest timeout.
  • Client disconnects mid-stream: The tx.send().await.is_err() check detects this and breaks the loop, dropping the Ollama stream (which closes the HTTP connection).
  • Ollama stream error mid-way: Send Status::internal through the channel. The gRPC client receives the error as a trailing status after any tokens already sent.
  • Large responses without backpressure: The channel capacity (32) provides natural backpressure. If the client is slow to consume, the spawned task blocks on tx.send(), which in turn stops reading from the Ollama stream.

Deviation Log

(Filled during implementation if deviations from plan occur)

Deviation Reason