Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
fa95207
Oracle: Add CHECK_TIMEOUT
robacourt Mar 10, 2026
169114c
Oracle: Remove where clauses with two identical subqueries
robacourt Mar 12, 2026
1d7d6b8
Oracle: make all queries optimised
robacourt Mar 14, 2026
ba9ea2a
Oracle: Add more variation
robacourt Mar 14, 2026
fd4d58a
Oracle: Allow idential subqueries
robacourt Mar 14, 2026
b57de54
Fix bug caused by typo
robacourt Jan 29, 2026
7c30222
Revert "Fix head-of-line blocking in SLC for subquery shapes via ETS …
robacourt Mar 9, 2026
3e830e0
Remove: sync-service: always route subquery shapes in filter
robacourt Mar 9, 2026
8b349e8
Remove: SLC: remove Materializer.get_all_as_refs
robacourt Mar 10, 2026
b94f6a2
Another PR: Introduce active_conditions wire format for DNF visibility
robacourt Mar 6, 2026
adcf038
Remove: Add debug.md
robacourt Mar 4, 2026
dbb0fa6
Docs: Add docs
robacourt Mar 6, 2026
a1c2b4b
Docs: Document LSN-based move-in splice triggers
robacourt Mar 9, 2026
d065f89
Docs: Add prototype-issues.md
robacourt Mar 9, 2026
b0d0469
Docs: Add filter algorythm
robacourt Mar 10, 2026
2a04043
Docs: Add RFC
robacourt Mar 11, 2026
2bfc7d1
Docs: Add to prototype-issues.md
robacourt Mar 11, 2026
59a2c82
Docs: Update RFC
robacourt Mar 11, 2026
2c5b00a
Docs: Add implimentation plan
robacourt Mar 11, 2026
5c76383
Docs: Add negation plan
robacourt Mar 14, 2026
527a37c
Update client
robacourt Mar 19, 2026
0ba19de
Docs: Add negation.md
robacourt Mar 19, 2026
c127aff
Update sync-server
robacourt Mar 19, 2026
68d947c
Buffer: remove ++
robacourt Mar 19, 2026
ed4414f
Move buffering to buffering state module
robacourt Mar 19, 2026
2e63aa0
Update @spec
robacourt Mar 19, 2026
8ffa636
Move drain queue to steady
robacourt Mar 19, 2026
4c5664f
Return actions
robacourt Mar 19, 2026
f2f07eb
Format where_clause_generator.ex
robacourt Mar 20, 2026
bb86221
Add start_move_in_query as action
robacourt Mar 20, 2026
4fb1feb
Update tests
robacourt Mar 20, 2026
9ad5055
Hide Buffering from consumer
robacourt Mar 22, 2026
5d65b3b
Remove duplication by adding NoSubqueries module
robacourt Mar 22, 2026
b5fe929
Don't acumulate batches
robacourt Mar 22, 2026
868feed
REMOVE: skip hacked Filter test
robacourt Mar 22, 2026
f233f77
Fix warning
robacourt Mar 22, 2026
cf83767
Docs: Add Filter plan
robacourt Mar 23, 2026
2968cc0
Update WhereClause to accept subquery function
robacourt Mar 23, 2026
bf8fd4d
Docs: Updated filter plan to remove hack
robacourt Mar 23, 2026
a093509
Docs: Update filter plan
robacourt Mar 23, 2026
401adc9
Add SubqueryIndex storage and static registration (Stage 1)
robacourt Mar 23, 2026
a182581
Add callback-based subquery evaluation via SubqueryIndex (Stage 2)
robacourt Mar 23, 2026
d4d7b31
Seed and maintain dynamic membership from consumer runtime (Stage 3)
robacourt Mar 23, 2026
991bff4
Use reverse index for subquery routing in Filter (Stage 5)
robacourt Mar 23, 2026
844da63
Remove legacy always-route path, use index for all subquery routing (…
robacourt Mar 23, 2026
5883847
Finish off filter
robacourt Mar 23, 2026
2b29c21
Docs: update filter docs
robacourt Mar 23, 2026
77f92a7
Really finish off filter
robacourt Mar 23, 2026
f85818b
Add ELECTRIC_SUBQUERY_BUFFER_MAX_TRANSACTIONS to limit buffering duri…
robacourt Mar 23, 2026
044fc50
Client: fix tests
robacourt Mar 25, 2026
6671567
Client - format tests
robacourt Mar 25, 2026
4738538
REMOVE: add reviews
robacourt Mar 24, 2026
25faa4d
Consumer redesign part 1
robacourt Mar 24, 2026
e4b6ba9
Introduce ResultStream
robacourt Mar 25, 2026
a85aba2
Support UUID
robacourt Mar 25, 2026
34e5bc5
Refactor Default
robacourt Mar 25, 2026
80d6b04
Fix nil event handler
robacourt Mar 25, 2026
847f0a0
Support truncate
robacourt Mar 26, 2026
203f4fc
Subscribe and unsubscribe to LSN updates
robacourt Mar 26, 2026
0dd4b75
Reduce message passing for NOT IN subquery
robacourt Mar 26, 2026
f74fb29
Fix race condition of missed lsn update with queued moves
robacourt Mar 27, 2026
5129850
Make :materializer_changes synchronous
robacourt Mar 27, 2026
5999217
Get lastest lsn when subscribing to it
robacourt Mar 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/active-conditions-wire-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@core/sync-service': patch
'@core/elixir-client': patch
---

Introduce `active_conditions` wire format for DNF-based visibility tracking. The server now includes `active_conditions` in change headers for shapes with subqueries, and the Elixir client handles position-based tag indexing and disjunctive normal form (DNF) visibility evaluation. This is a backward-compatible protocol addition preparing for OR/NOT support in WHERE clauses.
5 changes: 5 additions & 0 deletions .changeset/wild-walls-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

chore: improve Storage contract to have less coupling on snapshot appends
92 changes: 86 additions & 6 deletions packages/elixir-client/lib/electric/client/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Electric.Client.Message do
txids: [],
op_position: 0,
tags: [],
removed_tags: []
removed_tags: [],
active_conditions: []
]

@type operation :: :insert | :update | :delete
Expand All @@ -29,7 +30,8 @@ defmodule Electric.Client.Message do
txids: txids(),
op_position: non_neg_integer(),
tags: [tag()],
removed_tags: [tag()]
removed_tags: [tag()],
active_conditions: [boolean()]
}

@doc false
Expand All @@ -44,7 +46,8 @@ defmodule Electric.Client.Message do
lsn: Map.get(msg, "lsn", nil),
op_position: Map.get(msg, "op_position", 0),
tags: Map.get(msg, "tags", []),
removed_tags: Map.get(msg, "removed_tags", [])
removed_tags: Map.get(msg, "removed_tags", []),
active_conditions: Map.get(msg, "active_conditions", [])
}
end

Expand Down Expand Up @@ -187,14 +190,22 @@ defmodule Electric.Client.Message do

@enforce_keys [:shape_handle, :offset, :schema]

defstruct [:shape_handle, :offset, :schema, tag_to_keys: %{}, key_data: %{}]
defstruct [
:shape_handle,
:offset,
:schema,
tag_to_keys: %{},
key_data: %{},
disjunct_positions: nil
]

@type t :: %__MODULE__{
shape_handle: Client.shape_handle(),
offset: Offset.t(),
schema: Client.schema(),
tag_to_keys: %{String.t() => MapSet.t(String.t())},
key_data: %{String.t() => %{tags: MapSet.t(String.t()), msg: ChangeMessage.t()}}
tag_to_keys: %{optional(term()) => MapSet.t(String.t())},
key_data: %{optional(String.t()) => map()},
disjunct_positions: [[non_neg_integer()]] | nil
}
end

Expand Down Expand Up @@ -251,6 +262,57 @@ defmodule Electric.Client.Message do
end
end

defmodule MoveInMessage do
@moduledoc """
Represents a move-in event from the server.

Move-in events are sent when the server's subquery filter has changed and
rows may now be included in the shape. The `patterns` field contains position
and hash information that the client uses to update `active_conditions` on
tracked rows.
"""

defstruct [:patterns, :handle, :request_timestamp]

@type pattern :: %{pos: non_neg_integer(), value: String.t()}
@type t :: %__MODULE__{
patterns: [pattern()],
handle: Client.shape_handle(),
request_timestamp: DateTime.t()
}

def from_message(
%{"headers" => %{"event" => "move-in", "patterns" => patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

def from_message(
%{headers: %{event: "move-in", patterns: patterns}},
handle,
request_timestamp
) do
%__MODULE__{
patterns: normalize_patterns(patterns),
handle: handle,
request_timestamp: request_timestamp
}
end

defp normalize_patterns(patterns) do
Enum.map(patterns, fn
%{"pos" => pos, "value" => value} -> %{pos: pos, value: value}
%{pos: _, value: _} = pattern -> pattern
end)
end
end

defguard is_insert(msg) when is_struct(msg, ChangeMessage) and msg.headers.operation == :insert

def parse(%{"value" => _} = msg, shape_handle, value_mapper_fun, request_timestamp) do
Expand Down Expand Up @@ -288,6 +350,24 @@ defmodule Electric.Client.Message do
[MoveOutMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(
%{"headers" => %{"event" => "move-in"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse(
%{headers: %{event: "move-in"}} = msg,
shape_handle,
_value_mapper_fun,
request_timestamp
) do
[MoveInMessage.from_message(msg, shape_handle, request_timestamp)]
end

def parse("", _handle, _value_mapper_fun, _request_timestamp) do
[]
end
Expand Down
32 changes: 29 additions & 3 deletions packages/elixir-client/lib/electric/client/poll.ex
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,21 @@ defmodule Electric.Client.Poll do
end

defp handle_message(%Message.ChangeMessage{} = msg, state) do
{tag_to_keys, key_data} =
TagTracker.update_tag_index(state.tag_to_keys, state.key_data, msg)
{tag_to_keys, key_data, disjunct_positions} =
TagTracker.update_tag_index(
state.tag_to_keys,
state.key_data,
state.disjunct_positions,
msg
)

{:message, msg, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
{:message, msg,
%{
state
| tag_to_keys: tag_to_keys,
key_data: key_data,
disjunct_positions: disjunct_positions
}}
end

defp handle_message(
Expand All @@ -248,13 +259,28 @@ defmodule Electric.Client.Poll do
TagTracker.generate_synthetic_deletes(
state.tag_to_keys,
state.key_data,
state.disjunct_positions,
patterns,
request_timestamp
)

{:messages, synthetic_deletes, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
end

defp handle_message(
%Message.MoveInMessage{patterns: patterns},
state
) do
{tag_to_keys, key_data} =
TagTracker.handle_move_in(
state.tag_to_keys,
state.key_data,
patterns
)

{:skip, %{state | tag_to_keys: tag_to_keys, key_data: key_data}}
end

defp handle_schema(%Fetch.Response{schema: schema}, client, %{value_mapper_fun: nil} = state)
when is_map(schema) do
{parser_module, parser_opts} = client.parser
Expand Down
11 changes: 8 additions & 3 deletions packages/elixir-client/lib/electric/client/shape_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ defmodule Electric.Client.ShapeState do
tag_to_keys: %{},
key_data: %{},
stale_cache_retry_count: 0,
disjunct_positions: nil,
recent_requests: [],
fast_loop_consecutive_count: 0
]
Expand All @@ -59,6 +60,7 @@ defmodule Electric.Client.ShapeState do
up_to_date?: boolean(),
tag_to_keys: %{optional(term()) => MapSet.t()},
key_data: %{optional(term()) => %{tags: MapSet.t(), msg: term()}},
disjunct_positions: [[non_neg_integer()]] | nil,
stale_cache_buster: String.t() | nil,
stale_cache_retry_count: non_neg_integer(),
recent_requests: [{integer(), Offset.t()}],
Expand Down Expand Up @@ -95,7 +97,8 @@ defmodule Electric.Client.ShapeState do
schema: resume.schema,
up_to_date?: true,
tag_to_keys: Map.get(resume, :tag_to_keys, %{}),
key_data: Map.get(resume, :key_data, %{})
key_data: Map.get(resume, :key_data, %{}),
disjunct_positions: Map.get(resume, :disjunct_positions)
}
end

Expand All @@ -116,7 +119,8 @@ defmodule Electric.Client.ShapeState do
tag_to_keys: %{},
key_data: %{},
recent_requests: [],
fast_loop_consecutive_count: 0
fast_loop_consecutive_count: 0,
disjunct_positions: nil
}
end

Expand All @@ -130,7 +134,8 @@ defmodule Electric.Client.ShapeState do
offset: state.offset,
schema: state.schema,
tag_to_keys: state.tag_to_keys,
key_data: state.key_data
key_data: state.key_data,
disjunct_positions: state.disjunct_positions
}
end

Expand Down
Loading