diff --git a/stacks/eh-search/.env.example b/stacks/eh-search/.env.example new file mode 100644 index 0000000..09d32aa --- /dev/null +++ b/stacks/eh-search/.env.example @@ -0,0 +1,28 @@ +# eh-search — Environment template +# Copy to .env and fill in actual values. NEVER commit .env to Git. + +ENVIRONMENT=dev + +# Postgres (eh_vehicles via loco-replica-db on host private network) +DB_HOST=10.0.0.8 +DB_PORT=5433 +DB_NAME=eh_vehicles +DB_USER=search_read +DB_PASSWORD= + +# Redis (intra-stack, no password) +REDIS_HOST=eh-search-redis +REDIS_PORT=6379 +REDIS_DB=0 + +# Cache TTLs (seconds) +CACHE_TTL_RESULT=60 +CACHE_TTL_SUGGEST=600 +CACHE_TTL_EMPTY=300 + +# Directus (for slug resolution) +DIRECTUS_URL=http://10.0.0.10:8055 +DIRECTUS_SLUG_REFRESH_SECONDS=300 + +# CORS (leer wenn same-origin via Nginx auf Pegasus) +CORS_ORIGINS= diff --git a/stacks/eh-search/Agent.md b/stacks/eh-search/Agent.md new file mode 100644 index 0000000..30b6d3f --- /dev/null +++ b/stacks/eh-search/Agent.md @@ -0,0 +1,65 @@ +# Agent Briefing — eh-search Stack + +Du arbeitest am Site-Search-Service von electric-horses.de. Lies erst `../../Agent.md` am Repo-Root für globale Konventionen. + +## Was ist eh-search? +Ein FastAPI-basierter Suchservice für die Electric-Horses-Website. Unterstützt: +- **Exact Lookup** via Kommissionsnummer (z.B. `D9094`) und DVN (z.B. `9094`) +- **Full-Text Search** (Postgres `ts_vector` mit `german_unaccent` Dictionary) +- **Fuzzy Search** (`pg_trgm` Trigramm-Matching) +- **Multi-Source Federation:** Vehicles + Blog-Posts + Brands + Pages + Static Pages + Tag-Bridge +- **Cmd+K Command Palette** im Astro-Frontend, mit Silent-SSO-Style "Öffnet sich bei Tastendruck" + +## Live-Deployment +- **Server:** ai-apps (Hetzner cx22, 10.0.0.8 privat) +- **Pfad:** `/opt/ai-apps/eh-search/` +- **Binding:** NUR `10.0.0.8:8200` (privat), NICHT public. Wird über Pegasus Nginx via `/api/search` erreicht. +- **Frontend-Integration:** Astro Command-Palette auf `electric-horses.de` + `dev.electric-horses.de` +- **Content-Hosting:** Der Suchindex nutzt die Postgres-DB `eh_vehicles` (Schema in `sql/`), ein In-Memory Content-Index (aus Directus gezogen), und hardcoded Static Pages + Tag-Bridge. + +## Architektur-Verweis +- **Datenquelle für Fahrzeuge:** PostgreSQL `eh_vehicles` auf loco-replica-db (privater Postgres 17 Container) +- **Datenquelle für Blog/Brands/Pages:** Directus REST API auf Pegasus (`10.0.0.10:8055`) +- **Cache:** Redis (dediziert im Stack) +- **Intent Routing:** Regex-basiert, siehe `app/intent_router.py` +- **Zwei Komm-Nr-Konzepte in Loco-Soft:** (a) Kundenseitig `{type_letter}{dvn}` z.B. `D9094`, (b) interne Fahrzeug-Nummer. **Nur (a) der Kunde jemals sehen.** Siehe Memory `project_eh_search_phase1.md`. + +## Kritische Files +- `app/main.py` — FastAPI Entry + Endpoint-Definitionen +- `app/intent_router.py` — Query → Intent (komm_nr, dvn, keyword_search, ...) +- `app/search/exact.py` — Exact Lookup (Komm-Nr, DVN, VIN) +- `app/search/fts.py` — FTS + Fuzzy + Substring Fallback + Hybrid +- `app/search/static_pages.py` — Static Page Registry + Tag→Page Bridge (~60 Mappings) +- `app/content_index.py` — In-Memory Index aus Directus (Blog, Brands, Pages) +- `app/schemas.py` — Pydantic-Response-Models +- `sql/01_init.sql` — Initial Postgres Setup (pg_trgm, MView, search_read user) +- `sql/02_komm_nr_fix.sql` — Komm-Nr Datenmodell-Korrektur + +## Ops-Kommandos +```bash +ssh ai-apps +cd /opt/ai-apps/eh-search + +# Rebuild & restart nach Code-Änderung +docker compose up -d --build eh-search + +# Cache invalidieren +curl -X POST http://10.0.0.8:8200/cache/invalidate + +# Health +curl -s http://10.0.0.8:8200/health | jq . + +# Logs +docker logs eh-search --tail 30 +``` + +## OpenProject +- **M6** — Site Search Phase 1 (abgeschlossen) +- **M6.2** — Federated Multi-Source (abgeschlossen) +- **M6.3** — UX Refinement (abgeschlossen) +- **M6.4** — Komm-Nr Datenmodell-Fix (abgeschlossen) +- **M6.5** — Sync-Fix für Deaktivierungen (abgeschlossen) + +## Related +- Memory: `project_eh_search_phase1.md` — vollständige Historie und Gotchas +- `../../Agent.md` — Repo-weites Briefing diff --git a/stacks/eh-search/Dockerfile b/stacks/eh-search/Dockerfile new file mode 100644 index 0000000..3dadba0 --- /dev/null +++ b/stacks/eh-search/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.12-slim + +WORKDIR /app + +# System deps (minimal) +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Python deps +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# App code +COPY app/ ./app/ + +# Healthcheck +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD curl -fsS http://127.0.0.1:8200/health || exit 1 + +EXPOSE 8200 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8200", "--workers", "2", "--access-log"] diff --git a/stacks/eh-search/README.md b/stacks/eh-search/README.md new file mode 100644 index 0000000..61e1be4 --- /dev/null +++ b/stacks/eh-search/README.md @@ -0,0 +1,63 @@ +# eh-search — Site-Search für electric-horses.de + +**Live:** Via Nginx-Proxy auf `https://www.electric-horses.de/api/search?q=...` +**Interne Adresse:** `http://10.0.0.8:8200` (privat, nicht public!) +**Stack-Pfad:** `/opt/ai-apps/eh-search/` auf ai-apps + +## Was macht das? +Ein FastAPI-Service der eine **Cmd+K Command Palette** auf der Electric-Horses-Website mit Suchergebnissen versorgt. Durchsucht: +- 75 Fahrzeuge (Postgres FTS + Fuzzy + Komm-Nr/DVN Exact Lookup) +- 28 Blog-Posts (mit Tags + neue SEO-Descriptions) +- 24 Marken +- 6 Legal-Pages (Impressum, Datenschutz, ...) +- 8 Static Pages (Werkstatt, Ersatzteile, Kontakt, ...) +- 60 Tag → Page Bridges für semantische Verbindungen + +**Killer-Feature:** Eingabe einer 4-stelligen Komm-Nr (oder `D9094` etc.) → Direct Redirect zur Fahrzeug-Detailseite. Latenz <50ms. Ideal für Mitarbeiter am Telefon. + +## Architektur +``` +Browser Cmd+K + ↓ +www.electric-horses.de/api/search + ↓ (Nginx Proxy auf Pegasus) +10.0.0.8:8200 (eh-search auf ai-apps, privat) + ↓ +├─ Postgres eh_vehicles (loco-replica-db Container, Port 5433) +├─ Redis Cache (Stack-intern) +├─ Directus REST API (in-memory Content Index) +└─ Static Page Registry (hardcoded in Python) +``` + +## Sicherheit +- **Bind NUR auf privates Netz** (10.0.0.8:8200) — nicht über Public-IP erreichbar +- **Read-Only DB-User** (`search_read`) — kann nur SELECT auf Materialized View +- **Same-Origin via Pegasus Nginx** — kein CORS nötig, kein neues Cert, kein neues DNS +- **Rate Limit** via Nginx (30 req/s pro IP) + +## Files +- `docker-compose.yml` — Stack +- `Dockerfile` — Python 3.12 + FastAPI + asyncpg + redis-py +- `requirements.txt` — Python-Deps +- `.env.example` — Template für `.env` (niemals echte `.env` committen!) +- `app/` — FastAPI-Anwendung +- `sql/` — Postgres-Schema und Migrations (search_vehicles Materialized View, pg_trgm, unaccent) +- `Agent.md` — AI-Briefing +- `README.md` — diese Datei + +## Live-Update-Workflow +Wenn sich Fahrzeugdaten ändern (vehicle_sync.py nächtlich auf Pegasus), wird am Ende des Syncs automatisch die Materialized View refreshed UND der Redis-Cache invalidiert. Directus-Edits an Blog/Brands/Pages feuern einen Webhook an eh-search → sofortige Cache-Invalidation und Content-Index-Refresh. + +## OpenProject Phasen (alle abgeschlossen) +- **M6** — Postgres FTS + Fuzzy + Exact + Command Palette (Phase 1) +- **M6.2** — Federated Multi-Source (Phase 1.5) +- **M6.3** — UX Refinement (sleek, mobile-first, smart group ordering) +- **M6.4** — Komm-Nr Datenmodell-Fix (D9094 korrekt statt falsche interne Nummer) +- **M6.5** — Sync-Fix für deaktivierte Fahrzeuge + +## Out of Scope (mögliche Phase 2) +- Semantic Vector Search via Qdrant + bge-Embeddings +- Voice Input via Whisper +- LLM Filter-Extraktion via lokales Mistral/Gemma +- Image Search / OCR +- Cross-Encoder Reranking diff --git a/stacks/eh-search/app/__init__.py b/stacks/eh-search/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stacks/eh-search/app/cache.py b/stacks/eh-search/app/cache.py new file mode 100644 index 0000000..85705f0 --- /dev/null +++ b/stacks/eh-search/app/cache.py @@ -0,0 +1,80 @@ +"""Redis cache wrapper with namespaced keys.""" +import hashlib +import json +from typing import Any + +import redis.asyncio as redis_async + +from app.config import settings + +_redis: redis_async.Redis | None = None +NAMESPACE = "search:" + + +async def init_redis() -> None: + global _redis + _redis = redis_async.Redis( + host=settings.redis_host, + port=settings.redis_port, + db=settings.redis_db, + decode_responses=True, + socket_timeout=2, + ) + await _redis.ping() + + +async def close_redis() -> None: + global _redis + if _redis is not None: + await _redis.close() + _redis = None + + +def get_redis() -> redis_async.Redis: + if _redis is None: + raise RuntimeError("Redis not initialized") + return _redis + + +def _hash(value: str) -> str: + return hashlib.sha1(value.encode("utf-8")).hexdigest()[:16] + + +def key_query(query: str) -> str: + return f"{NAMESPACE}q:{_hash(query.lower())}" + + +def key_suggest(prefix: str) -> str: + return f"{NAMESPACE}suggest:{_hash(prefix.lower())}" + + +def key_empty(query: str) -> str: + return f"{NAMESPACE}empty:{_hash(query.lower())}" + + +def key_top_brands() -> str: + return f"{NAMESPACE}top:brands" + + +async def get_json(key: str) -> Any | None: + data = await get_redis().get(key) + if data is None: + return None + try: + return json.loads(data) + except (json.JSONDecodeError, TypeError): + return None + + +async def set_json(key: str, value: Any, ttl: int) -> None: + await get_redis().set(key, json.dumps(value, default=str), ex=ttl) + + +async def invalidate_all() -> int: + """Delete all keys in our namespace. Returns count deleted.""" + r = get_redis() + count = 0 + async for key in r.scan_iter(match=f"{NAMESPACE}*", count=200): + await r.delete(key) + count += 1 + return count diff --git a/stacks/eh-search/app/config.py b/stacks/eh-search/app/config.py new file mode 100644 index 0000000..c9a177a --- /dev/null +++ b/stacks/eh-search/app/config.py @@ -0,0 +1,46 @@ +"""Configuration via environment variables (Pydantic Settings).""" +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", extra="ignore") + + environment: str = "dev" + + # Postgres + db_host: str + db_port: int = 5433 + db_name: str = "eh_vehicles" + db_user: str = "search_read" + db_password: str + + # Redis + redis_host: str = "eh-search-redis" + redis_port: int = 6379 + redis_db: int = 0 + + # Cache TTLs + cache_ttl_result: int = 60 + cache_ttl_suggest: int = 600 + cache_ttl_empty: int = 300 + + # Directus + directus_url: str = "http://10.0.0.10:8055" + directus_slug_refresh_seconds: int = 300 + + # CORS + cors_origins: str = "" + + @property + def dsn(self) -> str: + return ( + f"postgresql://{self.db_user}:{self.db_password}" + f"@{self.db_host}:{self.db_port}/{self.db_name}" + ) + + @property + def cors_origin_list(self) -> list[str]: + return [o.strip() for o in self.cors_origins.split(",") if o.strip()] + + +settings = Settings() diff --git a/stacks/eh-search/app/content_index.py b/stacks/eh-search/app/content_index.py new file mode 100644 index 0000000..6883bd0 --- /dev/null +++ b/stacks/eh-search/app/content_index.py @@ -0,0 +1,305 @@ +"""In-memory content index for blog posts, brands, legal pages. + +Loaded from Directus on startup, refreshed every 5 min in background, +and on POST /cache/invalidate. Total dataset is small (~90 items), +fits comfortably in RAM. +""" +import asyncio +import logging +import time +from dataclasses import dataclass, field +from typing import Any + +import httpx + +from app.config import settings + +log = logging.getLogger(__name__) + + +@dataclass +class BlogPost: + id: int + title: str + slug: str + excerpt: str = "" + seo_description: str = "" + tags: list[str] = field(default_factory=list) + category: str = "" + published_at: str | None = None + image_id: str | None = None + + +@dataclass +class Brand: + id: int + name: str + slug: str + short_description: str = "" + description: str = "" + logo_id: str | None = None + vehicle_count: int = 0 + + +@dataclass +class LegalPage: + id: int + title: str + slug: str + seo_description: str = "" + + +# Mutable global state — protected by _lock for refresh atomicity +_blog_posts: list[BlogPost] = [] +_brands: list[Brand] = [] +_legal_pages: list[LegalPage] = [] +_last_refresh: float = 0.0 +_lock = asyncio.Lock() + +# Slugs of pages that are NOT really top-level static pages but live in +# the Directus pages collection. We treat them as legal/info pages. +LEGAL_PAGE_SLUGS = { + "impressum", "datenschutz", "agb", + "barrierefreiheit", "batterie-entsorgung", "widerruf", +} + + +async def _fetch(client: httpx.AsyncClient, collection: str, fields: str, filter_field: str = "status", filter_value: str = "published") -> list[dict]: + url = f"{settings.directus_url}/items/{collection}" + params = { + "fields": fields, + "limit": -1, + } + if filter_field: + params[f"filter[{filter_field}][_eq]"] = filter_value + try: + resp = await client.get(url, params=params) + resp.raise_for_status() + return resp.json().get("data", []) + except Exception as e: + log.warning("Failed to fetch %s: %s", collection, e) + return [] + + +async def refresh() -> dict: + """Pull all collections from Directus and rebuild the in-memory index.""" + global _blog_posts, _brands, _legal_pages, _last_refresh + + async with httpx.AsyncClient(timeout=15) as client: + blog_data, brand_data, page_data = await asyncio.gather( + _fetch(client, "blog_posts", + "id,title,slug,excerpt,seo_description,tags,category,published_at,image"), + _fetch(client, "brands", + "id,name,slug,short_description,description,logo", + filter_field="is_active", filter_value="true"), + _fetch(client, "pages", + "id,title,slug,seo_description"), + ) + + new_blog = [ + BlogPost( + id=int(p["id"]), + title=p.get("title") or "", + slug=p.get("slug") or "", + excerpt=p.get("excerpt") or "", + seo_description=p.get("seo_description") or "", + tags=[t for t in (p.get("tags") or []) if isinstance(t, str)], + category=p.get("category") or "", + published_at=p.get("published_at"), + image_id=p.get("image"), + ) + for p in blog_data + if p.get("slug") + ] + + new_brands = [ + Brand( + id=int(b["id"]), + name=b.get("name") or "", + slug=b.get("slug") or "", + short_description=b.get("short_description") or "", + description=b.get("description") or "", + logo_id=b.get("logo"), + ) + for b in brand_data + if b.get("slug") + ] + + new_legal = [ + LegalPage( + id=int(p["id"]), + title=p.get("title") or "", + slug=p.get("slug") or "", + seo_description=p.get("seo_description") or "", + ) + for p in page_data + if p.get("slug") and p["slug"] in LEGAL_PAGE_SLUGS + ] + + async with _lock: + _blog_posts = new_blog + _brands = new_brands + _legal_pages = new_legal + _last_refresh = time.time() + + counts = {"blog": len(new_blog), "brands": len(new_brands), "legal": len(new_legal)} + log.info("Content index refreshed: %s", counts) + return counts + + +async def background_refresher() -> None: + while True: + await asyncio.sleep(settings.directus_slug_refresh_seconds) + try: + await refresh() + except Exception as e: + log.warning("Background content refresh error: %s", e) + + +def _score_blog(post: BlogPost, q: str) -> float: + """Weighted match score for a blog post against query q (lowercased).""" + score = 0.0 + title_l = post.title.lower() + excerpt_l = post.excerpt.lower() + seo_l = post.seo_description.lower() + cat_l = post.category.lower() + + # Title weights highest (substring requires len >= 3 to avoid 2-char noise) + if title_l == q: + score += 10 + elif title_l.startswith(q): + score += 6 + elif len(q) >= 3 and q in title_l: + score += 4 + + # Tags (real curated keywords) + for tag in post.tags: + tl = tag.lower() + if tl == q: + score += 5 + break + elif tl.startswith(q) or (len(q) >= 3 and q in tl): + score += 2 + break + + # Category + if q in cat_l: + score += 1 + + # Excerpt + seo_description (lighter) + if q in excerpt_l: + score += 1 + if q in seo_l: + score += 0.5 + + return score + + +def _score_brand(brand: Brand, q: str) -> float: + score = 0.0 + name_l = brand.name.lower() + if name_l == q: + score += 15 # Brand exact match — dominant signal + elif name_l.startswith(q): + score += 10 # Prefix should beat any page substring match + elif len(q) >= 3 and q in name_l: + score += 5 + if len(q) >= 3 and q in (brand.short_description or "").lower(): + score += 1 + if len(q) >= 3 and q in (brand.description or "").lower(): + score += 0.5 + return score + + +def _score_legal(page: LegalPage, q: str) -> float: + score = 0.0 + title_l = page.title.lower() + slug_l = page.slug.lower() + if title_l == q or slug_l == q: + score += 10 + elif title_l.startswith(q) or slug_l.startswith(q): + score += 7 + elif len(q) >= 3 and (q in title_l or q in slug_l): + score += 4 + if len(q) >= 3 and q in (page.seo_description or "").lower(): + score += 1 + return score + + +def search_blog(query: str, limit: int = 5) -> list[dict]: + q = query.strip().lower() + if not q: + return [] + scored = [(p, _score_blog(p, q)) for p in _blog_posts] + scored = [(p, s) for p, s in scored if s > 0] + scored.sort(key=lambda x: -x[1]) + return [ + { + "type": "blog", + "title": p.title, + "slug": f"/blog/{p.slug}", + "snippet": (p.seo_description or p.excerpt or "")[:180], + "tags": p.tags, + "category": p.category, + "published_at": p.published_at, + "image_id": p.image_id, + "score": s, + "matched_via": "blog", + } + for p, s in scored[:limit] + ] + + +def search_brands(query: str, limit: int = 5) -> list[dict]: + q = query.strip().lower() + if not q: + return [] + scored = [(b, _score_brand(b, q)) for b in _brands] + scored = [(b, s) for b, s in scored if s > 0] + scored.sort(key=lambda x: -x[1]) + return [ + { + "type": "brand", + "title": b.name, + "slug": f"/marken/{b.slug}", + "snippet": (b.short_description or "")[:180], + "logo_id": b.logo_id, + "score": s, + "matched_via": "brand", + } + for b, s in scored[:limit] + ] + + +def search_legal(query: str, limit: int = 5) -> list[dict]: + q = query.strip().lower() + if not q: + return [] + scored = [(p, _score_legal(p, q)) for p in _legal_pages] + scored = [(p, s) for p, s in scored if s > 0] + scored.sort(key=lambda x: -x[1]) + return [ + { + "type": "page", + "title": p.title, + "slug": f"/{p.slug}", + "snippet": (p.seo_description or "")[:180], + "score": s, + "matched_via": "legal", + } + for p, s in scored[:limit] + ] + + +def get_blog_posts_with_tag(tag: str) -> list[BlogPost]: + tag_l = tag.lower() + return [p for p in _blog_posts if any(t.lower() == tag_l for t in p.tags)] + + +def stats() -> dict: + return { + "blog": len(_blog_posts), + "brands": len(_brands), + "legal": len(_legal_pages), + "last_refresh_age_s": int(time.time() - _last_refresh) if _last_refresh else None, + } diff --git a/stacks/eh-search/app/db.py b/stacks/eh-search/app/db.py new file mode 100644 index 0000000..42154dc --- /dev/null +++ b/stacks/eh-search/app/db.py @@ -0,0 +1,43 @@ +"""Async Postgres pool.""" +import asyncpg +from app.config import settings + +_pool: asyncpg.Pool | None = None + + +async def init_pool() -> None: + global _pool + _pool = await asyncpg.create_pool( + dsn=settings.dsn, + min_size=2, + max_size=10, + command_timeout=5, + ) + + +async def close_pool() -> None: + global _pool + if _pool is not None: + await _pool.close() + _pool = None + + +def get_pool() -> asyncpg.Pool: + if _pool is None: + raise RuntimeError("DB pool not initialized") + return _pool + + +async def fetch(query: str, *args) -> list[asyncpg.Record]: + async with get_pool().acquire() as conn: + return await conn.fetch(query, *args) + + +async def fetchrow(query: str, *args) -> asyncpg.Record | None: + async with get_pool().acquire() as conn: + return await conn.fetchrow(query, *args) + + +async def fetchval(query: str, *args): + async with get_pool().acquire() as conn: + return await conn.fetchval(query, *args) diff --git a/stacks/eh-search/app/intent_router.py b/stacks/eh-search/app/intent_router.py new file mode 100644 index 0000000..1d157cb --- /dev/null +++ b/stacks/eh-search/app/intent_router.py @@ -0,0 +1,82 @@ +"""Intent routing — pure regex, no AI, sub-millisecond. + +Loco-Soft Komm-Nr Format: + Type-Buchstabe + 4-6 stellige Zahl, optional Space dazwischen. + Type-Letter: N=Neu, T=Tageszul., V=Vorfuehr, D=Differenzbest., G=Gebraucht, L=Leihgabe. + Beispiele: 'D9094', 'd9094', 'D 9094', 'n8093', 'L9083' + +Pure 4-6 stellige Zahlen werden als DVN-Lookup behandelt (DVN ist eindeutig). +""" +import re +from dataclasses import dataclass +from typing import Literal + + +# Komm-Nr: Type + DVN (mit oder ohne Space) +KOMM_NR_RE = re.compile(r"^[NTVDGLntvdgl]\s*\d{4,6}$") +# DVN allein: pure Zahl 4-6 Stellen +DVN_RE = re.compile(r"^\d{4,6}$") +# VIN: 17 chars, no I/O/Q +VIN_RE = re.compile(r"^[A-HJ-NPR-Z0-9]{17}$") + + +@dataclass +class Intent: + type: Literal[ + "komm_nr", + "dvn", + "vin", + "autocomplete_only", + "keyword_search", + "empty", + ] + direct_redirect: bool = False + normalized_query: str = "" + + +def route(raw_query: str) -> Intent: + q = (raw_query or "").strip() + + if not q: + return Intent(type="empty", normalized_query="") + + # Rule 1: Komm-Nr (Type + DVN, e.g. 'D9094', 'D 9094', 'd9094') + if KOMM_NR_RE.match(q): + # Normalize: remove spaces, uppercase the type letter + cleaned = q.replace(" ", "") + normalized = cleaned[0].upper() + cleaned[1:] + return Intent( + type="komm_nr", + direct_redirect=True, + normalized_query=normalized, + ) + + # Rule 2: Pure 4-6 digit number = DVN lookup (eindeutig) + if DVN_RE.match(q): + return Intent( + type="dvn", + direct_redirect=True, + normalized_query=q, + ) + + # Rule 3: 17-char alphanumeric (no IOQ) = VIN + upper = q.upper().replace(" ", "") + if VIN_RE.match(upper): + return Intent( + type="vin", + direct_redirect=True, + normalized_query=upper, + ) + + # Rule 4: Single char only -> autocomplete (FTS unbrauchbar bei 1 Zeichen) + if len(q) < 2: + return Intent( + type="autocomplete_only", + normalized_query=q.lower(), + ) + + # Rule 5: Default — keyword + fuzzy search + return Intent( + type="keyword_search", + normalized_query=q.lower(), + ) diff --git a/stacks/eh-search/app/main.py b/stacks/eh-search/app/main.py new file mode 100644 index 0000000..6e0bf3d --- /dev/null +++ b/stacks/eh-search/app/main.py @@ -0,0 +1,300 @@ +"""eh-search FastAPI application — Phase 1.5 Federated Multi-Source. + +Endpoints: +- GET /health -> liveness + dependency checks +- GET /search?q= -> federated search across vehicles + content + static pages +- GET /suggest?q= -> autocomplete +- POST /cache/invalidate -> clear all caches + refresh content index +""" +import asyncio +import logging +import time +from contextlib import asynccontextmanager + +from fastapi import FastAPI, Query, Request, Response +from fastapi.middleware.cors import CORSMiddleware + +from app import cache, content_index, db +from app.config import settings +from app.intent_router import route as route_intent +from app.schemas import ( + HealthResponse, + InvalidateResponse, + SearchResponse, + SearchResultItem, + SuggestResponse, +) +from app.search import exact, fts, static_pages, suggest +from app import slug_resolver + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +log = logging.getLogger("eh-search") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + log.info("Starting eh-search (env=%s) Phase 1.5", settings.environment) + await db.init_pool() + await cache.init_redis() + await slug_resolver.refresh() + await content_index.refresh() + refresher_slug = asyncio.create_task(slug_resolver.background_refresher()) + refresher_content = asyncio.create_task(content_index.background_refresher()) + log.info("eh-search ready") + try: + yield + finally: + refresher_slug.cancel() + refresher_content.cancel() + await db.close_pool() + await cache.close_redis() + log.info("eh-search stopped") + + +app = FastAPI( + title="eh-search", + description="Federated search service for electric-horses.de", + version="0.2.0", + lifespan=lifespan, +) + +if settings.cors_origin_list: + app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_origin_list, + allow_methods=["GET"], + allow_headers=["*"], + max_age=600, + ) + + +# --------------------------------------------------------------------------- +# Health +# --------------------------------------------------------------------------- +@app.get("/health", response_model=HealthResponse) +async def health() -> HealthResponse: + db_status = "ok" + vehicles_count = 0 + try: + vehicles_count = await db.fetchval("SELECT count(*) FROM search_vehicles") + except Exception as e: + db_status = f"error: {e}" + + redis_status = "ok" + try: + await cache.get_redis().ping() + except Exception as e: + redis_status = f"error: {e}" + + overall = "ok" if db_status == "ok" and redis_status == "ok" else "degraded" + return HealthResponse( + status=overall, + environment=settings.environment, + db=db_status, + redis=redis_status, + vehicles_count=vehicles_count or 0, + content=content_index.stats(), + ) + + +# --------------------------------------------------------------------------- +# Federated Search +# --------------------------------------------------------------------------- +async def _vehicle_search(query: str, intent_type: str, normalized: str, limit: int) -> list[SearchResultItem]: + if intent_type == "komm_nr": + return await exact.by_komm_nr(normalized) + if intent_type == "dvn": + return await exact.by_dvn(normalized) + if intent_type == "vin": + return await exact.by_vin(normalized) + if intent_type == "keyword_search": + return await fts.hybrid_search(normalized, limit=limit) + return [] + + +def _content_to_items(raw: list[dict]) -> list[SearchResultItem]: + """Convert content_index dict results to SearchResultItem.""" + return [SearchResultItem(**r) for r in raw] + + +async def _enrich_vehicle_slugs(items: list[SearchResultItem]) -> None: + """Vehicles need their Directus slug resolved.""" + for item in items: + if item.type == "vehicle" and item.directus_product_id: + item.slug = await slug_resolver.get_slug(item.directus_product_id) + + +def _bridge_pages_for_blog(blog_results: list[SearchResultItem]) -> list[SearchResultItem]: + """If matched blog posts have tags that bridge to a static page, + surface that page as an additional result. + """ + found_pages: dict[str, float] = {} + for blog in blog_results: + if not blog.tags: + continue + slugs = static_pages.get_pages_for_tags(blog.tags) + for slug in slugs: + page = static_pages.page_by_slug(slug) + if page: + # Score: based on top blog match score, capped + found_pages[slug] = max(found_pages.get(slug, 0), min(blog.score, 5.0)) + + items = [] + for slug, score in found_pages.items(): + page = static_pages.page_by_slug(slug) + if page is None: + continue + items.append(SearchResultItem( + type="page", + title=page.title, + slug=page.slug, + snippet=page.snippet, + score=score, + matched_via="tag_bridge", + )) + return items + + +@app.get("/search", response_model=SearchResponse) +async def search( + response: Response, + q: str = Query(..., min_length=1, max_length=200), + limit: int = Query(10, ge=1, le=50), +) -> SearchResponse: + started = time.perf_counter() + intent = route_intent(q) + + cache_key = cache.key_query(f"v2:{intent.normalized_query}:{limit}") + cached = await cache.get_json(cache_key) + if cached is not None: + cached["cache_hit"] = True + response.headers["X-Cache-Hit"] = "true" + return SearchResponse(**cached) + + norm = intent.normalized_query + + # Run all sources in parallel + if intent.type in ("komm_nr", "dvn", "vin"): + # Exact lookups skip content/static (they should hit a vehicle) + vehicle_task = _vehicle_search(q, intent.type, norm, limit) + vehicle_items = await vehicle_task + content_blog: list[SearchResultItem] = [] + content_brands: list[SearchResultItem] = [] + content_legal: list[SearchResultItem] = [] + page_items: list[SearchResultItem] = [] + elif intent.type == "keyword_search": + vehicle_task = _vehicle_search(q, intent.type, norm, limit) + vehicle_items, content_blog_raw, content_brand_raw, content_legal_raw, page_raw = await asyncio.gather( + vehicle_task, + asyncio.to_thread(content_index.search_blog, norm, 5), + asyncio.to_thread(content_index.search_brands, norm, 5), + asyncio.to_thread(content_index.search_legal, norm, 5), + asyncio.to_thread(static_pages.search_static_pages, norm, 5), + ) + content_blog = _content_to_items(content_blog_raw) + content_brands = _content_to_items(content_brand_raw) + content_legal = _content_to_items(content_legal_raw) + page_items = _content_to_items(page_raw) + else: + # autocomplete_only / empty + vehicle_items = [] + content_blog = [] + content_brands = [] + content_legal = [] + page_items = [] + + await _enrich_vehicle_slugs(vehicle_items) + + # Tag-bridge: blog matches surface their bridged static pages too + bridged_pages = _bridge_pages_for_blog(content_blog) + + # Merge static pages from direct match + bridge (dedup by slug, keep higher score) + page_map: dict[str, SearchResultItem] = {} + for p in page_items + bridged_pages + content_legal: + if p.slug and (p.slug not in page_map or page_map[p.slug].score < p.score): + page_map[p.slug] = p + pages_final = sorted(page_map.values(), key=lambda x: -x.score) + + # Combined results, ordered by type priority then by score + type_priority = {"page": 0, "brand": 1, "vehicle": 2, "blog": 3} + all_results: list[SearchResultItem] = ( + pages_final[:5] + content_brands[:5] + vehicle_items[:limit] + content_blog[:5] + ) + + # Direct redirect logic: + # - commission_number / vin with single hit -> vehicle slug + # - exact static page match (score >= 9) when ONLY one hit -> page slug + direct_redirect = False + if intent.direct_redirect and len(vehicle_items) == 1 and vehicle_items[0].slug: + direct_redirect = True + elif intent.type == "keyword_search" and len(pages_final) == 1 and pages_final[0].score >= 9 and not vehicle_items and not content_brands and not content_blog: + direct_redirect = True + + took_ms = int((time.perf_counter() - started) * 1000) + payload = SearchResponse( + query=q, + intent=intent.type, + direct_redirect=direct_redirect, + total=len(all_results), + results=all_results, + took_ms=took_ms, + cache_hit=False, + ) + + ttl = settings.cache_ttl_empty if not all_results else settings.cache_ttl_result + await cache.set_json(cache_key, payload.model_dump(), ttl=ttl) + response.headers["X-Cache-Hit"] = "false" + return payload + + +# --------------------------------------------------------------------------- +# Suggest +# --------------------------------------------------------------------------- +@app.get("/suggest", response_model=SuggestResponse) +async def suggest_endpoint( + response: Response, + q: str = Query("", max_length=50), + limit: int = Query(8, ge=1, le=20), +) -> SuggestResponse: + started = time.perf_counter() + q_norm = (q or "").strip().lower() + + cache_key = cache.key_suggest(f"{q_norm}:{limit}") + cached = await cache.get_json(cache_key) + if cached is not None: + response.headers["X-Cache-Hit"] = "true" + return SuggestResponse(**cached) + + if not q_norm: + items = await suggest.top_brands(limit=limit) + else: + items = await suggest.prefix_suggest(q_norm, limit=limit) + + payload = SuggestResponse( + query=q, + suggestions=items, + took_ms=int((time.perf_counter() - started) * 1000), + ) + await cache.set_json(cache_key, payload.model_dump(), ttl=settings.cache_ttl_suggest) + response.headers["X-Cache-Hit"] = "false" + return payload + + +# --------------------------------------------------------------------------- +# Cache invalidation +# --------------------------------------------------------------------------- +@app.post("/cache/invalidate", response_model=InvalidateResponse) +async def invalidate(request: Request) -> InvalidateResponse: + cleared = await cache.invalidate_all() + # Refresh both indexes in the background + asyncio.create_task(slug_resolver.refresh()) + asyncio.create_task(content_index.refresh()) + log.info( + "Cache invalidated: %d keys (from %s)", + cleared, + request.client.host if request.client else "?", + ) + return InvalidateResponse(cleared=cleared, scope="all") diff --git a/stacks/eh-search/app/schemas.py b/stacks/eh-search/app/schemas.py new file mode 100644 index 0000000..8674316 --- /dev/null +++ b/stacks/eh-search/app/schemas.py @@ -0,0 +1,82 @@ +"""Pydantic response models.""" +from pydantic import BaseModel, Field +from typing import Literal, Any + + +IntentType = Literal[ + "komm_nr", + "dvn", + "vin", + "autocomplete_only", + "keyword_search", + "empty", +] + +ResultType = Literal["vehicle", "page", "blog", "brand"] + + +class SearchResultItem(BaseModel): + """Unified result item — fields are optional depending on type.""" + type: ResultType = "vehicle" + title: str + slug: str | None = None + snippet: str | None = None + score: float = 0.0 + matched_via: str = "" + + # Vehicle-specific + vehicle_id: int | None = None + commission_number: str | None = None + vin: str | None = None + brand: str | None = None + model: str | None = None + price: float | None = None + primary_image_id: str | None = None + directus_product_id: int | None = None + + # Blog-specific + tags: list[str] | None = None + category: str | None = None + published_at: str | None = None + image_id: str | None = None + + # Brand-specific + logo_id: str | None = None + + model_config = {"extra": "allow"} + + +class SearchResponse(BaseModel): + query: str + intent: IntentType + direct_redirect: bool = False + total: int + results: list[SearchResultItem] + took_ms: int + cache_hit: bool = False + + +class SuggestItem(BaseModel): + text: str + type: Literal["brand", "model", "category"] + count: int = 0 + + +class SuggestResponse(BaseModel): + query: str + suggestions: list[SuggestItem] + took_ms: int + + +class HealthResponse(BaseModel): + status: str + environment: str + db: str + redis: str + vehicles_count: int + content: dict[str, Any] = Field(default_factory=dict) + + +class InvalidateResponse(BaseModel): + cleared: int + scope: str = "all" diff --git a/stacks/eh-search/app/search/__init__.py b/stacks/eh-search/app/search/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/stacks/eh-search/app/search/exact.py b/stacks/eh-search/app/search/exact.py new file mode 100644 index 0000000..9841572 --- /dev/null +++ b/stacks/eh-search/app/search/exact.py @@ -0,0 +1,78 @@ +"""Exact lookups: Komm-Nr, DVN, VIN. + +Loco-Soft Komm-Nr-Schema: + Type-Buchstabe (N/T/V/D/G/L) + DVN-Zahl (4-6 Stellen) + Beispiele: 'D9094' (Nissan Leaf), 'N8093' (Askoll XKP45), 'L9083' (Opel) + + N = Neu, T = Tageszul., V = Vorfuehr, + D = Differenzbest., G = Gebraucht, L = Leihgabe + +DVN allein ist eindeutig pro Fahrzeug, deshalb auch als Such-Einstieg möglich. +""" +import re + +from app import db +from app.schemas import SearchResultItem + + +COLUMNS = """ + vehicle_id, dvn, commission_number, vin, brand, model, title, + price::float8 as price, primary_image_id::text as primary_image_id, + directus_product_id, dealer_vehicle_type +""" + +KOMM_NR_RE = re.compile(r"^([NTVDGLntvdgl])\s*(\d{4,6})$") + + +def normalize_komm_nr(raw: str) -> str | None: + """'D 9094' / 'd9094' / ' D9094 ' -> 'D9094'. Returns None if not a valid pattern.""" + if not raw: + return None + m = KOMM_NR_RE.match(raw.strip()) + if not m: + return None + return m.group(1).upper() + m.group(2) + + +async def by_komm_nr(raw: str) -> list[SearchResultItem]: + """Lookup by full Komm-Nr (Type-Letter + DVN). Eindeutig wenn vorhanden.""" + normalized = normalize_komm_nr(raw) + if not normalized: + return [] + sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE commission_number = $1 LIMIT 5" + rows = await db.fetch(sql, normalized) + return [_row_to_item(r, matched_via="exact_komm") for r in rows] + + +async def by_dvn(number: str) -> list[SearchResultItem]: + """Lookup by DVN allein (4-6 stellige Zahl ohne Type-Buchstabe). + DVN ist eindeutig pro Fahrzeug, also liefert das immer 0 oder 1 Treffer.""" + try: + dvn_int = int(number) + except (ValueError, TypeError): + return [] + sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE dvn = $1 LIMIT 5" + rows = await db.fetch(sql, dvn_int) + return [_row_to_item(r, matched_via="exact_dvn") for r in rows] + + +async def by_vin(vin: str) -> list[SearchResultItem]: + sql = f"SELECT {COLUMNS} FROM search_vehicles WHERE vin = $1 LIMIT 5" + rows = await db.fetch(sql, vin) + return [_row_to_item(r, matched_via="exact_vin") for r in rows] + + +def _row_to_item(row, matched_via: str) -> SearchResultItem: + return SearchResultItem( + vehicle_id=row["vehicle_id"], + commission_number=row["commission_number"], + vin=row["vin"], + brand=row["brand"], + model=row["model"], + title=row["title"] or "", + price=row["price"], + primary_image_id=row["primary_image_id"], + directus_product_id=row["directus_product_id"], + score=1.0, + matched_via=matched_via, + ) diff --git a/stacks/eh-search/app/search/fts.py b/stacks/eh-search/app/search/fts.py new file mode 100644 index 0000000..1ce272e --- /dev/null +++ b/stacks/eh-search/app/search/fts.py @@ -0,0 +1,148 @@ +"""Postgres Full-Text Search (Typ 2) + pg_trgm Fuzzy (Typ 3) + Substring fallback.""" +import re + +from app import db +from app.schemas import SearchResultItem + + +COLUMNS = """ + vehicle_id, commission_number, vin, brand, model, title, + price::float8 as price, primary_image_id::text as primary_image_id, + directus_product_id +""" + +# Sanitize tsquery input — keep word chars, spaces, dots, hyphens +_SAFE_TOKEN = re.compile(r"[^\w\s\.\-]", re.UNICODE) + + +def _build_prefix_tsquery(query: str) -> str: + """Convert free-text query into a safe tsquery with prefix match. + + 'ray' -> 'ray:*' + 'zero motor' -> 'zero:* & motor:*' + """ + cleaned = _SAFE_TOKEN.sub(" ", query) + tokens = [t for t in cleaned.split() if t] + if not tokens: + return "" + return " & ".join(f"{t}:*" for t in tokens) + + +async def fts_search(query: str, limit: int = 10) -> list[SearchResultItem]: + """Full-text search with German + unaccent dictionary, prefix-enabled.""" + tsquery_str = _build_prefix_tsquery(query) + if not tsquery_str: + return [] + sql = f""" + SELECT {COLUMNS}, + ts_rank_cd(search_tsv, q)::float8 AS rank + FROM search_vehicles, to_tsquery('german_unaccent', $1) q + WHERE search_tsv @@ q + ORDER BY rank DESC, brand, model + LIMIT $2 + """ + try: + rows = await db.fetch(sql, tsquery_str, limit) + except Exception: + sql_fallback = f""" + SELECT {COLUMNS}, + ts_rank_cd(search_tsv, q)::float8 AS rank + FROM search_vehicles, plainto_tsquery('german_unaccent', $1) q + WHERE search_tsv @@ q + ORDER BY rank DESC, brand, model + LIMIT $2 + """ + rows = await db.fetch(sql_fallback, query, limit) + return [_to_item(r, "fts", float(r["rank"] or 0)) for r in rows] + + +async def fuzzy_search(query: str, limit: int = 10) -> list[SearchResultItem]: + """Trigram similarity for typo tolerance.""" + sql = f""" + WITH scored AS ( + SELECT {COLUMNS}, + GREATEST( + similarity(title, $1), + similarity(COALESCE(commission_number, ''), $1) + )::float8 AS sim + FROM search_vehicles + WHERE title % $1 + OR commission_number % $1 + ) + SELECT * FROM scored + WHERE sim > 0.25 + ORDER BY sim DESC + LIMIT $2 + """ + rows = await db.fetch(sql, query, limit) + return [_to_item(r, "fuzzy", float(r["sim"])) for r in rows] + + +async def substring_search(query: str, limit: int = 10) -> list[SearchResultItem]: + """Last-resort: ILIKE substring on title/brand/model. + + Catches stop-words (e.g. German 'es') that FTS strips, and very short + queries that don't survive stemming. Uses the existing trgm GIN index. + """ + pattern = f"%{query.lower()}%" + sql = f""" + SELECT {COLUMNS}, 0.5::float8 AS rank + FROM search_vehicles + WHERE LOWER(title) LIKE $1 + OR LOWER(brand) LIKE $1 + OR LOWER(COALESCE(model, '')) LIKE $1 + ORDER BY brand, model + LIMIT $2 + """ + rows = await db.fetch(sql, pattern, limit) + return [_to_item(r, "substring", 0.5) for r in rows] + + +async def hybrid_search(query: str, limit: int = 10) -> list[SearchResultItem]: + """FTS first; if too few hits, add fuzzy; if still empty, add substring. + + Strategy: + 1. Always run FTS (cheap, indexed, prefix-enabled) + 2. If FTS has >= 3 results, return them + 3. Otherwise also run fuzzy and merge + 4. If still nothing, run substring fallback (handles stop-words) + """ + fts_results = await fts_search(query, limit=limit) + if len(fts_results) >= 3: + return fts_results + + fuzzy_results = await fuzzy_search(query, limit=limit) + + seen: dict[int, SearchResultItem] = {r.vehicle_id: r for r in fts_results} + for r in fuzzy_results: + if r.vehicle_id not in seen: + seen[r.vehicle_id] = r + + if not seen: + # Last resort: substring (catches stop-words like 'es') + substring_results = await substring_search(query, limit=limit) + for r in substring_results: + seen[r.vehicle_id] = r + + merged = list(seen.values()) + merged.sort(key=lambda x: ( + 0 if x.matched_via == "fts" else (1 if x.matched_via == "fuzzy" else 2), + -x.score, + )) + return merged[:limit] + + +def _to_item(r, matched_via: str, score: float) -> SearchResultItem: + return SearchResultItem( + vehicle_id=r["vehicle_id"], + commission_number=r["commission_number"], + vin=r["vin"], + brand=r["brand"], + model=r["model"], + title=r["title"] or "", + price=r["price"], + primary_image_id=r["primary_image_id"], + directus_product_id=r["directus_product_id"], + score=score, + matched_via=matched_via, + ) diff --git a/stacks/eh-search/app/search/static_pages.py b/stacks/eh-search/app/search/static_pages.py new file mode 100644 index 0000000..e206993 --- /dev/null +++ b/stacks/eh-search/app/search/static_pages.py @@ -0,0 +1,194 @@ +"""Static page registry + tag-to-page bridge. + +Hardcoded list of top-level Astro pages that aren't in Directus. +The tag bridge maps blog tags (real, curated by the team) to these pages, +turning the blog content taxonomy into a search vocabulary for the site. +""" +from dataclasses import dataclass, field +from typing import Iterable + + +@dataclass +class StaticPage: + slug: str # URL path, e.g. "/werkstatt" + title: str # Display title + snippet: str # Description shown in search results + keywords: list[str] = field(default_factory=list) # Direct synonyms + + +# Top-level pages of electric-horses.de that live as Astro routes, +# not in Directus. Order matters for tie-breaking (earlier = preferred). +STATIC_PAGES: list[StaticPage] = [ + StaticPage( + slug="/werkstatt", + title="Werkstatt", + snippet="Spezialwerkstatt fuer E-Roller und E-Motorraeder. Inspektion, Reparatur, Akku-Service.", + keywords=["werkstatt", "reparatur", "service", "inspektion", "wartung"], + ), + StaticPage( + slug="/ersatzteile", + title="Ersatzteile", + snippet="Original-Ersatzteile und Zubehoer fuer E-Roller, E-Motorraeder und E-Scooter.", + keywords=["ersatzteile", "teile", "zubehoer", "originalteile"], + ), + StaticPage( + slug="/fahrzeuge", + title="Fahrzeuge", + snippet="Unser kompletter Bestand an E-Rollern, E-Motorraedern und E-Autos.", + keywords=["fahrzeuge", "bestand", "angebot", "uebersicht", "alle"], + ), + StaticPage( + slug="/marken", + title="Marken", + snippet="Alle Marken im Ueberblick: Askoll, RAY, ZERO, Energica, Nissan und mehr.", + keywords=["marken", "hersteller", "brands"], + ), + StaticPage( + slug="/vermietung", + title="Vermietung", + snippet="E-Motorrad-Vermietung in Wendelstein bei Nuernberg. Tagestouren, Wochenmiete.", + keywords=["vermietung", "mieten", "leihen", "verleih", "rental"], + ), + StaticPage( + slug="/kontakt", + title="Kontakt", + snippet="Kontakt zu Electric Horses: Telefon, E-Mail, Anfahrt, Oeffnungszeiten.", + keywords=["kontakt", "anfahrt", "adresse", "telefon", "email", "oeffnungszeiten"], + ), + StaticPage( + slug="/ueber-uns", + title="Ueber uns", + snippet="Electric Horses ist die E-Mobilitaets-Sparte des Autohaus Richter & Zech.", + keywords=["ueber uns", "team", "geschichte", "richter zech", "wendelstein"], + ), + StaticPage( + slug="/blog", + title="Blog", + snippet="News, Ratgeber und Erfahrungsberichte rund um Elektromobilitaet.", + keywords=["blog", "news", "artikel", "ratgeber"], + ), +] + + +# Tag → Page bridge. +# These tags exist on real blog posts; matching one of them surfaces the +# linked page (in addition to the blog posts that carry the tag). +# Source: actual tag pool extracted from blog_posts.tags. +TAG_TO_PAGE_BRIDGE: dict[str, str] = { + # ─── Werkstatt-Themen (Technik, Service, Reifen, Akku) ─────── + "akku": "/werkstatt", + "akkuladung": "/werkstatt", + "ladedauer": "/werkstatt", + "reichweite": "/werkstatt", + "verbrauch": "/werkstatt", + "bremsen": "/werkstatt", + "reifen": "/werkstatt", + "metzeler": "/werkstatt", + "pirelli": "/werkstatt", + "heidenau": "/werkstatt", + "wartung": "/werkstatt", + "service": "/werkstatt", + "installation": "/werkstatt", + "montage": "/werkstatt", + "werkzeug": "/werkstatt", + "trittbrettverbreiterung": "/werkstatt", + "radnabenmotor": "/werkstatt", + "propilot": "/werkstatt", + "effizienz": "/werkstatt", + "ledersitze": "/werkstatt", + + # ─── Beratung & Foerderung (Kontakt) ───────────────────────── + "versicherung": "/kontakt", + "foerderung": "/kontakt", + "foerderungen": "/kontakt", + "praemie": "/kontakt", + "preisvorteil": "/kontakt", + "innovationspraemie": "/kontakt", + "umweltpraemie": "/kontakt", + "kosten": "/kontakt", + "vergleich": "/kontakt", + "vorfuehrer": "/kontakt", + "preis": "/kontakt", + "goelectric": "/kontakt", + + # ─── Fuehrerschein & Legales (Werkstatt-Section "Recht") ───── + "abe": "/werkstatt", + "ekfv": "/werkstatt", + "klassea1": "/werkstatt", + "klasseb": "/werkstatt", + "schluesselzahl196": "/werkstatt", + "fuehrerschein": "/werkstatt", + "strassenzulassung": "/werkstatt", + "verkehrsregeln": "/werkstatt", + "verordnung": "/werkstatt", + "legal": "/werkstatt", + "ekickroller": "/werkstatt", + "ekfv": "/werkstatt", + + # ─── Vermietung-Themen ─────────────────────────────────────── + "touristik": "/vermietung", + "freizeit": "/vermietung", + + # ─── Marken-Indikatoren (linken auf /marken) ───────────────── + "marknunabhaengig": "/marken", +} + + +def search_static_pages(query: str, limit: int = 5) -> list[dict]: + """Search static pages by title, slug, keywords, snippet substring. + + Returns dicts in the same shape as content search results. + """ + q = (query or "").strip().lower() + if not q: + return [] + results = [] + for page in STATIC_PAGES: + score = 0.0 + # Exact title match -> very high + if page.title.lower() == q: + score = 10.0 + # Title prefix + elif page.title.lower().startswith(q): + score = 7.0 + # Slug match (without leading slash) + elif page.slug.lstrip("/").lower() == q: + score = 9.0 + # Title substring (only for queries >= 3 chars to avoid noise like 'ze' in 'fahrzeuge') + elif len(q) >= 3 and q in page.title.lower(): + score = 5.0 + # Keyword exact / prefix + elif any(kw == q for kw in page.keywords): + score = 6.0 + elif any(kw.startswith(q) for kw in page.keywords): + score = 4.0 + # Snippet substring (weakest, also requires >= 3 chars) + elif len(q) >= 3 and q in page.snippet.lower(): + score = 2.0 + + if score > 0: + results.append({ + "type": "page", + "title": page.title, + "slug": page.slug, + "snippet": page.snippet, + "score": score, + "matched_via": "static_page", + }) + + results.sort(key=lambda r: -r["score"]) + return results[:limit] + + +def get_pages_for_tags(tags: Iterable[str]) -> set[str]: + """Given an iterable of blog tags, return the set of page slugs they map to.""" + if not tags: + return set() + return {TAG_TO_PAGE_BRIDGE[t.lower()] for t in tags if t and t.lower() in TAG_TO_PAGE_BRIDGE} + + +def page_by_slug(slug: str) -> StaticPage | None: + for p in STATIC_PAGES: + if p.slug == slug: + return p + return None diff --git a/stacks/eh-search/app/search/suggest.py b/stacks/eh-search/app/search/suggest.py new file mode 100644 index 0000000..815e5da --- /dev/null +++ b/stacks/eh-search/app/search/suggest.py @@ -0,0 +1,50 @@ +"""Autocomplete / Suggest endpoint (Typ 11).""" +from app import db +from app.schemas import SuggestItem + + +async def top_brands(limit: int = 10) -> list[SuggestItem]: + rows = await db.fetch( + "SELECT brand, count(*) AS cnt FROM search_vehicles " + "WHERE brand IS NOT NULL GROUP BY brand ORDER BY cnt DESC LIMIT $1", + limit, + ) + return [ + SuggestItem(text=r["brand"], type="brand", count=r["cnt"]) + for r in rows + ] + + +async def prefix_suggest(prefix: str, limit: int = 8) -> list[SuggestItem]: + """Brand + model prefix matching, case-insensitive.""" + pattern = f"{prefix.lower()}%" + + # Brands first + brand_rows = await db.fetch( + "SELECT brand, count(*) AS cnt FROM search_vehicles " + "WHERE LOWER(brand) LIKE $1 GROUP BY brand ORDER BY cnt DESC LIMIT $2", + pattern, + limit, + ) + items = [ + SuggestItem(text=r["brand"], type="brand", count=r["cnt"]) + for r in brand_rows + ] + + # Then models if room + remaining = limit - len(items) + if remaining > 0: + model_rows = await db.fetch( + "SELECT DISTINCT brand || ' ' || model AS text, count(*) AS cnt " + "FROM search_vehicles " + "WHERE model IS NOT NULL AND LOWER(model) LIKE $1 " + "GROUP BY brand, model ORDER BY cnt DESC LIMIT $2", + pattern, + remaining, + ) + items.extend( + SuggestItem(text=r["text"], type="model", count=r["cnt"]) + for r in model_rows + ) + + return items diff --git a/stacks/eh-search/app/slug_resolver.py b/stacks/eh-search/app/slug_resolver.py new file mode 100644 index 0000000..0b15a49 --- /dev/null +++ b/stacks/eh-search/app/slug_resolver.py @@ -0,0 +1,66 @@ +"""Resolves directus_product_id -> slug, with periodic refresh. + +Uses Directus REST API. Slug map is small (~75 entries) so we keep it +fully in-memory and refresh on a schedule (or on cache invalidation). +""" +import asyncio +import logging +import time + +import httpx + +from app.config import settings + +log = logging.getLogger(__name__) + +_slug_map: dict[int, str] = {} +_last_refresh: float = 0.0 +_lock = asyncio.Lock() + + +async def refresh() -> int: + """Pull product_id -> slug from Directus. Returns count.""" + global _slug_map, _last_refresh + url = f"{settings.directus_url}/items/products" + params = { + "fields": "id,slug", + "limit": -1, + "filter[status][_eq]": "published", + } + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(url, params=params) + resp.raise_for_status() + data = resp.json().get("data", []) + new_map = { + int(item["id"]): item["slug"] + for item in data + if item.get("id") is not None and item.get("slug") + } + async with _lock: + _slug_map = new_map + _last_refresh = time.time() + log.info("Slug map refreshed: %d entries", len(new_map)) + return len(new_map) + except Exception as e: + log.warning("Slug refresh failed: %s", e) + return 0 + + +async def get_slug(directus_product_id: int | None) -> str | None: + if directus_product_id is None: + return None + # Lazy refresh if stale + if time.time() - _last_refresh > settings.directus_slug_refresh_seconds: + await refresh() + return _slug_map.get(int(directus_product_id)) + + +async def background_refresher(): + """Periodic background task — refresh slug map every N seconds.""" + while True: + await asyncio.sleep(settings.directus_slug_refresh_seconds) + try: + await refresh() + except Exception as e: + log.warning("Background slug refresh error: %s", e) diff --git a/stacks/eh-search/docker-compose.yml b/stacks/eh-search/docker-compose.yml new file mode 100644 index 0000000..bdb46b1 --- /dev/null +++ b/stacks/eh-search/docker-compose.yml @@ -0,0 +1,57 @@ +# eh-search Stack on ai-apps +# Service is INTERNAL ONLY - bound to private network 10.0.0.8 +# Reached by Pegasus via nginx proxy_pass on 10.0.0.10 + +services: + eh-search: + build: . + image: eh-search:dev + container_name: eh-search + restart: unless-stopped + env_file: .env + ports: + # Bind ONLY to private network IP, not 0.0.0.0 + - "10.0.0.8:8200:8200" + networks: + - eh-search-internal + depends_on: + eh-search-redis: + condition: service_healthy + healthcheck: + test: ["CMD", "curl", "-fsS", "http://127.0.0.1:8200/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s + deploy: + resources: + limits: + memory: 512M + cpus: "0.5" + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" + + eh-search-redis: + image: redis:7-alpine + container_name: eh-search-redis + restart: unless-stopped + command: redis-server --maxmemory 64mb --maxmemory-policy allkeys-lru --save "" + networks: + - eh-search-internal + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 3s + retries: 5 + deploy: + resources: + limits: + memory: 96M + cpus: "0.2" + +networks: + eh-search-internal: + driver: bridge diff --git a/stacks/eh-search/requirements.txt b/stacks/eh-search/requirements.txt new file mode 100644 index 0000000..23d89a8 --- /dev/null +++ b/stacks/eh-search/requirements.txt @@ -0,0 +1,7 @@ +fastapi==0.115.0 +uvicorn[standard]==0.30.6 +asyncpg==0.29.0 +redis==5.0.8 +pydantic==2.9.2 +pydantic-settings==2.5.2 +httpx==0.27.2