Skip to content

Eagerly initialize last_id to avoid skipping messages#78

Merged
npezza93 merged 2 commits intorails:mainfrom
xrav3nz:kyle/eager-last-id
Mar 17, 2026
Merged

Eagerly initialize last_id to avoid skipping messages#78
npezza93 merged 2 commits intorails:mainfrom
xrav3nz:kyle/eager-last-id

Conversation

@xrav3nz
Copy link
Contributor

@xrav3nz xrav3nz commented Mar 4, 2026

What does this do?

This change eagerly initializes @last_id to avoid skipping messages.

Why?

Currently, @last_id is lazily initialized as MAX(id) on the listener thread's first call to broadcast_messages.

def last_id
@last_id ||= last_message_id
end

However, if a broadcast inserts a message between a subscription and that first call, MAX(id) returns a value that includes that inserted message message, causing the query WHERE id > last_id to miss it entirely.

For example:

Main thread                           Listener thread
──────────                            ───────────────
subscribe()
  Listener.new → starts thread →      thread starts, enters listen loop
  add_channel:
    channels["test"] = MAX(id) = 5
    event_loop.post(on_success)
                                      interruptible { executor.run! }
on_success fires → subscribed.set
subscribed.wait returns
                                      ↑ still hasn't reached broadcast_messages
broadcast("hello") → inserts id=6
                                      broadcast_messages:
                                        @last_id ||= MAX(id) → 6  ← INCLUDES THE MESSAGE
                                        broadcastable(["test"], 6)
                                          → WHERE id > 6 → nothing returned!
                                        message 6 is SKIPPED

The per-channel cursor (channels["test"] = 5) doesn't help, because it's only checked as a secondary filter inside the loop over query results, which already excluded the message.

::SolidCable::Message.
broadcastable(current_channels.keys, last_id).
each do |message|
should_broadcast_message = false
channels.compute_if_present(message.channel) do |channel_last_id|
break if channel_last_id >= message.id
should_broadcast_message = true
message.id
end
broadcast(message.channel, message.payload) if should_broadcast_message
self.last_id = message.id
end
end

How to repro?

This is evident in the existing test suite, where sleep is required after the first subscribe to give the listener thread time to complete the first poll cycle and evaluate @last_id to a pre-broadcast max.

Removing this sleep causes tests to fail without this change.

What do I NOT like about this change?

There is now a DB read in the constructor... Though:

  • Listeners are already lazily initialized so this is not a boot-time issue
  • DB connection is ready: both the listener thread & #subscribe call perform DB read/writes immediately

Thanks and open to suggestions!

Currently, `@last_id` is lazily initialized as `MAX(id)` on the listener
thread's first call to `broadcast_messages`.

However, if a broadcast inserts a message _between_ a subscription _and_
that first call, `MAX(id)` returns a value that includes that inserted
message message, causing the query `WHERE id > last_id` to miss it
entirely.

For example:

```
Main thread                           Listener thread
──────────                            ───────────────
subscribe()
  Listener.new → starts thread →      thread starts, enters listen loop
  add_channel:
    channels["test"] = MAX(id) = 5
    event_loop.post(on_success)
                                      interruptible { executor.run! }
on_success fires → subscribed.set
subscribed.wait returns
                                      ↑ still hasn't reached broadcast_messages
broadcast("hello") → inserts id=6
                                      broadcast_messages:
                                        @last_id ||= MAX(id) → 6  ← INCLUDES THE MESSAGE
                                        broadcastable(["test"], 6)
                                          → WHERE id > 6 → nothing returned!
                                        message 6 is SKIPPED
```

The per-channel cursor (`channels["test"] = 5`) doesn't help, because
it's only checked as a secondary filter inside the loop over query
results, which already excluded the message.

**How to repro?**

This is also evident in the existing test suite, where `sleep` is
required after the first `subscribe` to give the listener thread time to
complete the first poll cycle and evaluate `@last_id` to a pre-broadcast
max. Removing that `sleep` causes tests to fail without this change.

**What does this do?**

This change eagerly initializes `@last_id` to avoid missing messages.

**What do I NOT like about this change?**

There is now a DB read in the constructor... The good things are:

- Listeners are already lazily initialized so this is not a boot-time
  issue
- DB connection is ready: both the listener thread & `#subscribe` call
  perform DB read/writes immediately

---

Thanks and open to suggestions!
Comment on lines +178 to +179
10.times do |i|
test "#broadcast immediately after #subscribe does not skip messages - iteration #{i}" do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't think of a better way to add a regression test for the race condition. This intentionally does NOT use subscribe_as_queue, to make the whole process as transparent as possible. Happy to change / remove if it feels redundant.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea lets go ahead and remove this i think the removal of line 215 should cover us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed and thanks!

@npezza93
Copy link
Collaborator

Thanks for this. Ill take a look this weekend

Copy link
Collaborator

@npezza93 npezza93 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise looks good to me

Comment on lines +178 to +179
10.times do |i|
test "#broadcast immediately after #subscribe does not skip messages - iteration #{i}" do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea lets go ahead and remove this i think the removal of line 215 should cover us.

@npezza93 npezza93 merged commit 92818a0 into rails:main Mar 17, 2026
13 checks passed
@xrav3nz xrav3nz deleted the kyle/eager-last-id branch March 18, 2026 21:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants