Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 28 additions & 19 deletions src/postgres/migrations/20251202002136_initial_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1048,18 +1048,6 @@ begin
end;
$$;

-- Advisory lock to serialize await_event and emit_event operations on the same event.
-- This prevents lost wakeups when a waiter is being set up while an emit is happening.
-- Called at the top of await_event and emit_event.
create function durable.lock_event (
p_queue_name text,
p_event_name text
)
returns void
language sql
as $$
select pg_advisory_xact_lock(hashtext(p_queue_name), hashtext(p_event_name));
$$;

-- awaits an event for a given task's run and step name.
-- this will immediately return if it the event has already returned
Expand Down Expand Up @@ -1095,9 +1083,6 @@ begin
raise exception 'event_name must be provided';
end if;

-- Serialize with concurrent emit_event calls on the same event
perform durable.lock_event(p_queue_name, p_event_name);

if p_timeout is not null then
if p_timeout < 0 then
raise exception 'timeout must be non-negative';
Expand All @@ -1122,6 +1107,28 @@ begin
return query select false, v_checkpoint_payload;
return;
end if;
-- Ensure a row exists for this event so we can take a row-level lock.
--
-- We use payload IS NULL as the sentinel for "not emitted yet". emit_event
-- always writes a non-NULL payload (at minimum JSON null).
--
-- Lock ordering is important to avoid deadlocks: await_event locks the event
-- row first (FOR SHARE) and then the run row (FOR UPDATE). emit_event
-- naturally locks the event row via its UPSERT before touching waits/runs.
execute format(
'insert into durable.%I (event_name, payload, emitted_at)
values ($1, null, ''epoch''::timestamptz)
on conflict (event_name) do nothing',
'e_' || p_queue_name
) using p_event_name;

execute format(
'select 1
from durable.%I
where event_name = $1
for share',
'e_' || p_queue_name
) using p_event_name;

-- let's get the run state, any existing event payload and wake event name
execute format(
Expand Down Expand Up @@ -1253,15 +1260,17 @@ begin
raise exception 'event_name must be provided';
end if;

-- Serialize with concurrent await_event calls on the same event
perform durable.lock_event(p_queue_name, p_event_name);

-- Insert the event into the events table (first-writer-wins).
-- Subsequent emits for the same event are no-ops.
-- We use DO UPDATE WHERE payload IS NULL to handle the case where await_event
-- created a placeholder row before emit_event ran.
execute format(
'insert into durable.%I (event_name, payload, emitted_at)
values ($1, $2, $3)
on conflict (event_name) do nothing',
on conflict (event_name) do update
set payload = excluded.payload, emitted_at = excluded.emitted_at
where durable.%I.payload is null',
'e_' || p_queue_name,
'e_' || p_queue_name
) using p_event_name, v_payload, v_now;

Expand Down
Loading