feat(stacks/eh-search): add site-search FastAPI service

Mirrors /opt/ai-apps/eh-search/ on the server, including the full
FastAPI app (intent routing, FTS+fuzzy+substring hybrid, multi-source
federation across vehicles + blog + brands + pages + static + tag
bridge), SQL schema (Postgres materialized view with german_unaccent
text search, pg_trgm for fuzzy), Dockerfile and compose.

Sanitized the hardcoded password in sql/01_init.sql — replaced with
REPLACE_ME_BEFORE_APPLYING placeholder since this repo is public.

The eh-search service binds only on the private network (10.0.0.8:8200)
and is reachable only via Pegasus nginx proxy at /api/search.

Refs OP#1094 OP#1105 OP#1112 OP#1116 OP#1117
This commit is contained in:
Benjamin Weinlich 2026-04-11 22:19:39 +02:00
parent 8ba375caaa
commit b3813ed6ac
20 changed files with 1717 additions and 0 deletions

View file

@ -0,0 +1,28 @@
# eh-search — Environment template
# Copy to .env and fill in actual values. NEVER commit .env to Git.
ENVIRONMENT=dev
# Postgres (eh_vehicles via loco-replica-db on host private network)
DB_HOST=10.0.0.8
DB_PORT=5433
DB_NAME=eh_vehicles
DB_USER=search_read
DB_PASSWORD=<siehe Password-Manager / sql/01_init.sql Hinweis>
# Redis (intra-stack, no password)
REDIS_HOST=eh-search-redis
REDIS_PORT=6379
REDIS_DB=0
# Cache TTLs (seconds)
CACHE_TTL_RESULT=60
CACHE_TTL_SUGGEST=600
CACHE_TTL_EMPTY=300
# Directus (for slug resolution)
DIRECTUS_URL=http://10.0.0.10:8055
DIRECTUS_SLUG_REFRESH_SECONDS=300
# CORS (leer wenn same-origin via Nginx auf Pegasus)
CORS_ORIGINS=

65
stacks/eh-search/Agent.md Normal file
View file

@ -0,0 +1,65 @@
# Agent Briefing — eh-search Stack
Du arbeitest am Site-Search-Service von electric-horses.de. Lies erst `../../Agent.md` am Repo-Root für globale Konventionen.
## Was ist eh-search?
Ein FastAPI-basierter Suchservice für die Electric-Horses-Website. Unterstützt:
- **Exact Lookup** via Kommissionsnummer (z.B. `D9094`) und DVN (z.B. `9094`)
- **Full-Text Search** (Postgres `ts_vector` mit `german_unaccent` Dictionary)
- **Fuzzy Search** (`pg_trgm` Trigramm-Matching)
- **Multi-Source Federation:** Vehicles + Blog-Posts + Brands + Pages + Static Pages + Tag-Bridge
- **Cmd+K Command Palette** im Astro-Frontend, mit Silent-SSO-Style "Öffnet sich bei Tastendruck"
## Live-Deployment
- **Server:** ai-apps (Hetzner cx22, 10.0.0.8 privat)
- **Pfad:** `/opt/ai-apps/eh-search/`
- **Binding:** NUR `10.0.0.8:8200` (privat), NICHT public. Wird über Pegasus Nginx via `/api/search` erreicht.
- **Frontend-Integration:** Astro Command-Palette auf `electric-horses.de` + `dev.electric-horses.de`
- **Content-Hosting:** Der Suchindex nutzt die Postgres-DB `eh_vehicles` (Schema in `sql/`), ein In-Memory Content-Index (aus Directus gezogen), und hardcoded Static Pages + Tag-Bridge.
## Architektur-Verweis
- **Datenquelle für Fahrzeuge:** PostgreSQL `eh_vehicles` auf loco-replica-db (privater Postgres 17 Container)
- **Datenquelle für Blog/Brands/Pages:** Directus REST API auf Pegasus (`10.0.0.10:8055`)
- **Cache:** Redis (dediziert im Stack)
- **Intent Routing:** Regex-basiert, siehe `app/intent_router.py`
- **Zwei Komm-Nr-Konzepte in Loco-Soft:** (a) Kundenseitig `{type_letter}{dvn}` z.B. `D9094`, (b) interne Fahrzeug-Nummer. **Nur (a) der Kunde jemals sehen.** Siehe Memory `project_eh_search_phase1.md`.
## Kritische Files
- `app/main.py` — FastAPI Entry + Endpoint-Definitionen
- `app/intent_router.py` — Query → Intent (komm_nr, dvn, keyword_search, ...)
- `app/search/exact.py` — Exact Lookup (Komm-Nr, DVN, VIN)
- `app/search/fts.py` — FTS + Fuzzy + Substring Fallback + Hybrid
- `app/search/static_pages.py` — Static Page Registry + Tag→Page Bridge (~60 Mappings)
- `app/content_index.py` — In-Memory Index aus Directus (Blog, Brands, Pages)
- `app/schemas.py` — Pydantic-Response-Models
- `sql/01_init.sql` — Initial Postgres Setup (pg_trgm, MView, search_read user)
- `sql/02_komm_nr_fix.sql` — Komm-Nr Datenmodell-Korrektur
## Ops-Kommandos
```bash
ssh ai-apps
cd /opt/ai-apps/eh-search
# Rebuild & restart nach Code-Änderung
docker compose up -d --build eh-search
# Cache invalidieren
curl -X POST http://10.0.0.8:8200/cache/invalidate
# Health
curl -s http://10.0.0.8:8200/health | jq .
# Logs
docker logs eh-search --tail 30
```
## OpenProject
- **M6** — Site Search Phase 1 (abgeschlossen)
- **M6.2** — Federated Multi-Source (abgeschlossen)
- **M6.3** — UX Refinement (abgeschlossen)
- **M6.4** — Komm-Nr Datenmodell-Fix (abgeschlossen)
- **M6.5** — Sync-Fix für Deaktivierungen (abgeschlossen)
## Related
- Memory: `project_eh_search_phase1.md` — vollständige Historie und Gotchas
- `../../Agent.md` — Repo-weites Briefing

View file

@ -0,0 +1,23 @@
FROM python:3.12-slim
WORKDIR /app
# System deps (minimal)
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
&& rm -rf /var/lib/apt/lists/*
# Python deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# App code
COPY app/ ./app/
# Healthcheck
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -fsS http://127.0.0.1:8200/health || exit 1
EXPOSE 8200
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8200", "--workers", "2", "--access-log"]

View file

@ -0,0 +1,63 @@
# eh-search — Site-Search für electric-horses.de
**Live:** Via Nginx-Proxy auf `https://www.electric-horses.de/api/search?q=...`
**Interne Adresse:** `http://10.0.0.8:8200` (privat, nicht public!)
**Stack-Pfad:** `/opt/ai-apps/eh-search/` auf ai-apps
## Was macht das?
Ein FastAPI-Service der eine **Cmd+K Command Palette** auf der Electric-Horses-Website mit Suchergebnissen versorgt. Durchsucht:
- 75 Fahrzeuge (Postgres FTS + Fuzzy + Komm-Nr/DVN Exact Lookup)
- 28 Blog-Posts (mit Tags + neue SEO-Descriptions)
- 24 Marken
- 6 Legal-Pages (Impressum, Datenschutz, ...)
- 8 Static Pages (Werkstatt, Ersatzteile, Kontakt, ...)
- 60 Tag → Page Bridges für semantische Verbindungen
**Killer-Feature:** Eingabe einer 4-stelligen Komm-Nr (oder `D9094` etc.) → Direct Redirect zur Fahrzeug-Detailseite. Latenz <50ms. Ideal für Mitarbeiter am Telefon.
## Architektur
```
Browser Cmd+K
www.electric-horses.de/api/search
↓ (Nginx Proxy auf Pegasus)
10.0.0.8:8200 (eh-search auf ai-apps, privat)
├─ Postgres eh_vehicles (loco-replica-db Container, Port 5433)
├─ Redis Cache (Stack-intern)
├─ Directus REST API (in-memory Content Index)
└─ Static Page Registry (hardcoded in Python)
```
## Sicherheit
- **Bind NUR auf privates Netz** (10.0.0.8:8200) — nicht über Public-IP erreichbar
- **Read-Only DB-User** (`search_read`) — kann nur SELECT auf Materialized View
- **Same-Origin via Pegasus Nginx** — kein CORS nötig, kein neues Cert, kein neues DNS
- **Rate Limit** via Nginx (30 req/s pro IP)
## Files
- `docker-compose.yml` — Stack
- `Dockerfile` — Python 3.12 + FastAPI + asyncpg + redis-py
- `requirements.txt` — Python-Deps
- `.env.example` — Template für `.env` (niemals echte `.env` committen!)
- `app/` — FastAPI-Anwendung
- `sql/` — Postgres-Schema und Migrations (search_vehicles Materialized View, pg_trgm, unaccent)
- `Agent.md` — AI-Briefing
- `README.md` — diese Datei
## Live-Update-Workflow
Wenn sich Fahrzeugdaten ändern (vehicle_sync.py nächtlich auf Pegasus), wird am Ende des Syncs automatisch die Materialized View refreshed UND der Redis-Cache invalidiert. Directus-Edits an Blog/Brands/Pages feuern einen Webhook an eh-search → sofortige Cache-Invalidation und Content-Index-Refresh.
## OpenProject Phasen (alle abgeschlossen)
- **M6** — Postgres FTS + Fuzzy + Exact + Command Palette (Phase 1)
- **M6.2** — Federated Multi-Source (Phase 1.5)
- **M6.3** — UX Refinement (sleek, mobile-first, smart group ordering)
- **M6.4** — Komm-Nr Datenmodell-Fix (D9094 korrekt statt falsche interne Nummer)
- **M6.5** — Sync-Fix für deaktivierte Fahrzeuge
## Out of Scope (mögliche Phase 2)
- Semantic Vector Search via Qdrant + bge-Embeddings
- Voice Input via Whisper
- LLM Filter-Extraktion via lokales Mistral/Gemma
- Image Search / OCR
- Cross-Encoder Reranking

View file

View file

@ -0,0 +1,80 @@
"""Redis cache wrapper with namespaced keys."""
import hashlib
import json
from typing import Any
import redis.asyncio as redis_async
from app.config import settings
_redis: redis_async.Redis | None = None
NAMESPACE = "search:"
async def init_redis() -> None:
global _redis
_redis = redis_async.Redis(
host=settings.redis_host,
port=settings.redis_port,
db=settings.redis_db,
decode_responses=True,
socket_timeout=2,
)
await _redis.ping()
async def close_redis() -> None:
global _redis
if _redis is not None:
await _redis.close()
_redis = None
def get_redis() -> redis_async.Redis:
if _redis is None:
raise RuntimeError("Redis not initialized")
return _redis
def _hash(value: str) -> str:
return hashlib.sha1(value.encode("utf-8")).hexdigest()[:16]
def key_query(query: str) -> str:
return f"{NAMESPACE}q:{_hash(query.lower())}"
def key_suggest(prefix: str) -> str:
return f"{NAMESPACE}suggest:{_hash(prefix.lower())}"
def key_empty(query: str) -> str:
return f"{NAMESPACE}empty:{_hash(query.lower())}"
def key_top_brands() -> str:
return f"{NAMESPACE}top:brands"
async def get_json(key: str) -> Any | None:
data = await get_redis().get(key)
if data is None:
return None
try:
return json.loads(data)
except (json.JSONDecodeError, TypeError):
return None
async def set_json(key: str, value: Any, ttl: int) -> None:
await get_redis().set(key, json.dumps(value, default=str), ex=ttl)
async def invalidate_all() -> int:
"""Delete all keys in our namespace. Returns count deleted."""
r = get_redis()
count = 0
async for key in r.scan_iter(match=f"{NAMESPACE}*", count=200):
await r.delete(key)
count += 1
return count

View file

@ -0,0 +1,46 @@
"""Configuration via environment variables (Pydantic Settings)."""
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
environment: str = "dev"
# Postgres
db_host: str
db_port: int = 5433
db_name: str = "eh_vehicles"
db_user: str = "search_read"
db_password: str
# Redis
redis_host: str = "eh-search-redis"
redis_port: int = 6379
redis_db: int = 0
# Cache TTLs
cache_ttl_result: int = 60
cache_ttl_suggest: int = 600
cache_ttl_empty: int = 300
# Directus
directus_url: str = "http://10.0.0.10:8055"
directus_slug_refresh_seconds: int = 300
# CORS
cors_origins: str = ""
@property
def dsn(self) -> str:
return (
f"postgresql://{self.db_user}:{self.db_password}"
f"@{self.db_host}:{self.db_port}/{self.db_name}"
)
@property
def cors_origin_list(self) -> list[str]:
return [o.strip() for o in self.cors_origins.split(",") if o.strip()]
settings = Settings()

View file

@ -0,0 +1,305 @@
"""In-memory content index for blog posts, brands, legal pages.
Loaded from Directus on startup, refreshed every 5 min in background,
and on POST /cache/invalidate. Total dataset is small (~90 items),
fits comfortably in RAM.
"""
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Any
import httpx
from app.config import settings
log = logging.getLogger(__name__)
@dataclass
class BlogPost:
id: int
title: str
slug: str
excerpt: str = ""
seo_description: str = ""
tags: list[str] = field(default_factory=list)
category: str = ""
published_at: str | None = None
image_id: str | None = None
@dataclass
class Brand:
id: int
name: str
slug: str
short_description: str = ""
description: str = ""
logo_id: str | None = None
vehicle_count: int = 0
@dataclass
class LegalPage:
id: int
title: str
slug: str
seo_description: str = ""
# Mutable global state — protected by _lock for refresh atomicity
_blog_posts: list[BlogPost] = []
_brands: list[Brand] = []
_legal_pages: list[LegalPage] = []
_last_refresh: float = 0.0
_lock = asyncio.Lock()
# Slugs of pages that are NOT really top-level static pages but live in
# the Directus pages collection. We treat them as legal/info pages.
LEGAL_PAGE_SLUGS = {
"impressum", "datenschutz", "agb",
"barrierefreiheit", "batterie-entsorgung", "widerruf",
}
async def _fetch(client: httpx.AsyncClient, collection: str, fields: str, filter_field: str = "status", filter_value: str = "published") -> list[dict]:
url = f"{settings.directus_url}/items/{collection}"
params = {
"fields": fields,
"limit": -1,
}
if filter_field:
params[f"filter[{filter_field}][_eq]"] = filter_value
try:
resp = await client.get(url, params=params)
resp.raise_for_status()
return resp.json().get("data", [])
except Exception as e:
log.warning("Failed to fetch %s: %s", collection, e)
return []
async def refresh() -> dict:
"""Pull all collections from Directus and rebuild the in-memory index."""
global _blog_posts, _brands, _legal_pages, _last_refresh
async with httpx.AsyncClient(timeout=15) as client:
blog_data, brand_data, page_data = await asyncio.gather(
_fetch(client, "blog_posts",
"id,title,slug,excerpt,seo_description,tags,category,published_at,image"),
_fetch(client, "brands",
"id,name,slug,short_description,description,logo",
filter_field="is_active", filter_value="true"),
_fetch(client, "pages",
"id,title,slug,seo_description"),
)
new_blog = [
BlogPost(
id=int(p["id"]),
title=p.get("title") or "",
slug=p.get("slug") or "",
excerpt=p.get("excerpt") or "",
seo_description=p.get("seo_description") or "",
tags=[t for t in (p.get("tags") or []) if isinstance(t, str)],
category=p.get("category") or "",
published_at=p.get("published_at"),
image_id=p.get("image"),
)
for p in blog_data
if p.get("slug")
]
new_brands = [
Brand(
id=int(b["id"]),
name=b.get("name") or "",
slug=b.get("slug") or "",
short_description=b.get("short_description") or "",
description=b.get("description") or "",
logo_id=b.get("logo"),
)
for b in brand_data
if b.get("slug")
]
new_legal = [
LegalPage(
id=int(p["id"]),
title=p.get("title") or "",
slug=p.get("slug") or "",
seo_description=p.get("seo_description") or "",
)
for p in page_data
if p.get("slug") and p["slug"] in LEGAL_PAGE_SLUGS
]
async with _lock:
_blog_posts = new_blog
_brands = new_brands
_legal_pages = new_legal
_last_refresh = time.time()
counts = {"blog": len(new_blog), "brands": len(new_brands), "legal": len(new_legal)}
log.info("Content index refreshed: %s", counts)
return counts
async def background_refresher() -> None:
while True:
await asyncio.sleep(settings.directus_slug_refresh_seconds)
try:
await refresh()
except Exception as e:
log.warning("Background content refresh error: %s", e)
def _score_blog(post: BlogPost, q: str) -> float:
"""Weighted match score for a blog post against query q (lowercased)."""
score = 0.0
title_l = post.title.lower()
excerpt_l = post.excerpt.lower()
seo_l = post.seo_description.lower()
cat_l = post.category.lower()
# Title weights highest (substring requires len >= 3 to avoid 2-char noise)
if title_l == q:
score += 10
elif title_l.startswith(q):
score += 6
elif len(q) >= 3 and q in title_l:
score += 4
# Tags (real curated keywords)
for tag in post.tags:
tl = tag.lower()
if tl == q:
score += 5
break
elif tl.startswith(q) or (len(q) >= 3 and q in tl):
score += 2
break
# Category
if q in cat_l:
score += 1
# Excerpt + seo_description (lighter)
if q in excerpt_l:
score += 1
if q in seo_l:
score += 0.5
return score
def _score_brand(brand: Brand, q: str) -> float:
score = 0.0
name_l = brand.name.lower()
if name_l == q:
score += 15 # Brand exact match — dominant signal
elif name_l.startswith(q):
score += 10 # Prefix should beat any page substring match
elif len(q) >= 3 and q in name_l:
score += 5
if len(q) >= 3 and q in (brand.short_description or "").lower():
score += 1
if len(q) >= 3 and q in (brand.description or "").lower():
score += 0.5
return score
def _score_legal(page: LegalPage, q: str) -> float:
score = 0.0
title_l = page.title.lower()
slug_l = page.slug.lower()
if title_l == q or slug_l == q:
score += 10
elif title_l.startswith(q) or slug_l.startswith(q):
score += 7
elif len(q) >= 3 and (q in title_l or q in slug_l):
score += 4
if len(q) >= 3 and q in (page.seo_description or "").lower():
score += 1
return score
def search_blog(query: str, limit: int = 5) -> list[dict]:
q = query.strip().lower()
if not q:
return []
scored = [(p, _score_blog(p, q)) for p in _blog_posts]
scored = [(p, s) for p, s in scored if s > 0]
scored.sort(key=lambda x: -x[1])
return [
{
"type": "blog",
"title": p.title,
"slug": f"/blog/{p.slug}",
"snippet": (p.seo_description or p.excerpt or "")[:180],
"tags": p.tags,
"category": p.category,
"published_at": p.published_at,
"image_id": p.image_id,
"score": s,
"matched_via": "blog",
}
for p, s in scored[:limit]
]
def search_brands(query: str, limit: int = 5) -> list[dict]:
q = query.strip().lower()
if not q:
return []
scored = [(b, _score_brand(b, q)) for b in _brands]
scored = [(b, s) for b, s in scored if s > 0]
scored.sort(key=lambda x: -x[1])
return [
{
"type": "brand",
"title": b.name,
"slug": f"/marken/{b.slug}",
"snippet": (b.short_description or "")[:180],
"logo_id": b.logo_id,
"score": s,
"matched_via": "brand",
}
for b, s in scored[:limit]
]
def search_legal(query: str, limit: int = 5) -> list[dict]:
q = query.strip().lower()
if not q:
return []
scored = [(p, _score_legal(p, q)) for p in _legal_pages]
scored = [(p, s) for p, s in scored if s > 0]
scored.sort(key=lambda x: -x[1])
return [
{
"type": "page",
"title": p.title,
"slug": f"/{p.slug}",
"snippet": (p.seo_description or "")[:180],
"score": s,
"matched_via": "legal",
}
for p, s in scored[:limit]
]
def get_blog_posts_with_tag(tag: str) -> list[BlogPost]:
tag_l = tag.lower()
return [p for p in _blog_posts if any(t.lower() == tag_l for t in p.tags)]
def stats() -> dict:
return {
"blog": len(_blog_posts),
"brands": len(_brands),
"legal": len(_legal_pages),
"last_refresh_age_s": int(time.time() - _last_refresh) if _last_refresh else None,
}

View file

@ -0,0 +1,43 @@
"""Async Postgres pool."""
import asyncpg
from app.config import settings
_pool: asyncpg.Pool | None = None
async def init_pool() -> None:
global _pool
_pool = await asyncpg.create_pool(
dsn=settings.dsn,
min_size=2,
max_size=10,
command_timeout=5,
)
async def close_pool() -> None:
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def get_pool() -> asyncpg.Pool:
if _pool is None:
raise RuntimeError("DB pool not initialized")
return _pool
async def fetch(query: str, *args) -> list[asyncpg.Record]:
async with get_pool().acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(query: str, *args) -> asyncpg.Record | None:
async with get_pool().acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(query: str, *args):
async with get_pool().acquire() as conn:
return await conn.fetchval(query, *args)

View file

@ -0,0 +1,82 @@
"""Intent routing — pure regex, no AI, sub-millisecond.
Loco-Soft Komm-Nr Format:
Type-Buchstabe + 4-6 stellige Zahl, optional Space dazwischen.
Type-Letter: N=Neu, T=Tageszul., V=Vorfuehr, D=Differenzbest., G=Gebraucht, L=Leihgabe.
Beispiele: 'D9094', 'd9094', 'D 9094', 'n8093', 'L9083'
Pure 4-6 stellige Zahlen werden als DVN-Lookup behandelt (DVN ist eindeutig).
"""
import re
from dataclasses import dataclass
from typing import Literal
# Komm-Nr: Type + DVN (mit oder ohne Space)
KOMM_NR_RE = re.compile(r"^[NTVDGLntvdgl]\s*\d{4,6}$")
# DVN allein: pure Zahl 4-6 Stellen
DVN_RE = re.compile(r"^\d{4,6}$")
# VIN: 17 chars, no I/O/Q
VIN_RE = re.compile(r"^[A-HJ-NPR-Z0-9]{17}$")
@dataclass
class Intent:
type: Literal[
"komm_nr",
"dvn",
"vin",
"autocomplete_only",
"keyword_search",
"empty",
]
direct_redirect: bool = False
normalized_query: str = ""
def route(raw_query: str) -> Intent:
q = (raw_query or "").strip()
if not q:
return Intent(type="empty", normalized_query="")
# Rule 1: Komm-Nr (Type + DVN, e.g. 'D9094', 'D 9094', 'd9094')
if KOMM_NR_RE.match(q):
# Normalize: remove spaces, uppercase the type letter
cleaned = q.replace(" ", "")
normalized = cleaned[0].upper() + cleaned[1:]
return Intent(
type="komm_nr",
direct_redirect=True,
normalized_query=normalized,
)
# Rule 2: Pure 4-6 digit number = DVN lookup (eindeutig)
if DVN_RE.match(q):
return Intent(
type="dvn",
direct_redirect=True,
normalized_query=q,
)
# Rule 3: 17-char alphanumeric (no IOQ) = VIN
upper = q.upper().replace(" ", "")
if VIN_RE.match(upper):
return Intent(
type="vin",
direct_redirect=True,
normalized_query=upper,
)
# Rule 4: Single char only -> autocomplete (FTS unbrauchbar bei 1 Zeichen)
if len(q) < 2:
return Intent(
type="autocomplete_only",
normalized_query=q.lower(),
)
# Rule 5: Default — keyword + fuzzy search
return Intent(
type="keyword_search",
normalized_query=q.lower(),
)

View file

@ -0,0 +1,300 @@
"""eh-search FastAPI application — Phase 1.5 Federated Multi-Source.
Endpoints:
- GET /health -> liveness + dependency checks
- GET /search?q= -> federated search across vehicles + content + static pages
- GET /suggest?q= -> autocomplete
- POST /cache/invalidate -> clear all caches + refresh content index
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from app import cache, content_index, db
from app.config import settings
from app.intent_router import route as route_intent
from app.schemas import (
HealthResponse,
InvalidateResponse,
SearchResponse,
SearchResultItem,
SuggestResponse,
)
from app.search import exact, fts, static_pages, suggest
from app import slug_resolver
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
log = logging.getLogger("eh-search")
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Starting eh-search (env=%s) Phase 1.5", settings.environment)
await db.init_pool()
await cache.init_redis()
await slug_resolver.refresh()
await content_index.refresh()
refresher_slug = asyncio.create_task(slug_resolver.background_refresher())
refresher_content = asyncio.create_task(content_index.background_refresher())
log.info("eh-search ready")
try:
yield
finally:
refresher_slug.cancel()
refresher_content.cancel()
await db.close_pool()
await cache.close_redis()
log.info("eh-search stopped")
app = FastAPI(
title="eh-search",
description="Federated search service for electric-horses.de",
version="0.2.0",
lifespan=lifespan,
)
if settings.cors_origin_list:
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origin_list,
allow_methods=["GET"],
allow_headers=["*"],
max_age=600,
)
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
db_status = "ok"
vehicles_count = 0
try:
vehicles_count = await db.fetchval("SELECT count(*) FROM search_vehicles")
except Exception as e:
db_status = f"error: {e}"
redis_status = "ok"
try:
await cache.get_redis().ping()
except Exception as e:
redis_status = f"error: {e}"
overall = "ok" if db_status == "ok" and redis_status == "ok" else "degraded"
return HealthResponse(
status=overall,
environment=settings.environment,
db=db_status,
redis=redis_status,
vehicles_count=vehicles_count or 0,
content=content_index.stats(),
)
# ---------------------------------------------------------------------------
# Federated Search
# ---------------------------------------------------------------------------
async def _vehicle_search(query: str, intent_type: str, normalized: str, limit: int) -> list[SearchResultItem]:
if intent_type == "komm_nr":
return await exact.by_komm_nr(normalized)
if intent_type == "dvn":
return await exact.by_dvn(normalized)
if intent_type == "vin":
return await exact.by_vin(normalized)
if intent_type == "keyword_search":
return await fts.hybrid_search(normalized, limit=limit)
return []
def _content_to_items(raw: list[dict]) -> list[SearchResultItem]:
"""Convert content_index dict results to SearchResultItem."""
return [SearchResultItem(**r) for r in raw]
async def _enrich_vehicle_slugs(items: list[SearchResultItem]) -> None:
"""Vehicles need their Directus slug resolved."""
for item in items:
if item.type == "vehicle" and item.directus_product_id:
item.slug = await slug_resolver.get_slug(item.directus_product_id)
def _bridge_pages_for_blog(blog_results: list[SearchResultItem]) -> list[SearchResultItem]:
"""If matched blog posts have tags that bridge to a static page,
surface that page as an additional result.
"""
found_pages: dict[str, float] = {}
for blog in blog_results:
if not blog.tags:
continue
slugs = static_pages.get_pages_for_tags(blog.tags)
for slug in slugs:
page = static_pages.page_by_slug(slug)
if page:
# Score: based on top blog match score, capped
found_pages[slug] = max(found_pages.get(slug, 0), min(blog.score, 5.0))
items = []
for slug, score in found_pages.items():
page = static_pages.page_by_slug(slug)
if page is None:
continue
items.append(SearchResultItem(
type="page",
title=page.title,
slug=page.slug,
snippet=page.snippet,
score=score,
matched_via="tag_bridge",
))
return items
@app.get("/search", response_model=SearchResponse)
async def search(
response: Response,
q: str = Query(..., min_length=1, max_length=200),
limit: int = Query(10, ge=1, le=50),
) -> SearchResponse:
started = time.perf_counter()
intent = route_intent(q)
cache_key = cache.key_query(f"v2:{intent.normalized_query}:{limit}")
cached = await cache.get_json(cache_key)
if cached is not None:
cached["cache_hit"] = True
response.headers["X-Cache-Hit"] = "true"
return SearchResponse(**cached)
norm = intent.normalized_query
# Run all sources in parallel
if intent.type in ("komm_nr", "dvn", "vin"):
# Exact lookups skip content/static (they should hit a vehicle)
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
vehicle_items = await vehicle_task
content_blog: list[SearchResultItem] = []
content_brands: list[SearchResultItem] = []
content_legal: list[SearchResultItem] = []
page_items: list[SearchResultItem] = []
elif intent.type == "keyword_search":
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
vehicle_items, content_blog_raw, content_brand_raw, content_legal_raw, page_raw = await asyncio.gather(
vehicle_task,
asyncio.to_thread(content_index.search_blog, norm, 5),
asyncio.to_thread(content_index.search_brands, norm, 5),
asyncio.to_thread(content_index.search_legal, norm, 5),
asyncio.to_thread(static_pages.search_static_pages, norm, 5),
)
content_blog = _content_to_items(content_blog_raw)
content_brands = _content_to_items(content_brand_raw)
content_legal = _content_to_items(content_legal_raw)
page_items = _content_to_items(page_raw)
else:
# autocomplete_only / empty
vehicle_items = []
content_blog = []
content_brands = []
content_legal = []
page_items = []
await _enrich_vehicle_slugs(vehicle_items)
# Tag-bridge: blog matches surface their bridged static pages too
bridged_pages = _bridge_pages_for_blog(content_blog)
# Merge static pages from direct match + bridge (dedup by slug, keep higher score)
page_map: dict[str, SearchResultItem] = {}
for p in page_items + bridged_pages + content_legal:
if p.slug and (p.slug not in page_map or page_map[p.slug].score < p.score):
page_map[p.slug] = p
pages_final = sorted(page_map.values(), key=lambda x: -x.score)
# Combined results, ordered by type priority then by score
type_priority = {"page": 0, "brand": 1, "vehicle": 2, "blog": 3}
all_results: list[SearchResultItem] = (
pages_final[:5] + content_brands[:5] + vehicle_items[:limit] + content_blog[:5]
)
# Direct redirect logic:
# - commission_number / vin with single hit -> vehicle slug
# - exact static page match (score >= 9) when ONLY one hit -> page slug
direct_redirect = False
if intent.direct_redirect and len(vehicle_items) == 1 and vehicle_items[0].slug:
direct_redirect = True
elif intent.type == "keyword_search" and len(pages_final) == 1 and pages_final[0].score >= 9 and not vehicle_items and not content_brands and not content_blog:
direct_redirect = True
took_ms = int((time.perf_counter() - started) * 1000)
payload = SearchResponse(
query=q,
intent=intent.type,
direct_redirect=direct_redirect,
total=len(all_results),
results=all_results,
took_ms=took_ms,
cache_hit=False,
)
ttl = settings.cache_ttl_empty if not all_results else settings.cache_ttl_result
await cache.set_json(cache_key, payload.model_dump(), ttl=ttl)
response.headers["X-Cache-Hit"] = "false"
return payload
# ---------------------------------------------------------------------------
# Suggest
# ---------------------------------------------------------------------------
@app.get("/suggest", response_model=SuggestResponse)
async def suggest_endpoint(
response: Response,
q: str = Query("", max_length=50),
limit: int = Query(8, ge=1, le=20),
) -> SuggestResponse:
started = time.perf_counter()
q_norm = (q or "").strip().lower()
cache_key = cache.key_suggest(f"{q_norm}:{limit}")
cached = await cache.get_json(cache_key)
if cached is not None:
response.headers["X-Cache-Hit"] = "true"
return SuggestResponse(**cached)
if not q_norm:
items = await suggest.top_brands(limit=limit)
else:
items = await suggest.prefix_suggest(q_norm, limit=limit)
payload = SuggestResponse(
query=q,
suggestions=items,
took_ms=int((time.perf_counter() - started) * 1000),
)
await cache.set_json(cache_key, payload.model_dump(), ttl=settings.cache_ttl_suggest)
response.headers["X-Cache-Hit"] = "false"
return payload
# ---------------------------------------------------------------------------
# Cache invalidation
# ---------------------------------------------------------------------------
@app.post("/cache/invalidate", response_model=InvalidateResponse)
async def invalidate(request: Request) -> InvalidateResponse:
cleared = await cache.invalidate_all()
# Refresh both indexes in the background
asyncio.create_task(slug_resolver.refresh())
asyncio.create_task(content_index.refresh())
log.info(
"Cache invalidated: %d keys (from %s)",
cleared,
request.client.host if request.client else "?",
)
return InvalidateResponse(cleared=cleared, scope="all")

View file

@ -0,0 +1,82 @@
"""Pydantic response models."""
from pydantic import BaseModel, Field
from typing import Literal, Any
IntentType = Literal[
"komm_nr",
"dvn",
"vin",
"autocomplete_only",
"keyword_search",
"empty",
]
ResultType = Literal["vehicle", "page", "blog", "brand"]
class SearchResultItem(BaseModel):
"""Unified result item — fields are optional depending on type."""
type: ResultType = "vehicle"
title: str
slug: str | None = None
snippet: str | None = None
score: float = 0.0
matched_via: str = ""
# Vehicle-specific
vehicle_id: int | None = None
commission_number: str | None = None
vin: str | None = None
brand: str | None = None
model: str | None = None
price: float | None = None
primary_image_id: str | None = None
directus_product_id: int | None = None
# Blog-specific
tags: list[str] | None = None
category: str | None = None
published_at: str | None = None
image_id: str | None = None
# Brand-specific
logo_id: str | None = None
model_config = {"extra": "allow"}
class SearchResponse(BaseModel):
query: str
intent: IntentType
direct_redirect: bool = False
total: int
results: list[SearchResultItem]
took_ms: int
cache_hit: bool = False
class SuggestItem(BaseModel):
text: str
type: Literal["brand", "model", "category"]
count: int = 0
class SuggestResponse(BaseModel):
query: str
suggestions: list[SuggestItem]
took_ms: int
class HealthResponse(BaseModel):
status: str
environment: str
db: str
redis: str
vehicles_count: int
content: dict[str, Any] = Field(default_factory=dict)
class InvalidateResponse(BaseModel):
cleared: int
scope: str = "all"

View file

View file

@ -0,0 +1,78 @@
"""Exact lookups: Komm-Nr, DVN, VIN.
Loco-Soft Komm-Nr-Schema:
Type-Buchstabe (N/T/V/D/G/L) + DVN-Zahl (4-6 Stellen)
Beispiele: 'D9094' (Nissan Leaf), 'N8093' (Askoll XKP45), 'L9083' (Opel)
N = Neu, T = Tageszul., V = Vorfuehr,
D = Differenzbest., G = Gebraucht, L = Leihgabe
DVN allein ist eindeutig pro Fahrzeug, deshalb auch als Such-Einstieg möglich.
"""
import re
from app import db
from app.schemas import SearchResultItem
COLUMNS = """
vehicle_id, dvn, commission_number, vin, brand, model, title,
price::float8 as price, primary_image_id::text as primary_image_id,
directus_product_id, dealer_vehicle_type
"""
KOMM_NR_RE = re.compile(r"^([NTVDGLntvdgl])\s*(\d{4,6})$")
def normalize_komm_nr(raw: str) -> str | None:
"""'D 9094' / 'd9094' / ' D9094 ' -> 'D9094'. Returns None if not a valid pattern."""
if not raw:
return None
m = KOMM_NR_RE.match(raw.strip())
if not m:
return None
return m.group(1).upper() + m.group(2)
async def by_komm_nr(raw: str) -> list[SearchResultItem]:
"""Lookup by full Komm-Nr (Type-Letter + DVN). Eindeutig wenn vorhanden."""
normalized = normalize_komm_nr(raw)
if not normalized:
return []
sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE commission_number = $1 LIMIT 5"
rows = await db.fetch(sql, normalized)
return [_row_to_item(r, matched_via="exact_komm") for r in rows]
async def by_dvn(number: str) -> list[SearchResultItem]:
"""Lookup by DVN allein (4-6 stellige Zahl ohne Type-Buchstabe).
DVN ist eindeutig pro Fahrzeug, also liefert das immer 0 oder 1 Treffer."""
try:
dvn_int = int(number)
except (ValueError, TypeError):
return []
sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE dvn = $1 LIMIT 5"
rows = await db.fetch(sql, dvn_int)
return [_row_to_item(r, matched_via="exact_dvn") for r in rows]
async def by_vin(vin: str) -> list[SearchResultItem]:
sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE vin = $1 LIMIT 5"
rows = await db.fetch(sql, vin)
return [_row_to_item(r, matched_via="exact_vin") for r in rows]
def _row_to_item(row, matched_via: str) -> SearchResultItem:
return SearchResultItem(
vehicle_id=row["vehicle_id"],
commission_number=row["commission_number"],
vin=row["vin"],
brand=row["brand"],
model=row["model"],
title=row["title"] or "",
price=row["price"],
primary_image_id=row["primary_image_id"],
directus_product_id=row["directus_product_id"],
score=1.0,
matched_via=matched_via,
)

View file

@ -0,0 +1,148 @@
"""Postgres Full-Text Search (Typ 2) + pg_trgm Fuzzy (Typ 3) + Substring fallback."""
import re
from app import db
from app.schemas import SearchResultItem
COLUMNS = """
vehicle_id, commission_number, vin, brand, model, title,
price::float8 as price, primary_image_id::text as primary_image_id,
directus_product_id
"""
# Sanitize tsquery input — keep word chars, spaces, dots, hyphens
_SAFE_TOKEN = re.compile(r"[^\w\s\.\-]", re.UNICODE)
def _build_prefix_tsquery(query: str) -> str:
"""Convert free-text query into a safe tsquery with prefix match.
'ray' -> 'ray:*'
'zero motor' -> 'zero:* & motor:*'
"""
cleaned = _SAFE_TOKEN.sub(" ", query)
tokens = [t for t in cleaned.split() if t]
if not tokens:
return ""
return " & ".join(f"{t}:*" for t in tokens)
async def fts_search(query: str, limit: int = 10) -> list[SearchResultItem]:
"""Full-text search with German + unaccent dictionary, prefix-enabled."""
tsquery_str = _build_prefix_tsquery(query)
if not tsquery_str:
return []
sql = f"""
SELECT {COLUMNS},
ts_rank_cd(search_tsv, q)::float8 AS rank
FROM search_vehicles, to_tsquery('german_unaccent', $1) q
WHERE search_tsv @@ q
ORDER BY rank DESC, brand, model
LIMIT $2
"""
try:
rows = await db.fetch(sql, tsquery_str, limit)
except Exception:
sql_fallback = f"""
SELECT {COLUMNS},
ts_rank_cd(search_tsv, q)::float8 AS rank
FROM search_vehicles, plainto_tsquery('german_unaccent', $1) q
WHERE search_tsv @@ q
ORDER BY rank DESC, brand, model
LIMIT $2
"""
rows = await db.fetch(sql_fallback, query, limit)
return [_to_item(r, "fts", float(r["rank"] or 0)) for r in rows]
async def fuzzy_search(query: str, limit: int = 10) -> list[SearchResultItem]:
"""Trigram similarity for typo tolerance."""
sql = f"""
WITH scored AS (
SELECT {COLUMNS},
GREATEST(
similarity(title, $1),
similarity(COALESCE(commission_number, ''), $1)
)::float8 AS sim
FROM search_vehicles
WHERE title % $1
OR commission_number % $1
)
SELECT * FROM scored
WHERE sim > 0.25
ORDER BY sim DESC
LIMIT $2
"""
rows = await db.fetch(sql, query, limit)
return [_to_item(r, "fuzzy", float(r["sim"])) for r in rows]
async def substring_search(query: str, limit: int = 10) -> list[SearchResultItem]:
"""Last-resort: ILIKE substring on title/brand/model.
Catches stop-words (e.g. German 'es') that FTS strips, and very short
queries that don't survive stemming. Uses the existing trgm GIN index.
"""
pattern = f"%{query.lower()}%"
sql = f"""
SELECT {COLUMNS}, 0.5::float8 AS rank
FROM search_vehicles
WHERE LOWER(title) LIKE $1
OR LOWER(brand) LIKE $1
OR LOWER(COALESCE(model, '')) LIKE $1
ORDER BY brand, model
LIMIT $2
"""
rows = await db.fetch(sql, pattern, limit)
return [_to_item(r, "substring", 0.5) for r in rows]
async def hybrid_search(query: str, limit: int = 10) -> list[SearchResultItem]:
"""FTS first; if too few hits, add fuzzy; if still empty, add substring.
Strategy:
1. Always run FTS (cheap, indexed, prefix-enabled)
2. If FTS has >= 3 results, return them
3. Otherwise also run fuzzy and merge
4. If still nothing, run substring fallback (handles stop-words)
"""
fts_results = await fts_search(query, limit=limit)
if len(fts_results) >= 3:
return fts_results
fuzzy_results = await fuzzy_search(query, limit=limit)
seen: dict[int, SearchResultItem] = {r.vehicle_id: r for r in fts_results}
for r in fuzzy_results:
if r.vehicle_id not in seen:
seen[r.vehicle_id] = r
if not seen:
# Last resort: substring (catches stop-words like 'es')
substring_results = await substring_search(query, limit=limit)
for r in substring_results:
seen[r.vehicle_id] = r
merged = list(seen.values())
merged.sort(key=lambda x: (
0 if x.matched_via == "fts" else (1 if x.matched_via == "fuzzy" else 2),
-x.score,
))
return merged[:limit]
def _to_item(r, matched_via: str, score: float) -> SearchResultItem:
return SearchResultItem(
vehicle_id=r["vehicle_id"],
commission_number=r["commission_number"],
vin=r["vin"],
brand=r["brand"],
model=r["model"],
title=r["title"] or "",
price=r["price"],
primary_image_id=r["primary_image_id"],
directus_product_id=r["directus_product_id"],
score=score,
matched_via=matched_via,
)

View file

@ -0,0 +1,194 @@
"""Static page registry + tag-to-page bridge.
Hardcoded list of top-level Astro pages that aren't in Directus.
The tag bridge maps blog tags (real, curated by the team) to these pages,
turning the blog content taxonomy into a search vocabulary for the site.
"""
from dataclasses import dataclass, field
from typing import Iterable
@dataclass
class StaticPage:
slug: str # URL path, e.g. "/werkstatt"
title: str # Display title
snippet: str # Description shown in search results
keywords: list[str] = field(default_factory=list) # Direct synonyms
# Top-level pages of electric-horses.de that live as Astro routes,
# not in Directus. Order matters for tie-breaking (earlier = preferred).
STATIC_PAGES: list[StaticPage] = [
StaticPage(
slug="/werkstatt",
title="Werkstatt",
snippet="Spezialwerkstatt fuer E-Roller und E-Motorraeder. Inspektion, Reparatur, Akku-Service.",
keywords=["werkstatt", "reparatur", "service", "inspektion", "wartung"],
),
StaticPage(
slug="/ersatzteile",
title="Ersatzteile",
snippet="Original-Ersatzteile und Zubehoer fuer E-Roller, E-Motorraeder und E-Scooter.",
keywords=["ersatzteile", "teile", "zubehoer", "originalteile"],
),
StaticPage(
slug="/fahrzeuge",
title="Fahrzeuge",
snippet="Unser kompletter Bestand an E-Rollern, E-Motorraedern und E-Autos.",
keywords=["fahrzeuge", "bestand", "angebot", "uebersicht", "alle"],
),
StaticPage(
slug="/marken",
title="Marken",
snippet="Alle Marken im Ueberblick: Askoll, RAY, ZERO, Energica, Nissan und mehr.",
keywords=["marken", "hersteller", "brands"],
),
StaticPage(
slug="/vermietung",
title="Vermietung",
snippet="E-Motorrad-Vermietung in Wendelstein bei Nuernberg. Tagestouren, Wochenmiete.",
keywords=["vermietung", "mieten", "leihen", "verleih", "rental"],
),
StaticPage(
slug="/kontakt",
title="Kontakt",
snippet="Kontakt zu Electric Horses: Telefon, E-Mail, Anfahrt, Oeffnungszeiten.",
keywords=["kontakt", "anfahrt", "adresse", "telefon", "email", "oeffnungszeiten"],
),
StaticPage(
slug="/ueber-uns",
title="Ueber uns",
snippet="Electric Horses ist die E-Mobilitaets-Sparte des Autohaus Richter & Zech.",
keywords=["ueber uns", "team", "geschichte", "richter zech", "wendelstein"],
),
StaticPage(
slug="/blog",
title="Blog",
snippet="News, Ratgeber und Erfahrungsberichte rund um Elektromobilitaet.",
keywords=["blog", "news", "artikel", "ratgeber"],
),
]
# Tag → Page bridge.
# These tags exist on real blog posts; matching one of them surfaces the
# linked page (in addition to the blog posts that carry the tag).
# Source: actual tag pool extracted from blog_posts.tags.
TAG_TO_PAGE_BRIDGE: dict[str, str] = {
# ─── Werkstatt-Themen (Technik, Service, Reifen, Akku) ───────
"akku": "/werkstatt",
"akkuladung": "/werkstatt",
"ladedauer": "/werkstatt",
"reichweite": "/werkstatt",
"verbrauch": "/werkstatt",
"bremsen": "/werkstatt",
"reifen": "/werkstatt",
"metzeler": "/werkstatt",
"pirelli": "/werkstatt",
"heidenau": "/werkstatt",
"wartung": "/werkstatt",
"service": "/werkstatt",
"installation": "/werkstatt",
"montage": "/werkstatt",
"werkzeug": "/werkstatt",
"trittbrettverbreiterung": "/werkstatt",
"radnabenmotor": "/werkstatt",
"propilot": "/werkstatt",
"effizienz": "/werkstatt",
"ledersitze": "/werkstatt",
# ─── Beratung & Foerderung (Kontakt) ─────────────────────────
"versicherung": "/kontakt",
"foerderung": "/kontakt",
"foerderungen": "/kontakt",
"praemie": "/kontakt",
"preisvorteil": "/kontakt",
"innovationspraemie": "/kontakt",
"umweltpraemie": "/kontakt",
"kosten": "/kontakt",
"vergleich": "/kontakt",
"vorfuehrer": "/kontakt",
"preis": "/kontakt",
"goelectric": "/kontakt",
# ─── Fuehrerschein & Legales (Werkstatt-Section "Recht") ─────
"abe": "/werkstatt",
"ekfv": "/werkstatt",
"klassea1": "/werkstatt",
"klasseb": "/werkstatt",
"schluesselzahl196": "/werkstatt",
"fuehrerschein": "/werkstatt",
"strassenzulassung": "/werkstatt",
"verkehrsregeln": "/werkstatt",
"verordnung": "/werkstatt",
"legal": "/werkstatt",
"ekickroller": "/werkstatt",
"ekfv": "/werkstatt",
# ─── Vermietung-Themen ───────────────────────────────────────
"touristik": "/vermietung",
"freizeit": "/vermietung",
# ─── Marken-Indikatoren (linken auf /marken) ─────────────────
"marknunabhaengig": "/marken",
}
def search_static_pages(query: str, limit: int = 5) -> list[dict]:
"""Search static pages by title, slug, keywords, snippet substring.
Returns dicts in the same shape as content search results.
"""
q = (query or "").strip().lower()
if not q:
return []
results = []
for page in STATIC_PAGES:
score = 0.0
# Exact title match -> very high
if page.title.lower() == q:
score = 10.0
# Title prefix
elif page.title.lower().startswith(q):
score = 7.0
# Slug match (without leading slash)
elif page.slug.lstrip("/").lower() == q:
score = 9.0
# Title substring (only for queries >= 3 chars to avoid noise like 'ze' in 'fahrzeuge')
elif len(q) >= 3 and q in page.title.lower():
score = 5.0
# Keyword exact / prefix
elif any(kw == q for kw in page.keywords):
score = 6.0
elif any(kw.startswith(q) for kw in page.keywords):
score = 4.0
# Snippet substring (weakest, also requires >= 3 chars)
elif len(q) >= 3 and q in page.snippet.lower():
score = 2.0
if score > 0:
results.append({
"type": "page",
"title": page.title,
"slug": page.slug,
"snippet": page.snippet,
"score": score,
"matched_via": "static_page",
})
results.sort(key=lambda r: -r["score"])
return results[:limit]
def get_pages_for_tags(tags: Iterable[str]) -> set[str]:
"""Given an iterable of blog tags, return the set of page slugs they map to."""
if not tags:
return set()
return {TAG_TO_PAGE_BRIDGE[t.lower()] for t in tags if t and t.lower() in TAG_TO_PAGE_BRIDGE}
def page_by_slug(slug: str) -> StaticPage | None:
for p in STATIC_PAGES:
if p.slug == slug:
return p
return None

View file

@ -0,0 +1,50 @@
"""Autocomplete / Suggest endpoint (Typ 11)."""
from app import db
from app.schemas import SuggestItem
async def top_brands(limit: int = 10) -> list[SuggestItem]:
rows = await db.fetch(
"SELECT brand, count(*) AS cnt FROM search_vehicles "
"WHERE brand IS NOT NULL GROUP BY brand ORDER BY cnt DESC LIMIT $1",
limit,
)
return [
SuggestItem(text=r["brand"], type="brand", count=r["cnt"])
for r in rows
]
async def prefix_suggest(prefix: str, limit: int = 8) -> list[SuggestItem]:
"""Brand + model prefix matching, case-insensitive."""
pattern = f"{prefix.lower()}%"
# Brands first
brand_rows = await db.fetch(
"SELECT brand, count(*) AS cnt FROM search_vehicles "
"WHERE LOWER(brand) LIKE $1 GROUP BY brand ORDER BY cnt DESC LIMIT $2",
pattern,
limit,
)
items = [
SuggestItem(text=r["brand"], type="brand", count=r["cnt"])
for r in brand_rows
]
# Then models if room
remaining = limit - len(items)
if remaining > 0:
model_rows = await db.fetch(
"SELECT DISTINCT brand || ' ' || model AS text, count(*) AS cnt "
"FROM search_vehicles "
"WHERE model IS NOT NULL AND LOWER(model) LIKE $1 "
"GROUP BY brand, model ORDER BY cnt DESC LIMIT $2",
pattern,
remaining,
)
items.extend(
SuggestItem(text=r["text"], type="model", count=r["cnt"])
for r in model_rows
)
return items

View file

@ -0,0 +1,66 @@
"""Resolves directus_product_id -> slug, with periodic refresh.
Uses Directus REST API. Slug map is small (~75 entries) so we keep it
fully in-memory and refresh on a schedule (or on cache invalidation).
"""
import asyncio
import logging
import time
import httpx
from app.config import settings
log = logging.getLogger(__name__)
_slug_map: dict[int, str] = {}
_last_refresh: float = 0.0
_lock = asyncio.Lock()
async def refresh() -> int:
"""Pull product_id -> slug from Directus. Returns count."""
global _slug_map, _last_refresh
url = f"{settings.directus_url}/items/products"
params = {
"fields": "id,slug",
"limit": -1,
"filter[status][_eq]": "published",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json().get("data", [])
new_map = {
int(item["id"]): item["slug"]
for item in data
if item.get("id") is not None and item.get("slug")
}
async with _lock:
_slug_map = new_map
_last_refresh = time.time()
log.info("Slug map refreshed: %d entries", len(new_map))
return len(new_map)
except Exception as e:
log.warning("Slug refresh failed: %s", e)
return 0
async def get_slug(directus_product_id: int | None) -> str | None:
if directus_product_id is None:
return None
# Lazy refresh if stale
if time.time() - _last_refresh > settings.directus_slug_refresh_seconds:
await refresh()
return _slug_map.get(int(directus_product_id))
async def background_refresher():
"""Periodic background task — refresh slug map every N seconds."""
while True:
await asyncio.sleep(settings.directus_slug_refresh_seconds)
try:
await refresh()
except Exception as e:
log.warning("Background slug refresh error: %s", e)

View file

@ -0,0 +1,57 @@
# eh-search Stack on ai-apps
# Service is INTERNAL ONLY - bound to private network 10.0.0.8
# Reached by Pegasus via nginx proxy_pass on 10.0.0.10
services:
eh-search:
build: .
image: eh-search:dev
container_name: eh-search
restart: unless-stopped
env_file: .env
ports:
# Bind ONLY to private network IP, not 0.0.0.0
- "10.0.0.8:8200:8200"
networks:
- eh-search-internal
depends_on:
eh-search-redis:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-fsS", "http://127.0.0.1:8200/health"]
interval: 30s
timeout: 5s
retries: 3
start_period: 10s
deploy:
resources:
limits:
memory: 512M
cpus: "0.5"
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"
eh-search-redis:
image: redis:7-alpine
container_name: eh-search-redis
restart: unless-stopped
command: redis-server --maxmemory 64mb --maxmemory-policy allkeys-lru --save ""
networks:
- eh-search-internal
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
deploy:
resources:
limits:
memory: 96M
cpus: "0.2"
networks:
eh-search-internal:
driver: bridge

View file

@ -0,0 +1,7 @@
fastapi==0.115.0
uvicorn[standard]==0.30.6
asyncpg==0.29.0
redis==5.0.8
pydantic==2.9.2
pydantic-settings==2.5.2
httpx==0.27.2