Skip to content

Commit 9b25b43

Browse files
committed
demonstrate alternative constructor w/ listener support
Add a `NewWithListener` constructor to `riverdatabasesql` that allows the `database/sql` driver to be used with a functioning listener implementation. Also add a `NewListener` constructor to the `riverpgxv5` driver to allow creating a listener with a raw pgx pool. These can be combined to allow full listener support as long as the underlying database driver supports it, even when it's used within an abstraction like `database/sql` or Bun.
1 parent 183f359 commit 9b25b43

2 files changed

Lines changed: 40 additions & 3 deletions

File tree

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ var migrationFS embed.FS
3535

3636
// Driver is an implementation of riverdriver.Driver for database/sql.
3737
type Driver struct {
38-
dbPool *sql.DB
38+
dbPool *sql.DB
39+
listener riverdriver.Listener
3940
}
4041

4142
// New returns a new database/sql River driver for use with River.
@@ -47,11 +48,26 @@ func New(dbPool *sql.DB) *Driver {
4748
return &Driver{dbPool: dbPool}
4849
}
4950

51+
// NewWithListener returns a new database/sql River driver for use with River
52+
// just like New, except it also takes a riverdriver.Listener to use for
53+
// listening to notifications.
54+
func NewWithListener(dbPool *sql.DB, listener riverdriver.Listener) *Driver {
55+
driver := New(dbPool)
56+
driver.listener = listener
57+
return driver
58+
}
59+
5060
func (d *Driver) GetExecutor() riverdriver.Executor {
5161
return &Executor{d.dbPool, d.dbPool}
5262
}
5363

54-
func (d *Driver) GetListener() riverdriver.Listener { panic(riverdriver.ErrNotImplemented) }
64+
func (d *Driver) GetListener() riverdriver.Listener {
65+
if d.listener == nil {
66+
panic(riverdriver.ErrNotImplemented)
67+
}
68+
return d.listener
69+
}
70+
5571
func (d *Driver) GetMigrationFS(line string) fs.FS {
5672
if line == riverdriver.MigrationLineMain {
5773
return migrationFS
@@ -60,7 +76,7 @@ func (d *Driver) GetMigrationFS(line string) fs.FS {
6076
}
6177
func (d *Driver) GetMigrationLines() []string { return []string{riverdriver.MigrationLineMain} }
6278
func (d *Driver) HasPool() bool { return d.dbPool != nil }
63-
func (d *Driver) SupportsListener() bool { return false }
79+
func (d *Driver) SupportsListener() bool { return d.listener != nil }
6480

6581
func (d *Driver) UnwrapExecutor(tx *sql.Tx) riverdriver.ExecutorTx {
6682
return &ExecutorTx{Executor: Executor{nil, tx}, tx: tx}

riverdriver/riverpgxv5/river_pgx_v5_driver.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,6 +765,27 @@ type Listener struct {
765765
mu sync.Mutex
766766
}
767767

768+
// NewListener returns a Listener that can be used to enable other drivers
769+
// (notably `riverdatabasesql`) to listen for notifications when their
770+
// abstraction doesn't allow direct access to pgx connections, even though they
771+
// are using pgx under the hood. This constructor is not applicable to
772+
// applications which only use pgx directly.
773+
//
774+
// Users of `database/sql` or Bun can use this with the
775+
// `riverdatabasesql.NewWithListener` constructor to enable listener support in
776+
// that driver.
777+
//
778+
// The dbPool will solely be used for acquiring new connections for `LISTEN`
779+
// commands. As such, a pool must be provided that supports that command. Users
780+
// of pgbouncer should ensure that this specific pool is configured with session
781+
// pooling, even if their main application does not use session pooling.
782+
//
783+
// A single Client will never use more than one listener connection at a time,
784+
// no matter how many topics are being listened to.
785+
func NewListener(dbPool *pgxpool.Pool) riverdriver.Listener {
786+
return &Listener{dbPool: dbPool}
787+
}
788+
768789
func (l *Listener) Close(ctx context.Context) error {
769790
l.mu.Lock()
770791
defer l.mu.Unlock()

0 commit comments

Comments
 (0)