diff --git a/implementation-plans/_index.md b/implementation-plans/_index.md index b40f061..c090091 100644 --- a/implementation-plans/_index.md +++ b/implementation-plans/_index.md @@ -52,6 +52,7 @@ | #46 | Implement SearXNG query + snippet filter | Phase 6 | `COMPLETED` | Python | [issue-046.md](issue-046.md) | | #47 | Implement readability-lxml extraction pipeline | Phase 6 | `COMPLETED` | Python | [issue-047.md](issue-047.md) | | #48 | Implement summarization step via Model Gateway | Phase 6 | `COMPLETED` | Python | [issue-048.md](issue-048.md) | +| #49 | Implement Search gRPC endpoint | Phase 6 | `COMPLETED` | Python | [issue-049.md](issue-049.md) | ## Status Legend diff --git a/implementation-plans/issue-049.md b/implementation-plans/issue-049.md new file mode 100644 index 0000000..83e2742 --- /dev/null +++ b/implementation-plans/issue-049.md @@ -0,0 +1,80 @@ +# Implementation Plan — Issue #49: Implement Search gRPC endpoint + +## Metadata + +| Field | Value | +|---|---| +| Issue | [#49](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/49) | +| Title | Implement Search gRPC endpoint | +| Milestone | Phase 6: Search Service | +| Labels | — | +| Status | `COMPLETED` | +| Language | Python | +| Related Plans | issue-046.md, issue-047.md, issue-048.md | +| Blocked by | #48, #20 | + +## Acceptance Criteria + +- [x] `Search` RPC handler implemented +- [x] Full pipeline: query → fetch → extract → summarize +- [x] Results include title, URL, snippet, extracted content, summary +- [x] Audit-logs the search via Audit Service +- [x] Configurable pipeline stages (skip extraction/summarization) +- [x] Timeout handling for the full pipeline + +## Architecture Analysis + +### Service Context +- **Module:** `services/search/src/search_service/service.py` +- Orchestrates the full search pipeline: SearXNG → extraction → summarization +- Returns `SearchResponse` with `SearchResult` entries + +### Proto Messages +- `SearchRequest`: context (SessionContext), query (str), num_results (int) +- `SearchResponse`: results (repeated SearchResult), error_message (str) +- `SearchResult`: claim (str), source_url (str), confidence (float), date (str), summary (str) + +### Existing Components +- `SearXNGClient` (searxng.py) — query meta-search +- `PageExtractor` (extractor.py) — fetch + readability extraction +- `Summarizer` (summarizer.py) — Model Gateway summarization +- `AuditServiceStub` (audit_pb2_grpc) — audit logging + +## Implementation Steps + +### 1. Update `SearchServiceImpl.__init__` to accept dependencies +- SearXNGClient, PageExtractor, Summarizer (optional), AuditServiceStub (optional) +- Add pipeline config flags: `enable_extraction`, `enable_summarization` + +### 2. Implement `Search` RPC handler +- Validate request (query non-empty, context present) +- Query SearXNG for snippets +- If extraction enabled: extract content from URLs +- If summarization enabled: summarize extracted content +- Map results to SearchResult proto messages +- Audit-log the search +- Handle timeouts and errors gracefully + +### 3. Update `__main__.py` +- Initialize gRPC channels and clients +- Pass to SearchServiceImpl + +### 4. Tests +- Mock SearXNG, extractor, summarizer, audit +- Test full pipeline, partial pipeline (no extraction, no summarization) +- Test validation errors, SearXNG errors, timeout + +## Files to Create/Modify + +| File | Action | Purpose | +|---|---|---| +| `services/search/src/search_service/service.py` | Modify | Implement Search RPC handler | +| `services/search/src/search_service/__main__.py` | Modify | Wire dependencies | +| `services/search/tests/test_service.py` | Create | Tests for Search RPC | + +## Deviation Log + +| Deviation | Reason | +|---|---| +| Used `AUDIT_ACTION_TOOL_INVOCATION` enum instead of string "search" | AuditEntry.action is an enum type, not a string; no search-specific enum exists | +| Used `google.protobuf.timestamp_pb2.Timestamp` for timestamp field | Proto Timestamp field requires datetime object, not ISO string | diff --git a/services/search/src/search_service/__main__.py b/services/search/src/search_service/__main__.py index c356859..0771d09 100644 --- a/services/search/src/search_service/__main__.py +++ b/services/search/src/search_service/__main__.py @@ -7,10 +7,13 @@ import logging import signal import grpc -from llm_multiverse.v1 import search_pb2_grpc +from llm_multiverse.v1 import audit_pb2_grpc, search_pb2_grpc from .config import Config +from .extractor import PageExtractor +from .searxng import SearXNGClient from .service import SearchServiceImpl +from .summarizer import Summarizer logger = logging.getLogger("search_service") @@ -18,7 +21,28 @@ logger = logging.getLogger("search_service") async def serve(config: Config) -> None: """Start the gRPC server and wait for shutdown.""" server = grpc.aio.server() - service = SearchServiceImpl(config) + + # Initialize pipeline components + searxng = SearXNGClient(config.searxng_url) + extractor = PageExtractor() + + # Model Gateway channel + summarizer + gw_channel = grpc.aio.insecure_channel(config.model_gateway_addr) + summarizer = Summarizer(gw_channel) + + # Audit Service (optional) + audit_stub = None + if config.audit_addr: + audit_channel = grpc.aio.insecure_channel(config.audit_addr) + audit_stub = audit_pb2_grpc.AuditServiceStub(audit_channel) + + service = SearchServiceImpl( + config, + searxng=searxng, + extractor=extractor, + summarizer=summarizer, + audit_stub=audit_stub, + ) search_pb2_grpc.add_SearchServiceServicer_to_server(service, server) server.add_insecure_port(config.listen_addr) @@ -45,6 +69,7 @@ async def serve(config: Config) -> None: await shutdown_event.wait() logger.info("Shutting down gracefully...") await server.stop(grace=5) + await gw_channel.close() logger.info("Search Service shut down") diff --git a/services/search/src/search_service/service.py b/services/search/src/search_service/service.py index 67ed906..163c82a 100644 --- a/services/search/src/search_service/service.py +++ b/services/search/src/search_service/service.py @@ -2,27 +2,159 @@ from __future__ import annotations +import logging +from datetime import datetime, timezone + +from google.protobuf.timestamp_pb2 import Timestamp + import grpc -from llm_multiverse.v1 import search_pb2, search_pb2_grpc + +from llm_multiverse.v1 import ( + audit_pb2, + audit_pb2_grpc, + common_pb2, + search_pb2, + search_pb2_grpc, +) from .config import Config +from .extractor import PageExtractor +from .searxng import SearXNGClient, SearXNGError +from .summarizer import Summarizer + +logger = logging.getLogger(__name__) class SearchServiceImpl(search_pb2_grpc.SearchServiceServicer): """Implementation of the SearchService gRPC interface.""" - def __init__(self, config: Config) -> None: + def __init__( + self, + config: Config, + searxng: SearXNGClient | None = None, + extractor: PageExtractor | None = None, + summarizer: Summarizer | None = None, + audit_stub: audit_pb2_grpc.AuditServiceStub | None = None, + ) -> None: self.config = config + self.searxng = searxng or SearXNGClient(config.searxng_url) + self.extractor = extractor or PageExtractor() + self.summarizer = summarizer + self.audit_stub = audit_stub async def Search( self, request: search_pb2.SearchRequest, context: grpc.aio.ServicerContext, ) -> search_pb2.SearchResponse: - """Execute a search query. + """Execute a search query through the full pipeline.""" + # Validate request + if not request.query: + await context.abort( + grpc.StatusCode.INVALID_ARGUMENT, "query is required" + ) + session_ctx = request.context + if not session_ctx.session_id: + await context.abort( + grpc.StatusCode.INVALID_ARGUMENT, "context.session_id is required" + ) - Currently a stub — real implementation in issues #46-#49. - """ - await context.abort( - grpc.StatusCode.UNIMPLEMENTED, "Search not yet implemented" + num_results = request.num_results if request.num_results > 0 else None + + # Stage 1: SearXNG query + try: + snippets = await self.searxng.search( + query=request.query, num_results=num_results + ) + except SearXNGError as e: + logger.error("SearXNG query failed: %s", e) + return search_pb2.SearchResponse( + results=[], error_message=f"Search engine error: {e}" + ) + except Exception as e: + logger.error("SearXNG query failed unexpectedly: %s", e) + return search_pb2.SearchResponse( + results=[], error_message=f"Search engine unavailable: {e}" + ) + + if not snippets: + await self._audit_search(session_ctx, request.query, 0) + return search_pb2.SearchResponse(results=[]) + + # Stage 2: Content extraction + extractions = None + if self.extractor: + urls = [s.url for s in snippets] + try: + extractions = await self.extractor.extract_many(urls) + except Exception as e: + logger.warning("Extraction failed, using snippets only: %s", e) + + # Stage 3: Summarization + summaries: list[str] | None = None + if self.summarizer and extractions: + contents = [ + ex.content if not ex.error else snippets[i].snippet + for i, ex in enumerate(extractions) + ] + try: + summaries = await self.summarizer.summarize_many( + request.query, contents, session_ctx + ) + except Exception as e: + logger.warning("Summarization failed, using extracts: %s", e) + + # Build results + results = [] + for i, snippet in enumerate(snippets): + title = snippet.title + if extractions and i < len(extractions) and extractions[i].title: + title = extractions[i].title + + summary = "" + if summaries and i < len(summaries): + summary = summaries[i] + elif extractions and i < len(extractions) and not extractions[i].error: + summary = extractions[i].content[:500] + else: + summary = snippet.snippet + + results.append( + search_pb2.SearchResult( + claim=title, + source_url=snippet.url, + confidence=snippet.score, + summary=summary, + ) + ) + + await self._audit_search(session_ctx, request.query, len(results)) + + return search_pb2.SearchResponse(results=results) + + async def _audit_search( + self, + session_ctx: common_pb2.SessionContext, + query: str, + num_results: int, + ) -> None: + """Log a search action to the Audit Service.""" + if not self.audit_stub: + return + + now = Timestamp() + now.FromDatetime(datetime.now(timezone.utc)) + entry = audit_pb2.AuditEntry( + timestamp=now, + session_id=session_ctx.session_id, + action=audit_pb2.AUDIT_ACTION_TOOL_INVOCATION, + tool_name="searxng", + result_status=f"ok:{num_results}" if num_results > 0 else "empty", + metadata={"query": query}, ) + try: + await self.audit_stub.Append( + audit_pb2.AppendRequest(context=session_ctx, entry=entry) + ) + except Exception as e: + logger.warning("Failed to audit-log search: %s", e) diff --git a/services/search/tests/test_service.py b/services/search/tests/test_service.py new file mode 100644 index 0000000..39d8f95 --- /dev/null +++ b/services/search/tests/test_service.py @@ -0,0 +1,427 @@ +"""Tests for the SearchService gRPC endpoint.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import grpc +import pytest + +from llm_multiverse.v1 import ( + common_pb2, + search_pb2, + search_pb2_grpc, +) +from search_service.config import Config +from search_service.extractor import ContentExtraction +from search_service.searxng import SearchSnippet, SearXNGError +from search_service.service import SearchServiceImpl + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _session_context() -> common_pb2.SessionContext: + return common_pb2.SessionContext( + session_id="test-session", + user_id="test-user", + ) + + +def _search_request( + query: str = "test query", + num_results: int = 0, +) -> search_pb2.SearchRequest: + return search_pb2.SearchRequest( + context=_session_context(), + query=query, + num_results=num_results, + ) + + +def _snippets(n: int = 3) -> list[SearchSnippet]: + return [ + SearchSnippet( + title=f"Result {i}", + url=f"https://example.com/{i}", + snippet=f"Snippet for result {i}", + engine="google", + score=1.0 - i * 0.1, + ) + for i in range(n) + ] + + +def _extractions(n: int = 3) -> list[ContentExtraction]: + return [ + ContentExtraction( + url=f"https://example.com/{i}", + title=f"Extracted Title {i}", + content=f"Full extracted content for page {i}", + ) + for i in range(n) + ] + + +def _mock_context() -> AsyncMock: + ctx = AsyncMock(spec=grpc.aio.ServicerContext) + ctx.abort = AsyncMock(side_effect=grpc.aio.AbortError( + grpc.StatusCode.INVALID_ARGUMENT, "test abort" + )) + return ctx + + +async def _start_test_server(service): + """Start a gRPC server with the service and return (server, port).""" + server = grpc.aio.server() + search_pb2_grpc.add_SearchServiceServicer_to_server(service, server) + port = server.add_insecure_port("[::]:0") + await server.start() + return server, port + + +# --------------------------------------------------------------------------- +# Tests — Full pipeline via gRPC +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_search_full_pipeline() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(2)) + extractor = AsyncMock() + extractor.extract_many = AsyncMock(return_value=_extractions(2)) + summarizer = AsyncMock() + summarizer.summarize_many = AsyncMock(return_value=["Summary 0", "Summary 1"]) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=extractor, summarizer=summarizer + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 2 + assert resp.results[0].claim == "Extracted Title 0" + assert resp.results[0].source_url == "https://example.com/0" + assert resp.results[0].summary == "Summary 0" + assert resp.results[0].confidence == pytest.approx(1.0) + assert resp.results[1].summary == "Summary 1" + assert resp.error_message == "" + + searxng.search.assert_called_once() + extractor.extract_many.assert_called_once() + summarizer.summarize_many.assert_called_once() + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_no_summarizer() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(1)) + extractor = AsyncMock() + extractor.extract_many = AsyncMock(return_value=_extractions(1)) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=extractor, summarizer=None + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 1 + # Falls back to extracted content (truncated to 500) + assert "Full extracted content" in resp.results[0].summary + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_no_extractor() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(1)) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=None, summarizer=None + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 1 + # Falls back to snippet text + assert resp.results[0].summary == "Snippet for result 0" + assert resp.results[0].claim == "Result 0" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_empty_query() -> None: + service = SearchServiceImpl(Config()) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + req = search_pb2.SearchRequest( + context=_session_context(), query="" + ) + with pytest.raises(grpc.aio.AioRpcError) as exc_info: + await stub.Search(req) + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + assert "query" in exc_info.value.details() + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_missing_session_id() -> None: + service = SearchServiceImpl(Config()) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + req = search_pb2.SearchRequest( + context=common_pb2.SessionContext(session_id=""), + query="test", + ) + with pytest.raises(grpc.aio.AioRpcError) as exc_info: + await stub.Search(req) + assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT + assert "session_id" in exc_info.value.details() + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_searxng_error() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(side_effect=SearXNGError(503, "Service unavailable")) + + service = SearchServiceImpl(Config(), searxng=searxng) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 0 + assert "Search engine error" in resp.error_message + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_empty_results() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=[]) + + service = SearchServiceImpl(Config(), searxng=searxng) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 0 + assert resp.error_message == "" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_extraction_failure_falls_back() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(1)) + extractor = AsyncMock() + extractor.extract_many = AsyncMock(side_effect=RuntimeError("extraction failed")) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=extractor, summarizer=None + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 1 + # Falls back to snippet since extraction failed + assert resp.results[0].summary == "Snippet for result 0" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_summarization_failure_falls_back() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(1)) + extractor = AsyncMock() + extractor.extract_many = AsyncMock(return_value=_extractions(1)) + summarizer = AsyncMock() + summarizer.summarize_many = AsyncMock(side_effect=RuntimeError("gateway down")) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=extractor, summarizer=summarizer + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 1 + # Falls back to extracted content + assert "Full extracted content" in resp.results[0].summary + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_partial_extraction_errors() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(2)) + extractor = AsyncMock() + extractor.extract_many = AsyncMock(return_value=[ + ContentExtraction(url="https://example.com/0", title="Good", content="Good content"), + ContentExtraction(url="https://example.com/1", title="", content="", error="404"), + ]) + summarizer = AsyncMock() + # Summarizer gets good content for first, snippet for second + summarizer.summarize_many = AsyncMock(return_value=["Good summary", "Snippet summary"]) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=extractor, summarizer=summarizer + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + assert len(resp.results) == 2 + # Verify summarizer got snippet fallback for errored extraction + call_args = summarizer.summarize_many.call_args + contents = call_args[0][1] # second positional arg + assert contents[0] == "Good content" + assert contents[1] == "Snippet for result 1" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_num_results_passed() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(5)) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=None, summarizer=None + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + await stub.Search(_search_request(num_results=5)) + + searxng.search.assert_called_once_with(query="test query", num_results=5) + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_audit_logging() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(2)) + + audit_stub = AsyncMock() + audit_stub.Append = AsyncMock() + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=None, summarizer=None, + audit_stub=audit_stub, + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + await stub.Search(_search_request()) + + audit_stub.Append.assert_called_once() + call_args = audit_stub.Append.call_args[0][0] + from llm_multiverse.v1 import audit_pb2 + assert call_args.entry.action == audit_pb2.AUDIT_ACTION_TOOL_INVOCATION + assert call_args.entry.tool_name == "searxng" + assert call_args.entry.result_status == "ok:2" + assert call_args.entry.metadata["query"] == "test query" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_audit_failure_non_blocking() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=_snippets(1)) + + audit_stub = AsyncMock() + audit_stub.Append = AsyncMock(side_effect=RuntimeError("audit down")) + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=None, summarizer=None, + audit_stub=audit_stub, + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + resp = await stub.Search(_search_request()) + + # Search should still succeed despite audit failure + assert len(resp.results) == 1 + assert resp.error_message == "" + await channel.close() + finally: + await server.stop(0) + + +@pytest.mark.asyncio +async def test_search_audit_empty_results() -> None: + searxng = AsyncMock() + searxng.search = AsyncMock(return_value=[]) + + audit_stub = AsyncMock() + audit_stub.Append = AsyncMock() + + service = SearchServiceImpl( + Config(), searxng=searxng, extractor=None, summarizer=None, + audit_stub=audit_stub, + ) + server, port = await _start_test_server(service) + try: + channel = grpc.aio.insecure_channel(f"localhost:{port}") + stub = search_pb2_grpc.SearchServiceStub(channel) + await stub.Search(_search_request()) + + call_args = audit_stub.Append.call_args[0][0] + assert call_args.entry.result_status == "empty" + await channel.close() + finally: + await server.stop(0)