diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 750eae8..61c7db9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,5 +78,6 @@ jobs: bundle exec rake coverage:report - uses: joshmfrankel/simplecov-check-action@main + if: ${{ github.repository == 'igrigorik/http-2' }} with: github_token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.rubocop.yml b/.rubocop.yml index 97abdd0..8a517b4 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -17,6 +17,8 @@ Layout/HeredocIndentation: - 'lib/tasks/generate_huffman_table.rb' - 'example/*' +Layout/LeadingCommentSpace: + AllowRBSInlineAnnotation: true Metrics/BlockLength: Enabled: false diff --git a/Rakefile b/Rakefile index d8cb632..79ff5ad 100644 --- a/Rakefile +++ b/Rakefile @@ -94,7 +94,7 @@ task :h2spec do "Or Download the binary from https://github.com/summerwind/h2spec/releases" end - server_pid = Process.spawn("ruby example/server.rb -p 9000", out: File::NULL) + server_pid = Process.spawn({ "RUBYOPT" => nil }, "ruby example/server.rb -p 9000", out: File::NULL) sleep RUBY_ENGINE == "ruby" ? 5 : 20 system("#{h2spec} -p 9000 -o 2 --strict") Process.kill("TERM", server_pid) diff --git a/lib/http/2/client.rb b/lib/http/2/client.rb index 3dfe39b..6529359 100644 --- a/lib/http/2/client.rb +++ b/lib/http/2/client.rb @@ -69,7 +69,7 @@ def send_connection_preface end def self.settings_header(settings) - frame = Framer.new.generate(type: :settings, stream: 0, payload: settings) + frame = Framer.new.generate(type: :settings, stream: 0, flags: 0, payload: settings) Base64.urlsafe_encode64(frame[9..-1]) end diff --git a/lib/http/2/connection.rb b/lib/http/2/connection.rb index d3cf180..6215b27 100644 --- a/lib/http/2/connection.rb +++ b/lib/http/2/connection.rb @@ -41,8 +41,6 @@ module HTTP2 CONNECTION_FRAME_TYPES = %i[settings ping goaway].freeze - HEADERS_FRAME_TYPES = %i[headers push_promise].freeze - STREAM_OPEN_STATES = %i[open half_closed_local].freeze # Connection encapsulates all of the connection, stream, flow-control, @@ -91,6 +89,7 @@ def initialize(settings = {}) @last_stream_id = 0 @streams = {} @streams_recently_closed = {} + @oldest_stream_recently_closed = nil @pending_settings = [] @framer = Framer.new(@local_settings[:settings_max_frame_size]) @@ -102,6 +101,7 @@ def initialize(settings = {}) @recv_buffer = "".b @continuation = [] + @continuation_size = 0 @error = nil @h2c_upgrade = nil @@ -156,7 +156,7 @@ def ping(payload, &blk) # @param error [Symbol] # @param payload [String] def goaway(error = :no_error, payload = nil) - send(type: :goaway, last_stream: @last_stream_id, + send(type: :goaway, stream: 0, last_stream: @last_stream_id, error: error, payload: payload) @state = :closed @closed_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -212,9 +212,8 @@ def receive(data) end while (frame = @framer.parse(@recv_buffer)) - # @type var stream_id: Integer - stream_id = frame[:stream] - frame_type = frame[:type] + stream_id = frame[:stream] #: Integer + frame_type = frame[:type] #: Symbol? if is_a?(Client) && !@received_frame connection_error(:protocol_error, msg: "didn't receive settings") if frame_type != :settings @@ -237,15 +236,16 @@ def receive(data) # Header blocks MUST be transmitted as a contiguous sequence of frames # with no interleaved frames of any other type, or from any other stream. unless @continuation.empty? + # @type var frame: continuation_frame connection_error unless frame_type == :continuation && stream_id == @continuation.first[:stream] @continuation << frame - unless frame[:flags].include? :end_headers - buffered_payload = @continuation.sum { |f| f[:payload].bytesize } + @continuation_size += frame[:payload].bytesize + unless frame[:flags].anybits?(END_HEADERS) # prevent HTTP/2 CONTINUATION FLOOD # same heuristic as the one from HAProxy: https://www.haproxy.com/blog/haproxy-is-resilient-to-the-http-2-continuation-flood # different mitigation (connection closed, instead of 400 response) - unless buffered_payload < @local_settings[:settings_max_frame_size] + unless @continuation_size < @local_settings[:settings_max_frame_size] connection_error(:protocol_error, msg: "too many continuations received") end @@ -259,10 +259,11 @@ def receive(data) frame_type = frame[:type] @continuation.clear + @continuation_size = 0 frame.delete(:length) frame[:payload] = payload - frame[:flags] << :end_headers + frame[:flags] |= END_HEADERS end # SETTINGS frames always apply to a connection, never a single stream. @@ -271,18 +272,20 @@ def receive(data) # anything other than 0x0, the endpoint MUST respond with a connection # error (Section 5.4.1) of type PROTOCOL_ERROR. if connection_frame?(frame) + # @type var frame: connection_frame connection_error(:protocol_error) unless stream_id.zero? connection_management(frame) else case frame_type when :headers + # @type var frame: headers_frame # When server receives even-numbered stream identifier, # the endpoint MUST respond with a connection error of type PROTOCOL_ERROR. connection_error if stream_id.even? && is_a?(Server) # The last frame in a sequence of HEADERS/CONTINUATION # frames MUST have the END_HEADERS flag set. - unless frame[:flags].include? :end_headers + unless frame[:flags].anybits?(END_HEADERS) @continuation << frame next end @@ -312,9 +315,10 @@ def receive(data) stream << frame when :push_promise + # @type var frame: push_promise_frame # The last frame in a sequence of PUSH_PROMISE/CONTINUATION # frames MUST have the END_HEADERS flag set - unless frame[:flags].include? :end_headers + unless frame[:flags].anybits?(END_HEADERS) @continuation << frame return end @@ -434,20 +438,27 @@ def <<(data) # @note all frames are currently delivered in FIFO order. # @param frame [Hash] def send(frame) - frame_type = frame[:type] - emit(:frame_sent, frame) - if frame_type == :data - send_data(frame, true) - elsif frame_type == :rst_stream && frame[:error] == :protocol_error - # An endpoint can end a connection at any time. In particular, an - # endpoint MAY choose to treat a stream error as a connection error. + frame_type = frame[:type] #: Symbol - goaway(:protocol_error) - else + case frame_type + when :data + #: @type var frame: data_frame + send_data(frame, true) + when :headers, :push_promise # HEADERS and PUSH_PROMISE may generate CONTINUATION. Also send # RST_STREAM that are not protocol errors + #: @type var frame: headers_frame | push_promise_frame + encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame + else + if frame_type == :rst_stream && frame[:error] == :protocol_error + # An endpoint can end a connection at any time. In particular, an + # endpoint MAY choose to treat a stream error as a connection error. + + goaway(:protocol_error) + end + #: @type var frame: connection_frame encode(frame) end end @@ -456,11 +467,7 @@ def send(frame) # # @param frame [Hash] def encode(frame) - if HEADERS_FRAME_TYPES.include?(frame[:type]) - encode_headers(frame) # HEADERS and PUSH_PROMISE may create more than one frame - else - emit(:frame, @framer.generate(frame)) - end + emit(:frame, @framer.generate(frame)) end # Check if frame is a connection frame: SETTINGS, PING, GOAWAY, and any @@ -493,12 +500,15 @@ def connection_management(frame) when :connected case frame_type when :settings + # @type var frame: settings_frame connection_settings(frame) when :window_update + # @type var frame: window_update_frame process_window_update(frame: frame, encode: true) when :ping ping_management(frame) when :goaway + # @type var frame: goaway_frame # Receivers of a GOAWAY frame MUST NOT open additional streams on # the connection, although a new connection can be established # for new streams. @@ -506,19 +516,19 @@ def connection_management(frame) @closed_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) emit(:goaway, frame[:last_stream], frame[:error], frame[:payload]) when :altsvc + # @type var frame: altsvc_frame origin = frame[:origin] # 4. The ALTSVC HTTP/2 Frame # An ALTSVC frame on stream 0 with empty (length 0) "Origin" # information is invalid and MUST be ignored. emit(:altsvc, frame) if origin && !origin.empty? when :origin - return if @h2c_upgrade || !frame[:flags].empty? + # @type var frame: origin_frame + return if @h2c_upgrade || !frame[:flags].zero? frame[:payload].each do |orig| emit(:origin, orig) end - when :blocked - emit(:blocked, frame) else connection_error end @@ -538,11 +548,10 @@ def connection_management(frame) end def ping_management(frame) - if frame[:flags].include? :ack + if frame[:flags].anybits?(ACK) emit(:ack, frame[:payload]) else - send(type: :ping, stream: 0, - flags: [:ack], payload: frame[:payload]) + send(type: :ping, stream: 0, flags: ACK, payload: frame[:payload]) end end @@ -605,13 +614,13 @@ def connection_settings(frame) # side = # local: previously sent and pended our settings should be effective # remote: just received peer settings should immediately be effective - if frame[:flags].include?(:ack) + if frame[:flags].anybits?(ACK) # Process pending settings we have sent. settings = @pending_settings.shift side = :local else - validate_settings(@remote_role, frame[:payload]) settings = frame[:payload] + validate_settings(@remote_role, settings) side = :remote end @@ -681,7 +690,7 @@ def connection_settings(frame) when :remote unless @state == :closed || @h2c_upgrade == :start # Send ack to peer - send(type: :settings, stream: 0, payload: [], flags: [:ack]) + send(type: :settings, stream: 0, payload: EMPTY, flags: ACK) # when initial window size changes, we try to flush any buffered # data. @streams.each_value(&:flush) @@ -711,45 +720,49 @@ def decode_headers(frame) # # @param headers_frame [Hash] def encode_headers(headers_frame) - payload = headers_frame[:payload] + headers_payload = headers_frame[:payload] begin - payload = headers_frame[:payload] = @compressor.encode(payload) unless payload.is_a?(String) + payload = headers_payload.is_a?(String) ? headers_payload : @compressor.encode(headers_payload) rescue StandardError => e connection_error(:compression_error, e: e) end + #: @type var payload: String + headers_frame[:payload] = payload + max_frame_size = @remote_settings[:settings_max_frame_size] # if single frame, return immediately if payload.bytesize <= max_frame_size - emit(:frame, @framer.generate(headers_frame)) + encode(headers_frame) return end # split into multiple CONTINUATION frames - headers_frame[:flags].delete(:end_headers) + total = payload.bytesize + headers_frame[:flags] ^= END_HEADERS headers_frame[:payload] = payload.byteslice(0, max_frame_size) - payload = payload.byteslice(max_frame_size..-1) + # payload = payload.byteslice(max_frame_size..-1) + offset = max_frame_size # emit first HEADERS frame - emit(:frame, @framer.generate(headers_frame)) + encode(headers_frame) - loop do - continuation_frame = headers_frame.merge( - type: :continuation, - flags: EMPTY, - payload: payload.byteslice(0, max_frame_size) - ) + stream_id = headers_frame[:stream] - payload = payload.byteslice(max_frame_size..-1) + while offset < total + chunk_end = offset + max_frame_size + is_last = chunk_end >= total - if payload.nil? || payload.empty? - continuation_frame[:flags] = [:end_headers] - emit(:frame, @framer.generate(continuation_frame)) - break - end + continuation_frame = { + type: :continuation, + flags: is_last ? END_HEADERS : 0, + stream: stream_id, + payload: payload.byteslice(offset, max_frame_size) + } #: continuation_frame - emit(:frame, @framer.generate(continuation_frame)) + encode(continuation_frame) + offset = chunk_end end end @@ -776,19 +789,24 @@ def activate_stream(id:, **args) # is closed, with a minimum of 15s RTT time window. now = Process.clock_gettime(Process::CLOCK_MONOTONIC) - _, closed_since = @streams_recently_closed.first - # forego recently closed recycling if empty or the first element # hasn't expired yet (it's ordered). - if closed_since && (now - closed_since) > 15 + if @oldest_stream_recently_closed && (now - @oldest_stream_recently_closed) > 15 + new_oldest = nil # discards all streams which have closed for a while. # TODO: use a drop_while! variant whenever there is one. - @streams_recently_closed = @streams_recently_closed.drop_while do |_, since| - (now - since) > 15 - end.to_h + @streams_recently_closed.delete_if do |_, since| + unless (now - since) > 15 + new_oldest ||= since + break + end + + true + end + @oldest_stream_recently_closed = new_oldest end - @streams_recently_closed[id] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @streams_recently_closed[id] = now end stream.on(:frame, &method(:send)) diff --git a/lib/http/2/extensions.rb b/lib/http/2/extensions.rb index 8e840b6..b0c4ed7 100644 --- a/lib/http/2/extensions.rb +++ b/lib/http/2/extensions.rb @@ -22,22 +22,28 @@ def append_str(str, data) end end - def read_str(str, n) - return "".b if n == 0 + if String.method_defined?(:bytesplice) + def read_str(str, n) + return "".b if n == 0 - chunk = str.byteslice(0..(n - 1)) - remaining = str.byteslice(n..-1) - remaining ? str.replace(remaining) : str.clear - chunk + chunk = str.byteslice(0, n) + str.bytesplice(0, chunk.length, "") + chunk + end + else + def read_str(str, n) + return "".b if n == 0 + + chunk = str.byteslice(0, n) + remaining = str.byteslice(n, str.size - n) + remaining ? str.replace(remaining) : str.clear + chunk + end end def read_uint32(str) read_str(str, 4).unpack1("N") end - - def shift_byte(str) - read_str(str, 1).ord - end end # this mixin handles backwards-compatibility for the new packing options diff --git a/lib/http/2/flow_buffer.rb b/lib/http/2/flow_buffer.rb index 9f72691..c687292 100644 --- a/lib/http/2/flow_buffer.rb +++ b/lib/http/2/flow_buffer.rb @@ -70,7 +70,7 @@ def send_data(frame = nil, encode = false) if frame if @send_buffer.empty? frame_size = frame[:payload].bytesize - end_stream = frame[:flags].include?(:end_stream) + end_stream = frame[:flags].anybits?(END_STREAM) # if buffer is empty, and frame is either end 0 length OR # is within available window size, skip buffering and send immediately. if @remote_window.positive? @@ -115,6 +115,8 @@ def process_window_update(frame:, encode: false) end class FrameBuffer + include BufferUtils + attr_reader :bytesize def initialize @@ -140,7 +142,7 @@ def retrieve(window_size) frame = @buffer.first or return frame_size = frame[:payload].bytesize - end_stream = frame[:flags].include?(:end_stream) + end_stream = frame[:flags].anybits?(END_STREAM) # Frames with zero length with the END_STREAM flag set (that # is, an empty DATA frame) MAY be sent if there is no available space @@ -148,19 +150,17 @@ def retrieve(window_size) return if window_size <= 0 && !(frame_size.zero? && end_stream) if frame_size > window_size - chunk = frame.dup - payload = frame[:payload] + chunk = frame.dup # Split frame so that it fits in the window # TODO: consider padding! - chunk[:payload] = payload.byteslice(0, window_size) + chunk[:payload] = read_str(frame[:payload], window_size) # mutates frame[:payload] chunk[:length] = window_size - frame[:payload] = payload.byteslice(window_size..-1) frame[:length] = frame_size - window_size # if no longer last frame in sequence... - chunk[:flags] -= [:end_stream] if end_stream + chunk[:flags] ^= END_STREAM if end_stream @bytesize -= window_size chunk diff --git a/lib/http/2/framer.rb b/lib/http/2/framer.rb index b09c118..3643810 100644 --- a/lib/http/2/framer.rb +++ b/lib/http/2/framer.rb @@ -1,6 +1,25 @@ # frozen_string_literal: true module HTTP2 + # Frame flags as defined by the spec (max 255 bits) + # DATA: ( X X COMPRESSED X PADDED X X END_STREAM ) + # HEADERS: ( X X PRIORITY X PADDED END_HEADERS X END_STREAM ) + # PRIORITY: ( X X X X X X X X ) + # RST_STREAM: ( X X X X X X X X ) + # SETTINGS: ( X X X X X X X ACK ) + # PUSH_PROMISE: ( X X X X PADDED END_HEADERS X X ) + # PING: ( X X X X X X X ACK ) + # GOAWAY: ( X X X X X X X X ) + # WINDOW_UPDATE: ( X X X X X X X X ) + # CONTINUATION: ( X X X X X END_HEADERS X X ) + # ALTSVC: ( X X X X X X X X ) + # ORIGIN: ( RESERVED4 X X RESERVED3 X RESERVED2 RESERVED X ) + END_STREAM = ACK = 0b0001 # 1 + RESERVED = 0b0010 # 2 + END_HEADERS = 0b0100 # 4 + PADDED = 0b1000 # 8 + PRIORITY = 0b0010_0000 # 32 + # Performs encoding, decoding, and validation of binary HTTP/2 frames. # class Framer @@ -40,39 +59,6 @@ class Framer FRAME_TYPES_WITH_PADDING = %i[data headers push_promise].freeze - # Per frame flags as defined by the spec - FRAME_FLAGS = { - data: { - end_stream: 0, - padded: 3, - compressed: 5 - }, - headers: { - end_stream: 0, - end_headers: 2, - padded: 3, - priority: 5 - }, - priority: {}, - rst_stream: {}, - settings: { ack: 0 }, - push_promise: { - end_headers: 2, - padded: 3 - }, - ping: { ack: 0 }, - goaway: {}, - window_update: {}, - continuation: { end_headers: 2 }, - altsvc: {}, - origin: { - reserved: 1, - reserved2: 2, - reserved3: 4, - reserved4: 8 - } - }.each_value(&:freeze).freeze - # Default settings as defined by the spec DEFINED_SETTINGS = { settings_header_table_size: 1, @@ -83,23 +69,25 @@ class Framer settings_max_header_list_size: 6 }.freeze - # Default error types as defined by the spec - DEFINED_ERRORS = { - no_error: 0, - protocol_error: 1, - internal_error: 2, - flow_control_error: 3, - settings_timeout: 4, - stream_closed: 5, - frame_size_error: 6, - refused_stream: 7, - cancel: 8, - compression_error: 9, - connect_error: 10, - enhance_your_calm: 11, - inadequate_security: 12, - http_1_1_required: 13 - }.freeze + DEFINED_SETTINGS_BY_ID = DEFINED_SETTINGS.invert.freeze + + # Default error types as defined by the spec (the code is the array index) + DEFINED_ERRORS = %i[ + no_error + protocol_error + internal_error + flow_control_error + settings_timeout + stream_closed + frame_size_error + refused_stream + cancel + compression_error + connect_error + enhance_your_calm + inadequate_security + http_1_1_required + ].freeze RBIT = 0x7fffffff RBYTE = 0x0fffffff @@ -108,10 +96,12 @@ class Framer UINT16 = "n" UINT8 = "C" HEADERPACK = (UINT8 + UINT16 + UINT8 + UINT8 + UINT32).freeze + PRIORITYPACK = (UINT32 + UINT8).freeze + ALTSVCPACK = (UINT32 + UINT16).freeze FRAME_LENGTH_HISHIFT = 16 FRAME_LENGTH_LOMASK = 0xFFFF - private_constant :RBIT, :RBYTE, :EBIT, :HEADERPACK, :UINT32, :UINT16, :UINT8 + private_constant :RBIT, :RBYTE, :EBIT, :HEADERPACK, :PRIORITYPACK, :UINT32, :UINT16, :UINT8 # Initializes new framer object. # @@ -146,6 +136,10 @@ def common_header(frame, buffer:) raise CompressionError, "Window increment (#{frame[:increment]}) is too large" end + flags = frame[:flags] + + raise CompressionError, "Invalid frame flag (#{flags}) for #{type}" unless flags.between?(0, 255) + header = buffer # make sure the buffer is binary and unfrozen @@ -160,12 +154,7 @@ def common_header(frame, buffer:) (length >> FRAME_LENGTH_HISHIFT), (length & FRAME_LENGTH_LOMASK), FRAME_TYPES[type], - frame[:flags].reduce(0) do |acc, f| - position = FRAME_FLAGS[type][f] - raise CompressionError, "Invalid frame flag (#{f}) for #{type}" unless position - - acc | (1 << position) - end, + flags, stream_id ], HEADERPACK, buffer: header, offset: 0) # 8+16,8,8,32 end @@ -184,9 +173,7 @@ def read_common_header(buf) { type: type, - flags: FRAME_FLAGS[type].filter_map do |name, pos| - name if flags.anybits?(1 << pos) - end, + flags: flags, length: length, stream: stream & RBIT } @@ -198,10 +185,11 @@ def read_common_header(buf) # @param frame [Hash] def generate(frame) length = 0 - frame[:flags] ||= EMPTY + frame[:flags] ||= 0 case frame[:type] when :data, :continuation + # @type var frame: data_frame | continuation_frame bytes = frame[:payload] length = bytes.bytesize @@ -213,14 +201,16 @@ def generate(frame) raise CompressionError, "Must specify all of priority parameters for #{frame[:type]}" end - frame[:flags] += [:priority] unless frame[:flags].include?(:priority) + frame[:flags] |= PRIORITY end - if frame[:flags].include?(:priority) + if frame[:flags].anybits?(PRIORITY) length = 5 + headers.bytesize bytes = String.new("", encoding: Encoding::BINARY, capacity: length) - pack([(frame[:exclusive] ? EBIT : 0) | (frame[:dependency] & RBIT)], UINT32, buffer: bytes) - pack([frame[:weight] - 1], UINT8, buffer: bytes) + pack( + [(frame[:exclusive] ? EBIT : 0) | (frame[:dependency] & RBIT), frame[:weight] - 1], + PRIORITYPACK, buffer: bytes + ) append_str(bytes, headers) else length = headers.bytesize @@ -234,8 +224,10 @@ def generate(frame) length = 5 bytes = String.new("", encoding: Encoding::BINARY, capacity: length) - pack([(frame[:exclusive] ? EBIT : 0) | (frame[:dependency] & RBIT)], UINT32, buffer: bytes) - pack([frame[:weight] - 1], UINT8, buffer: bytes) + pack( + [(frame[:exclusive] ? EBIT : 0) | (frame[:dependency] & RBIT), frame[:weight] - 1], + PRIORITYPACK, buffer: bytes + ) when :rst_stream length = 4 @@ -248,20 +240,27 @@ def generate(frame) end settings = frame[:payload] - bytes = String.new("", encoding: Encoding::BINARY, capacity: length) - settings.each do |(k, v)| - if k.is_a? Integer # rubocop:disable Style/GuardClause - DEFINED_SETTINGS.value?(k) || next - else - k = DEFINED_SETTINGS[k] + case settings + when String + length = settings.bytesize + bytes = settings + else + bytes = String.new("", encoding: Encoding::BINARY, capacity: settings.size * 6) + + settings.each do |(k, v)| + if k.is_a? Integer # rubocop:disable Style/GuardClause + DEFINED_SETTINGS.value?(k) || next + else + k = DEFINED_SETTINGS[k] - raise CompressionError, "Unknown settings ID for #{k}" if k.nil? - end + raise CompressionError, "Unknown settings ID for #{k}" if k.nil? + end - pack([k], UINT16, buffer: bytes) - pack([v], UINT32, buffer: bytes) - length += 6 + pack([k], UINT16, buffer: bytes) + pack([v], UINT32, buffer: bytes) + length += 6 + end end when :push_promise @@ -295,7 +294,7 @@ def generate(frame) when :altsvc length = 6 bytes = String.new("", encoding: Encoding::BINARY, capacity: length) - pack([frame[:max_age], frame[:port]], UINT32 + UINT16, buffer: bytes) + pack([frame[:max_age], frame[:port]], ALTSVCPACK, buffer: bytes) if frame[:proto] raise CompressionError, "Proto too long" if frame[:proto].bytesize > 255 @@ -354,7 +353,7 @@ def generate(frame) length += padlen pack([padlen -= 1], UINT8, buffer: bytes, offset: 0) - frame[:flags] += [:padded] + frame[:flags] |= PADDED # Padding: Padding octets that contain no application semantic value. # Padding octets MUST be set to zero when sending and ignored when @@ -375,9 +374,9 @@ def parse(buf) frame = read_common_header(buf) - type = frame[:type] - length = frame[:length] - flags = frame[:flags] + type = frame[:type] #: Symbol + length = frame[:length] #: Integer + flags = frame[:flags] #: Integer return if buf.size < 9 + length @@ -394,15 +393,16 @@ def parse(buf) # Process padding padlen = 0 if FRAME_TYPES_WITH_PADDING.include?(type) - padded = flags.include?(:padded) + padded = flags.anybits?(PADDED) if padded padlen = read_str(payload, 1).unpack1(UINT8) - frame[:padding] = padlen + 1 raise ProtocolError, "padding too long" if padlen > payload.bytesize + frame[:padding] = padlen + 1 + payload = payload.byteslice(0, payload.bytesize - padlen) if padlen > 0 frame[:length] -= frame[:padding] - flags.delete(:padded) + frame[:flags] ^= PADDED end end @@ -410,11 +410,11 @@ def parse(buf) when :data, :ping, :continuation frame[:payload] = read_str(payload, length) when :headers - if flags.include?(:priority) + if flags.anybits?(PRIORITY) e_sd = read_uint32(payload) frame[:dependency] = e_sd & RBIT frame[:exclusive] = e_sd.anybits?(EBIT) - weight = payload.byteslice(0, 1).ord + 1 + weight = payload.getbyte(0) + 1 frame[:weight] = weight payload = payload.byteslice(1..-1) end @@ -425,7 +425,7 @@ def parse(buf) e_sd = read_uint32(payload) frame[:dependency] = e_sd & RBIT frame[:exclusive] = e_sd.anybits?(EBIT) - weight = payload.byteslice(0, 1).ord + 1 + weight = payload.getbyte(0) + 1 frame[:weight] = weight payload = payload.byteslice(1..-1) when :rst_stream @@ -436,19 +436,19 @@ def parse(buf) when :settings # NOTE: frame[:length] might not match the number of frame[:payload] # because unknown extensions are ignored. - frame[:payload] = [] raise ProtocolError, "Invalid settings payload length" unless (length % 6).zero? raise ProtocolError, "Invalid stream ID (#{frame[:stream]})" if frame[:stream].nonzero? - (frame[:length] / 6).times do + frame[:payload] = (frame[:length] / 6).times.filter_map do id = read_str(payload, 2).unpack1(UINT16) val = read_uint32(payload) # Unsupported or unrecognized settings MUST be ignored. # Here we send it along. - name, = DEFINED_SETTINGS.find { |_name, v| v == id } - frame[:payload] << [name, val] if name + if (name = DEFINED_SETTINGS_BY_ID[id]) + [name, val] + end end when :push_promise frame[:promise_stream] = read_uint32(payload) & RBIT @@ -464,13 +464,13 @@ def parse(buf) frame[:increment] = read_uint32(payload) & RBIT when :altsvc - frame[:max_age], frame[:port] = read_str(payload, 6).unpack(UINT32 + UINT16) + frame[:max_age], frame[:port] = read_str(payload, 6).unpack(ALTSVCPACK) - len = payload.byteslice(0, 1).ord + len = payload.getbyte(0) payload = payload.byteslice(1..-1) frame[:proto] = read_str(payload, len) if len > 0 - len = payload.byteslice(0, 1).ord + len = payload.getbyte(0) payload = payload.byteslice(1..-1) frame[:host] = read_str(payload, len) if len > 0 @@ -495,7 +495,7 @@ def parse(buf) def pack_error(error, buffer:) unless error.is_a? Integer - error = DEFINED_ERRORS[error] + error = DEFINED_ERRORS.index(error) raise CompressionError, "Unknown error ID for #{error}" unless error end @@ -504,7 +504,7 @@ def pack_error(error, buffer:) end def unpack_error(error) - DEFINED_ERRORS.key(error) || error + DEFINED_ERRORS.fetch(error, error) end end end diff --git a/lib/http/2/header/compressor.rb b/lib/http/2/header/compressor.rb index 03e26b6..8a6c280 100644 --- a/lib/http/2/header/compressor.rb +++ b/lib/http/2/header/compressor.rb @@ -83,8 +83,10 @@ def string(str, buffer = "".b) huffman = Huffman.encode(str) if huffman.bytesize < str.bytesize huffman_offset = buffer.bytesize + integer(huffman.bytesize, 7, buffer: buffer) + buffer.setbyte(huffman_offset, buffer.getbyte(huffman_offset) | 0x80) append_str(buffer, huffman) - set_huffman_size(buffer, huffman_offset) + buffer else plain_string(str, buffer) end @@ -119,7 +121,7 @@ def header(h, buffer = "".b) end # set header representation pattern on first byte - fb = buffer[offset].ord | rep[:pattern] + fb = buffer.getbyte(offset) | rep[:pattern] buffer.setbyte(offset, fb) buffer @@ -147,8 +149,17 @@ def encode(headers) # @return [String] binary string def huffman_string(str, buffer = "".b) huffman_offset = buffer.bytesize + buffer << "\x00".b Huffman.encode(str, buffer) - set_huffman_size(buffer, huffman_offset) + size = buffer.bytesize - huffman_offset - 1 + + if size < 127 + buffer.setbyte(huffman_offset, 0x80 | size) + else + buffer.slice!(huffman_offset, 1) + set_huffman_size(buffer, huffman_offset) + end + buffer end # @param str [String] @@ -165,7 +176,7 @@ def plain_string(str, plain = "".b) # @return [String] binary string def set_huffman_size(buffer, huffman_offset) integer(buffer.bytesize - huffman_offset, 7, buffer: buffer, offset: huffman_offset) - buffer.setbyte(huffman_offset, buffer[huffman_offset].ord | 0x80) + buffer.setbyte(huffman_offset, buffer.getbyte(huffman_offset) | 0x80) buffer end end diff --git a/lib/http/2/header/decompressor.rb b/lib/http/2/header/decompressor.rb index 03cf01e..6f0385b 100644 --- a/lib/http/2/header/decompressor.rb +++ b/lib/http/2/header/decompressor.rb @@ -33,24 +33,25 @@ def table_size=(size) # @return [Integer] def integer(buf, n) limit = (1 << n) - 1 - i = n.zero? ? 0 : (shift_byte(buf) & limit) + if n.zero? + i = 0 + consumed = 0 + else + i = buf.getbyte(0) & limit + consumed = 1 + end - m = 0 if i == limit - offset = 0 - - buf.each_byte.with_index do |byte, idx| - offset = idx - # while (byte = shift_byte(buf)) + m = 0 + while (byte = buf.getbyte(consumed)) i += ((byte & 127) << m) m += 7 - + consumed += 1 break if byte.nobits?(128) end - - read_str(buf, offset + 1) end + read_str(buf, consumed) i end diff --git a/lib/http/2/header/encoding_context.rb b/lib/http/2/header/encoding_context.rb index d59ffac..0dc06c5 100644 --- a/lib/http/2/header/encoding_context.rb +++ b/lib/http/2/header/encoding_context.rb @@ -118,6 +118,8 @@ class EncodingContext # :index Symbol :all, :static, :never def initialize(options = {}) @table = [] + @table_by_field = Hash.new { |hs, k| hs[k] = [] } + @unshifts = 0 @options = DEFAULT_OPTIONS.merge(options) @limit = @options[:table_size] @_table_updated = false @@ -129,9 +131,13 @@ def initialize(options = {}) def dup other = EncodingContext.new(@options) t = @table + tbf = @table_by_field.transform_values(&:dup) + unshifts = @unshifts l = @limit other.instance_eval do @table = t.dup # shallow copy + @table_by_field = tbf + @unshifts = unshifts @limit = l end other @@ -213,6 +219,8 @@ def process(cmd) # add to table if type == :incremental && size_check?(name.bytesize + value.bytesize + 32) @table.unshift(emit) + @unshifts += 1 + @table_by_field[name].unshift([value, @unshifts]) @current_table_size += name.bytesize + value.bytesize + 32 @_table_updated = true end @@ -279,14 +287,14 @@ def addcmd(field, value) end if index_type == :all && !exact - @table.each_with_index do |(hfield, hvalue), i| - next unless field == hfield + field_entries = @table_by_field[field] + field_entries&.each do |hvalue, unshift_id| + abs_index = (@unshifts - unshift_id) + STATIC_TABLE_SIZE + name_only ||= abs_index if value == hvalue - exact = i + STATIC_TABLE_SIZE + exact = abs_index break - else - name_only ||= i + STATIC_TABLE_SIZE end end end @@ -317,9 +325,13 @@ def resize_table(cmdsize) return if @table.empty? while @current_table_size + cmdsize > @limit - name, value = @table.pop @current_table_size -= name.bytesize + value.bytesize + 32 + + field_arr = @table_by_field[name] + field_arr.pop + @table_by_field.delete(name) if field_arr.empty? + break if @table.empty? end diff --git a/lib/http/2/header/huffman.rb b/lib/http/2/header/huffman.rb index 1036174..5e88602 100644 --- a/lib/http/2/header/huffman.rb +++ b/lib/http/2/header/huffman.rb @@ -20,6 +20,8 @@ module Huffman EOS = 256 private_constant :EOS + EOS_PADDING = (0..7).map { |n| ("1" * n).b.freeze }.freeze + # Encodes provided value via huffman encoding. # Length is not encoded in this method. # @@ -29,7 +31,7 @@ module Huffman def encode(str, buffer = "".b) bitstring = String.new("", encoding: Encoding::BINARY, capacity: (str.bytesize * 30) + ((8 - str.size) % 8)) str.each_byte { |chr| append_str(bitstring, ENCODE_TABLE[chr]) } - append_str(bitstring, "1" * ((8 - bitstring.size) % 8)) + append_str(bitstring, EOS_PADDING[(8 - bitstring.size) % 8]) pack([bitstring], "B*", buffer: buffer) end @@ -54,13 +56,13 @@ def decode(buf) first, state = MACHINE.dig(state, branch) raise CompressionError, "Huffman decode error (EOS found)" if first == EOS - append_str(emit, first.chr) if first + emit << first if first end end # Check whether partial input is correctly filled raise CompressionError, "Huffman decode error (EOS invalid)" unless state <= MAX_FINAL_STATE - emit.force_encoding(Encoding::BINARY) + emit end # Huffman table as specified in @@ -325,7 +327,7 @@ def decode(buf) [0x3fffffff, 30] ].each(&:freeze).freeze - ENCODE_TABLE = CODES.map { |c, l| [c].pack("N").unpack1("B*")[-l..-1] }.each(&:freeze).freeze + ENCODE_TABLE = CODES.map { |c, l| [c].pack("N").unpack1("B*")[-l..-1].b.freeze }.freeze end end end diff --git a/lib/http/2/server.rb b/lib/http/2/server.rb index a91f041..7761fe7 100644 --- a/lib/http/2/server.rb +++ b/lib/http/2/server.rb @@ -78,18 +78,12 @@ def upgrade(settings, headers, body) receive(CONNECTION_PREFACE_MAGIC) # Process received HTTP2-Settings payload - buf = "".b - append_str(buf, Base64.urlsafe_decode64(settings.to_s)) - @framer.common_header( - { - length: buf.bytesize, - type: :settings, - stream: 0, - flags: [] - }, - buffer: buf - ) - receive(buf) + receive(@framer.generate( + type: :settings, + stream: 0, + flags: 0, + payload: Base64.urlsafe_decode64(settings.to_s) + )) # Activate stream (id: 1) with on HTTP/1.1 request parameters stream = activate_stream(id: 1) @@ -97,7 +91,7 @@ def upgrade(settings, headers, body) headers_frame = { type: :headers, - flags: [:end_headers], + flags: END_HEADERS, stream: 1, weight: DEFAULT_WEIGHT, dependency: 0, @@ -106,11 +100,11 @@ def upgrade(settings, headers, body) } if body.empty? - headers_frame[:flags] << [:end_stream] + headers_frame[:flags] |= END_HEADERS stream << headers_frame else stream << headers_frame - stream << { type: :data, stream: 1, payload: body, flags: [:end_stream] } + stream << { type: :data, stream: 1, payload: body, flags: END_STREAM } end # Mark h2c upgrade as finished @@ -135,7 +129,7 @@ def origin_set=(origins) def connection_settings(frame) super - return unless frame[:flags].include?(:ack) && !@origins_sent + return unless frame[:flags].anybits?(ACK) && !@origins_sent send(type: :origin, stream: 0, payload: @origin_set) end @@ -148,7 +142,7 @@ def verify_pseudo_headers(frame) # # @param parent [Stream] # @param headers [Enumerable[String, String]] - # @param flags [Array[Symbol]] + # @param flags Integer # @param callback [Proc] def promise(parent, headers, flags) promise = new_stream(parent: parent) @@ -157,7 +151,7 @@ def promise(parent, headers, flags) flags: flags, stream: parent.id, promise_stream: promise.id, - payload: headers.to_a + payload: headers ) yield(promise) diff --git a/lib/http/2/stream.rb b/lib/http/2/stream.rb index 796f3b5..3324f7d 100644 --- a/lib/http/2/stream.rb +++ b/lib/http/2/stream.rb @@ -114,8 +114,10 @@ def closed? def receive(frame) transition(frame, false) - case frame[:type] + frame_type = frame[:type] #: Symbol + case frame_type when :data + # @type var frame: data_frame # 6.1. DATA # If a DATA frame is received whose stream is not in "open" or # "half closed (local)" state, the recipient MUST respond with a @@ -129,6 +131,7 @@ def receive(frame) emit(:data, frame[:payload]) unless frame[:ignore] calculate_window_update(@local_window_max_size) when :headers + # @type var frame: headers_frame stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed @_method ||= frame[:method] @@ -151,17 +154,18 @@ def receive(frame) stream_error(:stream_closed) if (@state == :closed && @closed != :local_rst) || @state == :remote_closed stream_error(:protocol_error) if @received_data when :priority + # @type var frame: priority_frame process_priority(frame) when :window_update + # @type var frame: window_update_frame process_window_update(frame: frame) when :altsvc + # @type var frame: origin_frame # 4. The ALTSVC HTTP/2 Frame # An ALTSVC frame on a # stream other than stream 0 containing non-empty "Origin" information # is invalid and MUST be ignored. - emit(frame[:type], frame) if !frame[:origin] || frame[:origin].empty? - when :blocked - emit(frame[:type], frame) + emit(frame_type, frame) if !frame[:origin] || frame[:origin].empty? end complete_transition(frame) @@ -199,11 +203,14 @@ def calculate_content_length(data_length) def send(frame) case frame[:type] when :data + # @type var frame: data_frame # stream state management is maintained in send_data return send_data(frame) when :window_update + # @type var frame: window_update_frame @local_window += frame[:increment] when :priority + # @type var frame: priority_frame process_priority(frame) end @@ -219,9 +226,8 @@ def send(frame) # @param end_headers [Boolean] indicates that no more headers will be sent # @param end_stream [Boolean] indicates that no payload will be sent def headers(headers, end_headers: true, end_stream: false) - flags = [] - flags << :end_headers if end_headers - flags << :end_stream if end_stream || @_method == "HEAD" + flags = end_headers ? END_HEADERS : 0 + flags |= END_STREAM if end_stream || @_method == "HEAD" send(type: :headers, flags: flags, payload: headers) end @@ -229,7 +235,7 @@ def headers(headers, end_headers: true, end_stream: false) def promise(headers, end_headers: true, &block) raise ArgumentError, "must provide callback" unless block - flags = end_headers ? [:end_headers] : [] + flags = end_headers ? END_HEADERS : 0 emit(:promise, self, headers, flags, &block) end @@ -254,12 +260,12 @@ def data(payload, end_stream: true) if payload.bytesize > max_size payload = chunk_data(payload, max_size) do |chunk| - send(type: :data, flags: [], payload: chunk) + send(type: :data, flags: 0, payload: chunk) end end - flags = [] - flags << :end_stream if end_stream + flags = 0 + flags |= END_STREAM if end_stream send(type: :data, flags: flags, payload: payload) end @@ -663,7 +669,8 @@ def process_priority(frame) def end_stream?(frame) case frame[:type] when :data, :headers, :continuation - frame[:flags] && frame[:flags].include?(:end_stream) + # @type var frame: data_frame | headers_frame | continuation_frame + frame[:flags] && frame[:flags].anybits?(END_STREAM) else false end end diff --git a/sig/2.rbs b/sig/2.rbs index 7887f62..81fac9b 100644 --- a/sig/2.rbs +++ b/sig/2.rbs @@ -36,49 +36,96 @@ module HTTP2 RESPONSE_MANDATORY_HEADERS: Array[String] + # Frame flags + END_STREAM: Integer + ACK: Integer + RESERVED: Integer + END_HEADERS: Integer + PADDED: Integer + PRIORITY: Integer + type header_pair = [string, string] # # FRAMES - type frame_control_flags = Array[:end_headers | :end_stream] + type frame_control_flags = Integer + + type common_frame = { stream: Integer, flags: frame_control_flags, ?length: Integer, ?ignore: bool } - type common_frame = { stream: Integer } + type connection_frame = settings_frame | ping_frame | goaway_frame | window_update_frame | altsvc_frame | origin_frame # # HEADERS - type headers_frame = common_frame & { - type: :headers, flags: frame_control_flags, payload: Enumerable[header_pair] | String, + type headers_frame_props = { + type: :headers, payload: Enumerable[header_pair] | String, ?method: Symbol, ?trailer: Array[String], ?content_length: Integer, ?padding: Integer } + type headers_frame = common_frame & headers_frame_props + + type continuation_frame_props = headers_frame_props | { type: :continuation } + + type continuation_frame = common_frame & continuation_frame_props # # DATA - type data_frame = { type: :data, flags: frame_control_flags, ?length: Integer, payload: String, ?padding: Integer } + type data_frame_props = { type: :data, ?length: Integer, payload: String, ?padding: Integer } + + type data_frame = common_frame & data_frame_props # # PUSH_PROMISE - type push_promise_frame = { type: :push_promise, promise_stream: Integer, flags: frame_control_flags, ?method: Symbol, ?trailer: Array[String], ?content_length: Integer, payload: Enumerable[header_pair], ?padding: Integer } + type push_promise_frame_props = { promise_stream: Integer, ?length: Integer, ?method: Symbol, ?trailer: Array[String], ?content_length: Integer, payload: Enumerable[header_pair] | String, ?padding: Integer } + + type push_promise_frame = common_frame & push_promise_frame_props # # SETTINGS - type settings_frame = { type: :settings, payload: Array[[Symbol | Integer, Integer]] } + type settings_frame_props = { type: :settings, payload: (Enumerable[[Symbol | Integer, Integer]] | String) } + + type settings_frame = common_frame & settings_frame_props # # WINDOW_UPDATE - type window_update_frame = { type: :window_update, increment: Integer } + type window_update_frame_props = { type: :window_update, increment: Integer } + + type window_update_frame = common_frame & window_update_frame_props # # PRIORITY - type priority_frame = { dependency: Integer, exclusive: bool, weight: Integer } + type priority_frame_inner_props = { dependency: Integer, exclusive: bool, weight: Integer } + + type priority_frame_props = { type: :priority } & priority_frame_inner_props + + type priority_frame = common_frame & priority_frame_props # # ALTSVC - type altsvc_frame = { type: :altsvc, max_age: Integer, port: Integer, proto: "String", host: String } + type altsvc_frame_props = { type: :altsvc, max_age: Integer, port: Integer, proto: String, host: String } + + type altsvc_frame = common_frame & altsvc_frame_props # # ORIGIN - type origin_frame = { type: :origin, origin: Array[String] } + type origin_frame_props = { type: :origin, payload: Array[String] } + + type origin_frame = common_frame & origin_frame_props # # PING - type ping_frame = { type: :ping, payload: String, length: Integer } + type ping_frame_props = { type: :ping, payload: String } + + type ping_frame = common_frame & ping_frame_props # # GOAWAY - type goaway_frame = { type: :goaway, last_stream: Integer, error: Symbol? } + type goaway_frame_props = { type: :goaway, last_stream: Integer, error: (Symbol | Integer)? } + + type goaway_frame = common_frame & goaway_frame_props - # type frame = common_frame & (headers_frame | data_frame | push_promise_frame | + # # RST_STREAM + type rst_stream_frame_props = { type: :rst_stream, error: (Symbol | Integer)? } + + type rst_stream_frame = common_frame & rst_stream_frame_props + + # type frame = headers_frame | continuation_frame | push_promise_frame | data_frame | # settings_frame | window_update_frame | priority_frame | altsvc_frame | - # origin_frame | ping_frame | goaway_frame) + # origin_frame | ping_frame | goaway_frame | rst_stream_frame + + # type connection_frame_props = rst_stream_frame_props | ping_frame_props | goaway_frame_props + # | window_update_frame_props | settings_frame_props + + # type stream_frame_props = headers_frame_props | continuation_frame_props | push_promise_frame_props + # | data_frame_props | rst_stream_frame_props | priority_frame_props + # | window_update_frame_props | origin_frame_props | altsvc_frame_props type frame_key = :type | :flags | :stream | :padding | :ignore | # headers @@ -109,4 +156,6 @@ module HTTP2 nil type frame = Hash[frame_key, frame_value] + type connection_frame_props = Hash[frame_key, frame_value] + type stream_frame_props = Hash[frame_key, frame_value] end diff --git a/sig/connection.rbs b/sig/connection.rbs index 4589bc7..f87e139 100644 --- a/sig/connection.rbs +++ b/sig/connection.rbs @@ -33,6 +33,7 @@ module HTTP2 @streams: Hash[Integer, Stream] @streams_recently_closed: Hash[Integer, Numeric] + @oldest_stream_recently_closed: Numeric @framer: Framer @@ -50,6 +51,7 @@ module HTTP2 @recv_buffer: String @continuation: Array[frame] + @continuation_size: Integer @h2c_upgrade: Symbol? @closed_since: Float? @@ -68,39 +70,39 @@ module HTTP2 def settings: (settings_enum payload) -> void - def receive: (string data) -> void + def receive: (String data) -> void alias << receive def initialize: (?connection_opts) -> void private - def send: (frame frame) -> void + def send: (connection_frame_props | stream_frame_props frame) -> void def encode: (frame frame) -> void def connection_frame?: (frame) -> bool - def connection_management: (frame) -> void + def connection_management: (connection_frame frame) -> void def ping_management: (frame) -> void def validate_settings: (role_type, settings_enum) -> void - def connection_settings: (frame) -> void + def connection_settings: (settings_frame) -> void - def decode_headers: (frame) -> void + def decode_headers: (headers_frame | push_promise_frame) -> void - def encode_headers: (frame headers_frame) -> void + def encode_headers: (headers_frame | push_promise_frame) -> void def activate_stream: (id: Integer, **untyped) -> Stream def verify_stream_order: (Integer id) -> void - def verify_pseudo_headers: (frame) -> void + def verify_pseudo_headers: (headers_frame | push_promise_frame) -> void - def _verify_pseudo_headers: (frame, Array[String]) -> void + def _verify_pseudo_headers: (headers_frame | push_promise_frame, Array[String]) -> void def connection_error: (?Symbol error, ?msg: String?, ?e: StandardError?) -> void end -end \ No newline at end of file +end diff --git a/sig/flow_buffer.rbs b/sig/flow_buffer.rbs index 1b82e16..6fd8bd6 100644 --- a/sig/flow_buffer.rbs +++ b/sig/flow_buffer.rbs @@ -14,9 +14,9 @@ module HTTP2 def calculate_window_update: (Integer) -> void - def send_data: (?data_frame? frame, ?bool encode) -> void + def send_data: (?data_frame_props? frame, ?bool encode) -> void - def send_frame: (data_frame frame, bool encode) -> void + def send_frame: (data_frame_props frame, bool encode) -> void def process_window_update: (frame: window_update_frame, ?encode: bool) -> void end diff --git a/sig/frame_buffer.rbs b/sig/frame_buffer.rbs index 57cb53f..ad15969 100644 --- a/sig/frame_buffer.rbs +++ b/sig/frame_buffer.rbs @@ -1,15 +1,17 @@ module HTTP2 class FrameBuffer + include BufferUtils + attr_reader bytesize: Integer @buffer: Array[data_frame] def clear: () -> void - def <<: (data_frame frame) -> void + def <<: (data_frame_props frame) -> void def empty?: () -> bool - def retrieve: (Integer) -> data_frame? + def retrieve: (Integer) -> data_frame_props? end end \ No newline at end of file diff --git a/sig/framer.rbs b/sig/framer.rbs index 5205990..80e27e9 100644 --- a/sig/framer.rbs +++ b/sig/framer.rbs @@ -20,7 +20,9 @@ module HTTP2 DEFINED_SETTINGS: Hash[Symbol, Integer] - DEFINED_ERRORS: Hash[Symbol, Integer] + DEFINED_SETTINGS_BY_ID: Hash[Integer, Symbol] + + DEFINED_ERRORS: Array[Symbol] RBIT: Integer RBYTE: Integer @@ -29,6 +31,8 @@ module HTTP2 UINT16: String UINT8: String HEADERPACK: String + PRIORITYPACK: String + ALTSVCPACK: String FRAME_LENGTH_HISHIFT: Integer FRAME_LENGTH_LOMASK: Integer @@ -41,19 +45,19 @@ module HTTP2 def common_header: (frame, buffer: String) -> String - def read_common_header: (String buf) -> frame + def read_common_header: (String buf) -> (common_frame | { length: Integer }) def read_common_frame: (String) -> frame def generate: (frame) -> String - def parse: (String) -> frame? + def parse: (String) -> (frame | { length: Integer })? private def initialize: (?Integer local_max_frame_size, ?Integer remote_max_frame_size) -> untyped - def pack_error: (Integer | Symbol error, buffer: String) -> String + def pack_error: (Symbol | Integer error, buffer: String) -> String def unpack_error: (Integer) -> (Symbol | Integer) end diff --git a/sig/header/compressor.rbs b/sig/header/compressor.rbs index 2999c91..1b6b8ed 100644 --- a/sig/header/compressor.rbs +++ b/sig/header/compressor.rbs @@ -2,6 +2,7 @@ module HTTP2 module Header class Compressor include PackingExtensions + include BufferUtils @cc: EncodingContext diff --git a/sig/header/encoding_context.rbs b/sig/header/encoding_context.rbs index 1085d4b..481acc5 100644 --- a/sig/header/encoding_context.rbs +++ b/sig/header/encoding_context.rbs @@ -22,6 +22,10 @@ module HTTP2 attr_reader current_table_size: Integer + @table_by_field: Hash[string, Array[[string, Integer]]] + + @unshifts: Integer + @limit: Integer @_table_updated: bool diff --git a/sig/header/huffman.rbs b/sig/header/huffman.rbs index 3f5e345..0757d25 100644 --- a/sig/header/huffman.rbs +++ b/sig/header/huffman.rbs @@ -9,6 +9,8 @@ module HTTP2 EOS: Integer + EOS_PADDING: Array[String] + CODES: Array[[Integer, Integer]] ENCODE_TABLE: Array[String] diff --git a/sig/server.rbs b/sig/server.rbs index 28d2290..dd46e5c 100644 --- a/sig/server.rbs +++ b/sig/server.rbs @@ -7,6 +7,6 @@ module HTTP2 private - def promise: (Stream parent, Enumerable[header_pair] headers, Array[Symbol] flags) { (Stream) -> void } -> void + def promise: (Stream parent, Enumerable[header_pair] headers, frame_control_flags flags) { (Stream) -> void } -> void end end \ No newline at end of file diff --git a/sig/stream.rbs b/sig/stream.rbs index 3f1864c..447b4b3 100644 --- a/sig/stream.rbs +++ b/sig/stream.rbs @@ -37,7 +37,7 @@ module HTTP2 def calculate_content_length: (Integer?) -> void - def send: (frame frame) -> void + def send: (stream_frame_props frame) -> void def headers: (Enumerable[header_pair] headers, ?end_headers: bool, ?end_stream: bool) -> void @@ -70,7 +70,7 @@ module HTTP2 ?state: Symbol ) -> untyped - def transition: (frame, bool sending) -> void + def transition: (stream_frame_props frame, bool sending) -> void def event: (Symbol newstate) -> void @@ -80,15 +80,14 @@ module HTTP2 def complete_transition: (frame) -> void - def process_priority: (priority_frame frame) -> void + def process_priority: (priority_frame_inner_props frame) -> void - def end_stream?: (frame frame) -> boolish + def end_stream?: (stream_frame_props frame) -> boolish - def stream_error: (Symbol error, ?msg: String?) -> void - | () -> void + def stream_error: (?Symbol error, ?msg: String?) -> void alias error stream_error - def manage_state: (frame) { () -> void } -> void + def manage_state: (stream_frame_props frame) { () -> void } -> void end -end \ No newline at end of file +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 2198c72..1d40f11 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -72,7 +72,7 @@ expect(f.parse(frames[1])[:type]).to eq :settings ack_frame = f.parse(frames[2]) expect(ack_frame[:type]).to eq :settings - expect(ack_frame[:flags]).to include(:ack) + expect(ack_frame[:flags]).to be_anybits(ACK) end end @@ -83,7 +83,7 @@ client.settings(settings_header_table_size: 256) expect(client.local_settings[:settings_header_table_size]).to eq 4096 - ack = { type: :settings, stream: 0, payload: [], flags: [:ack] } + ack = { type: :settings, stream: 0, payload: [], flags: ACK } client << f.generate(ack) expect(client.local_settings[:settings_header_table_size]).to eq 256 @@ -114,10 +114,13 @@ end.to raise_error(NoMethodError) end - it "should raise error on PUSH_PROMISE against stream 0" do - expect do - client << set_stream_id(f.generate(push_promise_frame), 0) - end.to raise_error(ProtocolError) + unless defined?(RBS) + # RBS detects type unsafety at runtime + it "should raise error on PUSH_PROMISE against stream 0" do + expect do + client << set_stream_id(f.generate(push_promise_frame), 0) + end.to raise_error(ProtocolError) + end end it "should raise error on PUSH_PROMISE against bogus stream" do @@ -252,7 +255,7 @@ end end context "when receiving a reserved flag" do - let(:orig_frame) { origin_frame.merge(flags: [:reserved]) } + let(:orig_frame) { origin_frame.merge(flags: RESERVED) } it "should be ignored" do client << f.generate(settings_frame) origins = [] @@ -265,13 +268,15 @@ end end context "received in a stream" do - it "should be ignored" do - s = client.new_stream - s.send headers_frame + unless defined?(RBS) + it "should be ignored" do + s = client.new_stream + s.send headers_frame - expect do - client << set_stream_id(f.generate(orig_frame), s.id) - end.not_to raise_error + expect do + client << set_stream_id(f.generate(orig_frame), s.id) + end.not_to raise_error + end end end end @@ -350,7 +355,7 @@ payload = cc.encode(req_headers) h1[:payload] = payload.slice!(0, payload.size / 2) # first half h1[:stream] = 2 - h1[:flags] = [] + h1[:flags] = 0 h2[:payload] = payload # the remaining h2[:stream] = 2 diff --git a/spec/framer_spec.rb b/spec/framer_spec.rb index 265d3de..9c7cf87 100644 --- a/spec/framer_spec.rb +++ b/spec/framer_spec.rb @@ -6,14 +6,15 @@ let(:f) { Framer.new } context "common header" do - let(:frame) do + let(:frame_headers) do { length: 4, type: :headers, - flags: %i[end_stream end_headers], + flags: END_HEADERS | END_STREAM, stream: 15 } end + let(:frame) { frame_headers.merge(payload: "data") } let(:bytes) { [0, 0x04, 0x01, 0x5, 0x0000000F].pack("CnCCN") } @@ -22,7 +23,7 @@ end it "should parse common 9 byte header" do - expect(f.read_common_header(bytes)).to eq frame + expect(f.read_common_header(bytes)).to eq frame_headers end it "should generate a large frame" do @@ -31,19 +32,21 @@ frame = { length: (2**18) + (2**16) + 17, type: :headers, - flags: %i[end_stream end_headers], + flags: END_HEADERS | END_STREAM, stream: 15 } bytes = [5, 17, 0x01, 0x5, 0x0000000F].pack("CnCCN") - expect(f.common_header(frame, buffer: "".b)).to eq bytes + expect(f.common_header(frame.merge(payload: "data"), buffer: "".b)).to eq bytes expect(f.read_common_header(bytes)).to eq frame end - it "should raise exception on invalid frame type when sending" do - expect do - frame[:type] = :bogus - f.common_header(frame, buffer: "".b) - end.to raise_error(CompressionError, /invalid.*type/i) + unless defined?(RBS) + it "should raise exception on invalid frame type when sending" do + expect do + frame[:type] = :bogus + f.common_header(frame, buffer: "".b) + end.to raise_error(CompressionError, /invalid.*type/i) + end end it "should raise exception on invalid stream ID" do @@ -55,7 +58,7 @@ it "should raise exception on invalid frame flag" do expect do - frame[:flags] = [:bogus] + frame[:flags] = 1024 f.common_header(frame, buffer: "".b) end.to raise_error(CompressionError, /frame flag/) end @@ -73,7 +76,7 @@ frame = { length: 4, type: :data, - flags: [:end_stream], + flags: END_STREAM, stream: 1, payload: "text" } @@ -90,7 +93,7 @@ frame = { length: 12, type: :headers, - flags: %i[end_stream end_headers], + flags: END_HEADERS | END_STREAM, stream: 1, payload: "header-block" } @@ -104,7 +107,7 @@ frame = { length: 16, type: :headers, - flags: [:end_headers], + flags: END_HEADERS, stream: 1, dependency: 15, weight: 12, @@ -154,7 +157,7 @@ let(:frame) do { type: :settings, - flags: [], + flags: 0, stream: 0, payload: [ [:settings_max_concurrent_streams, 10], @@ -242,7 +245,7 @@ frame = { length: 11, type: :push_promise, - flags: [:end_headers], + flags: END_HEADERS, stream: 1, promise_stream: 2, payload: "headers" @@ -260,7 +263,7 @@ length: 8, stream: 1, type: :ping, - flags: [:ack], + flags: ACK, payload: "12345678" } end @@ -312,7 +315,8 @@ frame = { length: 4, type: :window_update, - increment: 10 + increment: 10, + stream: 0 } bytes = f.generate(frame) @@ -327,7 +331,8 @@ frame = { length: 4, type: :window_update, - increment: 0x7fffffff + 1 + increment: 0x7fffffff + 1, + stream: 0 } expect { f.generate(frame) }.to raise_error(CompressionError) @@ -340,7 +345,7 @@ length: 12, type: :continuation, stream: 1, - flags: [:end_headers], + flags: END_HEADERS, payload: "header-block" } @@ -443,14 +448,14 @@ it "should determine frame length" do frames = [ - [{ type: :data, stream: 1, flags: [:end_stream], payload: "abc" }, 3], + [{ type: :data, stream: 1, flags: END_STREAM, payload: "abc" }, 3], [{ type: :headers, stream: 1, payload: "abc" }, 3], [{ type: :priority, stream: 3, dependency: 30, exclusive: false, weight: 1 }, 5], [{ type: :rst_stream, stream: 3, error: 100 }, 4], - [{ type: :settings, payload: [[:settings_max_concurrent_streams, 10]] }, 6], - [{ type: :push_promise, promise_stream: 5, payload: "abc" }, 7], - [{ type: :ping, payload: "blob" * 2 }, 8], - [{ type: :goaway, last_stream: 5, error: 20, payload: "blob" }, 12], + [{ type: :settings, stream: 0, payload: [[:settings_max_concurrent_streams, 10]] }, 6], + [{ type: :push_promise, stream: 1, promise_stream: 5, payload: "abc" }, 7], + [{ type: :ping, stream: 0, payload: "blob" * 2 }, 8], + [{ type: :goaway, stream: 0, last_stream: 5, error: 20, payload: "blob" }, 12], [{ type: :window_update, stream: 1, increment: 1024 }, 4], [{ type: :continuation, stream: 1, payload: "abc" }, 3] ] @@ -465,7 +470,7 @@ it "should parse single frame at a time" do frames = [ { type: :headers, stream: 1, payload: "headers" }, - { type: :data, stream: 1, flags: [:end_stream], payload: "abc" } + { type: :data, stream: 1, flags: END_STREAM, payload: "abc" } ] buf = f.generate(frames[0]) << f.generate(frames[1]) diff --git a/spec/helper.rb b/spec/helper.rb index 802229a..60e53d6 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -34,7 +34,7 @@ module FrameHelpers def data_frame { type: :data, - flags: [:end_stream], + flags: END_STREAM, stream: 1, payload: "text" } @@ -43,7 +43,7 @@ def data_frame def headers_frame { type: :headers, - flags: [:end_headers].freeze, + flags: END_HEADERS, stream: 1, payload: Compressor.new.encode(REQUEST_HEADERS) } @@ -53,6 +53,7 @@ def priority_frame { type: :priority, stream: 1, + flags: 0, exclusive: false, dependency: 0, weight: 20 @@ -63,6 +64,7 @@ def rst_stream_frame { type: :rst_stream, stream: 1, + flags: 0, error: :stream_closed } end @@ -71,6 +73,7 @@ def settings_frame { type: :settings, stream: 0, + flags: 0, payload: [ [:settings_max_concurrent_streams, 10], [:settings_initial_window_size, 0x7fffffff] @@ -81,7 +84,7 @@ def settings_frame def push_promise_frame { type: :push_promise, - flags: [:end_headers], + flags: END_HEADERS, stream: 1, promise_stream: 2, payload: Compressor.new.encode(REQUEST_HEADERS) @@ -100,7 +103,7 @@ def pong_frame { stream: 0, type: :ping, - flags: [:ack], + flags: ACK, payload: "12345678" } end @@ -108,6 +111,7 @@ def pong_frame def goaway_frame { type: :goaway, + stream: 0, last_stream: 2, error: :no_error, payload: "debug" @@ -117,7 +121,9 @@ def goaway_frame def window_update_frame { type: :window_update, - increment: 10 + increment: 10, + flags: 0, + stream: 0 } end @@ -125,13 +131,14 @@ def continuation_frame { type: :continuation, stream: 1, - flags: [:end_headers], + flags: END_HEADERS, payload: "-second-block" } end def altsvc_frame { + stream: 0, type: :altsvc, max_age: 1_402_290_402, # 4 port: 8080, # 2 reserved 1 @@ -144,6 +151,7 @@ def altsvc_frame def origin_frame { type: :origin, + stream: 0, payload: %w[https://www.example.com https://www.example.org] } end @@ -160,6 +168,12 @@ def frame_types methods.select { |meth| meth.to_s.end_with?("_frame") } .map { |meth| __send__(meth) } end + + def stream_frame_types + %i[data headers priority rst_stream push_promise window_update continuation].map do |type| + __send__(:"#{type}_frame") + end + end end def set_stream_id(bytes, id) diff --git a/spec/shared_examples/connection.rb b/spec/shared_examples/connection.rb index a2e9f20..b2e032d 100644 --- a/spec/shared_examples/connection.rb +++ b/spec/shared_examples/connection.rb @@ -29,7 +29,7 @@ frame = frames.last expect(frame[:type]).to eq :settings - expect(frame[:flags]).to eq [:ack] + expect(frame[:flags]).to eq ACK expect(frame[:payload]).to eq [] end end @@ -119,7 +119,7 @@ conn << f.generate(settings_frame) expect(conn).to receive(:send) do |frame| expect(frame[:type]).to eq :ping - expect(frame[:flags]).to eq [:ack] + expect(frame[:flags]).to eq ACK expect(frame[:payload]).to eq "12345678" end @@ -284,10 +284,10 @@ it "should chain continuation frames" do headers = headers_frame - headers[:flags] = [] + headers[:flags] = 0 continuation = continuation_frame continuation[:stream] = headers[:stream] - continuation[:flags] = [] + continuation[:flags] = 0 conn << f.generate(headers) conn << f.generate(continuation) @@ -298,14 +298,14 @@ max_frame_size = connected_conn.local_settings[:settings_max_frame_size] headers = headers_frame - headers[:flags] = [] + headers[:flags] = 0 conn << f.generate(headers) expect do max_frame_size.times do continuation = continuation_frame continuation[:stream] = headers[:stream] - continuation[:flags] = [] + continuation[:flags] = 0 conn << f.generate(continuation) end end.to raise_error(ProtocolError) @@ -313,7 +313,7 @@ it "should require that split header blocks are a contiguous sequence" do headers = headers_frame - headers[:flags] = [] + headers[:flags] = 0 conn << f.generate(headers) (frame_types - [continuation_frame]).each do |frame| @@ -323,7 +323,7 @@ it "should require that split promise blocks are a contiguous sequence" do headers = push_promise_frame - headers[:flags] = [] + headers[:flags] = 0 conn << f.generate(headers) (frame_types - [continuation_frame]).each do |frame| @@ -381,9 +381,9 @@ expect(headers[0][:type]).to eq :headers expect(headers[1][:type]).to eq :continuation expect(headers[2][:type]).to eq :continuation - expect(headers[0][:flags]).to eq [:end_stream] - expect(headers[1][:flags]).to eq [] - expect(headers[2][:flags]).to eq [:end_headers] + expect(headers[0][:flags]).to eq END_STREAM + expect(headers[1][:flags]).to eq 0 + expect(headers[2][:flags]).to eq END_HEADERS end it "should not generate CONTINUATION if HEADERS fits exactly in a frame" do @@ -404,8 +404,8 @@ expect(headers[0][:length]).to eq conn.remote_settings[:settings_max_frame_size] expect(headers.size).to eq 1 expect(headers[0][:type]).to eq :headers - expect(headers[0][:flags]).to include(:end_headers) - expect(headers[0][:flags]).to include(:end_stream) + expect(headers[0][:flags]).to be_anybits(END_HEADERS) + expect(headers[0][:flags]).to be_anybits(END_STREAM) end it "should not generate CONTINUATION if HEADERS fits exactly in a frame" do @@ -426,8 +426,8 @@ expect(headers[0][:length]).to eq conn.remote_settings[:settings_max_frame_size] expect(headers.size).to eq 1 expect(headers[0][:type]).to eq :headers - expect(headers[0][:flags]).to include(:end_headers) - expect(headers[0][:flags]).to include(:end_stream) + expect(headers[0][:flags]).to be_anybits(END_HEADERS) + expect(headers[0][:flags]).to be_anybits(END_STREAM) end it "should generate CONTINUATION if HEADERS exceed the max payload by one byte" do @@ -449,8 +449,8 @@ expect(headers.size).to eq 2 expect(headers[0][:type]).to eq :headers expect(headers[1][:type]).to eq :continuation - expect(headers[0][:flags]).to eq [:end_stream] - expect(headers[1][:flags]).to eq [:end_headers] + expect(headers[0][:flags]).to eq END_STREAM + expect(headers[1][:flags]).to eq END_HEADERS end end context "API" do diff --git a/spec/stream_spec.rb b/spec/stream_spec.rb index 692d36f..cfccf8c 100644 --- a/spec/stream_spec.rb +++ b/spec/stream_spec.rb @@ -58,13 +58,13 @@ end it "should raise error if sending invalid frames" do - frame_types.reject { |frame| %i[headers rst_stream priority].include?(frame[:type]) }.each do |type| + stream_frame_types.reject { |frame| %i[headers rst_stream priority].include?(frame[:type]) }.each do |type| expect { stream.dup.send type }.to raise_error InternalError end end it "should raise error on receipt of invalid frames" do - what_types = frame_types.reject { |frame| %i[priority window_update rst_stream].include?(frame[:type]) } + what_types = stream_frame_types.reject { |frame| %i[priority window_update rst_stream].include?(frame[:type]) } what_types.each do |type| expect { stream.dup.receive type }.to raise_error InternalError end @@ -109,13 +109,13 @@ end it "should raise error if sending invalid frames" do - frame_types.reject { |frame| %i[priority rst_stream window_update].include?(frame[:type]) }.each do |type| + stream_frame_types.reject { |frame| %i[priority rst_stream window_update].include?(frame[:type]) }.each do |type| expect { stream.dup.send type }.to raise_error InternalError end end it "should raise error on receipt of invalid frames" do - frame_types.reject { |frame| %i[headers rst_stream priority].include?(frame[:type]) }.each do |type| + stream_frame_types.reject { |frame| %i[headers rst_stream priority].include?(frame[:type]) }.each do |type| expect { stream.dup.receive type }.to raise_error InternalError end end @@ -150,13 +150,13 @@ before { stream.receive headers_frame } it "should allow any valid frames types to be sent" do - (frame_types - [ping_frame, goaway_frame, settings_frame]).each do |type| + stream_frame_types.each do |type| expect { stream.dup.send type }.to_not raise_error end end it "should allow frames of any type to be received" do - frame_types.each do |type| + stream_frame_types.each do |type| expect { stream.dup.receive type }.to_not raise_error end end @@ -165,7 +165,7 @@ [data_frame, headers_frame].each do |frame| s = stream.dup - s.send frame.merge(flags: [:end_stream]) + s.send frame.merge(flags: END_STREAM) expect(s.state).to eq :half_closed_local end end @@ -174,7 +174,7 @@ [data_frame, headers_frame].each do |frame| s = stream.dup f = frame.dup - f[:flags] = [:end_stream] + f[:flags] = END_STREAM s.receive f expect(s.state).to eq :half_closed_remote @@ -184,7 +184,7 @@ it "should transition to half closed if remote opened with END_STREAM" do s = client.new_stream hclose = headers_frame - hclose[:flags] = [:end_stream] + hclose[:flags] = END_STREAM s.receive hclose expect(s.state).to eq :half_closed_remote @@ -193,7 +193,7 @@ it "should transition to half closed if local opened with END_STREAM" do s = client.new_stream hclose = headers_frame - hclose[:flags] = [:end_stream] + hclose[:flags] = END_STREAM s.send hclose expect(s.state).to eq :half_closed_local @@ -234,7 +234,7 @@ stream.on(:close) { order << :close } req = headers_frame - req[:flags] = [:end_headers] + req[:flags] = END_HEADERS stream.send req stream.send data_frame @@ -267,7 +267,7 @@ stream.on(:close) { order << :close } req = headers_frame - req[:flags] = %i[end_stream end_headers] + req[:flags] = END_HEADERS | END_STREAM stream.send req stream.receive headers_frame @@ -294,10 +294,10 @@ end context "half closed (local)" do - before { stream.send headers_frame.merge(flags: %i[end_headers end_stream]) } + before { stream.send headers_frame.merge(flags: END_HEADERS | END_STREAM) } it "should raise error on attempt to send invalid frames" do - frame_types.reject { |frame| %i[priority rst_stream window_update].include?(frame[:type]) }.each do |frame| + stream_frame_types.reject { |frame| %i[priority rst_stream window_update].include?(frame[:type]) }.each do |frame| expect { stream.dup.send frame }.to raise_error InternalError end end @@ -306,7 +306,7 @@ [data_frame, headers_frame, continuation_frame].each do |frame| s = stream.dup f = frame.dup - f[:flags] = [:end_stream] + f[:flags] = END_STREAM s.receive f expect(s.state).to eq :closed @@ -355,7 +355,7 @@ stream.on(:half_close) { order << :half_close } req = headers_frame - req[:flags] = %i[end_stream end_headers] + req[:flags] = END_HEADERS | END_STREAM stream.send req expect(order).to eq %i[active half_close] @@ -372,10 +372,10 @@ end context "half closed (remote)" do - before { stream.receive headers_frame.merge(flags: %i[end_headers end_stream]) } + before { stream.receive headers_frame.merge(flags: END_HEADERS | END_STREAM) } it "should raise STREAM_CLOSED error on reciept of frames" do - (frame_types - [priority_frame, rst_stream_frame, window_update_frame]).each do |frame| + (stream_frame_types - [priority_frame, rst_stream_frame, window_update_frame]).each do |frame| expect do stream.dup.receive frame end.to raise_error(StreamClosed) @@ -387,16 +387,16 @@ s = stream.dup s.on(:close) { expect(s.state).to eq :closed } - s.send frame.merge(flags: [:end_stream]) + s.send frame.merge(flags: END_STREAM) expect(s.state).to eq :closed end end it "should not transition to closed if END_STREAM flag is sent when overflowing window" do stream.on(:close) { raise "should not have closed" } - data = { type: :data, flags: [], stream: stream.id } + data = { type: :data, flags: 0, stream: stream.id } 4.times do - data = data.merge(flags: [:end_stream]) if stream.remote_window < 16_384 + data = data.merge(flags: END_STREAM) if stream.remote_window < 16_384 stream.send data.merge(payload: "x" * 16_384) end end @@ -408,13 +408,13 @@ expect(stream.buffered_amount).to eq 0 o.tap end - data = { type: :data, flags: [], stream: stream.id } + data = { type: :data, flags: 0, stream: stream.id } 4.times do - data = data.merge(flags: [:end_stream]) if stream.remote_window < 16_384 + data = data.merge(flags: END_STREAM) if stream.remote_window < 16_384 stream.send data.merge(payload: "x" * 16_384) end client << f.generate(settings_frame) - client << Framer.new.generate(type: :window_update, stream: stream.id, increment: 16_384) + client << Framer.new.generate(type: :window_update, stream: stream.id, increment: 16_384, flags: 0) end it "should transition to closed if RST_STREAM is sent" do @@ -453,7 +453,7 @@ stream.on(:half_close) { order << :half_close } req = headers_frame - req[:flags] = %i[end_stream end_headers] + req[:flags] = END_HEADERS | END_STREAM stream.receive req expect(order).to eq %i[active half_close] @@ -472,12 +472,12 @@ context "closed" do context "remote closed stream" do before do - stream.send headers_frame.merge(flags: %i[end_headers end_stream]) # half closed local - stream.receive headers_frame.merge(flags: %i[end_headers end_stream]) # closed by remote + stream.send headers_frame.merge(flags: END_HEADERS | END_STREAM) # half closed local + stream.receive headers_frame.merge(flags: END_HEADERS | END_STREAM) # closed by remote end it "should raise STREAM_CLOSED on attempt to send frames" do - (frame_types - [priority_frame, rst_stream_frame]).each do |frame| + (stream_frame_types - [priority_frame, rst_stream_frame]).each do |frame| expect do stream.dup.send frame end.to raise_error(StreamClosed) @@ -485,7 +485,7 @@ end it "should raise STREAM_CLOSED on receipt of frame" do - (frame_types - [priority_frame, rst_stream_frame, window_update_frame]).each do |frame| + (stream_frame_types - [priority_frame, rst_stream_frame, window_update_frame]).each do |frame| expect do stream.dup.receive frame end.to raise_error(StreamClosed) @@ -523,15 +523,17 @@ stream.send rst_stream_frame # closed by local end - it "should ignore received frames" do - control_frames.each do |frame| - expect do - cb = [] - stream.on(:data) { cb << :data } - stream.on(:headers) { cb << :headers } - stream.dup.receive frame - expect(cb).to be_empty - end.to_not raise_error + unless defined?(RBS) + it "should ignore received frames" do + control_frames.each do |frame| + expect do + cb = [] + stream.on(:data) { cb << :data } + stream.on(:headers) { cb << :headers } + stream.dup.receive frame + expect(cb).to be_empty + end.to_not raise_error + end end end @@ -571,7 +573,7 @@ stream.send headers_frame # go to open expect(stream.remote_window).to eq DEFAULT_FLOW_WINDOW - (frame_types - [data_frame, ping_frame, goaway_frame, settings_frame]).each do |frame| + (frame_types - [data_frame, ping_frame, pong_frame, goaway_frame, settings_frame]).each do |frame| s = stream.dup s.send frame expect(s.remote_window).to eq DEFAULT_FLOW_WINDOW @@ -602,7 +604,7 @@ s1 = client.new_stream s1.send headers_frame - s1.send data.merge(payload: "x" * 900, flags: []) + s1.send data.merge(payload: "x" * 900, flags: 0) expect(s1.remote_window).to eq 100 s1.send data.merge(payload: "x" * 200) @@ -626,7 +628,7 @@ end it "should update window when data received is over half of the maximum local window size" do - data1 = data_frame.merge(payload: "a" * 16_384, flags: []) + data1 = data_frame.merge(payload: "a" * 16_384, flags: 0) data2 = data_frame.merge(payload: "a" * 16_384) datalen = 16_384 * 2 expect(stream).to receive(:send) do |frame| @@ -649,13 +651,13 @@ s1 = client.new_stream s1.send headers_frame - s1.send data.merge(payload: "x" * 1000, flags: []) + s1.send data.merge(payload: "x" * 1000, flags: 0) # check if window is exhausted expect(s1.remote_window).to be(0) expect(s1.send_buffer).to be_empty - s1.send data.merge(payload: "", flags: [:end_stream]) + s1.send data.merge(payload: "", flags: END_STREAM) expect(s1.remote_window).to be(0) expect(s1.send_buffer).to be_empty @@ -691,7 +693,7 @@ expect(stream).to receive(:send) do |frame| expect(frame[:type]).to eq :headers expect(frame[:payload]).to eq payload - expect(frame[:flags]).to eq [:end_headers] + expect(frame[:flags]).to eq END_HEADERS end stream.headers(payload, end_stream: false, end_headers: true) @@ -701,12 +703,12 @@ expect(stream).to receive(:send) do |frame| expect(frame[:type]).to eq :data expect(frame[:payload]).to eq "text" - expect(frame[:flags]).to be_empty + expect(frame[:flags]).to be(0) end stream.data("text", end_stream: false) expect(stream).to receive(:send) do |frame| - expect(frame[:flags]).to eq [:end_stream] + expect(frame[:flags]).to eq END_STREAM end stream.data("text") end @@ -715,9 +717,9 @@ data = "x" * 16_384 * 2 want = [ - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [:end_stream], length: 1 } + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: END_STREAM, length: 1 } ] want.each do |w| expect(stream).to receive(:send) do |frame| @@ -734,11 +736,11 @@ data = "🐼" * 16_384 want = [ - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [], length: 16_384 }, - { type: :data, flags: [:end_stream], length: 1 } + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: 0, length: 16_384 }, + { type: :data, flags: END_STREAM, length: 1 } ] want.each do |w| expect(stream).to receive(:send) do |frame|