"""In-memory content index for blog posts, brands, legal pages. Loaded from Directus on startup, refreshed every 5 min in background, and on POST /cache/invalidate. Total dataset is small (~90 items), fits comfortably in RAM. """ import asyncio import logging import time from dataclasses import dataclass, field from typing import Any import httpx from app.config import settings log = logging.getLogger(__name__) @dataclass class BlogPost: id: int title: str slug: str excerpt: str = "" seo_description: str = "" tags: list[str] = field(default_factory=list) category: str = "" published_at: str | None = None image_id: str | None = None @dataclass class Brand: id: int name: str slug: str short_description: str = "" description: str = "" logo_id: str | None = None vehicle_count: int = 0 @dataclass class LegalPage: id: int title: str slug: str seo_description: str = "" # Mutable global state — protected by _lock for refresh atomicity _blog_posts: list[BlogPost] = [] _brands: list[Brand] = [] _legal_pages: list[LegalPage] = [] _last_refresh: float = 0.0 _lock = asyncio.Lock() # Slugs of pages that are NOT really top-level static pages but live in # the Directus pages collection. We treat them as legal/info pages. LEGAL_PAGE_SLUGS = { "impressum", "datenschutz", "agb", "barrierefreiheit", "batterie-entsorgung", "widerruf", } async def _fetch(client: httpx.AsyncClient, collection: str, fields: str, filter_field: str = "status", filter_value: str = "published") -> list[dict]: url = f"{settings.directus_url}/items/{collection}" params = { "fields": fields, "limit": -1, } if filter_field: params[f"filter[{filter_field}][_eq]"] = filter_value try: resp = await client.get(url, params=params) resp.raise_for_status() return resp.json().get("data", []) except Exception as e: log.warning("Failed to fetch %s: %s", collection, e) return [] async def refresh() -> dict: """Pull all collections from Directus and rebuild the in-memory index.""" global _blog_posts, _brands, _legal_pages, _last_refresh async with httpx.AsyncClient(timeout=15) as client: blog_data, brand_data, page_data = await asyncio.gather( _fetch(client, "blog_posts", "id,title,slug,excerpt,seo_description,tags,category,published_at,image"), _fetch(client, "brands", "id,name,slug,short_description,description,logo", filter_field="is_active", filter_value="true"), _fetch(client, "pages", "id,title,slug,seo_description"), ) new_blog = [ BlogPost( id=int(p["id"]), title=p.get("title") or "", slug=p.get("slug") or "", excerpt=p.get("excerpt") or "", seo_description=p.get("seo_description") or "", tags=[t for t in (p.get("tags") or []) if isinstance(t, str)], category=p.get("category") or "", published_at=p.get("published_at"), image_id=p.get("image"), ) for p in blog_data if p.get("slug") ] new_brands = [ Brand( id=int(b["id"]), name=b.get("name") or "", slug=b.get("slug") or "", short_description=b.get("short_description") or "", description=b.get("description") or "", logo_id=b.get("logo"), ) for b in brand_data if b.get("slug") ] new_legal = [ LegalPage( id=int(p["id"]), title=p.get("title") or "", slug=p.get("slug") or "", seo_description=p.get("seo_description") or "", ) for p in page_data if p.get("slug") and p["slug"] in LEGAL_PAGE_SLUGS ] async with _lock: _blog_posts = new_blog _brands = new_brands _legal_pages = new_legal _last_refresh = time.time() counts = {"blog": len(new_blog), "brands": len(new_brands), "legal": len(new_legal)} log.info("Content index refreshed: %s", counts) return counts async def background_refresher() -> None: while True: await asyncio.sleep(settings.directus_slug_refresh_seconds) try: await refresh() except Exception as e: log.warning("Background content refresh error: %s", e) def _score_blog(post: BlogPost, q: str) -> float: """Weighted match score for a blog post against query q (lowercased).""" score = 0.0 title_l = post.title.lower() excerpt_l = post.excerpt.lower() seo_l = post.seo_description.lower() cat_l = post.category.lower() # Title weights highest (substring requires len >= 3 to avoid 2-char noise) if title_l == q: score += 10 elif title_l.startswith(q): score += 6 elif len(q) >= 3 and q in title_l: score += 4 # Tags (real curated keywords) for tag in post.tags: tl = tag.lower() if tl == q: score += 5 break elif tl.startswith(q) or (len(q) >= 3 and q in tl): score += 2 break # Category if q in cat_l: score += 1 # Excerpt + seo_description (lighter) if q in excerpt_l: score += 1 if q in seo_l: score += 0.5 return score def _score_brand(brand: Brand, q: str) -> float: score = 0.0 name_l = brand.name.lower() if name_l == q: score += 15 # Brand exact match — dominant signal elif name_l.startswith(q): score += 10 # Prefix should beat any page substring match elif len(q) >= 3 and q in name_l: score += 5 if len(q) >= 3 and q in (brand.short_description or "").lower(): score += 1 if len(q) >= 3 and q in (brand.description or "").lower(): score += 0.5 return score def _score_legal(page: LegalPage, q: str) -> float: score = 0.0 title_l = page.title.lower() slug_l = page.slug.lower() if title_l == q or slug_l == q: score += 10 elif title_l.startswith(q) or slug_l.startswith(q): score += 7 elif len(q) >= 3 and (q in title_l or q in slug_l): score += 4 if len(q) >= 3 and q in (page.seo_description or "").lower(): score += 1 return score def search_blog(query: str, limit: int = 5) -> list[dict]: q = query.strip().lower() if not q: return [] scored = [(p, _score_blog(p, q)) for p in _blog_posts] scored = [(p, s) for p, s in scored if s > 0] scored.sort(key=lambda x: -x[1]) return [ { "type": "blog", "title": p.title, "slug": f"/blog/{p.slug}", "snippet": (p.seo_description or p.excerpt or "")[:180], "tags": p.tags, "category": p.category, "published_at": p.published_at, "image_id": p.image_id, "score": s, "matched_via": "blog", } for p, s in scored[:limit] ] def search_brands(query: str, limit: int = 5) -> list[dict]: q = query.strip().lower() if not q: return [] scored = [(b, _score_brand(b, q)) for b in _brands] scored = [(b, s) for b, s in scored if s > 0] scored.sort(key=lambda x: -x[1]) return [ { "type": "brand", "title": b.name, "slug": f"/marken/{b.slug}", "snippet": (b.short_description or "")[:180], "logo_id": b.logo_id, "score": s, "matched_via": "brand", } for b, s in scored[:limit] ] def search_legal(query: str, limit: int = 5) -> list[dict]: q = query.strip().lower() if not q: return [] scored = [(p, _score_legal(p, q)) for p in _legal_pages] scored = [(p, s) for p, s in scored if s > 0] scored.sort(key=lambda x: -x[1]) return [ { "type": "page", "title": p.title, "slug": f"/{p.slug}", "snippet": (p.seo_description or "")[:180], "score": s, "matched_via": "legal", } for p, s in scored[:limit] ] def get_blog_posts_with_tag(tag: str) -> list[BlogPost]: tag_l = tag.lower() return [p for p in _blog_posts if any(t.lower() == tag_l for t in p.tags)] def stats() -> dict: return { "blog": len(_blog_posts), "brands": len(_brands), "legal": len(_legal_pages), "last_refresh_age_s": int(time.time() - _last_refresh) if _last_refresh else None, }