Skip to content

Commit 8ec9428

Browse files
authored
feat: add ensure_workers function to automatically restart dead workers (#510)
# Add `ensure_workers()` function to manage worker lifecycle This PR introduces a new PostgreSQL function `pgflow.ensure_workers()` that manages the lifecycle of worker functions by: - Determining which worker functions need to be invoked based on their current state - Making HTTP requests to invoke those functions - Updating the `last_invoked_at` timestamp for invoked functions The function implements different behavior based on environment: - In local mode: Always pings all enabled functions (for fast restart after code changes) - In production mode: Only pings functions that have no alive workers Key features: - Respects debounce periods using the function's `heartbeat_timeout_seconds` setting - Uses Vault secrets for credentials in production with local fallbacks - Returns request IDs from pg_net for each HTTP request made - Comprehensive test suite covering all behavior scenarios This function will be called by a cron job to ensure worker functions stay running, providing automatic recovery from worker failures.
1 parent b727299 commit 8ec9428

15 files changed

+838
-1
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
-- Ensure Workers
2+
-- Returns which worker functions should be invoked, makes HTTP requests, and updates last_invoked_at
3+
-- Called by cron job to keep workers running
4+
5+
drop function if exists pgflow.ensure_workers();
6+
7+
create or replace function pgflow.ensure_workers()
8+
returns table (function_name text, invoked boolean, request_id bigint)
9+
language sql
10+
as $$
11+
with
12+
-- Detect environment
13+
env as (
14+
select pgflow.is_local() as is_local
15+
),
16+
17+
-- Get credentials: Vault secrets with local fallback for base_url only
18+
credentials as (
19+
select
20+
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
21+
coalesce(
22+
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
23+
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
24+
) as base_url
25+
),
26+
27+
-- Find functions that pass the debounce check
28+
debounce_passed as (
29+
select wf.function_name, wf.heartbeat_timeout_seconds
30+
from pgflow.worker_functions as wf
31+
where wf.enabled = true
32+
and (
33+
wf.last_invoked_at is null
34+
or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval
35+
)
36+
),
37+
38+
-- Find functions that have at least one alive worker
39+
functions_with_alive_workers as (
40+
select distinct w.function_name
41+
from pgflow.workers as w
42+
inner join debounce_passed as dp on w.function_name = dp.function_name
43+
where w.stopped_at is null
44+
and w.deprecated_at is null
45+
and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval
46+
),
47+
48+
-- Determine which functions should be invoked
49+
functions_to_invoke as (
50+
select dp.function_name
51+
from debounce_passed as dp
52+
where
53+
pgflow.is_local() = true
54+
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
55+
),
56+
57+
-- Make HTTP requests and capture request_ids
58+
http_requests as (
59+
select
60+
fti.function_name,
61+
net.http_post(
62+
url => c.base_url || '/' || fti.function_name,
63+
headers => case
64+
when e.is_local then '{}'::jsonb
65+
else jsonb_build_object(
66+
'Content-Type', 'application/json',
67+
'Authorization', 'Bearer ' || c.service_role_key
68+
)
69+
end,
70+
body => '{}'::jsonb
71+
) as request_id
72+
from functions_to_invoke as fti
73+
cross join credentials as c
74+
cross join env as e
75+
where c.base_url is not null
76+
and (e.is_local or c.service_role_key is not null)
77+
),
78+
79+
-- Update last_invoked_at for invoked functions
80+
updated as (
81+
update pgflow.worker_functions as wf
82+
set last_invoked_at = clock_timestamp()
83+
from http_requests as hr
84+
where wf.function_name = hr.function_name
85+
returning wf.function_name
86+
)
87+
88+
select u.function_name, true as invoked, hr.request_id
89+
from updated as u
90+
inner join http_requests as hr on u.function_name = hr.function_name
91+
$$;
92+
93+
comment on function pgflow.ensure_workers() is
94+
'Ensures worker functions are running by pinging them via HTTP when needed.
95+
In local mode: always pings all enabled functions (for fast restart after code changes).
96+
In production mode: only pings functions that have no alive workers.
97+
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
98+
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
99+
Returns request_id from pg_net for each HTTP request made.';

pkgs/core/src/database-types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,14 @@ export type Database = {
482482
Args: { flow_slug: string; shape: Json }
483483
Returns: Json
484484
}
485+
ensure_workers: {
486+
Args: never
487+
Returns: {
488+
function_name: string
489+
invoked: boolean
490+
request_id: number
491+
}[]
492+
}
485493
fail_task: {
486494
Args: {
487495
error_message: string
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
-- Create "ensure_workers" function
2+
CREATE FUNCTION "pgflow"."ensure_workers" () RETURNS TABLE ("function_name" text, "invoked" boolean, "request_id" bigint) LANGUAGE sql AS $$
3+
with
4+
-- Detect environment
5+
env as (
6+
select pgflow.is_local() as is_local
7+
),
8+
9+
-- Get credentials: Vault secrets with local fallback for base_url only
10+
credentials as (
11+
select
12+
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_service_role_key') as service_role_key,
13+
coalesce(
14+
(select decrypted_secret from vault.decrypted_secrets where name = 'pgflow_function_base_url'),
15+
case when (select is_local from env) then 'http://kong:8000/functions/v1' end
16+
) as base_url
17+
),
18+
19+
-- Find functions that pass the debounce check
20+
debounce_passed as (
21+
select wf.function_name, wf.heartbeat_timeout_seconds
22+
from pgflow.worker_functions as wf
23+
where wf.enabled = true
24+
and (
25+
wf.last_invoked_at is null
26+
or wf.last_invoked_at < now() - (wf.heartbeat_timeout_seconds || ' seconds')::interval
27+
)
28+
),
29+
30+
-- Find functions that have at least one alive worker
31+
functions_with_alive_workers as (
32+
select distinct w.function_name
33+
from pgflow.workers as w
34+
inner join debounce_passed as dp on w.function_name = dp.function_name
35+
where w.stopped_at is null
36+
and w.deprecated_at is null
37+
and w.last_heartbeat_at > now() - (dp.heartbeat_timeout_seconds || ' seconds')::interval
38+
),
39+
40+
-- Determine which functions should be invoked
41+
functions_to_invoke as (
42+
select dp.function_name
43+
from debounce_passed as dp
44+
where
45+
pgflow.is_local() = true
46+
or dp.function_name not in (select faw.function_name from functions_with_alive_workers as faw)
47+
),
48+
49+
-- Make HTTP requests and capture request_ids
50+
http_requests as (
51+
select
52+
fti.function_name,
53+
net.http_post(
54+
url => c.base_url || '/' || fti.function_name,
55+
headers => case
56+
when e.is_local then '{}'::jsonb
57+
else jsonb_build_object(
58+
'Content-Type', 'application/json',
59+
'Authorization', 'Bearer ' || c.service_role_key
60+
)
61+
end,
62+
body => '{}'::jsonb
63+
) as request_id
64+
from functions_to_invoke as fti
65+
cross join credentials as c
66+
cross join env as e
67+
where c.base_url is not null
68+
and (e.is_local or c.service_role_key is not null)
69+
),
70+
71+
-- Update last_invoked_at for invoked functions
72+
updated as (
73+
update pgflow.worker_functions as wf
74+
set last_invoked_at = clock_timestamp()
75+
from http_requests as hr
76+
where wf.function_name = hr.function_name
77+
returning wf.function_name
78+
)
79+
80+
select u.function_name, true as invoked, hr.request_id
81+
from updated as u
82+
inner join http_requests as hr on u.function_name = hr.function_name
83+
$$;
84+
-- Set comment to function: "ensure_workers"
85+
COMMENT ON FUNCTION "pgflow"."ensure_workers" IS 'Ensures worker functions are running by pinging them via HTTP when needed.
86+
In local mode: always pings all enabled functions (for fast restart after code changes).
87+
In production mode: only pings functions that have no alive workers.
88+
Respects debounce: skips functions pinged within their heartbeat_timeout_seconds window.
89+
Credentials: Uses Vault secrets (pgflow_service_role_key, pgflow_function_base_url) or local fallbacks.
90+
Returns request_id from pg_net for each HTTP request made.';

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU=
1+
h1:G7RvNGNjnwVtYZTuceBwoaHEXa9bd/xH6hC8x5MNxZk=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -18,3 +18,4 @@ h1:ITAzSq+m8k27LdS5wU7dFgdolSQl5pHgOD5VgpL3zyU=
1818
20251204164612_pgflow_temp_track_worker_function.sql h1:3Ht8wUx3saKPo98osuTG/nxD/tKR48qe8jHNVwuu2lY=
1919
20251204165231_pgflow_temp_mark_worker_stopped.sql h1:zI2FijK429oae4OpLbjU4eSaZtYhlsbvN3buWH9FKLw=
2020
20251205103442_pgflow_temp_add_extensions.sql h1:IBHG1vBdXu8wDEJzqpJUFmuPhVaX0mAmDUkngLgdaMg=
21+
20251205133446_pgflow_temp_ensure_workers.sql h1:EQzE75uaMSXeU1sdjO7MK1ipCwepxlWSVzlKegLpr48=
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
-- Test: ensure_workers() retrieves credentials from Vault
2+
begin;
3+
select plan(2);
4+
select pgflow_tests.reset_db();
5+
6+
-- Setup: Create Vault secrets
7+
select vault.create_secret(
8+
'test-service-role-key-from-vault',
9+
'pgflow_service_role_key'
10+
);
11+
select vault.create_secret(
12+
'http://vault-configured-url.example.com/functions/v1',
13+
'pgflow_function_base_url'
14+
);
15+
16+
-- Setup: Register a worker function
17+
select pgflow.track_worker_function('my-function');
18+
update pgflow.worker_functions
19+
set last_invoked_at = now() - interval '10 seconds';
20+
21+
-- Simulate production mode (non-local jwt_secret)
22+
set local app.settings.jwt_secret = 'production-secret-different-from-local';
23+
24+
-- TEST: In production mode WITH Vault secrets, function IS invoked
25+
with result as (
26+
select * from pgflow.ensure_workers()
27+
)
28+
select is(
29+
(select count(*) from result),
30+
1::bigint,
31+
'Production mode with Vault secrets invokes functions'
32+
);
33+
34+
-- TEST: request_id is returned (proves Vault credentials were used for HTTP call)
35+
update pgflow.worker_functions
36+
set last_invoked_at = now() - interval '10 seconds';
37+
38+
with result as (
39+
select * from pgflow.ensure_workers()
40+
)
41+
select ok(
42+
(select request_id is not null from result limit 1),
43+
'Vault credentials allow HTTP invocation in production mode'
44+
);
45+
46+
select finish();
47+
rollback;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
-- Test: ensure_workers() uses local fallback credentials when Vault is empty
2+
begin;
3+
select plan(2);
4+
select pgflow_tests.reset_db();
5+
6+
-- Ensure no Vault secrets exist
7+
delete from vault.secrets where name in ('pgflow_service_role_key', 'pgflow_function_base_url');
8+
9+
-- Setup: Register a worker function
10+
select pgflow.track_worker_function('my-function');
11+
update pgflow.worker_functions
12+
set last_invoked_at = now() - interval '10 seconds';
13+
14+
-- Simulate local mode
15+
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';
16+
17+
-- TEST: In local mode without Vault secrets, function IS invoked (uses fallback)
18+
with result as (
19+
select * from pgflow.ensure_workers()
20+
)
21+
select is(
22+
(select count(*) from result),
23+
1::bigint,
24+
'Local mode uses fallback credentials when Vault is empty'
25+
);
26+
27+
-- TEST: request_id is returned (proves HTTP call was made)
28+
update pgflow.worker_functions
29+
set last_invoked_at = now() - interval '10 seconds';
30+
31+
with result as (
32+
select * from pgflow.ensure_workers()
33+
)
34+
select ok(
35+
(select request_id is not null from result limit 1),
36+
'Local fallback credentials allow HTTP invocation'
37+
);
38+
39+
select finish();
40+
rollback;
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
-- Test: ensure_workers() respects debounce window
2+
begin;
3+
select plan(4);
4+
select pgflow_tests.reset_db();
5+
6+
-- Setup: Register a worker function with 6 second heartbeat timeout
7+
select pgflow.track_worker_function('my-function');
8+
9+
-- TEST: Function with recent last_invoked_at is NOT returned (debounce active)
10+
-- Note: track_worker_function sets last_invoked_at to now()
11+
set local app.settings.jwt_secret = 'super-secret-jwt-token-with-at-least-32-characters-long';
12+
select is(
13+
(select count(*) from pgflow.ensure_workers()),
14+
0::bigint,
15+
'Function with recent last_invoked_at is NOT returned (debounce active)'
16+
);
17+
18+
-- TEST: Function with last_invoked_at beyond heartbeat_timeout IS returned
19+
update pgflow.worker_functions
20+
set last_invoked_at = now() - interval '7 seconds'
21+
where function_name = 'my-function';
22+
23+
select is(
24+
(select count(*) from pgflow.ensure_workers()),
25+
1::bigint,
26+
'Function with last_invoked_at beyond timeout IS returned'
27+
);
28+
29+
-- TEST: Function with NULL last_invoked_at IS returned
30+
update pgflow.worker_functions
31+
set last_invoked_at = null
32+
where function_name = 'my-function';
33+
34+
select is(
35+
(select count(*) from pgflow.ensure_workers()),
36+
1::bigint,
37+
'Function with NULL last_invoked_at IS returned'
38+
);
39+
40+
-- TEST: Debounce uses heartbeat_timeout_seconds from function config
41+
update pgflow.worker_functions
42+
set heartbeat_timeout_seconds = 3,
43+
last_invoked_at = now() - interval '4 seconds'
44+
where function_name = 'my-function';
45+
46+
select is(
47+
(select count(*) from pgflow.ensure_workers()),
48+
1::bigint,
49+
'Debounce respects function-specific heartbeat_timeout_seconds'
50+
);
51+
52+
select finish();
53+
rollback;

0 commit comments

Comments
 (0)