"""eh-search FastAPI application — Phase 1.5 Federated Multi-Source. Endpoints: - GET /health -> liveness + dependency checks - GET /search?q= -> federated search across vehicles + content + static pages - GET /suggest?q= -> autocomplete - POST /cache/invalidate -> clear all caches + refresh content index """ import asyncio import logging import time from contextlib import asynccontextmanager from fastapi import FastAPI, Query, Request, Response from fastapi.middleware.cors import CORSMiddleware from app import cache, content_index, db from app.config import settings from app.intent_router import route as route_intent from app.schemas import ( HealthResponse, InvalidateResponse, SearchResponse, SearchResultItem, SuggestResponse, ) from app.search import exact, fts, static_pages, suggest from app import slug_resolver logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) log = logging.getLogger("eh-search") @asynccontextmanager async def lifespan(app: FastAPI): log.info("Starting eh-search (env=%s) Phase 1.5", settings.environment) await db.init_pool() await cache.init_redis() await slug_resolver.refresh() await content_index.refresh() refresher_slug = asyncio.create_task(slug_resolver.background_refresher()) refresher_content = asyncio.create_task(content_index.background_refresher()) log.info("eh-search ready") try: yield finally: refresher_slug.cancel() refresher_content.cancel() await db.close_pool() await cache.close_redis() log.info("eh-search stopped") app = FastAPI( title="eh-search", description="Federated search service for electric-horses.de", version="0.2.0", lifespan=lifespan, ) if settings.cors_origin_list: app.add_middleware( CORSMiddleware, allow_origins=settings.cors_origin_list, allow_methods=["GET"], allow_headers=["*"], max_age=600, ) # --------------------------------------------------------------------------- # Health # --------------------------------------------------------------------------- @app.get("/health", response_model=HealthResponse) async def health() -> HealthResponse: db_status = "ok" vehicles_count = 0 try: vehicles_count = await db.fetchval("SELECT count(*) FROM search_vehicles") except Exception as e: db_status = f"error: {e}" redis_status = "ok" try: await cache.get_redis().ping() except Exception as e: redis_status = f"error: {e}" overall = "ok" if db_status == "ok" and redis_status == "ok" else "degraded" return HealthResponse( status=overall, environment=settings.environment, db=db_status, redis=redis_status, vehicles_count=vehicles_count or 0, content=content_index.stats(), ) # --------------------------------------------------------------------------- # Federated Search # --------------------------------------------------------------------------- async def _vehicle_search(query: str, intent_type: str, normalized: str, limit: int) -> list[SearchResultItem]: if intent_type == "komm_nr": return await exact.by_komm_nr(normalized) if intent_type == "dvn": return await exact.by_dvn(normalized) if intent_type == "vin": return await exact.by_vin(normalized) if intent_type == "keyword_search": return await fts.hybrid_search(normalized, limit=limit) return [] def _content_to_items(raw: list[dict]) -> list[SearchResultItem]: """Convert content_index dict results to SearchResultItem.""" return [SearchResultItem(**r) for r in raw] async def _enrich_vehicle_slugs(items: list[SearchResultItem]) -> None: """Vehicles need their Directus slug resolved.""" for item in items: if item.type == "vehicle" and item.directus_product_id: item.slug = await slug_resolver.get_slug(item.directus_product_id) def _bridge_pages_for_blog(blog_results: list[SearchResultItem]) -> list[SearchResultItem]: """If matched blog posts have tags that bridge to a static page, surface that page as an additional result. """ found_pages: dict[str, float] = {} for blog in blog_results: if not blog.tags: continue slugs = static_pages.get_pages_for_tags(blog.tags) for slug in slugs: page = static_pages.page_by_slug(slug) if page: # Score: based on top blog match score, capped found_pages[slug] = max(found_pages.get(slug, 0), min(blog.score, 5.0)) items = [] for slug, score in found_pages.items(): page = static_pages.page_by_slug(slug) if page is None: continue items.append(SearchResultItem( type="page", title=page.title, slug=page.slug, snippet=page.snippet, score=score, matched_via="tag_bridge", )) return items @app.get("/search", response_model=SearchResponse) async def search( response: Response, q: str = Query(..., min_length=1, max_length=200), limit: int = Query(10, ge=1, le=50), ) -> SearchResponse: started = time.perf_counter() intent = route_intent(q) cache_key = cache.key_query(f"v2:{intent.normalized_query}:{limit}") cached = await cache.get_json(cache_key) if cached is not None: cached["cache_hit"] = True response.headers["X-Cache-Hit"] = "true" return SearchResponse(**cached) norm = intent.normalized_query # Run all sources in parallel if intent.type in ("komm_nr", "dvn", "vin"): # Exact lookups skip content/static (they should hit a vehicle) vehicle_task = _vehicle_search(q, intent.type, norm, limit) vehicle_items = await vehicle_task content_blog: list[SearchResultItem] = [] content_brands: list[SearchResultItem] = [] content_legal: list[SearchResultItem] = [] page_items: list[SearchResultItem] = [] elif intent.type == "keyword_search": vehicle_task = _vehicle_search(q, intent.type, norm, limit) vehicle_items, content_blog_raw, content_brand_raw, content_legal_raw, page_raw = await asyncio.gather( vehicle_task, asyncio.to_thread(content_index.search_blog, norm, 5), asyncio.to_thread(content_index.search_brands, norm, 5), asyncio.to_thread(content_index.search_legal, norm, 5), asyncio.to_thread(static_pages.search_static_pages, norm, 5), ) content_blog = _content_to_items(content_blog_raw) content_brands = _content_to_items(content_brand_raw) content_legal = _content_to_items(content_legal_raw) page_items = _content_to_items(page_raw) else: # autocomplete_only / empty vehicle_items = [] content_blog = [] content_brands = [] content_legal = [] page_items = [] await _enrich_vehicle_slugs(vehicle_items) # Tag-bridge: blog matches surface their bridged static pages too bridged_pages = _bridge_pages_for_blog(content_blog) # Merge static pages from direct match + bridge (dedup by slug, keep higher score) page_map: dict[str, SearchResultItem] = {} for p in page_items + bridged_pages + content_legal: if p.slug and (p.slug not in page_map or page_map[p.slug].score < p.score): page_map[p.slug] = p pages_final = sorted(page_map.values(), key=lambda x: -x.score) # Combined results, ordered by type priority then by score type_priority = {"page": 0, "brand": 1, "vehicle": 2, "blog": 3} all_results: list[SearchResultItem] = ( pages_final[:5] + content_brands[:5] + vehicle_items[:limit] + content_blog[:5] ) # Direct redirect logic: # - commission_number / vin with single hit -> vehicle slug # - exact static page match (score >= 9) when ONLY one hit -> page slug direct_redirect = False if intent.direct_redirect and len(vehicle_items) == 1 and vehicle_items[0].slug: direct_redirect = True elif intent.type == "keyword_search" and len(pages_final) == 1 and pages_final[0].score >= 9 and not vehicle_items and not content_brands and not content_blog: direct_redirect = True took_ms = int((time.perf_counter() - started) * 1000) payload = SearchResponse( query=q, intent=intent.type, direct_redirect=direct_redirect, total=len(all_results), results=all_results, took_ms=took_ms, cache_hit=False, ) ttl = settings.cache_ttl_empty if not all_results else settings.cache_ttl_result await cache.set_json(cache_key, payload.model_dump(), ttl=ttl) response.headers["X-Cache-Hit"] = "false" return payload # --------------------------------------------------------------------------- # Suggest # --------------------------------------------------------------------------- @app.get("/suggest", response_model=SuggestResponse) async def suggest_endpoint( response: Response, q: str = Query("", max_length=50), limit: int = Query(8, ge=1, le=20), ) -> SuggestResponse: started = time.perf_counter() q_norm = (q or "").strip().lower() cache_key = cache.key_suggest(f"{q_norm}:{limit}") cached = await cache.get_json(cache_key) if cached is not None: response.headers["X-Cache-Hit"] = "true" return SuggestResponse(**cached) if not q_norm: items = await suggest.top_brands(limit=limit) else: items = await suggest.prefix_suggest(q_norm, limit=limit) payload = SuggestResponse( query=q, suggestions=items, took_ms=int((time.perf_counter() - started) * 1000), ) await cache.set_json(cache_key, payload.model_dump(), ttl=settings.cache_ttl_suggest) response.headers["X-Cache-Hit"] = "false" return payload # --------------------------------------------------------------------------- # Cache invalidation # --------------------------------------------------------------------------- @app.post("/cache/invalidate", response_model=InvalidateResponse) async def invalidate(request: Request) -> InvalidateResponse: cleared = await cache.invalidate_all() # Refresh both indexes in the background asyncio.create_task(slug_resolver.refresh()) asyncio.create_task(content_index.refresh()) log.info( "Cache invalidated: %d keys (from %s)", cleared, request.client.host if request.client else "?", ) return InvalidateResponse(cleared=cleared, scope="all")