Skip to content

Commit cf8811b

Browse files
committed
feat: add advisory locks to prevent concurrent flow compilation race conditions (#477)
# Add Advisory Locks to Prevent Concurrent Flow Compilation Race Conditions This PR adds transaction-level advisory locks to the `ensure_flow_compiled` function to prevent race conditions during concurrent flow compilation attempts. The implementation: - Generates a deterministic lock key from the flow slug using `hashtext()` - Acquires a transaction-level advisory lock with `pg_advisory_xact_lock(1, v_lock_key)` - Ensures that concurrent compilation attempts for the same flow are properly serialized A comprehensive stress test was added that verifies the locking behavior by: - Creating 50 separate database connections - Attempting to compile the same flow simultaneously - Verifying that exactly one compilation succeeds while the others verify the existing flow - Confirming that only one flow and one step exist in the database after all operations This change prevents potential data corruption or duplicate entries that could occur when multiple workers attempt to compile the same flow simultaneously.
1 parent cfa9a23 commit cf8811b

File tree

5 files changed

+143
-43
lines changed

5 files changed

+143
-43
lines changed

pkgs/core/schemas/0100_function_ensure_flow_compiled.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,18 @@ volatile
1212
set search_path to ''
1313
as $$
1414
DECLARE
15+
v_lock_key int;
1516
v_flow_exists boolean;
1617
v_db_shape jsonb;
1718
v_differences text[];
1819
BEGIN
20+
-- Generate lock key from flow_slug (deterministic hash)
21+
v_lock_key := hashtext(p_flow_slug);
22+
23+
-- Acquire transaction-level advisory lock
24+
-- Serializes concurrent compilation attempts for same flow
25+
PERFORM pg_advisory_xact_lock(1, v_lock_key);
26+
1927
-- 1. Check if flow exists
2028
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
2129
INTO v_flow_exists;
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
-- Modify "ensure_flow_compiled" function
2+
CREATE OR REPLACE FUNCTION "pgflow"."ensure_flow_compiled" ("p_flow_slug" text, "p_shape" jsonb, "p_mode" text DEFAULT 'production') RETURNS jsonb LANGUAGE plpgsql SET "search_path" = '' AS $$
3+
DECLARE
4+
v_lock_key int;
5+
v_flow_exists boolean;
6+
v_db_shape jsonb;
7+
v_differences text[];
8+
BEGIN
9+
-- Generate lock key from flow_slug (deterministic hash)
10+
v_lock_key := hashtext(p_flow_slug);
11+
12+
-- Acquire transaction-level advisory lock
13+
-- Serializes concurrent compilation attempts for same flow
14+
PERFORM pg_advisory_xact_lock(1, v_lock_key);
15+
16+
-- 1. Check if flow exists
17+
SELECT EXISTS(SELECT 1 FROM pgflow.flows AS flow WHERE flow.flow_slug = p_flow_slug)
18+
INTO v_flow_exists;
19+
20+
-- 2. If flow missing: compile (both modes)
21+
IF NOT v_flow_exists THEN
22+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
23+
RETURN jsonb_build_object('status', 'compiled', 'differences', '[]'::jsonb);
24+
END IF;
25+
26+
-- 3. Get current shape from DB
27+
v_db_shape := pgflow._get_flow_shape(p_flow_slug);
28+
29+
-- 4. Compare shapes
30+
v_differences := pgflow._compare_flow_shapes(p_shape, v_db_shape);
31+
32+
-- 5. If shapes match: return verified
33+
IF array_length(v_differences, 1) IS NULL THEN
34+
RETURN jsonb_build_object('status', 'verified', 'differences', '[]'::jsonb);
35+
END IF;
36+
37+
-- 6. Shapes differ - handle by mode
38+
IF p_mode = 'development' THEN
39+
-- Recompile in dev mode: full deletion + fresh compile
40+
PERFORM pgflow.delete_flow_and_data(p_flow_slug);
41+
PERFORM pgflow._create_flow_from_shape(p_flow_slug, p_shape);
42+
RETURN jsonb_build_object('status', 'recompiled', 'differences', to_jsonb(v_differences));
43+
ELSE
44+
-- Fail in production mode
45+
RETURN jsonb_build_object('status', 'mismatch', 'differences', to_jsonb(v_differences));
46+
END IF;
47+
END;
48+
$$;

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
1+
h1:K23lj19qKEUldXvUo2XV8Dg1GIY994aZgpvg8XS0TRE=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -15,3 +15,4 @@ h1:sszJnuW0bvBbhzmEAekpZN/kSi7ga04pjSepmJlgoYY=
1515
20251130012043_pgflow_temp_compilation_utilities.sql h1:Qn7RxYkbFd36hJYhOsuJdrcSlo8itqhmdAQLfmrP9+Y=
1616
20251130012803_pgflow_temp_ensure_flow_compiled.sql h1:RvuDNy53B03P5mzs9JUoVYMA725V6aCVoPSp59Gh9ko=
1717
20251130164844_pgflow_temp_options_in_shape.sql h1:lbMDdu15QiBElTsvl7g0dI7flvyjngK9g68VDnCE0S0=
18+
20251201105311_pgflow_temp_advisory_lock_for_compilation.sql h1:OmRtiaPYjPuq9P87Px2PH06gdKhHZ0Ro6GfjjS0G+Rs=

pkgs/edge-worker/deno.lock

Lines changed: 23 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkgs/edge-worker/tests/integration/flow/compilationAtStartup.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { delay } from '@std/async';
55
import { createFlowWorker } from '../../../src/flow/createFlowWorker.ts';
66
import { createTestPlatformAdapter } from '../_helpers.ts';
77
import type { postgres } from '../../sql.ts';
8+
import postgresLib from 'postgres';
9+
import { integrationConfig } from '../../config.ts';
810

911
// Define a minimal test flow
1012
const TestCompilationFlow = new Flow<{ value: number }>({ slug: 'test_compilation_flow' })
@@ -245,3 +247,63 @@ Deno.test(
245247
}
246248
})
247249
);
250+
251+
Deno.test(
252+
'handles concurrent compilation with advisory locks (stress test)',
253+
withPgNoTransaction(async (sql) => {
254+
await sql`select pgflow_tests.reset_db();`;
255+
256+
const CONCURRENT = 50; // 50 separate connections
257+
const flowSlug = `concurrent_test_${Date.now()}`;
258+
const shape = {
259+
steps: [{ slug: 'step1', stepType: 'single', dependencies: [] }],
260+
};
261+
262+
// Create N SEPARATE connections (critical for true concurrency)
263+
const connections = await Promise.all(
264+
Array(CONCURRENT)
265+
.fill(null)
266+
.map(() => postgresLib(integrationConfig.dbUrl, { prepare: false }))
267+
);
268+
269+
try {
270+
// Fire all compilations simultaneously on separate connections
271+
// Note: Must use conn.json() for proper jsonb parameter passing
272+
const results = await Promise.all(
273+
connections.map((conn) =>
274+
conn`SELECT pgflow.ensure_flow_compiled(
275+
${flowSlug},
276+
${conn.json(shape)},
277+
'production'
278+
) as result`
279+
)
280+
);
281+
282+
// Parse results
283+
const statuses = results.map((r) => r[0].result.status);
284+
const compiled = statuses.filter((s) => s === 'compiled');
285+
const verified = statuses.filter((s) => s === 'verified');
286+
287+
// Assert: exactly 1 compiled, rest verified
288+
assertEquals(compiled.length, 1, 'Exactly 1 should compile');
289+
assertEquals(verified.length, CONCURRENT - 1, 'Rest should verify');
290+
291+
// Assert: exactly 1 flow and 1 step in DB
292+
const [flowCount] = await sql`
293+
SELECT COUNT(*)::int as count FROM pgflow.flows
294+
WHERE flow_slug = ${flowSlug}
295+
`;
296+
const [stepCount] = await sql`
297+
SELECT COUNT(*)::int as count FROM pgflow.steps
298+
WHERE flow_slug = ${flowSlug}
299+
`;
300+
301+
assertEquals(flowCount.count, 1, 'Exactly 1 flow should exist');
302+
assertEquals(stepCount.count, 1, 'Exactly 1 step should exist');
303+
} finally {
304+
// Cleanup
305+
await sql`SELECT pgflow.delete_flow_and_data(${flowSlug})`.catch(() => {});
306+
await Promise.all(connections.map((c) => c.end()));
307+
}
308+
})
309+
);

0 commit comments

Comments
 (0)