electric-horses-infra/stacks/eh-search/app/main.py

301 lines
10 KiB
Python
Raw Normal View History

"""eh-search FastAPI application — Phase 1.5 Federated Multi-Source.
Endpoints:
- GET /health -> liveness + dependency checks
- GET /search?q= -> federated search across vehicles + content + static pages
- GET /suggest?q= -> autocomplete
- POST /cache/invalidate -> clear all caches + refresh content index
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, Query, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from app import cache, content_index, db
from app.config import settings
from app.intent_router import route as route_intent
from app.schemas import (
HealthResponse,
InvalidateResponse,
SearchResponse,
SearchResultItem,
SuggestResponse,
)
from app.search import exact, fts, static_pages, suggest
from app import slug_resolver
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
log = logging.getLogger("eh-search")
@asynccontextmanager
async def lifespan(app: FastAPI):
log.info("Starting eh-search (env=%s) Phase 1.5", settings.environment)
await db.init_pool()
await cache.init_redis()
await slug_resolver.refresh()
await content_index.refresh()
refresher_slug = asyncio.create_task(slug_resolver.background_refresher())
refresher_content = asyncio.create_task(content_index.background_refresher())
log.info("eh-search ready")
try:
yield
finally:
refresher_slug.cancel()
refresher_content.cancel()
await db.close_pool()
await cache.close_redis()
log.info("eh-search stopped")
app = FastAPI(
title="eh-search",
description="Federated search service for electric-horses.de",
version="0.2.0",
lifespan=lifespan,
)
if settings.cors_origin_list:
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origin_list,
allow_methods=["GET"],
allow_headers=["*"],
max_age=600,
)
# ---------------------------------------------------------------------------
# Health
# ---------------------------------------------------------------------------
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
db_status = "ok"
vehicles_count = 0
try:
vehicles_count = await db.fetchval("SELECT count(*) FROM search_vehicles")
except Exception as e:
db_status = f"error: {e}"
redis_status = "ok"
try:
await cache.get_redis().ping()
except Exception as e:
redis_status = f"error: {e}"
overall = "ok" if db_status == "ok" and redis_status == "ok" else "degraded"
return HealthResponse(
status=overall,
environment=settings.environment,
db=db_status,
redis=redis_status,
vehicles_count=vehicles_count or 0,
content=content_index.stats(),
)
# ---------------------------------------------------------------------------
# Federated Search
# ---------------------------------------------------------------------------
async def _vehicle_search(query: str, intent_type: str, normalized: str, limit: int) -> list[SearchResultItem]:
if intent_type == "komm_nr":
return await exact.by_komm_nr(normalized)
if intent_type == "dvn":
return await exact.by_dvn(normalized)
if intent_type == "vin":
return await exact.by_vin(normalized)
if intent_type == "keyword_search":
return await fts.hybrid_search(normalized, limit=limit)
return []
def _content_to_items(raw: list[dict]) -> list[SearchResultItem]:
"""Convert content_index dict results to SearchResultItem."""
return [SearchResultItem(**r) for r in raw]
async def _enrich_vehicle_slugs(items: list[SearchResultItem]) -> None:
"""Vehicles need their Directus slug resolved."""
for item in items:
if item.type == "vehicle" and item.directus_product_id:
item.slug = await slug_resolver.get_slug(item.directus_product_id)
def _bridge_pages_for_blog(blog_results: list[SearchResultItem]) -> list[SearchResultItem]:
"""If matched blog posts have tags that bridge to a static page,
surface that page as an additional result.
"""
found_pages: dict[str, float] = {}
for blog in blog_results:
if not blog.tags:
continue
slugs = static_pages.get_pages_for_tags(blog.tags)
for slug in slugs:
page = static_pages.page_by_slug(slug)
if page:
# Score: based on top blog match score, capped
found_pages[slug] = max(found_pages.get(slug, 0), min(blog.score, 5.0))
items = []
for slug, score in found_pages.items():
page = static_pages.page_by_slug(slug)
if page is None:
continue
items.append(SearchResultItem(
type="page",
title=page.title,
slug=page.slug,
snippet=page.snippet,
score=score,
matched_via="tag_bridge",
))
return items
@app.get("/search", response_model=SearchResponse)
async def search(
response: Response,
q: str = Query(..., min_length=1, max_length=200),
limit: int = Query(10, ge=1, le=50),
) -> SearchResponse:
started = time.perf_counter()
intent = route_intent(q)
cache_key = cache.key_query(f"v2:{intent.normalized_query}:{limit}")
cached = await cache.get_json(cache_key)
if cached is not None:
cached["cache_hit"] = True
response.headers["X-Cache-Hit"] = "true"
return SearchResponse(**cached)
norm = intent.normalized_query
# Run all sources in parallel
if intent.type in ("komm_nr", "dvn", "vin"):
# Exact lookups skip content/static (they should hit a vehicle)
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
vehicle_items = await vehicle_task
content_blog: list[SearchResultItem] = []
content_brands: list[SearchResultItem] = []
content_legal: list[SearchResultItem] = []
page_items: list[SearchResultItem] = []
elif intent.type == "keyword_search":
vehicle_task = _vehicle_search(q, intent.type, norm, limit)
vehicle_items, content_blog_raw, content_brand_raw, content_legal_raw, page_raw = await asyncio.gather(
vehicle_task,
asyncio.to_thread(content_index.search_blog, norm, 5),
asyncio.to_thread(content_index.search_brands, norm, 5),
asyncio.to_thread(content_index.search_legal, norm, 5),
asyncio.to_thread(static_pages.search_static_pages, norm, 5),
)
content_blog = _content_to_items(content_blog_raw)
content_brands = _content_to_items(content_brand_raw)
content_legal = _content_to_items(content_legal_raw)
page_items = _content_to_items(page_raw)
else:
# autocomplete_only / empty
vehicle_items = []
content_blog = []
content_brands = []
content_legal = []
page_items = []
await _enrich_vehicle_slugs(vehicle_items)
# Tag-bridge: blog matches surface their bridged static pages too
bridged_pages = _bridge_pages_for_blog(content_blog)
# Merge static pages from direct match + bridge (dedup by slug, keep higher score)
page_map: dict[str, SearchResultItem] = {}
for p in page_items + bridged_pages + content_legal:
if p.slug and (p.slug not in page_map or page_map[p.slug].score < p.score):
page_map[p.slug] = p
pages_final = sorted(page_map.values(), key=lambda x: -x.score)
# Combined results, ordered by type priority then by score
type_priority = {"page": 0, "brand": 1, "vehicle": 2, "blog": 3}
all_results: list[SearchResultItem] = (
pages_final[:5] + content_brands[:5] + vehicle_items[:limit] + content_blog[:5]
)
# Direct redirect logic:
# - commission_number / vin with single hit -> vehicle slug
# - exact static page match (score >= 9) when ONLY one hit -> page slug
direct_redirect = False
if intent.direct_redirect and len(vehicle_items) == 1 and vehicle_items[0].slug:
direct_redirect = True
elif intent.type == "keyword_search" and len(pages_final) == 1 and pages_final[0].score >= 9 and not vehicle_items and not content_brands and not content_blog:
direct_redirect = True
took_ms = int((time.perf_counter() - started) * 1000)
payload = SearchResponse(
query=q,
intent=intent.type,
direct_redirect=direct_redirect,
total=len(all_results),
results=all_results,
took_ms=took_ms,
cache_hit=False,
)
ttl = settings.cache_ttl_empty if not all_results else settings.cache_ttl_result
await cache.set_json(cache_key, payload.model_dump(), ttl=ttl)
response.headers["X-Cache-Hit"] = "false"
return payload
# ---------------------------------------------------------------------------
# Suggest
# ---------------------------------------------------------------------------
@app.get("/suggest", response_model=SuggestResponse)
async def suggest_endpoint(
response: Response,
q: str = Query("", max_length=50),
limit: int = Query(8, ge=1, le=20),
) -> SuggestResponse:
started = time.perf_counter()
q_norm = (q or "").strip().lower()
cache_key = cache.key_suggest(f"{q_norm}:{limit}")
cached = await cache.get_json(cache_key)
if cached is not None:
response.headers["X-Cache-Hit"] = "true"
return SuggestResponse(**cached)
if not q_norm:
items = await suggest.top_brands(limit=limit)
else:
items = await suggest.prefix_suggest(q_norm, limit=limit)
payload = SuggestResponse(
query=q,
suggestions=items,
took_ms=int((time.perf_counter() - started) * 1000),
)
await cache.set_json(cache_key, payload.model_dump(), ttl=settings.cache_ttl_suggest)
response.headers["X-Cache-Hit"] = "false"
return payload
# ---------------------------------------------------------------------------
# Cache invalidation
# ---------------------------------------------------------------------------
@app.post("/cache/invalidate", response_model=InvalidateResponse)
async def invalidate(request: Request) -> InvalidateResponse:
cleared = await cache.invalidate_all()
# Refresh both indexes in the background
asyncio.create_task(slug_resolver.refresh())
asyncio.create_task(content_index.refresh())
log.info(
"Cache invalidated: %d keys (from %s)",
cleared,
request.client.host if request.client else "?",
)
return InvalidateResponse(cleared=cleared, scope="all")