From 2d0a6baef30dcaae281f4f1a2d3470d756412581 Mon Sep 17 00:00:00 2001 From: vivere-dally Date: Mon, 16 Feb 2026 23:48:53 +0200 Subject: [PATCH] Parse the search_path from the database URL or extract it from the RIVER_SCHEMA env var and pass it to the River client's Config. Should allow usage for users that do not have the river tables in the postgres public schema. --- cmd/riverui/main.go | 4 ++-- internal/riveruicmd/auth_middleware_test.go | 4 ++-- internal/riveruicmd/riveruicmd.go | 11 ++++++++--- internal/riveruicmd/riveruicmd_test.go | 8 ++++---- riverproui/cmd/riverproui/main.go | 4 ++-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/cmd/riverui/main.go b/cmd/riverui/main.go index 63e9deca..b564dc60 100644 --- a/cmd/riverui/main.go +++ b/cmd/riverui/main.go @@ -14,8 +14,8 @@ import ( func main() { riveruicmd.Run( - func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { - return river.NewClient(riverpgxv5.New(dbPool), &river.Config{}) + func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) { + return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema}) }, func(client *river.Client[pgx.Tx]) uiendpoints.Bundle { return riverui.NewEndpoints(client, nil) diff --git a/internal/riveruicmd/auth_middleware_test.go b/internal/riveruicmd/auth_middleware_test.go index a892db44..7e298762 100644 --- a/internal/riveruicmd/auth_middleware_test.go +++ b/internal/riveruicmd/auth_middleware_test.go @@ -41,8 +41,8 @@ func TestAuthMiddleware(t *testing.T) { //nolint:tparallel logger: riversharedtest.Logger(t), pathPrefix: prefix, }, - func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { - return river.NewClient(riverpgxv5.New(dbPool), &river.Config{}) + func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) { + return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema}) }, func(client *river.Client[pgx.Tx]) uiendpoints.Bundle { return riverui.NewEndpoints(client, nil) diff --git a/internal/riveruicmd/riveruicmd.go b/internal/riveruicmd/riveruicmd.go index 1fb47220..310739f9 100644 --- a/internal/riveruicmd/riveruicmd.go +++ b/internal/riveruicmd/riveruicmd.go @@ -28,7 +28,7 @@ type BundleOpts struct { JobListHideArgsByDefault bool } -func Run[TClient any](createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) { +func Run[TClient any](createClient func(dbPool *pgxpool.Pool, schema string) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) { ctx := context.Background() logger := slog.New(getLogHandler(&slog.HandlerOptions{ @@ -142,7 +142,7 @@ type initServerOpts struct { silentHealthChecks bool } -func initServer[TClient any](ctx context.Context, opts *initServerOpts, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) { +func initServer[TClient any](ctx context.Context, opts *initServerOpts, createClient func(dbPool *pgxpool.Pool, schema string) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) { if opts == nil { return nil, errors.New("opts is required") } @@ -177,12 +177,17 @@ func initServer[TClient any](ctx context.Context, opts *initServerOpts, createCl return nil, fmt.Errorf("error parsing db config: %w", err) } + schema := os.Getenv("RIVER_SCHEMA") + if schema == "" { + schema = poolConfig.ConnConfig.Config.RuntimeParams["search_path"] + } + dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig) if err != nil { return nil, fmt.Errorf("error connecting to db: %w", err) } - client, err := createClient(dbPool) + client, err := createClient(dbPool, schema) if err != nil { return nil, err } diff --git a/internal/riveruicmd/riveruicmd_test.go b/internal/riveruicmd/riveruicmd_test.go index 40e881a7..f90616fe 100644 --- a/internal/riveruicmd/riveruicmd_test.go +++ b/internal/riveruicmd/riveruicmd_test.go @@ -42,8 +42,8 @@ func TestInitServer(t *testing.T) { //nolint:tparallel logger: riversharedtest.Logger(t), pathPrefix: "/", }, - func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { - return river.NewClient(riverpgxv5.New(dbPool), &river.Config{}) + func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) { + return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema}) }, func(client *river.Client[pgx.Tx]) uiendpoints.Bundle { return riverui.NewEndpoints(client, nil) @@ -182,8 +182,8 @@ func TestSilentHealthchecks_SuppressesLogs(t *testing.T) { pathPrefix: prefix, silentHealthChecks: silent, }, - func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) { - return river.NewClient(riverpgxv5.New(dbPool), &river.Config{}) + func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) { + return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema}) }, func(client *river.Client[pgx.Tx]) uiendpoints.Bundle { return riverui.NewEndpoints(client, nil) diff --git a/riverproui/cmd/riverproui/main.go b/riverproui/cmd/riverproui/main.go index d0303a9a..d3d4c351 100644 --- a/riverproui/cmd/riverproui/main.go +++ b/riverproui/cmd/riverproui/main.go @@ -14,8 +14,8 @@ import ( func main() { riveruicmd.Run( - func(dbPool *pgxpool.Pool) (*riverpro.Client[pgx.Tx], error) { - return riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{}) + func(dbPool *pgxpool.Pool, schema string) (*riverpro.Client[pgx.Tx], error) { + return riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{Schema: schema}) }, func(client *riverpro.Client[pgx.Tx]) uiendpoints.Bundle { return riverproui.NewEndpoints(client, nil)