Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions lib/net-http2/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module NetHttp2
PROXY_SETTINGS_KEYS = [:proxy_addr, :proxy_port, :proxy_user, :proxy_pass]

AsyncRequestTimeout = Class.new(StandardError)
SocketClosedError = Class.new(StandardError)

class Client

Expand Down Expand Up @@ -39,8 +40,7 @@ def call(method, path, options={})

def call_async(request)
ensure_open
stream = new_monitored_stream_for request
stream.async_call_with request
new_stream.async_call_with request
end

def prepare_request(method, path, options={})
Expand All @@ -53,6 +53,7 @@ def ssl?

def close
exit_thread(@socket_thread)
@socket_error = SocketClosedError.new
init_vars
end

Expand All @@ -74,10 +75,14 @@ def stream_count

private

def init_vars
def init_vars(error: nil)
@mutex.synchronize do
@socket.close if @socket && !@socket.closed?

(@streams || {}).each do |k, v|
v.force_close(@socket_error)
end

@h2 = nil
@socket = nil
@socket_thread = nil
Expand All @@ -86,20 +91,16 @@ def init_vars
end
end

def new_stream
@mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) }
rescue StandardError => e
close
raise e
end

def new_monitored_stream_for(request)
stream = new_stream
def new_stream()
stream = @mutex.synchronize { NetHttp2::Stream.new(h2_stream: h2.new_stream) }

@streams[stream.id] = true
request.on(:close) { @streams.delete(stream.id) }
@streams[stream.id] = stream
stream.on(:close) { @streams.delete(stream.id) }

stream
rescue StandardError => e
close
raise e
end

def ensure_open
Expand All @@ -115,13 +116,15 @@ def ensure_open

rescue EOFError
# socket closed
@socket_error = SocketError.new('Socket was remotely closed')
init_vars
callback_or_raise SocketError.new('Socket was remotely closed')
callback_or_raise @socket_error

rescue Exception => e
# error on socket
@socket_error = e
init_vars
callback_or_raise e
callback_or_raise @socket_error
end
end.tap { |t| t.abort_on_exception = true }
end
Expand Down
19 changes: 19 additions & 0 deletions lib/net-http2/stream.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module NetHttp2

class Stream
include Callbacks

def initialize(options={})
@h2_stream = options[:h2_stream]
Expand All @@ -10,6 +11,7 @@ def initialize(options={})
@async = false
@completed = false
@mutex = Mutex.new
@error = nil
@cv = ConditionVariable.new

listen_for_headers
Expand Down Expand Up @@ -42,12 +44,23 @@ def async?
@async
end

def wait
wait_for_completed
end

def force_close(error = nil)
@error = error
@mutex.synchronize { @cv.signal }
end

private

def listen_for_headers
@h2_stream.on(:headers) do |hs_array|
hs = Hash[*hs_array.flatten]

emit(:headers, hs)

if async?
@request.emit(:headers, hs)
else
Expand All @@ -58,6 +71,9 @@ def listen_for_headers

def listen_for_data
@h2_stream.on(:data) do |data|

emit(:data, data)

if async?
@request.emit(:body_chunk, data)
else
Expand All @@ -70,6 +86,8 @@ def listen_for_close
@h2_stream.on(:close) do |data|
@completed = true

emit(:close, data)

if async?
@request.emit(:close, data)
else
Expand Down Expand Up @@ -98,6 +116,7 @@ def sync_respond

def wait_for_completed
@mutex.synchronize { @cv.wait(@mutex, @request.timeout) }
raise @error if @error
end
end
end
7 changes: 4 additions & 3 deletions spec/api/timeouts_with_sync_requests_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
expect(responses.compact).to be_empty
end

it "returns nil even if the client's main thread gets killed" do
it "returns raises an error if the client's main thread gets killed" do

Thread.new do
sleep 1
client.close
end

response = client.call(:get, '/path', headers: { 'x-custom-header' => 'custom' }, timeout: 2)
expect(response).to be_nil
expect {
client.call(:get, '/path', headers: { 'x-custom-header' => 'custom' }, timeout: 2)
}.to raise_error(NetHttp2::SocketClosedError)
end
end