Mirrors /opt/ai-apps/eh-search/ on the server, including the full FastAPI app (intent routing, FTS+fuzzy+substring hybrid, multi-source federation across vehicles + blog + brands + pages + static + tag bridge), SQL schema (Postgres materialized view with german_unaccent text search, pg_trgm for fuzzy), Dockerfile and compose. Sanitized the hardcoded password in sql/01_init.sql — replaced with REPLACE_ME_BEFORE_APPLYING placeholder since this repo is public. The eh-search service binds only on the private network (10.0.0.8:8200) and is reachable only via Pegasus nginx proxy at /api/search. Refs OP#1094 OP#1105 OP#1112 OP#1116 OP#1117
148 lines
4.8 KiB
Python
148 lines
4.8 KiB
Python
"""Postgres Full-Text Search (Typ 2) + pg_trgm Fuzzy (Typ 3) + Substring fallback."""
|
|
import re
|
|
|
|
from app import db
|
|
from app.schemas import SearchResultItem
|
|
|
|
|
|
COLUMNS = """
|
|
vehicle_id, commission_number, vin, brand, model, title,
|
|
price::float8 as price, primary_image_id::text as primary_image_id,
|
|
directus_product_id
|
|
"""
|
|
|
|
# Sanitize tsquery input — keep word chars, spaces, dots, hyphens
|
|
_SAFE_TOKEN = re.compile(r"[^\w\s\.\-]", re.UNICODE)
|
|
|
|
|
|
def _build_prefix_tsquery(query: str) -> str:
|
|
"""Convert free-text query into a safe tsquery with prefix match.
|
|
|
|
'ray' -> 'ray:*'
|
|
'zero motor' -> 'zero:* & motor:*'
|
|
"""
|
|
cleaned = _SAFE_TOKEN.sub(" ", query)
|
|
tokens = [t for t in cleaned.split() if t]
|
|
if not tokens:
|
|
return ""
|
|
return " & ".join(f"{t}:*" for t in tokens)
|
|
|
|
|
|
async def fts_search(query: str, limit: int = 10) -> list[SearchResultItem]:
|
|
"""Full-text search with German + unaccent dictionary, prefix-enabled."""
|
|
tsquery_str = _build_prefix_tsquery(query)
|
|
if not tsquery_str:
|
|
return []
|
|
sql = f"""
|
|
SELECT {COLUMNS},
|
|
ts_rank_cd(search_tsv, q)::float8 AS rank
|
|
FROM search_vehicles, to_tsquery('german_unaccent', $1) q
|
|
WHERE search_tsv @@ q
|
|
ORDER BY rank DESC, brand, model
|
|
LIMIT $2
|
|
"""
|
|
try:
|
|
rows = await db.fetch(sql, tsquery_str, limit)
|
|
except Exception:
|
|
sql_fallback = f"""
|
|
SELECT {COLUMNS},
|
|
ts_rank_cd(search_tsv, q)::float8 AS rank
|
|
FROM search_vehicles, plainto_tsquery('german_unaccent', $1) q
|
|
WHERE search_tsv @@ q
|
|
ORDER BY rank DESC, brand, model
|
|
LIMIT $2
|
|
"""
|
|
rows = await db.fetch(sql_fallback, query, limit)
|
|
return [_to_item(r, "fts", float(r["rank"] or 0)) for r in rows]
|
|
|
|
|
|
async def fuzzy_search(query: str, limit: int = 10) -> list[SearchResultItem]:
|
|
"""Trigram similarity for typo tolerance."""
|
|
sql = f"""
|
|
WITH scored AS (
|
|
SELECT {COLUMNS},
|
|
GREATEST(
|
|
similarity(title, $1),
|
|
similarity(COALESCE(commission_number, ''), $1)
|
|
)::float8 AS sim
|
|
FROM search_vehicles
|
|
WHERE title % $1
|
|
OR commission_number % $1
|
|
)
|
|
SELECT * FROM scored
|
|
WHERE sim > 0.25
|
|
ORDER BY sim DESC
|
|
LIMIT $2
|
|
"""
|
|
rows = await db.fetch(sql, query, limit)
|
|
return [_to_item(r, "fuzzy", float(r["sim"])) for r in rows]
|
|
|
|
|
|
async def substring_search(query: str, limit: int = 10) -> list[SearchResultItem]:
|
|
"""Last-resort: ILIKE substring on title/brand/model.
|
|
|
|
Catches stop-words (e.g. German 'es') that FTS strips, and very short
|
|
queries that don't survive stemming. Uses the existing trgm GIN index.
|
|
"""
|
|
pattern = f"%{query.lower()}%"
|
|
sql = f"""
|
|
SELECT {COLUMNS}, 0.5::float8 AS rank
|
|
FROM search_vehicles
|
|
WHERE LOWER(title) LIKE $1
|
|
OR LOWER(brand) LIKE $1
|
|
OR LOWER(COALESCE(model, '')) LIKE $1
|
|
ORDER BY brand, model
|
|
LIMIT $2
|
|
"""
|
|
rows = await db.fetch(sql, pattern, limit)
|
|
return [_to_item(r, "substring", 0.5) for r in rows]
|
|
|
|
|
|
async def hybrid_search(query: str, limit: int = 10) -> list[SearchResultItem]:
|
|
"""FTS first; if too few hits, add fuzzy; if still empty, add substring.
|
|
|
|
Strategy:
|
|
1. Always run FTS (cheap, indexed, prefix-enabled)
|
|
2. If FTS has >= 3 results, return them
|
|
3. Otherwise also run fuzzy and merge
|
|
4. If still nothing, run substring fallback (handles stop-words)
|
|
"""
|
|
fts_results = await fts_search(query, limit=limit)
|
|
if len(fts_results) >= 3:
|
|
return fts_results
|
|
|
|
fuzzy_results = await fuzzy_search(query, limit=limit)
|
|
|
|
seen: dict[int, SearchResultItem] = {r.vehicle_id: r for r in fts_results}
|
|
for r in fuzzy_results:
|
|
if r.vehicle_id not in seen:
|
|
seen[r.vehicle_id] = r
|
|
|
|
if not seen:
|
|
# Last resort: substring (catches stop-words like 'es')
|
|
substring_results = await substring_search(query, limit=limit)
|
|
for r in substring_results:
|
|
seen[r.vehicle_id] = r
|
|
|
|
merged = list(seen.values())
|
|
merged.sort(key=lambda x: (
|
|
0 if x.matched_via == "fts" else (1 if x.matched_via == "fuzzy" else 2),
|
|
-x.score,
|
|
))
|
|
return merged[:limit]
|
|
|
|
|
|
def _to_item(r, matched_via: str, score: float) -> SearchResultItem:
|
|
return SearchResultItem(
|
|
vehicle_id=r["vehicle_id"],
|
|
commission_number=r["commission_number"],
|
|
vin=r["vin"],
|
|
brand=r["brand"],
|
|
model=r["model"],
|
|
title=r["title"] or "",
|
|
price=r["price"],
|
|
primary_image_id=r["primary_image_id"],
|
|
directus_product_id=r["directus_product_id"],
|
|
score=score,
|
|
matched_via=matched_via,
|
|
)
|