Mirrors /opt/ai-apps/eh-search/ on the server, including the full FastAPI app (intent routing, FTS+fuzzy+substring hybrid, multi-source federation across vehicles + blog + brands + pages + static + tag bridge), SQL schema (Postgres materialized view with german_unaccent text search, pg_trgm for fuzzy), Dockerfile and compose. Sanitized the hardcoded password in sql/01_init.sql — replaced with REPLACE_ME_BEFORE_APPLYING placeholder since this repo is public. The eh-search service binds only on the private network (10.0.0.8:8200) and is reachable only via Pegasus nginx proxy at /api/search. Refs OP#1094 OP#1105 OP#1112 OP#1116 OP#1117
66 lines
2 KiB
Python
66 lines
2 KiB
Python
"""Resolves directus_product_id -> slug, with periodic refresh.
|
|
|
|
Uses Directus REST API. Slug map is small (~75 entries) so we keep it
|
|
fully in-memory and refresh on a schedule (or on cache invalidation).
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import time
|
|
|
|
import httpx
|
|
|
|
from app.config import settings
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_slug_map: dict[int, str] = {}
|
|
_last_refresh: float = 0.0
|
|
_lock = asyncio.Lock()
|
|
|
|
|
|
async def refresh() -> int:
|
|
"""Pull product_id -> slug from Directus. Returns count."""
|
|
global _slug_map, _last_refresh
|
|
url = f"{settings.directus_url}/items/products"
|
|
params = {
|
|
"fields": "id,slug",
|
|
"limit": -1,
|
|
"filter[status][_eq]": "published",
|
|
}
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(url, params=params)
|
|
resp.raise_for_status()
|
|
data = resp.json().get("data", [])
|
|
new_map = {
|
|
int(item["id"]): item["slug"]
|
|
for item in data
|
|
if item.get("id") is not None and item.get("slug")
|
|
}
|
|
async with _lock:
|
|
_slug_map = new_map
|
|
_last_refresh = time.time()
|
|
log.info("Slug map refreshed: %d entries", len(new_map))
|
|
return len(new_map)
|
|
except Exception as e:
|
|
log.warning("Slug refresh failed: %s", e)
|
|
return 0
|
|
|
|
|
|
async def get_slug(directus_product_id: int | None) -> str | None:
|
|
if directus_product_id is None:
|
|
return None
|
|
# Lazy refresh if stale
|
|
if time.time() - _last_refresh > settings.directus_slug_refresh_seconds:
|
|
await refresh()
|
|
return _slug_map.get(int(directus_product_id))
|
|
|
|
|
|
async def background_refresher():
|
|
"""Periodic background task — refresh slug map every N seconds."""
|
|
while True:
|
|
await asyncio.sleep(settings.directus_slug_refresh_seconds)
|
|
try:
|
|
await refresh()
|
|
except Exception as e:
|
|
log.warning("Background slug refresh error: %s", e)
|