import { createClient, ConnectError, Code } from '@connectrpc/connect'; import { createGrpcWebTransport } from '@connectrpc/connect-web'; import { OrchestratorService } from '$lib/proto/llm_multiverse/v1/orchestrator_pb'; import type { ProcessRequestResponse, SessionConfig } from '$lib/proto/llm_multiverse/v1/orchestrator_pb'; import { create } from '@bufbuild/protobuf'; import { ProcessRequestRequestSchema } from '$lib/proto/llm_multiverse/v1/orchestrator_pb'; import { connectionStore } from '$lib/stores/connection.svelte'; import { toastStore } from '$lib/stores/toast.svelte'; import { logger } from '$lib/utils/logger'; /** * Application-level error wrapping gRPC status codes. */ export class OrchestratorError extends Error { constructor( message: string, public readonly code: string, public readonly details?: string ) { super(message); this.name = 'OrchestratorError'; } } /** * Map gRPC status codes to user-friendly messages. */ const GRPC_USER_MESSAGES: Record = { unavailable: 'The server is currently unavailable. Please try again later.', deadline_exceeded: 'The request timed out. Please try again.', cancelled: 'The request was cancelled.', not_found: 'The requested resource was not found.', already_exists: 'The resource already exists.', permission_denied: 'You do not have permission to perform this action.', resource_exhausted: 'Rate limit reached. Please wait a moment and try again.', failed_precondition: 'The operation cannot be performed in the current state.', aborted: 'The operation was aborted. Please try again.', unimplemented: 'This feature is not yet available.', internal: 'An internal server error occurred. Please try again.', unauthenticated: 'Authentication required. Please log in.', data_loss: 'Data loss detected. Please contact support.', unknown: 'An unexpected error occurred. Please try again.' }; /** * Codes considered transient / retriable. */ const TRANSIENT_CODES = new Set(['unavailable', 'deadline_exceeded', 'aborted', 'internal']); /** * Return a user-friendly message for a gRPC error code. */ export function friendlyMessage(code: string): string { return GRPC_USER_MESSAGES[code.toLowerCase()] ?? GRPC_USER_MESSAGES['unknown']; } /** * Whether the given error code is transient and should be retried. */ function isTransient(code: string): boolean { return TRANSIENT_CODES.has(code.toLowerCase()); } const DEFAULT_ENDPOINT = '/'; let transport: ReturnType | null = null; function getTransport(endpoint?: string) { if (!transport) { transport = createGrpcWebTransport({ baseUrl: endpoint ?? DEFAULT_ENDPOINT }); } return transport; } /** * Reset the transport (useful for reconfiguring the endpoint). */ export function resetTransport(newEndpoint?: string): void { transport = null; if (newEndpoint !== undefined) { transport = createGrpcWebTransport({ baseUrl: newEndpoint }); } } /** * Create a configured orchestrator client. */ function getClient() { return createClient(OrchestratorService, getTransport()); } /** * Sleep for a given number of milliseconds. */ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * Calculate exponential backoff delay with jitter. * Base delay doubles each attempt: 1s, 2s, 4s (capped at 8s). */ function backoffDelay(attempt: number): number { const base = Math.min(1000 * Math.pow(2, attempt), 8000); const jitter = Math.random() * base * 0.25; return base + jitter; } const MAX_RETRIES = 3; /** * Map numeric Code enum values to lowercase string names matching GRPC_USER_MESSAGES keys. */ const CODE_TO_STRING: Record = { [Code.Canceled]: 'cancelled', [Code.Unknown]: 'unknown', [Code.InvalidArgument]: 'failed_precondition', [Code.DeadlineExceeded]: 'deadline_exceeded', [Code.NotFound]: 'not_found', [Code.AlreadyExists]: 'already_exists', [Code.PermissionDenied]: 'permission_denied', [Code.ResourceExhausted]: 'resource_exhausted', [Code.FailedPrecondition]: 'failed_precondition', [Code.Aborted]: 'aborted', [Code.OutOfRange]: 'failed_precondition', [Code.Unimplemented]: 'unimplemented', [Code.Internal]: 'internal', [Code.Unavailable]: 'unavailable', [Code.DataLoss]: 'data_loss', [Code.Unauthenticated]: 'unauthenticated' }; /** * Extract gRPC error code from an error, normalising to lowercase string. */ function extractCode(err: unknown): string { if (err instanceof ConnectError) { return CODE_TO_STRING[err.code] ?? 'unknown'; } if (err instanceof Error && 'code' in err) { const raw = (err as { code: unknown }).code; if (typeof raw === 'string') return raw.toLowerCase(); if (typeof raw === 'number') return String(raw); } return 'unknown'; } /** * Wrap an error into an OrchestratorError with a friendly message. */ function toOrchestratorError(err: unknown): OrchestratorError { if (err instanceof OrchestratorError) return err; if (err instanceof ConnectError) { const code = extractCode(err); const details = [ err.rawMessage, err.cause ? String(err.cause) : '' ].filter(Boolean).join('; '); return new OrchestratorError(friendlyMessage(code), code, details || undefined); } if (err instanceof Error) { const code = extractCode(err); return new OrchestratorError(friendlyMessage(code), code, err.message); } return new OrchestratorError(friendlyMessage('unknown'), 'unknown'); } /** * Append a diagnostic code suffix to a message, e.g. "(code: unavailable)". */ function diagnosticSuffix(err: OrchestratorError): string { return err.code && err.code !== 'unknown' ? ` (code: ${err.code})` : ''; } /** * Send a request to the orchestrator and yield streaming responses. * * Includes automatic retry with exponential backoff for transient failures. * Updates the connection status store on success or failure and fires * toast notifications on errors. * * Returns an async iterator of `ProcessRequestResponse` messages, * each containing the current orchestration state, status message, * and optionally intermediate or final results. */ export async function* processRequest( sessionId: string, userMessage: string, sessionConfig?: SessionConfig ): AsyncGenerator { const request = create(ProcessRequestRequestSchema, { sessionId, userMessage, sessionConfig }); logger.debug('orchestrator', 'processRequest', { sessionId, messageLength: userMessage.length }); let lastError: OrchestratorError | null = null; for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { // Skip retries if already known disconnected if (connectionStore.status === 'disconnected' && attempt > 0) { break; } if (attempt > 0) { connectionStore.setReconnecting(); const delay = backoffDelay(attempt - 1); logger.warn('orchestrator', `Retry attempt ${attempt}/${MAX_RETRIES}`, { sessionId, previousCode: lastError?.code, delay }); await sleep(delay); } try { const client = getClient(); for await (const response of client.processRequest(request)) { connectionStore.reportSuccess(); yield response; } logger.debug('orchestrator', 'Stream completed', { sessionId }); // Completed successfully — no retry needed return; } catch (err: unknown) { logger.grpcError('orchestrator', `Request failed (attempt ${attempt + 1}/${MAX_RETRIES + 1})`, err); lastError = toOrchestratorError(err); const code = lastError.code; if (isTransient(code) && attempt < MAX_RETRIES) { // Will retry — continue loop connectionStore.reportFailure(); continue; } // Non-transient or exhausted retries connectionStore.reportFailure(); const suffix = diagnosticSuffix(lastError); logger.error('orchestrator', 'Request failed permanently', { code: lastError.code, details: lastError.details }); toastStore.addToast({ message: lastError.message + suffix, type: 'error' }); throw lastError; } } // Should not reach here, but guard against it if (lastError) { throw lastError; } }