electric-horses-infra/stacks/eh-search/app/db.py
Benjamin Weinlich b3813ed6ac feat(stacks/eh-search): add site-search FastAPI service
Mirrors /opt/ai-apps/eh-search/ on the server, including the full
FastAPI app (intent routing, FTS+fuzzy+substring hybrid, multi-source
federation across vehicles + blog + brands + pages + static + tag
bridge), SQL schema (Postgres materialized view with german_unaccent
text search, pg_trgm for fuzzy), Dockerfile and compose.

Sanitized the hardcoded password in sql/01_init.sql — replaced with
REPLACE_ME_BEFORE_APPLYING placeholder since this repo is public.

The eh-search service binds only on the private network (10.0.0.8:8200)
and is reachable only via Pegasus nginx proxy at /api/search.

Refs OP#1094 OP#1105 OP#1112 OP#1116 OP#1117
2026-04-11 22:19:39 +02:00

43 lines
999 B
Python

"""Async Postgres pool."""
import asyncpg
from app.config import settings
_pool: asyncpg.Pool | None = None
async def init_pool() -> None:
global _pool
_pool = await asyncpg.create_pool(
dsn=settings.dsn,
min_size=2,
max_size=10,
command_timeout=5,
)
async def close_pool() -> None:
global _pool
if _pool is not None:
await _pool.close()
_pool = None
def get_pool() -> asyncpg.Pool:
if _pool is None:
raise RuntimeError("DB pool not initialized")
return _pool
async def fetch(query: str, *args) -> list[asyncpg.Record]:
async with get_pool().acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(query: str, *args) -> asyncpg.Record | None:
async with get_pool().acquire() as conn:
return await conn.fetchrow(query, *args)
async def fetchval(query: str, *args):
async with get_pool().acquire() as conn:
return await conn.fetchval(query, *args)