Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nervous-brooms-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix storage race condition when deleting shape during a live poll request
16 changes: 16 additions & 0 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,22 @@ defmodule Electric.Shapes.Api do
{^shape_handle, latest_log_offset} when is_log_offset_lt(last_offset, latest_log_offset) ->
send(self(), {ref, :new_changes, latest_log_offset})

{^shape_handle, _latest_log_offset} ->
# Fix for issue #3760: Handle offset regression during shape invalidation.
#
# This case handles a race condition where:
# 1. Shape invalidation spawns an async cleanup task
# 2. Between consumer termination and async cleanup completion, the writer
# ETS gets deleted while ShapeStatus retains the shape entry
# 3. A pending API request queries the shape
# 4. Validation succeeds (shape exists in ShapeStatus), but metadata reading
# fails, causing resolve_shape_handle to return LogOffset.last_before_real_offsets()
Copy link
Copy Markdown
Contributor

@msfstef msfstef Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My agent had found this race as well but I had disliked this solution - what is it that is actually returning last_before_real_offsets(), and should it be returning something else instead rather than having this as a default? The default should perhaps be set when the shape is created, but if the shape is deleted we shouldn't fall back to the default

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the result of a race condition and we don't actually know that the shape is deleted at the point where we return this default offset, which is what you get with no ets table and no metadata files. Changing that default would break a lot of things. This feels like a pragmatic solution to the problem to me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough - I'd prefer it if we somehow avoided this "default offset" being returned and handling the missing offset instead but I'm not sure how deep that rabbit hole is

# 5. This offset is less than the client's stored offset (offset regression)
#
# When the offset goes backwards, the shape has effectively been invalidated
# and the client should refetch from the beginning.
send(self(), {ref, :shape_rotation})

{other_shape_handle, _} when other_shape_handle != shape_handle ->
send(self(), {ref, :shape_rotation, other_shape_handle})

Expand Down
65 changes: 65 additions & 0 deletions packages/sync-service/test/electric/shapes/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,6 +1272,71 @@ defmodule Electric.Shapes.ApiTest do
assert [%{headers: %{control: "must-refetch"}}] = response_body(response)
end

@tag long_poll_timeout: 100
test "handles shape offset regression during live polling (issue #3760)", ctx do
# This test reproduces the CaseClauseError described in issue #3760.
# The bug occurs when:
# 1. A live request is waiting for changes
# 2. Shape invalidation spawns an async cleanup task
# 3. Between consumer termination and async cleanup completion,
# the writer ETS gets deleted while ShapeStatus retains the shape entry
# 4. When notify_changes_since_request_start/1 calls resolve_shape_handle,
# it returns {shape_handle, LogOffset.last_before_real_offsets()} as a fallback
# 5. This offset is *less than* the client's stored offset, causing a
# CaseClauseError because no case clause handles offset regression

regressed_offset = LogOffset.last_before_real_offsets()

expect_shape_cache(
# First call during validation - returns valid offset
resolve_shape_handle: fn @test_shape_handle, @test_shape, _stack_id ->
{@test_shape_handle, @test_offset}
end,
# Second call during notify_changes_since_request_start - simulates
# the race condition where shape metadata was partially cleaned up,
# causing offset to regress to last_before_real_offsets()
resolve_shape_handle: fn @test_shape_handle, @test_shape, _stack_id ->
{@test_shape_handle, regressed_offset}
end
)

patch_shape_cache(
has_shape?: fn @test_shape_handle, _opts -> true end,
await_snapshot_start: fn @test_shape_handle, _ -> :started end
)

patch_storage(
for_shape: fn @test_shape_handle, _opts -> @test_opts end,
get_chunk_end_log_offset: fn _, @test_opts -> nil end,
get_log_stream: fn @test_offset, _, @test_opts -> [] end
)

# The flow is:
# 1. validate() calls resolve_shape_handle (first expectation - returns valid offset)
# 2. serve_shape_response() -> handle_live_request() -> notify_changes_since_request_start()
# 3. notify_changes_since_request_start() calls resolve_shape_handle (second expectation)
# 4. Second call returns regressed offset (last_before_real_offsets)
# 5. The fix detects offset regression and sends :shape_rotation message
# 6. hold_until_change() receives :shape_rotation and returns 409
assert {:ok, request} =
Api.validate(
ctx.api,
%{
table: "public.users",
offset: "#{@test_offset}",
handle: @test_shape_handle,
live: true
}
)

response = Api.serve_shape_response(request)

# With the fix in place, offset regression should be treated as shape rotation
# and return a 409 with must-refetch control header
assert response.status == 409
assert [%{headers: %{control: "must-refetch"}}] = response_body(response)
end

@tag long_poll_timeout: 100
test "sends an up-to-date response after a timeout if no changes are observed", ctx do
patch_shape_cache(
Expand Down