Compare commits

...

2 Commits

Author SHA1 Message Date
986584b759 Merge pull request 'test: integration tests for Search Service (#50)' (#148) from feature/issue-50-search-integration-tests into main 2026-03-10 15:51:33 +01:00
Pi Agent
cd75318f45 test: add integration tests for Search Service (issue #50)
8 integration tests wiring real service components with mocked external
services (SearXNG via aioresponses, Model Gateway/Audit via mock gRPC
servers). Tests cover: full pipeline with all fields populated, clean
text extraction, summarization, unreachable URL handling, audit logging,
SearXNG unavailability, result ordering, and Model Gateway fallback.

Total: 71 tests passing across the Search Service.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 15:51:13 +01:00
3 changed files with 651 additions and 0 deletions

View File

@@ -53,6 +53,7 @@
| #47 | Implement readability-lxml extraction pipeline | Phase 6 | `COMPLETED` | Python | [issue-047.md](issue-047.md) |
| #48 | Implement summarization step via Model Gateway | Phase 6 | `COMPLETED` | Python | [issue-048.md](issue-048.md) |
| #49 | Implement Search gRPC endpoint | Phase 6 | `COMPLETED` | Python | [issue-049.md](issue-049.md) |
| #50 | Integration tests for Search Service | Phase 6 | `COMPLETED` | Python | [issue-050.md](issue-050.md) |
## Status Legend

View File

@@ -0,0 +1,49 @@
# Implementation Plan — Issue #50: Integration tests for Search Service
## Metadata
| Field | Value |
|---|---|
| Issue | [#50](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/50) |
| Title | Integration tests for Search Service |
| Milestone | Phase 6: Search Service |
| Labels | — |
| Status | `COMPLETED` |
| Language | Python |
| Related Plans | issue-049.md, issue-046.md, issue-047.md, issue-048.md |
| Blocked by | #49 |
## Acceptance Criteria
- [x] Test: Search returns results with all fields populated
- [x] Test: Extraction produces clean text from HTML pages
- [x] Test: Summarization produces concise relevant summaries
- [x] Test: Handles unreachable URLs gracefully
- [x] Test: Audit logging for search operations
- [x] Tests run in CI (uses aioresponses + mock gRPC servers, no containers needed)
## Architecture Analysis
### Approach
Integration tests wire together real service components (SearXNGClient, PageExtractor, Summarizer, SearchServiceImpl) with mocked external services:
- SearXNG HTTP API → mocked via `aioresponses`
- Model Gateway gRPC → mocked via in-process gRPC server
- Audit Service gRPC → mocked via in-process gRPC server
- Web pages for extraction → mocked via `aioresponses`
### Difference from Unit Tests
- `test_service.py` uses `AsyncMock` for all dependencies
- Integration tests use real component instances with only external HTTP/gRPC mocked
## Files to Create/Modify
| File | Action | Purpose |
|---|---|---|
| `services/search/tests/test_integration.py` | Create | Integration tests |
| `implementation-plans/issue-050.md` | Create | Plan |
| `implementation-plans/_index.md` | Modify | Add entry |
## Deviation Log
| Deviation | Reason |
|---|---|

View File

@@ -0,0 +1,601 @@
"""Integration tests for the Search Service.
These tests wire together real service components with mocked external
services (SearXNG HTTP, Model Gateway gRPC, Audit Service gRPC, web pages).
"""
from __future__ import annotations
import re
import grpc
import pytest
from aioresponses import aioresponses
from llm_multiverse.v1 import (
audit_pb2,
audit_pb2_grpc,
common_pb2,
model_gateway_pb2,
model_gateway_pb2_grpc,
search_pb2,
search_pb2_grpc,
)
from search_service.config import Config
from search_service.extractor import PageExtractor
from search_service.searxng import SearXNGClient
from search_service.service import SearchServiceImpl
from search_service.summarizer import Summarizer
# ---------------------------------------------------------------------------
# SearXNG mock helpers
# ---------------------------------------------------------------------------
SEARXNG_URL = "http://searxng-test:8080"
SEARXNG_PATTERN = re.compile(r"^http://searxng-test:8080/search\?.*$")
SIMPLE_PAGE_HTML = """
<!DOCTYPE html>
<html>
<head><title>Test Article</title></head>
<body>
<article>
<h1>Understanding Python Async</h1>
<p>Python's asyncio library enables concurrent programming. It uses
coroutines and event loops to handle I/O-bound operations efficiently.</p>
<p>The async/await syntax was introduced in Python 3.5 and has become
the standard approach for writing asynchronous code.</p>
</article>
</body>
</html>
"""
UNREACHABLE_PAGE_HTML = "" # Won't be served — simulates unreachable URL
def _searxng_response(results: list[dict]) -> dict:
return {"results": results}
def _searxng_result(
title: str, url: str, content: str, score: float = 1.0
) -> dict:
return {
"title": title,
"url": url,
"content": content,
"score": score,
"engine": "google",
}
# ---------------------------------------------------------------------------
# Mock Model Gateway gRPC server
# ---------------------------------------------------------------------------
class MockModelGatewayServicer(model_gateway_pb2_grpc.ModelGatewayServiceServicer):
def __init__(self) -> None:
self.requests: list[model_gateway_pb2.InferenceRequest] = []
async def Inference(self, request, context): # noqa: N802
self.requests.append(request)
return model_gateway_pb2.InferenceResponse(
text=f"Summary of content about: {request.params.prompt[:50]}",
finish_reason="stop",
tokens_used=15,
)
async def StreamInference(self, request, context): # noqa: N802
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
async def GenerateEmbedding(self, request, context): # noqa: N802
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
# ---------------------------------------------------------------------------
# Mock Audit Service gRPC server
# ---------------------------------------------------------------------------
class MockAuditServicer(audit_pb2_grpc.AuditServiceServicer):
def __init__(self) -> None:
self.entries: list[audit_pb2.AppendRequest] = []
async def Append(self, request, context): # noqa: N802
self.entries.append(request)
return audit_pb2.AppendResponse()
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _session_context() -> common_pb2.SessionContext:
return common_pb2.SessionContext(
session_id="integration-session",
user_id="integration-user",
)
async def _start_grpc_server(servicer, add_fn):
server = grpc.aio.server()
add_fn(servicer, server)
port = server.add_insecure_port("[::]:0")
await server.start()
return server, port
# ---------------------------------------------------------------------------
# Integration Tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_full_pipeline_returns_populated_results() -> None:
"""Search returns results with all fields populated through the full pipeline."""
gw_servicer = MockModelGatewayServicer()
audit_servicer = MockAuditServicer()
gw_server, gw_port = await _start_grpc_server(
gw_servicer,
model_gateway_pb2_grpc.add_ModelGatewayServiceServicer_to_server,
)
audit_server, audit_port = await _start_grpc_server(
audit_servicer,
audit_pb2_grpc.add_AuditServiceServicer_to_server,
)
try:
gw_channel = grpc.aio.insecure_channel(f"localhost:{gw_port}")
audit_channel = grpc.aio.insecure_channel(f"localhost:{audit_port}")
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
extractor = PageExtractor()
summarizer = Summarizer(gw_channel)
audit_stub = audit_pb2_grpc.AuditServiceStub(audit_channel)
service = SearchServiceImpl(
config,
searxng=searxng,
extractor=extractor,
summarizer=summarizer,
audit_stub=audit_stub,
)
# Start search service gRPC server
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
with aioresponses() as m:
# Mock SearXNG
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result(
"Python Async Guide",
"https://example.com/async",
"Learn about Python async.",
score=0.9,
),
]),
)
# Mock the page fetch for extraction
m.get(
"https://example.com/async",
body=SIMPLE_PAGE_HTML,
content_type="text/html",
)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="python async programming",
)
)
assert len(resp.results) == 1
result = resp.results[0]
assert result.source_url == "https://example.com/async"
assert result.claim # title populated
assert result.summary # summary populated
assert result.confidence == pytest.approx(0.9)
assert resp.error_message == ""
# Model Gateway received summarization request
assert len(gw_servicer.requests) == 1
assert "python async programming" in gw_servicer.requests[0].params.prompt
# Audit entry recorded
assert len(audit_servicer.entries) == 1
assert audit_servicer.entries[0].entry.tool_name == "searxng"
await channel.close()
await server.stop(0)
await gw_channel.close()
await audit_channel.close()
finally:
await gw_server.stop(0)
await audit_server.stop(0)
@pytest.mark.asyncio
async def test_extraction_produces_clean_text() -> None:
"""Extraction pipeline produces clean text from HTML pages."""
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
extractor = PageExtractor()
service = SearchServiceImpl(
config, searxng=searxng, extractor=extractor, summarizer=None
)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
try:
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result(
"HTML Test",
"https://example.com/html",
"A test page",
),
]),
)
m.get(
"https://example.com/html",
body=SIMPLE_PAGE_HTML,
content_type="text/html",
)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="async python",
)
)
assert len(resp.results) == 1
# Summary should contain extracted text (no summarizer, so raw content)
summary = resp.results[0].summary
assert "asyncio" in summary.lower() or "async" in summary.lower()
# Should NOT contain HTML tags
assert "<p>" not in summary
assert "<article>" not in summary
await channel.close()
finally:
await server.stop(0)
@pytest.mark.asyncio
async def test_summarization_produces_concise_summaries() -> None:
"""Summarization via Model Gateway produces summaries."""
gw_servicer = MockModelGatewayServicer()
gw_server, gw_port = await _start_grpc_server(
gw_servicer,
model_gateway_pb2_grpc.add_ModelGatewayServiceServicer_to_server,
)
try:
gw_channel = grpc.aio.insecure_channel(f"localhost:{gw_port}")
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
extractor = PageExtractor()
summarizer = Summarizer(gw_channel)
service = SearchServiceImpl(
config, searxng=searxng, extractor=extractor, summarizer=summarizer
)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result("Page", "https://example.com/p1", "Snippet"),
_searxng_result("Page 2", "https://example.com/p2", "Snippet 2"),
]),
)
m.get("https://example.com/p1", body=SIMPLE_PAGE_HTML, content_type="text/html")
m.get("https://example.com/p2", body=SIMPLE_PAGE_HTML, content_type="text/html")
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="async programming",
)
)
assert len(resp.results) == 2
# Both results should have summaries from Model Gateway
for r in resp.results:
assert r.summary
assert "Summary of content" in r.summary
# Model Gateway received 2 requests (one per result)
assert len(gw_servicer.requests) == 2
await channel.close()
await server.stop(0)
await gw_channel.close()
finally:
await gw_server.stop(0)
@pytest.mark.asyncio
async def test_unreachable_url_handled_gracefully() -> None:
"""Handles unreachable URLs gracefully — returns results with snippet fallback."""
gw_servicer = MockModelGatewayServicer()
gw_server, gw_port = await _start_grpc_server(
gw_servicer,
model_gateway_pb2_grpc.add_ModelGatewayServiceServicer_to_server,
)
try:
gw_channel = grpc.aio.insecure_channel(f"localhost:{gw_port}")
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
extractor = PageExtractor(timeout=2.0)
summarizer = Summarizer(gw_channel)
service = SearchServiceImpl(
config, searxng=searxng, extractor=extractor, summarizer=summarizer
)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result(
"Good Page",
"https://example.com/good",
"Good snippet",
score=0.9,
),
_searxng_result(
"Bad Page",
"https://unreachable.example.com",
"Bad snippet",
score=0.5,
),
]),
)
# Only mock the good page
m.get("https://example.com/good", body=SIMPLE_PAGE_HTML, content_type="text/html")
m.get(
"https://unreachable.example.com",
exception=ConnectionError("Connection refused"),
)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="test query",
)
)
assert len(resp.results) == 2
assert resp.error_message == ""
# Both results should have summaries (summarizer handles fallback)
assert resp.results[0].summary
assert resp.results[1].summary
await channel.close()
await server.stop(0)
await gw_channel.close()
finally:
await gw_server.stop(0)
@pytest.mark.asyncio
async def test_audit_logging_records_search() -> None:
"""Audit Service receives search operation log entries."""
audit_servicer = MockAuditServicer()
audit_server, audit_port = await _start_grpc_server(
audit_servicer,
audit_pb2_grpc.add_AuditServiceServicer_to_server,
)
try:
audit_channel = grpc.aio.insecure_channel(f"localhost:{audit_port}")
audit_stub = audit_pb2_grpc.AuditServiceStub(audit_channel)
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
service = SearchServiceImpl(
config, searxng=searxng, extractor=None, summarizer=None,
audit_stub=audit_stub,
)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result("R1", "https://example.com/1", "S1"),
_searxng_result("R2", "https://example.com/2", "S2"),
_searxng_result("R3", "https://example.com/3", "S3"),
]),
)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="audit test query",
)
)
assert len(audit_servicer.entries) == 1
entry = audit_servicer.entries[0].entry
assert entry.action == audit_pb2.AUDIT_ACTION_TOOL_INVOCATION
assert entry.tool_name == "searxng"
assert entry.result_status == "ok:3"
assert entry.metadata["query"] == "audit test query"
assert entry.session_id == "integration-session"
# Verify SessionContext was forwarded
ctx = audit_servicer.entries[0].context
assert ctx.session_id == "integration-session"
await channel.close()
await server.stop(0)
await audit_channel.close()
finally:
await audit_server.stop(0)
@pytest.mark.asyncio
async def test_searxng_unavailable_returns_error() -> None:
"""SearXNG unavailability returns an error response, not an exception."""
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
service = SearchServiceImpl(config, searxng=searxng, extractor=None, summarizer=None)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
try:
with aioresponses() as m:
m.get(SEARXNG_PATTERN, status=503)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="test",
)
)
assert len(resp.results) == 0
assert "Search engine error" in resp.error_message
await channel.close()
finally:
await server.stop(0)
@pytest.mark.asyncio
async def test_multiple_results_ordering_preserved() -> None:
"""Results preserve the ordering from SearXNG (by score)."""
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
service = SearchServiceImpl(config, searxng=searxng, extractor=None, summarizer=None)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
try:
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result("High", "https://example.com/high", "S1", score=0.9),
_searxng_result("Medium", "https://example.com/med", "S2", score=0.5),
_searxng_result("Low", "https://example.com/low", "S3", score=0.1),
]),
)
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="ordering test",
)
)
assert len(resp.results) == 3
assert resp.results[0].claim == "High"
assert resp.results[1].claim == "Medium"
assert resp.results[2].claim == "Low"
assert resp.results[0].confidence > resp.results[2].confidence
await channel.close()
finally:
await server.stop(0)
@pytest.mark.asyncio
async def test_model_gateway_down_falls_back_to_truncation() -> None:
"""When Model Gateway is unavailable, summarizer falls back to truncated content."""
config = Config(searxng_url=SEARXNG_URL)
searxng = SearXNGClient(SEARXNG_URL)
extractor = PageExtractor()
# Connect summarizer to a non-existent port
dead_channel = grpc.aio.insecure_channel("localhost:1")
summarizer = Summarizer(dead_channel, max_summary_length=100)
service = SearchServiceImpl(
config, searxng=searxng, extractor=extractor, summarizer=summarizer
)
server = grpc.aio.server()
search_pb2_grpc.add_SearchServiceServicer_to_server(service, server)
port = server.add_insecure_port("[::]:0")
await server.start()
try:
with aioresponses() as m:
m.get(
SEARXNG_PATTERN,
payload=_searxng_response([
_searxng_result("Fallback", "https://example.com/fb", "Snippet"),
]),
)
m.get("https://example.com/fb", body=SIMPLE_PAGE_HTML, content_type="text/html")
channel = grpc.aio.insecure_channel(f"localhost:{port}")
stub = search_pb2_grpc.SearchServiceStub(channel)
resp = await stub.Search(
search_pb2.SearchRequest(
context=_session_context(),
query="fallback test",
)
)
assert len(resp.results) == 1
# Summary should be truncated extracted content (not empty)
assert resp.results[0].summary
assert len(resp.results[0].summary) <= 100
await channel.close()
await dead_channel.close()
finally:
await server.stop(0)