electric-horses-infra/stacks/eh-search/app/slug_resolver.py

67 lines
2 KiB
Python
Raw Permalink Normal View History

"""Resolves directus_product_id -> slug, with periodic refresh.
Uses Directus REST API. Slug map is small (~75 entries) so we keep it
fully in-memory and refresh on a schedule (or on cache invalidation).
"""
import asyncio
import logging
import time
import httpx
from app.config import settings
log = logging.getLogger(__name__)
_slug_map: dict[int, str] = {}
_last_refresh: float = 0.0
_lock = asyncio.Lock()
async def refresh() -> int:
"""Pull product_id -> slug from Directus. Returns count."""
global _slug_map, _last_refresh
url = f"{settings.directus_url}/items/products"
params = {
"fields": "id,slug",
"limit": -1,
"filter[status][_eq]": "published",
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(url, params=params)
resp.raise_for_status()
data = resp.json().get("data", [])
new_map = {
int(item["id"]): item["slug"]
for item in data
if item.get("id") is not None and item.get("slug")
}
async with _lock:
_slug_map = new_map
_last_refresh = time.time()
log.info("Slug map refreshed: %d entries", len(new_map))
return len(new_map)
except Exception as e:
log.warning("Slug refresh failed: %s", e)
return 0
async def get_slug(directus_product_id: int | None) -> str | None:
if directus_product_id is None:
return None
# Lazy refresh if stale
if time.time() - _last_refresh > settings.directus_slug_refresh_seconds:
await refresh()
return _slug_map.get(int(directus_product_id))
async def background_refresher():
"""Periodic background task — refresh slug map every N seconds."""
while True:
await asyncio.sleep(settings.directus_slug_refresh_seconds)
try:
await refresh()
except Exception as e:
log.warning("Background slug refresh error: %s", e)