Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/riverui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

func main() {
riveruicmd.Run(
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema})
},
func(client *river.Client[pgx.Tx]) uiendpoints.Bundle {
return riverui.NewEndpoints(client, nil)
Expand Down
4 changes: 2 additions & 2 deletions internal/riveruicmd/auth_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestAuthMiddleware(t *testing.T) { //nolint:tparallel
logger: riversharedtest.Logger(t),
pathPrefix: prefix,
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema})
},
func(client *river.Client[pgx.Tx]) uiendpoints.Bundle {
return riverui.NewEndpoints(client, nil)
Expand Down
11 changes: 8 additions & 3 deletions internal/riveruicmd/riveruicmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type BundleOpts struct {
JobListHideArgsByDefault bool
}

func Run[TClient any](createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) {
func Run[TClient any](createClient func(dbPool *pgxpool.Pool, schema string) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) {
ctx := context.Background()

logger := slog.New(getLogHandler(&slog.HandlerOptions{
Expand Down Expand Up @@ -142,7 +142,7 @@ type initServerOpts struct {
silentHealthChecks bool
}

func initServer[TClient any](ctx context.Context, opts *initServerOpts, createClient func(*pgxpool.Pool) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) {
func initServer[TClient any](ctx context.Context, opts *initServerOpts, createClient func(dbPool *pgxpool.Pool, schema string) (TClient, error), createBundle func(TClient) uiendpoints.Bundle) (*initServerResult, error) {
if opts == nil {
return nil, errors.New("opts is required")
}
Expand Down Expand Up @@ -177,12 +177,17 @@ func initServer[TClient any](ctx context.Context, opts *initServerOpts, createCl
return nil, fmt.Errorf("error parsing db config: %w", err)
}

schema := os.Getenv("RIVER_SCHEMA")
if schema == "" {
schema = poolConfig.ConnConfig.Config.RuntimeParams["search_path"]
}

dbPool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
return nil, fmt.Errorf("error connecting to db: %w", err)
}

client, err := createClient(dbPool)
client, err := createClient(dbPool, schema)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/riveruicmd/riveruicmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func TestInitServer(t *testing.T) { //nolint:tparallel
logger: riversharedtest.Logger(t),
pathPrefix: "/",
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema})
},
func(client *river.Client[pgx.Tx]) uiendpoints.Bundle {
return riverui.NewEndpoints(client, nil)
Expand Down Expand Up @@ -182,8 +182,8 @@ func TestSilentHealthchecks_SuppressesLogs(t *testing.T) {
pathPrefix: prefix,
silentHealthChecks: silent,
},
func(dbPool *pgxpool.Pool) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{})
func(dbPool *pgxpool.Pool, schema string) (*river.Client[pgx.Tx], error) {
return river.NewClient(riverpgxv5.New(dbPool), &river.Config{Schema: schema})
},
func(client *river.Client[pgx.Tx]) uiendpoints.Bundle {
return riverui.NewEndpoints(client, nil)
Expand Down
4 changes: 2 additions & 2 deletions riverproui/cmd/riverproui/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

func main() {
riveruicmd.Run(
func(dbPool *pgxpool.Pool) (*riverpro.Client[pgx.Tx], error) {
return riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{})
func(dbPool *pgxpool.Pool, schema string) (*riverpro.Client[pgx.Tx], error) {
return riverpro.NewClient(riverpropgxv5.New(dbPool), &riverpro.Config{Schema: schema})
},
func(client *riverpro.Client[pgx.Tx]) uiendpoints.Bundle {
return riverproui.NewEndpoints(client, nil)
Expand Down