feat: implement readability-lxml extraction pipeline (issue #47)

- PageExtractor: async HTTP fetcher with timeout, user-agent, redirect handling
- readability-lxml integration for main content extraction
- HTML-to-text conversion preserving headings and list structure
- Content length truncation (configurable max_content_length)
- Parallel fetching via asyncio.gather with semaphore concurrency limit
- Error handling for unreachable/blocked URLs
- 14 unit tests with aioresponses mocking, 36 total tests pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pi Agent
2026-03-10 15:34:42 +01:00
parent a578fa3c5b
commit 7d91c5638a
5 changed files with 418 additions and 0 deletions

View File

@@ -50,6 +50,7 @@
| #44 | Set up SearXNG Docker container | Phase 6 | `COMPLETED` | Docker / YAML | [issue-044.md](issue-044.md) | | #44 | Set up SearXNG Docker container | Phase 6 | `COMPLETED` | Docker / YAML | [issue-044.md](issue-044.md) |
| #45 | Scaffold Search Service Python project | Phase 6 | `COMPLETED` | Python | [issue-045.md](issue-045.md) | | #45 | Scaffold Search Service Python project | Phase 6 | `COMPLETED` | Python | [issue-045.md](issue-045.md) |
| #46 | Implement SearXNG query + snippet filter | Phase 6 | `COMPLETED` | Python | [issue-046.md](issue-046.md) | | #46 | Implement SearXNG query + snippet filter | Phase 6 | `COMPLETED` | Python | [issue-046.md](issue-046.md) |
| #47 | Implement readability-lxml extraction pipeline | Phase 6 | `COMPLETED` | Python | [issue-047.md](issue-047.md) |
## Status Legend ## Status Legend

View File

@@ -0,0 +1,66 @@
# Implementation Plan — Issue #47: Implement readability-lxml extraction pipeline
## Metadata
| Field | Value |
|---|---|
| Issue | [#47](https://git.shahondin1624.de/llm-multiverse/llm-multiverse/issues/47) |
| Title | Implement readability-lxml extraction pipeline |
| Milestone | Phase 6: Search Service |
| Labels | — |
| Status | `COMPLETED` |
| Language | Python |
| Related Plans | issue-046.md |
| Blocked by | #46 |
## Acceptance Criteria
- [ ] HTTP fetcher with timeout, user-agent, and redirect handling
- [ ] readability-lxml extraction of main content
- [ ] HTML-to-text conversion preserving structure (headings, lists)
- [ ] Content length limiting (truncate very long pages)
- [ ] Error handling for unreachable/blocked URLs
- [ ] Parallel fetching for multiple URLs
## Architecture Analysis
### Service Context
- **Module:** `services/search/src/search_service/extractor.py`
- Used after SearXNG returns search snippets — fetches each URL and extracts clean text content
- The extracted content is later summarized by the Model Gateway (#48)
### Dependencies
- `readability-lxml` — extracts main content from HTML (Mozilla Readability algorithm)
- `lxml` — HTML parser (required by readability-lxml)
- `aiohttp` — async HTTP fetching (already a dependency)
## Implementation Steps
### 1. Add dependencies to pyproject.toml
- `readability-lxml>=0.8`
- `lxml` (pulled in by readability-lxml but declare explicitly)
### 2. Create `extractor.py` module
**ContentExtraction dataclass:**
- `url: str`, `title: str`, `content: str`, `error: str | None`
**PageExtractor class:**
- `__init__(max_content_length, timeout, user_agent)`
- `async fetch_and_extract(url) -> ContentExtraction` — fetch URL, apply readability, convert to text
- `async extract_many(urls) -> list[ContentExtraction]` — parallel fetch via asyncio.gather
### 3. Tests with aioresponses mocking
## Files to Create/Modify
| File | Action | Purpose |
|---|---|---|
| `services/search/src/search_service/extractor.py` | Create | Page fetcher + readability extraction |
| `services/search/pyproject.toml` | Modify | Add readability-lxml, lxml |
| `services/search/tests/test_extractor.py` | Create | Unit tests |
## Deviation Log
| Deviation | Reason |
|---|---|

View File

@@ -9,6 +9,8 @@ dependencies = [
"protobuf>=7.34", "protobuf>=7.34",
"pyyaml>=6.0", "pyyaml>=6.0",
"aiohttp>=3.10", "aiohttp>=3.10",
"readability-lxml>=0.8",
"lxml>=5.0",
] ]
[project.scripts] [project.scripts]

View File

@@ -0,0 +1,172 @@
"""Web page content extraction using readability-lxml."""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass
import aiohttp
from lxml.html.clean import Cleaner
from readability import Document
logger = logging.getLogger(__name__)
DEFAULT_USER_AGENT = (
"Mozilla/5.0 (compatible; llm-multiverse-search/0.1; +https://localhost)"
)
DEFAULT_TIMEOUT = 10.0
DEFAULT_MAX_CONTENT_LENGTH = 8000 # characters
@dataclass
class ContentExtraction:
"""Result of extracting content from a web page."""
url: str
title: str
content: str
error: str | None = None
_WHITESPACE_RE = re.compile(r"\n{3,}")
_CLEANER = Cleaner(
scripts=True,
javascript=True,
comments=True,
style=True,
links=False,
meta=True,
page_structure=False,
processing_instructions=True,
embedded=True,
frames=True,
forms=True,
annoying_tags=True,
remove_tags=None,
remove_unknown_tags=True,
safe_attrs_only=True,
)
def _html_to_text(html_content: str) -> str:
"""Convert HTML to plain text preserving basic structure."""
from lxml.html import fromstring
try:
doc = fromstring(html_content)
except Exception:
# If parsing fails, strip tags naively
return re.sub(r"<[^>]+>", "", html_content).strip()
_CLEANER(doc)
# Walk the tree and convert to text with structure
lines: list[str] = []
for element in doc.iter():
tag = element.tag if isinstance(element.tag, str) else ""
# Add newlines before block elements
if tag in ("h1", "h2", "h3", "h4", "h5", "h6"):
text = (element.text or "").strip()
if text:
lines.append(f"\n## {text}\n")
elif tag in ("p", "div", "article", "section"):
text = element.text_content().strip()
if text:
lines.append(f"\n{text}\n")
elif tag == "li":
text = element.text_content().strip()
if text:
lines.append(f"- {text}")
elif tag == "br":
lines.append("\n")
if not lines:
# Fallback: just get text content
return doc.text_content().strip()
result = "\n".join(lines)
# Collapse excessive whitespace
result = _WHITESPACE_RE.sub("\n\n", result)
return result.strip()
class PageExtractor:
"""Fetches web pages and extracts main content using readability."""
def __init__(
self,
max_content_length: int = DEFAULT_MAX_CONTENT_LENGTH,
timeout: float = DEFAULT_TIMEOUT,
user_agent: str = DEFAULT_USER_AGENT,
) -> None:
self.max_content_length = max_content_length
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.user_agent = user_agent
async def fetch_and_extract(self, url: str) -> ContentExtraction:
"""Fetch a URL and extract its main content.
Returns a ContentExtraction with error set if fetching/parsing fails.
"""
try:
html_content = await self._fetch(url)
except Exception as e:
logger.warning("Failed to fetch %s: %s", url, e)
return ContentExtraction(url=url, title="", content="", error=str(e))
try:
return self._extract(url, html_content)
except Exception as e:
logger.warning("Failed to extract content from %s: %s", url, e)
return ContentExtraction(url=url, title="", content="", error=str(e))
async def extract_many(
self, urls: list[str], max_concurrent: int = 5
) -> list[ContentExtraction]:
"""Fetch and extract content from multiple URLs in parallel.
Args:
urls: List of URLs to process.
max_concurrent: Maximum number of concurrent fetches.
Returns:
List of ContentExtraction results (same order as input URLs).
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def _bounded_fetch(url: str) -> ContentExtraction:
async with semaphore:
return await self.fetch_and_extract(url)
tasks = [_bounded_fetch(url) for url in urls]
return list(await asyncio.gather(*tasks))
async def _fetch(self, url: str) -> str:
"""Fetch HTML content from a URL."""
headers = {"User-Agent": self.user_agent}
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
url, headers=headers, allow_redirects=True, max_redirects=5
) as resp:
if resp.status != 200:
raise RuntimeError(
f"HTTP {resp.status} fetching {url}"
)
return await resp.text(errors="replace")
def _extract(self, url: str, html_content: str) -> ContentExtraction:
"""Extract main content from HTML using readability-lxml."""
doc = Document(html_content, url=url)
title = doc.short_title() or ""
summary_html = doc.summary()
content = _html_to_text(summary_html)
# Truncate to max length
if len(content) > self.max_content_length:
content = content[: self.max_content_length] + "..."
return ContentExtraction(url=url, title=title, content=content)

View File

@@ -0,0 +1,177 @@
"""Tests for the web page content extractor."""
from __future__ import annotations
import pytest
from aioresponses import aioresponses
from search_service.extractor import (
PageExtractor,
_html_to_text,
)
@pytest.fixture
def extractor() -> PageExtractor:
return PageExtractor(max_content_length=5000)
SIMPLE_HTML = """
<!DOCTYPE html>
<html>
<head><title>Test Page</title></head>
<body>
<nav>Navigation links here</nav>
<article>
<h1>Main Article Title</h1>
<p>This is the main content of the article. It has important information.</p>
<p>Second paragraph with more details about the topic.</p>
<ul>
<li>First list item</li>
<li>Second list item</li>
</ul>
</article>
<footer>Footer content</footer>
</body>
</html>
"""
MINIMAL_HTML = """
<html><body><p>Just some text.</p></body></html>
"""
@pytest.mark.asyncio
async def test_fetch_and_extract_success(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get("https://example.com/article", body=SIMPLE_HTML, content_type="text/html")
result = await extractor.fetch_and_extract("https://example.com/article")
assert result.error is None
assert result.url == "https://example.com/article"
assert "Main Article Title" in result.title or "Test Page" in result.title
assert "main content" in result.content.lower() or "article" in result.content.lower()
@pytest.mark.asyncio
async def test_fetch_and_extract_minimal(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get("https://example.com/min", body=MINIMAL_HTML, content_type="text/html")
result = await extractor.fetch_and_extract("https://example.com/min")
assert result.error is None
assert "Just some text" in result.content
@pytest.mark.asyncio
async def test_fetch_error_returns_error(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get("https://example.com/404", status=404)
result = await extractor.fetch_and_extract("https://example.com/404")
assert result.error is not None
assert "404" in result.error
assert result.content == ""
@pytest.mark.asyncio
async def test_connection_error_returns_error(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get(
"https://unreachable.example.com",
exception=ConnectionError("Connection refused"),
)
result = await extractor.fetch_and_extract("https://unreachable.example.com")
assert result.error is not None
assert result.content == ""
@pytest.mark.asyncio
async def test_content_truncation() -> None:
extractor = PageExtractor(max_content_length=50)
long_html = f"<html><body><p>{'x' * 200}</p></body></html>"
with aioresponses() as m:
m.get("https://example.com/long", body=long_html, content_type="text/html")
result = await extractor.fetch_and_extract("https://example.com/long")
assert result.error is None
assert len(result.content) <= 54 # 50 + "..."
assert result.content.endswith("...")
@pytest.mark.asyncio
async def test_extract_many_parallel(extractor: PageExtractor) -> None:
urls = [f"https://example.com/{i}" for i in range(5)]
with aioresponses() as m:
for url in urls:
m.get(url, body=f"<html><body><p>Content for {url}</p></body></html>", content_type="text/html")
results = await extractor.extract_many(urls)
assert len(results) == 5
for i, result in enumerate(results):
assert result.url == urls[i]
assert result.error is None
@pytest.mark.asyncio
async def test_extract_many_partial_failure(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get("https://example.com/ok", body=MINIMAL_HTML, content_type="text/html")
m.get("https://example.com/fail", status=500)
results = await extractor.extract_many(
["https://example.com/ok", "https://example.com/fail"]
)
assert len(results) == 2
assert results[0].error is None
assert results[1].error is not None
@pytest.mark.asyncio
async def test_extract_many_concurrency_limit(extractor: PageExtractor) -> None:
urls = [f"https://example.com/{i}" for i in range(10)]
with aioresponses() as m:
for url in urls:
m.get(url, body=MINIMAL_HTML, content_type="text/html")
results = await extractor.extract_many(urls, max_concurrent=3)
assert len(results) == 10
assert all(r.error is None for r in results)
def test_html_to_text_basic() -> None:
result = _html_to_text("<p>Hello world</p>")
assert "Hello world" in result
def test_html_to_text_headings() -> None:
result = _html_to_text("<h2>Section</h2><p>Content</p>")
assert "Section" in result
assert "Content" in result
def test_html_to_text_lists() -> None:
result = _html_to_text("<ul><li>Item 1</li><li>Item 2</li></ul>")
assert "Item 1" in result
assert "Item 2" in result
def test_html_to_text_empty() -> None:
result = _html_to_text("")
assert result == ""
def test_html_to_text_malformed() -> None:
result = _html_to_text("<p>Unclosed paragraph<div>Mixed")
assert "Unclosed paragraph" in result or "Mixed" in result
@pytest.mark.asyncio
async def test_user_agent_header(extractor: PageExtractor) -> None:
with aioresponses() as m:
m.get("https://example.com", body=MINIMAL_HTML, content_type="text/html")
await extractor.fetch_and_extract("https://example.com")
# Verify the request was made (aioresponses matched it)
assert len(m.requests) == 1