Skip to content

Commit a1e0fc1

Browse files
committed
Subscribe and unsubscribe to LSN updates
1 parent 847f0a0 commit a1e0fc1

6 files changed

Lines changed: 44 additions & 13 deletions

File tree

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ defmodule Electric.Shapes.Consumer do
1717
alias Electric.Shapes.Consumer.Materializer
1818
alias Electric.Shapes.ConsumerRegistry
1919
alias Electric.LogItems
20-
alias Electric.LsnTracker
20+
2121
alias Electric.Postgres.Inspector
2222
alias Electric.Replication.Changes
2323
alias Electric.Replication.Changes.Transaction
@@ -126,7 +126,6 @@ defmodule Electric.Shapes.Consumer do
126126
metadata = [shape_handle: shape_handle, stack_id: stack_id]
127127
Logger.metadata(metadata)
128128
Electric.Telemetry.Sentry.set_tags_context(metadata)
129-
{:ok, _} = LsnTracker.subscribe_to_global_lsn_updates(stack_id)
130129

131130
# Shape initialization will be complete when we receive a message {:initialize_shape,
132131
# <shape>, <shape_opts>} which the ShapeCache is expected to send as soon as this process

packages/sync-service/lib/electric/shapes/consumer/effect.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,14 @@ defmodule Electric.Shapes.Consumer.Effect do
55
@moduledoc false
66
defstruct []
77
end
8+
9+
defmodule SubscribeGlobalLsn do
10+
@moduledoc false
11+
defstruct []
12+
end
13+
14+
defmodule UnsubscribeGlobalLsn do
15+
@moduledoc false
16+
defstruct []
17+
end
818
end

packages/sync-service/lib/electric/shapes/consumer/event_handler/subqueries/buffering.ex

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
2222
:trigger_dep_index,
2323
:move_in_values,
2424
:views_before_move,
25-
:views_after_move,
26-
:latest_seen_lsn
25+
:views_after_move
2726
]
2827
defstruct [
2928
:shape,
@@ -98,7 +97,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
9897
views_before_move: state.views,
9998
views_after_move: views_after,
10099
dependency_handle_to_ref: state.dependency_handle_to_ref,
101-
latest_seen_lsn: state.latest_seen_lsn,
102100
queue: queue,
103101
buffer_max_transactions: state.buffer_max_transactions
104102
}
@@ -239,7 +237,11 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
239237
txns -> txns |> List.last() |> Map.fetch!(:last_log_offset)
240238
end
241239

242-
plan = %Plan{log_ops: log_ops, ack_source_offset: ack_offset}
240+
plan = %Plan{
241+
log_ops: log_ops,
242+
effects: [%Electric.Shapes.Consumer.Effect.UnsubscribeGlobalLsn{}],
243+
ack_source_offset: ack_offset
244+
}
243245

244246
# Transition back to steady state, then drain any queued moves
245247
state
@@ -280,7 +282,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Buffering do
280282
dnf_plan: state.dnf_plan,
281283
views: state.views_after_move,
282284
dependency_handle_to_ref: state.dependency_handle_to_ref,
283-
latest_seen_lsn: state.latest_seen_lsn,
284285
queue: state.queue,
285286
buffer_max_transactions: state.buffer_max_transactions
286287
}

packages/sync-service/lib/electric/shapes/consumer/event_handler/subqueries/steady.ex

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
1919
:dnf_plan,
2020
views: %{},
2121
dependency_handle_to_ref: %{},
22-
latest_seen_lsn: nil,
2322
queue: MoveQueue.new(),
2423
buffer_max_transactions: 1000
2524
]
@@ -31,7 +30,6 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
3130
dnf_plan: DnfPlan.t(),
3231
views: %{[String.t()] => MapSet.t()},
3332
dependency_handle_to_ref: %{String.t() => {non_neg_integer(), [String.t()]}},
34-
latest_seen_lsn: Electric.Postgres.Lsn.t() | nil,
3533
queue: MoveQueue.t(),
3634
buffer_max_transactions: pos_integer()
3735
}
@@ -55,8 +53,9 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
5553
end
5654
end
5755

58-
def handle_event(state, {:global_last_seen_lsn, lsn}) do
59-
{:ok, %{state | latest_seen_lsn: Subqueries.normalize_global_lsn(lsn)}, %Plan{}}
56+
def handle_event(state, {:global_last_seen_lsn, _lsn}) do
57+
# Straggler message after unsubscribe; ignore.
58+
{:ok, state, %Plan{}}
6059
end
6160

6261
def handle_event(state, {:materializer_changes, dep_handle, payload}) do
@@ -126,7 +125,12 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
126125

127126
plan = %{
128127
plan
129-
| effects: plan.effects ++ [%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}]
128+
| effects:
129+
plan.effects ++
130+
[
131+
%Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{},
132+
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
133+
]
130134
}
131135

132136
{:ok, buffering, plan}
@@ -150,7 +154,12 @@ defmodule Electric.Shapes.Consumer.EventHandler.Subqueries.Steady do
150154

151155
plan = %{
152156
plan
153-
| effects: plan.effects ++ [%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}]
157+
| effects:
158+
plan.effects ++
159+
[
160+
%Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{},
161+
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
162+
]
154163
}
155164

156165
{:ok, buffering, plan}

packages/sync-service/lib/electric/shapes/consumer/plan.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ defmodule Electric.Shapes.Consumer.Plan do
1616

1717
@type effect() ::
1818
%Electric.Shapes.Consumer.Effect.StartMoveInQuery{}
19+
| %Electric.Shapes.Consumer.Effect.SubscribeGlobalLsn{}
20+
| %Electric.Shapes.Consumer.Effect.UnsubscribeGlobalLsn{}
1921
end

packages/sync-service/lib/electric/shapes/consumer/plan_executor.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ defmodule Electric.Shapes.Consumer.PlanExecutor do
123123
acc
124124
end
125125

126+
defp execute_effect(%Effect.SubscribeGlobalLsn{}, acc) do
127+
{:ok, _} = Electric.LsnTracker.subscribe_to_global_lsn_updates(acc.state.stack_id)
128+
acc
129+
end
130+
131+
defp execute_effect(%Effect.UnsubscribeGlobalLsn{}, acc) do
132+
:ok = Electric.LsnTracker.unsubscribe_from_global_lsn_updates(acc.state.stack_id)
133+
acc
134+
end
135+
126136
# -- Ack --
127137

128138
defp apply_ack(acc, nil), do: acc

0 commit comments

Comments
 (0)