Skip to content

Commit 5054f10

Browse files
authored
feat: clear migrated messages (#30)
* update circleCI images * remove net-tools install * feat: delete old messages during migration * feat: migrate any in retry state. remove intermediary update * feat: add 'retry' to getInflightMessages query
1 parent ad240da commit 5054f10

2 files changed

Lines changed: 12 additions & 13 deletions

File tree

.circleci/config.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ jobs:
8585
type: string
8686

8787
machine:
88-
image: ubuntu-2004:202201-02
88+
image: ubuntu-2004:202111-02
8989

9090
steps:
9191
- attach_workspace:
@@ -114,8 +114,7 @@ jobs:
114114
name: Install OpenVPN
115115
command: |
116116
sudo apt-get update
117-
sudo apt-get install net-tools -y
118-
sudo apt-get install openvpn openvpn-systemd-resolved -y
117+
sudo apt-get install openvpn openvpn-systemd-resolved
119118
120119
- run:
121120
name: Check IP before VPN connection

worker/src/Consumer/migrate/helpers.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export async function getInflightMessages(queue: string, schema: string) {
2727
${schema}.job
2828
WHERE
2929
name = '${queue}'
30-
AND state = 'created'`;
30+
AND state in ('created', 'retry')`;
3131

3232
const inflightMessages = await db.query(query);
3333
const numberOfMessages = inflightMessages?.rows.length;
@@ -45,8 +45,8 @@ export async function migrateInflightMessagesFromV9(queue: string, schema: strin
4545
const client = await db.connect();
4646

4747
try {
48+
logger.info(`Copying inflight messages for queue ${queue} from ${schema} to ${currentSchema} and deleting old messages`);
4849
await client.query("BEGIN");
49-
5050
await client.query(`INSERT INTO ${currentSchema}.job (
5151
id,
5252
name,
@@ -74,16 +74,16 @@ export async function migrateInflightMessagesFromV9(queue: string, schema: strin
7474
output
7575
FROM ${schema}.job
7676
WHERE name = '${queue}'
77-
AND state = 'created'
77+
AND state IN ('created', 'retry')
7878
ON CONFLICT DO NOTHING`);
7979

80-
logger.info(`Copied inflight messages for queue ${queue} from ${schema} to ${currentSchema}`);
81-
8280
await client.query(`
83-
UPDATE ${schema}.job
84-
SET state = 'completed'
85-
WHERE name = '${queue}' and state = 'created'
86-
`);
81+
DELETE from ${schema}.job
82+
WHERE name = '${queue}' and state IN ('created', 'completed', 'retry');
83+
84+
DELETE from ${schema}.archive
85+
WHERE name = '${queue}' and state IN ('created', 'completed', 'retry');
86+
`);
8787

8888
await client.query("COMMIT");
8989
} catch (err) {
@@ -94,5 +94,5 @@ export async function migrateInflightMessagesFromV9(queue: string, schema: strin
9494
await client.release();
9595
}
9696

97-
logger.info({ drainSchema: schema, queue }, `Inflight messages drained successfully for schema ${schema}, queue ${queue}.`);
97+
logger.info({ drainSchema: schema, queue }, `Inflight messages drained successfully for queue ${queue}, from schema ${schema} to ${currentSchema}`);
9898
}

0 commit comments

Comments
 (0)