From 4c98497967428519cfa984c8412a58ef0f4dbaf0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 29 May 2026 23:51:56 +0900 Subject: [PATCH] Add Async::Cable::Executor. A fiber-based replacement for `ActionCable::Server::ThreadedExecutor` that mirrors its small interface (`#post`, `#timer`, `#shutdown`) without bouncing every call through a Concurrent::ThreadPoolExecutor. - `#post` is implemented as `Async { task.call }`: inside a reactor this spawns a fire-and-forget child task on the caller's reactor; outside a reactor it opens a transient reactor and runs the block to completion. The latter is a slow path but rare in practice. - `#timer` always runs on a dedicated reactor thread owned by the executor so the recurring task's lifetime is decoupled from any individual request reactor. Returns a handle that responds to `#shutdown`, matching the `Concurrent::TimerTask` surface that callers (e.g. `Channel::PeriodicTimers`) already use. The cancel is routed back through the executor's inbox so callers don't need to be inside a reactor to shut down a timer. - `#shutdown` stops the dedicated reactor thread (if started). Not auto-installed: this commit only introduces the class. Assisted-By: devx/edbcc7bd-6e27-4da8-bd2e-43c275487564 --- async-cable.gemspec | 4 +- guides/getting-started/readme.md | 2 +- lib/async/cable/executor.rb | 176 +++++++++++++++++++++++++++++++ readme.md | 8 +- releases.md | 1 + test/async/cable/executor.rb | 130 +++++++++++++++++++++++ 6 files changed, 317 insertions(+), 4 deletions(-) create mode 100644 lib/async/cable/executor.rb create mode 100644 test/async/cable/executor.rb diff --git a/async-cable.gemspec b/async-cable.gemspec index 78a3209..84c80a9 100644 --- a/async-cable.gemspec +++ b/async-cable.gemspec @@ -25,8 +25,8 @@ Gem::Specification.new do |spec| spec.required_ruby_version = ">= 3.3" # Requires the `ActionCable::Server::Socket` abstraction introduced by - # https://github.com/rails/rails/pull/50979 (Rails 8.1+). - spec.add_dependency "actioncable", ">= 8.1.0.alpha" + # https://github.com/rails/rails/pull/50979 (currently Rails main). + spec.add_dependency "actioncable", ">= 8.2.0.alpha" spec.add_dependency "async", "~> 2.9" spec.add_dependency "async-websocket" end diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index ba9f9df..3852457 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -15,7 +15,7 @@ $ bundle add async-cable To use `async-cable`, you need to add the following to your `config/application.rb`: ~~~ ruby -require 'async/cable' +require "async/cable" ~~~ This will automatically add the {ruby Async::Cable::Middleware} to your middleware stack which will handle incoming WebSocket connections and integrates with Action Cable. diff --git a/lib/async/cable/executor.rb b/lib/async/cable/executor.rb new file mode 100644 index 0000000..e1f503f --- /dev/null +++ b/lib/async/cable/executor.rb @@ -0,0 +1,176 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "async" + +module Async + module Cable + # Fiber-based replacement for `ActionCable::Server::ThreadedExecutor`. + # + # Action Cable uses an `#executor` to dispatch internal async work + # (pub/sub callback invocations, heartbeat timers, periodic channel + # timers) and broadcasts a small interface: `#post`, `#timer`, + # `#shutdown`. Stock Rails backs this with a + # {Concurrent::ThreadPoolExecutor}; under a fiber-scheduler-aware + # server like Falcon every `#post` then bounces through an OS thread + # unnecessarily. + # + # This executor instead spawns Async tasks. Tasks posted from inside + # a reactor run on the caller's reactor (no thread hop). Tasks + # posted or scheduled from outside a reactor run on a dedicated + # reactor thread owned by the executor. + class Executor + # Create a new executor. The dedicated reactor thread is started + # lazily on first use that needs it (timers, or `#post` from + # outside a reactor). + def initialize + @mutex = ::Thread::Mutex.new + @inbox = nil + @thread = nil + end + + # Run the given callable asynchronously. When called from inside + # a reactor this spawns a fire-and-forget child task on the + # current reactor; when called from outside a reactor this routes + # the task to the executor's dedicated reactor thread. The return + # value is the executor (matching + # `ActionCable::Server::ThreadedExecutor#post`). + # @parameter task [#call, nil] Callable to run; if nil, the block is used. + def post(task = nil, &block) + block ||= task + + if current = ::Async::Task.current? + current.async{block.call} + else + inbox.push(proc{block.call}) + end + + return self + end + + # Schedule a recurring timer. When called from inside a reactor + # this spawns a child task on the current reactor; when called + # from outside a reactor this routes the timer to the executor's + # dedicated reactor thread. + # @parameter interval [Numeric] Seconds between invocations. + # @returns [Timer] A handle that responds to `#shutdown`. + def timer(interval, &block) + timer = Timer.new + + if current = ::Async::Task.current? + timer.task = current.async do |inner| + run_timer(inner, interval, block) + end + + return timer + end + + inbox = timer.inbox = self.inbox + begin + operation = proc do |task| + timer.task = task.async do |inner| + run_timer(inner, interval, block) + end + end + + inbox.push(operation) + rescue ::ClosedQueueError + # Executor is shutting down; match the best-effort + # behaviour of posting work during shutdown. + end + + return timer + end + + # Stop the dedicated reactor thread (if any). Tasks posted to + # the caller's reactor via `#post` are unaffected; their + # lifetime is owned by the calling reactor. + def shutdown + @mutex.synchronize do + return unless @thread + @inbox.close + @thread.join + @thread = nil + @inbox = nil + end + end + + # Handle returned from `#timer`. Wraps the underlying + # `Async::Task` and exposes a thread-safe `#shutdown` matching + # the `Concurrent::TimerTask` interface that callers expect. + # Timers running on the dedicated reactor are cancelled through + # the executor's inbox; timers running on the caller's reactor + # are cancelled directly. + class Timer + attr_writer :inbox + + # Initialize an empty timer handle. + def initialize + @inbox = nil + @mutex = ::Thread::Mutex.new + @task = nil + end + + # Set the underlying task. Called by the executor thread + # once the timer has been scheduled. + def task=(task) + @mutex.synchronize{@task = task} + end + + # Cancel the timer. Idempotent; safe to call from any thread + # or fiber. + def shutdown + task = nil + + @mutex.synchronize do + task = @task + @task = nil + end + return unless task + + if inbox = @inbox + begin + inbox.push(proc{task.stop}) + rescue ::ClosedQueueError + # Executor already shut down; the timer task was + # stopped along with its parent reactor. + end + else + task.stop + end + end + end + + private + + def inbox + @inbox || @mutex.synchronize{@inbox ||= start_thread} + end + + def run_timer(task, interval, block) + loop do + task.sleep(interval) + block.call + end + end + + def start_thread + inbox = ::Thread::Queue.new + + @thread = ::Thread.new do + ::Thread.current.name = "async-cable executor" + + Sync do |task| + while operation = inbox.pop + operation.call(task) + end + end + end + + return inbox + end + end + end +end diff --git a/readme.md b/readme.md index a5c8884..d1e96b5 100644 --- a/readme.md +++ b/readme.md @@ -2,7 +2,13 @@ This is a proof-of-concept adapter for Action Cable. -The `next` branch tracks Rails `main` and relies on the `ActionCable::Server::Socket` abstraction introduced by [rails/rails#50979](https://github.com/rails/rails/pull/50979) (Rails 8.1+). For stable Rails (≤ 8.0), use the `main` branch, which depends on [`actioncable-next`](https://github.com/anycable/actioncable-next). +The `next` branch tracks Rails `main` and relies on the `ActionCable::Server::Socket` abstraction introduced by [rails/rails#50979](https://github.com/rails/rails/pull/50979). For stable Rails (≤ 8.1), use the `main` branch, which depends on [`actioncable-next`](https://github.com/anycable/actioncable-next). + +## Rails Compatibility + +This branch requires unreleased Action Cable changes from Rails `main`, currently versioned as `8.2.0.alpha`. Released Rails 8.1.x does not include `ActionCable::Server::Socket`; in Rails 8.1, `ActionCable::Connection::Base` still accepts `(server, env, coder: ...)` rather than `(server, socket)`. + +The gemspec therefore pins `actioncable >= 8.2.0.alpha` to prevent accidentally resolving against Rails 8.1.x. Once Rails ships a stable release containing rails/rails#50979, this constraint should be changed to that released version. [![Development Status](https://github.com/socketry/async-cable/workflows/Test/badge.svg)](https://github.com/socketry/async-cable/actions?workflow=Test) diff --git a/releases.md b/releases.md index fe6c5db..759019d 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - Add {ruby Async::Cable::Socket#raw_transmit} for pushing pre-encoded payloads to the client without re-encoding. Enables "fastlane" broadcasts that encode the message once and share it across many connections. + - Add {ruby Async::Cable::Executor}, a fiber-based replacement for `ActionCable::Server::ThreadedExecutor`. Tasks posted from inside a reactor run on the caller's reactor (no thread hop); tasks posted from outside, and all recurring timers, run on a dedicated reactor thread owned by the executor. ## v0.3.0 diff --git a/test/async/cable/executor.rb b/test/async/cable/executor.rb new file mode 100644 index 0000000..00282b8 --- /dev/null +++ b/test/async/cable/executor.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Samuel Williams. + +require "async/cable/executor" +require "sus/fixtures/async" + +describe Async::Cable::Executor do + let(:executor) {subject.new} + + with "#post" do + it "runs the block when called from outside a reactor" do + completed = ::Thread::Queue.new + executor.post{completed.push(true)} + expect(completed.pop).to be == true + ensure + executor.shutdown + end + + it "returns immediately when called from outside a reactor" do + completed = ::Thread::Queue.new + + executor.post do + sleep 0.05 + completed.push(true) + end + + expect(completed).to be(:empty?) + expect(completed.pop).to be == true + ensure + executor.shutdown + end + + it "accepts a positional callable as well as a block" do + completed = ::Thread::Queue.new + executor.post(->{completed.push(:ok)}) + expect(completed.pop).to be == :ok + ensure + executor.shutdown + end + + with "called from inside a reactor" do + include Sus::Fixtures::Async::ReactorContext + + it "runs the block on the caller's reactor with no thread hop" do + caller_thread = ::Thread.current + completed = ::Thread::Queue.new + + executor.post do + completed.push(::Thread.current) + end + + expect(completed.pop).to be == caller_thread + ensure + executor.shutdown + end + end + end + + with "#timer" do + it "runs the block at the configured interval" do + ticks = ::Thread::Queue.new + timer = executor.timer(0.01){ticks.push(true)} + + 3.times{ticks.pop} + + timer.shutdown + ensure + executor.shutdown + end + + it "stops invoking the block after #shutdown" do + ticks = ::Thread::Queue.new + timer = executor.timer(0.01){ticks.push(true)} + + # Drain a couple of ticks so we know it started: + 2.times{ticks.pop} + + timer.shutdown + + # After shutdown, allow any in-flight tick to land then sample: + sleep 0.05 + ticks.clear + sleep 0.05 + expect(ticks).to be(:empty?) + ensure + executor.shutdown + end + + with "called from inside a reactor" do + include Sus::Fixtures::Async::ReactorContext + + it "runs on the caller's reactor with no thread hop" do + caller_thread = ::Thread.current + ticks = ::Thread::Queue.new + timer = executor.timer(0.01){ticks.push(::Thread.current)} + + expect(ticks.pop).to be == caller_thread + expect(executor.instance_variable_get(:@thread)).to be_nil + + timer.shutdown + ensure + executor.shutdown + end + end + end + + with "#shutdown" do + it "is idempotent" do + executor.shutdown + executor.shutdown + end + + it "stops the dedicated thread used by timers" do + # A scheduled timer is what spins up the dedicated reactor thread: + ticks = ::Thread::Queue.new + timer = executor.timer(0.01){ticks.push(true)} + ticks.pop + + thread = executor.instance_variable_get(:@thread) + expect(thread).to be(:alive?) + + timer.shutdown + executor.shutdown + + expect(thread).not.to be(:alive?) + end + end +end