Skip to content

Commit 888f6f2

Browse files
authored
feat: allow pgboss schema name to be configurable (#28)
* feat: allow schema name to be configured via environment variable * rename to consumerSchema
1 parent ebe5c7a commit 888f6f2

5 files changed

Lines changed: 23 additions & 7 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,5 +79,6 @@ See [TROUBLESHOOTING.md](./TROUBLESHOOTING.md) on how to troubleshoot the worker
7979
| DELETE_ARCHIVED_IN_DAYS | string/int | 7 | How long to keep jobs in pgboss.archive before deleting it |
8080
| SUBMISSION_REQUEST_TIMEOUT | string/int | 2000 | How long to keep the POST request alive for in milliseconds. This should be higher (20-30s) if integrating into CASEBOOK/Orbit which has long response times |
8181
| NEW_JOB_CHECK_INTERVAL | string/int | 2000 | The frequency to check for new jobs in milliseconds |
82+
| QUEUE_SCHEMA | string | pgboss | The schema name for pgboss to use. If it does not exist, pgboss will create the schema and related tables in this schema. |
8283

8384
Types are described as string/int since kubernetes only accepts strings. Strings are parsed into int.

TROUBLESHOOTING.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ kubectl run -it --rm --env PGPASSWORD='<PASSWORD>' --env PAGER= --image=postgres
2020
Replace PASSWORD with the password for the database, ENDPOINT_URL with the endpoint URL for the database.
2121

2222

23-
2423
[pgboss](https://github.com/timgit/pg-boss) is used to manage queueing jobs. On application start, pgboss will automatically create necessary tables in the database.
2524

25+
By default, pgboss will create the schema `pgboss`. The tables will be created in this schema.
26+
This is configured using the environment variable `QUEUE_SCHEMA`. If you have set this to a different schema, you will need to change the schema in the following queries.
27+
28+
For example, if `QUEUE_SCHEMA` is set to `pgboss_v9`, query the job table with `select * from pgboss_v9.job;`
29+
30+
2631
### Jobs table
2732
The jobs table `pgboss.job` is where all the current jobs are stored. Jobs will remain here, until they are completed or failed. Then they will move to `pgboss.archive`
2833

worker/config/custom-environment-variables.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"url": "QUEUE_URL",
44
"archiveFailedAfterDays": "ARCHIVE_FAILED_AFTER_DAYS",
55
"deleteArchivedAfterDays": "DELETE_ARCHIVED_IN_DAYS",
6+
"schema": "QUEUE_SCHEMA"
67
},
78
"Submission": {
89
"requestTimeout": "SUBMISSION_REQUEST_TIMEOUT"

worker/config/default.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module.exports = {
33
url: "postgres://user:root@localhost:5432/queue",
44
archiveFailedInDays: 30,
55
deleteArchivedAfterDays: 7,
6+
schema: "pgboss",
67
},
78
Submission: {
89
requestTimeout: 2000,

worker/src/Consumer/getConsumer.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,32 @@ import pino from "pino";
55

66
const URL = config.get<string>("Queue.url");
77
const logger = pino().child({ method: "Consumer.create" });
8-
let consumer;
98

109
const MINUTE_IN_S = 60;
1110
const HOUR_IN_S = MINUTE_IN_S * 60;
1211
const DAY_IN_S = HOUR_IN_S * 24;
1312

1413
const archiveFailedAfterDays = parseInt(config.get<string>("Queue.archiveFailedInDays"));
1514
const deleteAfterDays = parseInt(config.get<string>("Queue.deleteArchivedAfterDays"));
15+
const schema = config.get<string>("Queue.schema");
16+
17+
const registeredConsumers: Record<string, PgBoss> = {};
1618

1719
logger.info(`archiveFailedAfterDays: ${archiveFailedAfterDays}, deleteAfterDays: ${deleteAfterDays}`);
1820

21+
type ConsumerOptions = {
22+
schema?: string;
23+
};
1924
/**
2025
* Sets up database connection via PgBoss and creates an instance of a "consumer" (consumes the queue).
2126
*/
22-
export async function create() {
27+
export async function create(options: ConsumerOptions) {
28+
const { schema } = options;
2329
const boss = new PgBoss({
2430
connectionString: URL,
2531
archiveFailedAfterSeconds: archiveFailedAfterDays * DAY_IN_S,
2632
deleteAfterDays,
33+
schema,
2734
});
2835

2936
boss.on("error", (error) => {
@@ -47,13 +54,14 @@ export async function create() {
4754
* `getConsumer` should be used whenever an instance of a consumer is needed.
4855
* This is to prevent too many database connections from being opened unnecessarily.
4956
*/
50-
export async function getConsumer() {
57+
export async function getConsumer(name?: string) {
58+
const consumerSchema = name ?? schema;
59+
const consumer = registeredConsumers[consumerSchema];
5160
try {
5261
if (!consumer) {
53-
const boss = await create();
54-
consumer = boss;
62+
registeredConsumers[consumerSchema] = await create({ schema: consumerSchema });
5563
}
56-
return consumer;
64+
return registeredConsumers[consumerSchema];
5765
} catch (e) {
5866
logger.error(e);
5967
process.exit(1);

0 commit comments

Comments
 (0)