From 3c078037599239e3716a0e3064fde87bbf749d88 Mon Sep 17 00:00:00 2001 From: Kyle Date: Wed, 4 Mar 2026 15:52:50 -0500 Subject: [PATCH 1/2] Eagerly initialize last_id to avoid missed messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, `@last_id` is lazily initialized as `MAX(id)` on the listener thread's first call to `broadcast_messages`. However, if a broadcast inserts a message _between_ a subscription _and_ that first call, `MAX(id)` returns a value that includes that inserted message message, causing the query `WHERE id > last_id` to miss it entirely. For example: ``` Main thread Listener thread ────────── ─────────────── subscribe() Listener.new → starts thread → thread starts, enters listen loop add_channel: channels["test"] = MAX(id) = 5 event_loop.post(on_success) interruptible { executor.run! } on_success fires → subscribed.set subscribed.wait returns ↑ still hasn't reached broadcast_messages broadcast("hello") → inserts id=6 broadcast_messages: @last_id ||= MAX(id) → 6 ← INCLUDES THE MESSAGE broadcastable(["test"], 6) → WHERE id > 6 → nothing returned! message 6 is SKIPPED ``` The per-channel cursor (`channels["test"] = 5`) doesn't help, because it's only checked as a secondary filter inside the loop over query results, which already excluded the message. **How to repro?** This is also evident in the existing test suite, where `sleep` is required after the first `subscribe` to give the listener thread time to complete the first poll cycle and evaluate `@last_id` to a pre-broadcast max. Removing that `sleep` causes tests to fail without this change. **What does this do?** This change eagerly initializes `@last_id` to avoid missing messages. **What do I NOT like about this change?** There is now a DB read in the constructor... The good things are: - Listeners are already lazily initialized so this is not a boot-time issue - DB connection is ready: both the listener thread & `#subscribe` call perform DB read/writes immediately --- Thanks and open to suggestions! --- .../subscription_adapter/solid_cable.rb | 8 ++---- .../subscription_adapter/solid_cable_test.rb | 25 ++++++++++++++++++- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/lib/action_cable/subscription_adapter/solid_cable.rb b/lib/action_cable/subscription_adapter/solid_cable.rb index 2c476e9..15d3d02 100644 --- a/lib/action_cable/subscription_adapter/solid_cable.rb +++ b/lib/action_cable/subscription_adapter/solid_cable.rb @@ -53,6 +53,7 @@ def initialize(event_loop) @critical = Concurrent::Semaphore.new(0) @reconnect_attempt = 0 + @last_id = last_message_id @thread = Thread.new do Thread.current.name = "solid_cable_listener" @@ -113,12 +114,7 @@ def invoke_callback(*) private attr_reader :event_loop, :thread - attr_writer :last_id - attr_accessor :reconnect_attempt - - def last_id - @last_id ||= last_message_id - end + attr_accessor :last_id, :reconnect_attempt def last_message_id ::SolidCable::Message.maximum(:id) || 0 diff --git a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb index 73c2bf9..270165c 100644 --- a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb +++ b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb @@ -175,6 +175,30 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase end end + 10.times do |i| + test "#broadcast immediately after #subscribe does not skip messages - iteration #{i}" do + subscribed = Concurrent::Event.new + message_received = Concurrent::Event.new + message = nil + + subscription_callback = -> { subscribed.set } + message_callback = ->(msg) { + message = msg + message_received.set + } + + @rx_adapter.subscribe("channel", message_callback, subscription_callback) + assert subscribed.wait(WAIT_WHEN_EXPECTING_EVENT), + "Timed out waiting for subscription" + + @tx_adapter.broadcast("channel", "don't skip me") + assert message_received.wait(WAIT_WHEN_EXPECTING_EVENT), "Message skipped" + assert_equal "don't skip me", message + ensure + @rx_adapter.unsubscribe("channel", message_callback) + end + end + test "retries after a connection failure and keeps listening" do with_cable_config reconnect_attempts: [0] do raised = false @@ -212,7 +236,6 @@ def subscribe_as_queue(channel, adapter = @rx_adapter) subscribed = Concurrent::Event.new adapter.subscribe(channel, callback, proc { subscribed.set }) subscribed.wait(WAIT_WHEN_EXPECTING_EVENT) - sleep WAIT_WHEN_EXPECTING_EVENT assert_predicate subscribed, :set? yield queue From 40a9289894b87f37c860533ac4cf5c17ea478d06 Mon Sep 17 00:00:00 2001 From: Kyle Date: Tue, 17 Mar 2026 11:54:56 -0400 Subject: [PATCH 2/2] Remove redundant test --- .../subscription_adapter/solid_cable_test.rb | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb index 270165c..dac6e2d 100644 --- a/test/lib/action_cable/subscription_adapter/solid_cable_test.rb +++ b/test/lib/action_cable/subscription_adapter/solid_cable_test.rb @@ -175,30 +175,6 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase end end - 10.times do |i| - test "#broadcast immediately after #subscribe does not skip messages - iteration #{i}" do - subscribed = Concurrent::Event.new - message_received = Concurrent::Event.new - message = nil - - subscription_callback = -> { subscribed.set } - message_callback = ->(msg) { - message = msg - message_received.set - } - - @rx_adapter.subscribe("channel", message_callback, subscription_callback) - assert subscribed.wait(WAIT_WHEN_EXPECTING_EVENT), - "Timed out waiting for subscription" - - @tx_adapter.broadcast("channel", "don't skip me") - assert message_received.wait(WAIT_WHEN_EXPECTING_EVENT), "Message skipped" - assert_equal "don't skip me", message - ensure - @rx_adapter.unsubscribe("channel", message_callback) - end - end - test "retries after a connection failure and keeps listening" do with_cable_config reconnect_attempts: [0] do raised = false