From 40e78c4776a7dd1a66f636f0ceb819353f500da5 Mon Sep 17 00:00:00 2001 From: Bob Forma <1178544+bforma@users.noreply.github.com> Date: Mon, 11 May 2026 15:18:45 +0200 Subject: [PATCH] Release the websocket promptly when a connection is disconnected --- lib/s2/connection.rb | 5 ++++- spec/s2/connection_spec.rb | 39 +++++++++++++++++++++----------------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/lib/s2/connection.rb b/lib/s2/connection.rb index f9176ad..174cd54 100644 --- a/lib/s2/connection.rb +++ b/lib/s2/connection.rb @@ -9,7 +9,7 @@ class Connection attr_reader :connected_at, :status - def initialize(resource_id:, task:, ws_url:, headers: {}) + def initialize(resource_id:, ws_url:, task: nil, headers: {}) @connected_at = nil @queue = nil @resource_id = resource_id @@ -27,6 +27,8 @@ def initialize(resource_id:, task:, ws_url:, headers: {}) end def connect + @task ||= Async::Task.current + until @stopping begin connect_and_run @@ -61,6 +63,7 @@ def disconnect # ignore end + @task&.stop @status = :disconnected end diff --git a/spec/s2/connection_spec.rb b/spec/s2/connection_spec.rb index f16219a..c044a25 100644 --- a/spec/s2/connection_spec.rb +++ b/spec/s2/connection_spec.rb @@ -15,11 +15,7 @@ resource_id = SecureRandom.uuid ws_url = "ws://example.com/#{resource_id}" - connection = described_class.new( - resource_id:, - task: Async::Task.current, - ws_url:, - ) + connection = described_class.new(resource_id:, ws_url:) allow(connection).to receive(:sleep) { |duration| sleep_durations << duration } @@ -48,12 +44,7 @@ ws_url = "ws://example.com/#{resource_id}" headers = { "authorization" => "Basic dXNlcjpwYXNz" } - connection = described_class.new( - resource_id:, - task: Async::Task.current, - ws_url:, - headers:, - ) + connection = described_class.new(resource_id:, ws_url:, headers:) task = Async do connection.connect @@ -81,11 +72,7 @@ resource_id = SecureRandom.uuid ws_url = "ws://example.com/#{resource_id}" - connection = described_class.new( - resource_id:, - task: Async::Task.current, - ws_url:, - ) + connection = described_class.new(resource_id:, ws_url:) allow(connection).to receive(:sleep) @@ -102,6 +89,25 @@ expect(connection_attempts.uniq.size).to eq(1) end + it "stops its task when disconnected, even while sleeping between reconnect attempts" do + allow(Async::WebSocket::Client).to receive(:connect).and_raise(Errno::ECONNREFUSED) + + resource_id = SecureRandom.uuid + ws_url = "ws://example.com/#{resource_id}" + + connection = described_class.new(resource_id:, ws_url:) + + task = Async { connection.connect } + Async::Task.current.sleep 0.1 + + expect(task).not_to be_finished + + connection.disconnect + + Async::Task.current.sleep 0.1 + expect(task).to be_finished + end + it "instruments connection_errored with the resource_id and exception" do ws = FakeWebSocket.new connection_attempts = 0 @@ -122,7 +128,6 @@ connection = described_class.new( resource_id:, - task: Async::Task.current, ws_url: "ws://example.com/#{resource_id}", ) allow(connection).to receive(:sleep)