301 lines
10 KiB
Python
301 lines
10 KiB
Python
|
|
"""eh-search FastAPI application — Phase 1.5 Federated Multi-Source.
|
||
|
|
|
||
|
|
Endpoints:
|
||
|
|
- GET /health -> liveness + dependency checks
|
||
|
|
- GET /search?q= -> federated search across vehicles + content + static pages
|
||
|
|
- GET /suggest?q= -> autocomplete
|
||
|
|
- POST /cache/invalidate -> clear all caches + refresh content index
|
||
|
|
"""
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
import time
|
||
|
|
from contextlib import asynccontextmanager
|
||
|
|
|
||
|
|
from fastapi import FastAPI, Query, Request, Response
|
||
|
|
from fastapi.middleware.cors import CORSMiddleware
|
||
|
|
|
||
|
|
from app import cache, content_index, db
|
||
|
|
from app.config import settings
|
||
|
|
from app.intent_router import route as route_intent
|
||
|
|
from app.schemas import (
|
||
|
|
HealthResponse,
|
||
|
|
InvalidateResponse,
|
||
|
|
SearchResponse,
|
||
|
|
SearchResultItem,
|
||
|
|
SuggestResponse,
|
||
|
|
)
|
||
|
|
from app.search import exact, fts, static_pages, suggest
|
||
|
|
from app import slug_resolver
|
||
|
|
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
|
||
|
|
)
|
||
|
|
log = logging.getLogger("eh-search")
|
||
|
|
|
||
|
|
|
||
|
|
@asynccontextmanager
|
||
|
|
async def lifespan(app: FastAPI):
|
||
|
|
log.info("Starting eh-search (env=%s) Phase 1.5", settings.environment)
|
||
|
|
await db.init_pool()
|
||
|
|
await cache.init_redis()
|
||
|
|
await slug_resolver.refresh()
|
||
|
|
await content_index.refresh()
|
||
|
|
refresher_slug = asyncio.create_task(slug_resolver.background_refresher())
|
||
|
|
refresher_content = asyncio.create_task(content_index.background_refresher())
|
||
|
|
log.info("eh-search ready")
|
||
|
|
try:
|
||
|
|
yield
|
||
|
|
finally:
|
||
|
|
refresher_slug.cancel()
|
||
|
|
refresher_content.cancel()
|
||
|
|
await db.close_pool()
|
||
|
|
await cache.close_redis()
|
||
|
|
log.info("eh-search stopped")
|
||
|
|
|
||
|
|
|
||
|
|
app = FastAPI(
|
||
|
|
title="eh-search",
|
||
|
|
description="Federated search service for electric-horses.de",
|
||
|
|
version="0.2.0",
|
||
|
|
lifespan=lifespan,
|
||
|
|
)
|
||
|
|
|
||
|
|
if settings.cors_origin_list:
|
||
|
|
app.add_middleware(
|
||
|
|
CORSMiddleware,
|
||
|
|
allow_origins=settings.cors_origin_list,
|
||
|
|
allow_methods=["GET"],
|
||
|
|
allow_headers=["*"],
|
||
|
|
max_age=600,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Health
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
@app.get("/health", response_model=HealthResponse)
|
||
|
|
async def health() -> HealthResponse:
|
||
|
|
db_status = "ok"
|
||
|
|
vehicles_count = 0
|
||
|
|
try:
|
||
|
|
vehicles_count = await db.fetchval("SELECT count(*) FROM search_vehicles")
|
||
|
|
except Exception as e:
|
||
|
|
db_status = f"error: {e}"
|
||
|
|
|
||
|
|
redis_status = "ok"
|
||
|
|
try:
|
||
|
|
await cache.get_redis().ping()
|
||
|
|
except Exception as e:
|
||
|
|
redis_status = f"error: {e}"
|
||
|
|
|
||
|
|
overall = "ok" if db_status == "ok" and redis_status == "ok" else "degraded"
|
||
|
|
return HealthResponse(
|
||
|
|
status=overall,
|
||
|
|
environment=settings.environment,
|
||
|
|
db=db_status,
|
||
|
|
redis=redis_status,
|
||
|
|
vehicles_count=vehicles_count or 0,
|
||
|
|
content=content_index.stats(),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Federated Search
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
async def _vehicle_search(query: str, intent_type: str, normalized: str, limit: int) -> list[SearchResultItem]:
|
||
|
|
if intent_type == "komm_nr":
|
||
|
|
return await exact.by_komm_nr(normalized)
|
||
|
|
if intent_type == "dvn":
|
||
|
|
return await exact.by_dvn(normalized)
|
||
|
|
if intent_type == "vin":
|
||
|
|
return await exact.by_vin(normalized)
|
||
|
|
if intent_type == "keyword_search":
|
||
|
|
return await fts.hybrid_search(normalized, limit=limit)
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
def _content_to_items(raw: list[dict]) -> list[SearchResultItem]:
|
||
|
|
"""Convert content_index dict results to SearchResultItem."""
|
||
|
|
return [SearchResultItem(**r) for r in raw]
|
||
|
|
|
||
|
|
|
||
|
|
async def _enrich_vehicle_slugs(items: list[SearchResultItem]) -> None:
|
||
|
|
"""Vehicles need their Directus slug resolved."""
|
||
|
|
for item in items:
|
||
|
|
if item.type == "vehicle" and item.directus_product_id:
|
||
|
|
item.slug = await slug_resolver.get_slug(item.directus_product_id)
|
||
|
|
|
||
|
|
|
||
|
|
def _bridge_pages_for_blog(blog_results: list[SearchResultItem]) -> list[SearchResultItem]:
|
||
|
|
"""If matched blog posts have tags that bridge to a static page,
|
||
|
|
surface that page as an additional result.
|
||
|
|
"""
|
||
|
|
found_pages: dict[str, float] = {}
|
||
|
|
for blog in blog_results:
|
||
|
|
if not blog.tags:
|
||
|
|
continue
|
||
|
|
slugs = static_pages.get_pages_for_tags(blog.tags)
|
||
|
|
for slug in slugs:
|
||
|
|
page = static_pages.page_by_slug(slug)
|
||
|
|
if page:
|
||
|
|
# Score: based on top blog match score, capped
|
||
|
|
found_pages[slug] = max(found_pages.get(slug, 0), min(blog.score, 5.0))
|
||
|
|
|
||
|
|
items = []
|
||
|
|
for slug, score in found_pages.items():
|
||
|
|
page = static_pages.page_by_slug(slug)
|
||
|
|
if page is None:
|
||
|
|
continue
|
||
|
|
items.append(SearchResultItem(
|
||
|
|
type="page",
|
||
|
|
title=page.title,
|
||
|
|
slug=page.slug,
|
||
|
|
snippet=page.snippet,
|
||
|
|
score=score,
|
||
|
|
matched_via="tag_bridge",
|
||
|
|
))
|
||
|
|
return items
|
||
|
|
|
||
|
|
|
||
|
|
@app.get("/search", response_model=SearchResponse)
|
||
|
|
async def search(
|
||
|
|
response: Response,
|
||
|
|
q: str = Query(..., min_length=1, max_length=200),
|
||
|
|
limit: int = Query(10, ge=1, le=50),
|
||
|
|
) -> SearchResponse:
|
||
|
|
started = time.perf_counter()
|
||
|
|
intent = route_intent(q)
|
||
|
|
|
||
|
|
cache_key = cache.key_query(f"v2:{intent.normalized_query}:{limit}")
|
||
|
|
cached = await cache.get_json(cache_key)
|
||
|
|
if cached is not None:
|
||
|
|
cached["cache_hit"] = True
|
||
|
|
response.headers["X-Cache-Hit"] = "true"
|
||
|
|
return SearchResponse(**cached)
|
||
|
|
|
||
|
|
norm = intent.normalized_query
|
||
|
|
|
||
|
|
# Run all sources in parallel
|
||
|
|
if intent.type in ("komm_nr", "dvn", "vin"):
|
||
|
|
# Exact lookups skip content/static (they should hit a vehicle)
|
||
|
|
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
|
||
|
|
vehicle_items = await vehicle_task
|
||
|
|
content_blog: list[SearchResultItem] = []
|
||
|
|
content_brands: list[SearchResultItem] = []
|
||
|
|
content_legal: list[SearchResultItem] = []
|
||
|
|
page_items: list[SearchResultItem] = []
|
||
|
|
elif intent.type == "keyword_search":
|
||
|
|
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
|
||
|
|
vehicle_items, content_blog_raw, content_brand_raw, content_legal_raw, page_raw = await asyncio.gather(
|
||
|
|
vehicle_task,
|
||
|
|
asyncio.to_thread(content_index.search_blog, norm, 5),
|
||
|
|
asyncio.to_thread(content_index.search_brands, norm, 5),
|
||
|
|
asyncio.to_thread(content_index.search_legal, norm, 5),
|
||
|
|
asyncio.to_thread(static_pages.search_static_pages, norm, 5),
|
||
|
|
)
|
||
|
|
content_blog = _content_to_items(content_blog_raw)
|
||
|
|
content_brands = _content_to_items(content_brand_raw)
|
||
|
|
content_legal = _content_to_items(content_legal_raw)
|
||
|
|
page_items = _content_to_items(page_raw)
|
||
|
|
else:
|
||
|
|
# autocomplete_only / empty
|
||
|
|
vehicle_items = []
|
||
|
|
content_blog = []
|
||
|
|
content_brands = []
|
||
|
|
content_legal = []
|
||
|
|
page_items = []
|
||
|
|
|
||
|
|
await _enrich_vehicle_slugs(vehicle_items)
|
||
|
|
|
||
|
|
# Tag-bridge: blog matches surface their bridged static pages too
|
||
|
|
bridged_pages = _bridge_pages_for_blog(content_blog)
|
||
|
|
|
||
|
|
# Merge static pages from direct match + bridge (dedup by slug, keep higher score)
|
||
|
|
page_map: dict[str, SearchResultItem] = {}
|
||
|
|
for p in page_items + bridged_pages + content_legal:
|
||
|
|
if p.slug and (p.slug not in page_map or page_map[p.slug].score < p.score):
|
||
|
|
page_map[p.slug] = p
|
||
|
|
pages_final = sorted(page_map.values(), key=lambda x: -x.score)
|
||
|
|
|
||
|
|
# Combined results, ordered by type priority then by score
|
||
|
|
type_priority = {"page": 0, "brand": 1, "vehicle": 2, "blog": 3}
|
||
|
|
all_results: list[SearchResultItem] = (
|
||
|
|
pages_final[:5] + content_brands[:5] + vehicle_items[:limit] + content_blog[:5]
|
||
|
|
)
|
||
|
|
|
||
|
|
# Direct redirect logic:
|
||
|
|
# - commission_number / vin with single hit -> vehicle slug
|
||
|
|
# - exact static page match (score >= 9) when ONLY one hit -> page slug
|
||
|
|
direct_redirect = False
|
||
|
|
if intent.direct_redirect and len(vehicle_items) == 1 and vehicle_items[0].slug:
|
||
|
|
direct_redirect = True
|
||
|
|
elif intent.type == "keyword_search" and len(pages_final) == 1 and pages_final[0].score >= 9 and not vehicle_items and not content_brands and not content_blog:
|
||
|
|
direct_redirect = True
|
||
|
|
|
||
|
|
took_ms = int((time.perf_counter() - started) * 1000)
|
||
|
|
payload = SearchResponse(
|
||
|
|
query=q,
|
||
|
|
intent=intent.type,
|
||
|
|
direct_redirect=direct_redirect,
|
||
|
|
total=len(all_results),
|
||
|
|
results=all_results,
|
||
|
|
took_ms=took_ms,
|
||
|
|
cache_hit=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
ttl = settings.cache_ttl_empty if not all_results else settings.cache_ttl_result
|
||
|
|
await cache.set_json(cache_key, payload.model_dump(), ttl=ttl)
|
||
|
|
response.headers["X-Cache-Hit"] = "false"
|
||
|
|
return payload
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Suggest
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
@app.get("/suggest", response_model=SuggestResponse)
|
||
|
|
async def suggest_endpoint(
|
||
|
|
response: Response,
|
||
|
|
q: str = Query("", max_length=50),
|
||
|
|
limit: int = Query(8, ge=1, le=20),
|
||
|
|
) -> SuggestResponse:
|
||
|
|
started = time.perf_counter()
|
||
|
|
q_norm = (q or "").strip().lower()
|
||
|
|
|
||
|
|
cache_key = cache.key_suggest(f"{q_norm}:{limit}")
|
||
|
|
cached = await cache.get_json(cache_key)
|
||
|
|
if cached is not None:
|
||
|
|
response.headers["X-Cache-Hit"] = "true"
|
||
|
|
return SuggestResponse(**cached)
|
||
|
|
|
||
|
|
if not q_norm:
|
||
|
|
items = await suggest.top_brands(limit=limit)
|
||
|
|
else:
|
||
|
|
items = await suggest.prefix_suggest(q_norm, limit=limit)
|
||
|
|
|
||
|
|
payload = SuggestResponse(
|
||
|
|
query=q,
|
||
|
|
suggestions=items,
|
||
|
|
took_ms=int((time.perf_counter() - started) * 1000),
|
||
|
|
)
|
||
|
|
await cache.set_json(cache_key, payload.model_dump(), ttl=settings.cache_ttl_suggest)
|
||
|
|
response.headers["X-Cache-Hit"] = "false"
|
||
|
|
return payload
|
||
|
|
|
||
|
|
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
# Cache invalidation
|
||
|
|
# ---------------------------------------------------------------------------
|
||
|
|
@app.post("/cache/invalidate", response_model=InvalidateResponse)
|
||
|
|
async def invalidate(request: Request) -> InvalidateResponse:
|
||
|
|
cleared = await cache.invalidate_all()
|
||
|
|
# Refresh both indexes in the background
|
||
|
|
asyncio.create_task(slug_resolver.refresh())
|
||
|
|
asyncio.create_task(content_index.refresh())
|
||
|
|
log.info(
|
||
|
|
"Cache invalidated: %d keys (from %s)",
|
||
|
|
cleared,
|
||
|
|
request.client.host if request.client else "?",
|
||
|
|
)
|
||
|
|
return InvalidateResponse(cleared=cleared, scope="all")
|