Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/s2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Connection

attr_reader :connected_at, :status

def initialize(resource_id:, task:, ws_url:, headers: {})
def initialize(resource_id:, ws_url:, task: nil, headers: {})
@connected_at = nil
@queue = nil
@resource_id = resource_id
Expand All @@ -27,6 +27,8 @@ def initialize(resource_id:, task:, ws_url:, headers: {})
end

def connect
@task ||= Async::Task.current

until @stopping
begin
connect_and_run
Expand Down Expand Up @@ -61,6 +63,7 @@ def disconnect
# ignore
end

@task&.stop
@status = :disconnected
end

Expand Down
39 changes: 22 additions & 17 deletions spec/s2/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
resource_id = SecureRandom.uuid
ws_url = "ws://example.com/#{resource_id}"

connection = described_class.new(
resource_id:,
task: Async::Task.current,
ws_url:,
)
connection = described_class.new(resource_id:, ws_url:)

allow(connection).to receive(:sleep) { |duration| sleep_durations << duration }

Expand Down Expand Up @@ -48,12 +44,7 @@
ws_url = "ws://example.com/#{resource_id}"
headers = { "authorization" => "Basic dXNlcjpwYXNz" }

connection = described_class.new(
resource_id:,
task: Async::Task.current,
ws_url:,
headers:,
)
connection = described_class.new(resource_id:, ws_url:, headers:)

task = Async do
connection.connect
Expand Down Expand Up @@ -81,11 +72,7 @@
resource_id = SecureRandom.uuid
ws_url = "ws://example.com/#{resource_id}"

connection = described_class.new(
resource_id:,
task: Async::Task.current,
ws_url:,
)
connection = described_class.new(resource_id:, ws_url:)

allow(connection).to receive(:sleep)

Expand All @@ -102,6 +89,25 @@
expect(connection_attempts.uniq.size).to eq(1)
end

it "stops its task when disconnected, even while sleeping between reconnect attempts" do
allow(Async::WebSocket::Client).to receive(:connect).and_raise(Errno::ECONNREFUSED)

resource_id = SecureRandom.uuid
ws_url = "ws://example.com/#{resource_id}"

connection = described_class.new(resource_id:, ws_url:)

task = Async { connection.connect }
Async::Task.current.sleep 0.1

expect(task).not_to be_finished

connection.disconnect

Async::Task.current.sleep 0.1
expect(task).to be_finished
end

it "instruments connection_errored with the resource_id and exception" do
ws = FakeWebSocket.new
connection_attempts = 0
Expand All @@ -122,7 +128,6 @@

connection = described_class.new(
resource_id:,
task: Async::Task.current,
ws_url: "ws://example.com/#{resource_id}",
)
allow(connection).to receive(:sleep)
Expand Down