Skip to content

Commit d05f962

Browse files
leheckafacebook-github-bot
authored andcommitted
Reworked and reapplying D8955216
Summary: The original diff D8955216 caused memory regression in RTGW. In this diff I redesigned the control/child stream handler to not require changes in the Http2StreamHandler (shared class with the Http2ServerStreamHandler, which is currently used in prod). I also removed the need to use std::deque which was increasing the memory usage. The frame chunk accumulation and buffering is done purely thru IOBufQueue which doesn't require any extra allocations. The control stream handler implementation is disabled by default and the new code is not touching the http server handlers used in RTGW. Reviewed By: sarangbh Differential Revision: D9544659 fbshipit-source-id: 2f225a44d61b3a9753567f1f08734d06018cca41
1 parent 562e476 commit d05f962

File tree

8 files changed

+365
-8
lines changed

8 files changed

+365
-8
lines changed

rsocket/framing/FrameSerializer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ folly::IOBufQueue FrameSerializer::createBufferQueue(size_t bufferSize) const {
5151

5252
folly::Optional<StreamId> FrameSerializer::peekStreamId(
5353
const ProtocolVersion& protocolVersion,
54-
const folly::IOBuf& frame) {
54+
const folly::IOBuf& frame,
55+
bool skipFrameLengthBytes) {
5556
if (protocolVersion == FrameSerializerV1_0::Version) {
56-
return FrameSerializerV1_0().peekStreamId(frame);
57+
return FrameSerializerV1_0().peekStreamId(frame, skipFrameLengthBytes);
5758
}
5859

5960
auto* msg = "unknown protocol version";

rsocket/framing/FrameSerializer.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,13 @@ class FrameSerializer {
3737

3838
static folly::Optional<StreamId> peekStreamId(
3939
const ProtocolVersion& protocolVersion,
40-
const folly::IOBuf& frame);
40+
const folly::IOBuf& frame,
41+
bool skipFrameLengthBytes);
4142

4243
virtual FrameType peekFrameType(const folly::IOBuf& in) const = 0;
4344
virtual folly::Optional<StreamId> peekStreamId(
44-
const folly::IOBuf& in) const = 0;
45+
const folly::IOBuf& in,
46+
bool skipFrameLengthBytes) const = 0;
4547

4648
virtual std::unique_ptr<folly::IOBuf> serializeOut(
4749
Frame_REQUEST_STREAM&&) const = 0;

rsocket/framing/FrameSerializer_v1_0.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,13 @@ FrameType FrameSerializerV1_0::peekFrameType(const folly::IOBuf& in) const {
189189
}
190190

191191
folly::Optional<StreamId> FrameSerializerV1_0::peekStreamId(
192-
const folly::IOBuf& in) const {
192+
const folly::IOBuf& in,
193+
bool skipFrameLengthBytes) const {
193194
folly::io::Cursor cur(&in);
194195
try {
196+
if (skipFrameLengthBytes) {
197+
cur.skip(3); // skip 3 bytes for frame length
198+
}
195199
auto streamId = cur.readBE<int32_t>();
196200
if (streamId < 0) {
197201
return folly::none;

rsocket/framing/FrameSerializer_v1_0.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class FrameSerializerV1_0 : public FrameSerializer {
3131
size_t skipBytes = 0);
3232

3333
FrameType peekFrameType(const folly::IOBuf& in) const override;
34-
folly::Optional<StreamId> peekStreamId(const folly::IOBuf& in) const override;
34+
folly::Optional<StreamId> peekStreamId(
35+
const folly::IOBuf& in,
36+
bool skipFrameLengthBytes) const override;
3537

3638
std::unique_ptr<folly::IOBuf> serializeOut(
3739
Frame_REQUEST_STREAM&&) const override;

rsocket/framing/Framer.cpp

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// Copyright (c) Facebook, Inc. and its affiliates.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "rsocket/framing/Framer.h"
16+
#include <folly/io/Cursor.h>
17+
#include "rsocket/framing/FrameSerializer_v1_0.h"
18+
19+
namespace rsocket {
20+
21+
namespace {
22+
23+
constexpr size_t kFrameLengthFieldLengthV1_0 = 3;
24+
constexpr auto kMaxFrameLength = 0xFFFFFF; // 24bit max value
25+
26+
template <typename TWriter>
27+
void writeFrameLength(
28+
TWriter& cur,
29+
size_t frameLength,
30+
size_t frameSizeFieldLength) {
31+
DCHECK(frameSizeFieldLength > 0);
32+
33+
// starting from the highest byte
34+
// frameSizeFieldLength == 3 => shift = [16,8,0]
35+
// frameSizeFieldLength == 4 => shift = [24,16,8,0]
36+
auto shift = (frameSizeFieldLength - 1) * 8;
37+
38+
while (frameSizeFieldLength--) {
39+
const auto byte = (frameLength >> shift) & 0xFF;
40+
cur.write(static_cast<uint8_t>(byte));
41+
shift -= 8;
42+
}
43+
}
44+
} // namespace
45+
46+
/// Get the byte size of the frame length field in an RSocket frame.
47+
size_t Framer::frameSizeFieldLength() const {
48+
DCHECK_NE(protocolVersion_, ProtocolVersion::Unknown);
49+
if (protocolVersion_ < FrameSerializerV1_0::Version) {
50+
return sizeof(int32_t);
51+
} else {
52+
return 3; // bytes
53+
}
54+
}
55+
56+
/// Get the minimum size for a valid RSocket frame (including its frame length
57+
/// field).
58+
size_t Framer::minimalFrameLength() const {
59+
DCHECK_NE(protocolVersion_, ProtocolVersion::Unknown);
60+
return FrameSerializerV1_0::kFrameHeaderSize;
61+
}
62+
63+
/// Compute the length of the entire frame (including its frame length field),
64+
/// if given only its frame length field.
65+
size_t Framer::frameSizeWithLengthField(size_t frameSize) const {
66+
return protocolVersion_ < FrameSerializerV1_0::Version
67+
? frameSize
68+
: frameSize + frameSizeFieldLength();
69+
}
70+
71+
/// Compute the length of the frame (excluding its frame length field), if given
72+
/// only its frame length field.
73+
size_t Framer::frameSizeWithoutLengthField(size_t frameSize) const {
74+
DCHECK_NE(protocolVersion_, ProtocolVersion::Unknown);
75+
return protocolVersion_ < FrameSerializerV1_0::Version
76+
? frameSize - frameSizeFieldLength()
77+
: frameSize;
78+
}
79+
80+
size_t Framer::readFrameLength() const {
81+
const auto fieldLength = frameSizeFieldLength();
82+
DCHECK_GT(fieldLength, 0);
83+
84+
folly::io::Cursor cur{payloadQueue_.front()};
85+
size_t frameLength = 0;
86+
87+
// Reading of arbitrary-sized big-endian integer.
88+
for (size_t i = 0; i < fieldLength; ++i) {
89+
frameLength <<= 8;
90+
frameLength |= cur.read<uint8_t>();
91+
}
92+
93+
return frameLength;
94+
}
95+
96+
void Framer::addFrameChunk(std::unique_ptr<folly::IOBuf> payload) {
97+
payloadQueue_.append(std::move(payload));
98+
parseFrames();
99+
}
100+
101+
void Framer::parseFrames() {
102+
if (payloadQueue_.empty() || !ensureOrAutodetectProtocolVersion()) {
103+
// At this point we dont have enough bytes on the wire or we errored out.
104+
return;
105+
}
106+
107+
while (!payloadQueue_.empty()) {
108+
auto const frameSizeFieldLen = frameSizeFieldLength();
109+
if (payloadQueue_.chainLength() < frameSizeFieldLen) {
110+
// We don't even have the next frame size value.
111+
break;
112+
}
113+
114+
auto const nextFrameSize = readFrameLength();
115+
if (nextFrameSize < minimalFrameLength()) {
116+
error("Invalid frame - Frame size smaller than minimum");
117+
break;
118+
}
119+
120+
if (payloadQueue_.chainLength() < frameSizeWithLengthField(nextFrameSize)) {
121+
// Need to accumulate more data.
122+
break;
123+
}
124+
125+
auto payloadSize = frameSizeWithoutLengthField(nextFrameSize);
126+
if (stripFrameLengthField_) {
127+
payloadQueue_.trimStart(frameSizeFieldLen);
128+
} else {
129+
payloadSize += frameSizeFieldLen;
130+
}
131+
132+
DCHECK_GT(payloadSize, 0)
133+
<< "folly::IOBufQueue::split(0) returns a nullptr, can't have that";
134+
auto nextFrame = payloadQueue_.split(payloadSize);
135+
onFrame(std::move(nextFrame));
136+
}
137+
}
138+
139+
bool Framer::ensureOrAutodetectProtocolVersion() {
140+
if (protocolVersion_ != ProtocolVersion::Unknown) {
141+
return true;
142+
}
143+
144+
const auto minBytesNeeded =
145+
FrameSerializerV1_0::kMinBytesNeededForAutodetection;
146+
DCHECK_GT(minBytesNeeded, 0);
147+
if (payloadQueue_.chainLength() < minBytesNeeded) {
148+
return false;
149+
}
150+
151+
DCHECK_GT(minBytesNeeded, kFrameLengthFieldLengthV1_0);
152+
153+
auto const& firstFrame = *payloadQueue_.front();
154+
155+
const auto detectedV1 = FrameSerializerV1_0::detectProtocolVersion(
156+
firstFrame, kFrameLengthFieldLengthV1_0);
157+
if (detectedV1 != ProtocolVersion::Unknown) {
158+
protocolVersion_ = FrameSerializerV1_0::Version;
159+
return true;
160+
}
161+
162+
error("Could not detect protocol version from data");
163+
return false;
164+
}
165+
166+
std::unique_ptr<folly::IOBuf> Framer::prependSize(
167+
std::unique_ptr<folly::IOBuf> payload) {
168+
CHECK(payload);
169+
170+
const auto frameSizeFieldLengthValue = frameSizeFieldLength();
171+
const auto payloadLength = payload->computeChainDataLength();
172+
173+
CHECK_LE(payloadLength, kMaxFrameLength)
174+
<< "payloadLength: " << payloadLength
175+
<< " kMaxFrameLength: " << kMaxFrameLength;
176+
177+
if (payload->headroom() >= frameSizeFieldLengthValue) {
178+
// move the data pointer back and write value to the payload
179+
payload->prepend(frameSizeFieldLengthValue);
180+
folly::io::RWPrivateCursor cur(payload.get());
181+
writeFrameLength(cur, payloadLength, frameSizeFieldLengthValue);
182+
return payload;
183+
} else {
184+
auto newPayload = folly::IOBuf::createCombined(frameSizeFieldLengthValue);
185+
folly::io::Appender appender(newPayload.get(), /* do not grow */ 0);
186+
writeFrameLength(appender, payloadLength, frameSizeFieldLengthValue);
187+
newPayload->appendChain(std::move(payload));
188+
return newPayload;
189+
}
190+
}
191+
192+
StreamId Framer::peekStreamId(
193+
const folly::IOBuf& frame,
194+
bool skipFrameLengthBytes) const {
195+
return FrameSerializer::peekStreamId(
196+
protocolVersion_, frame, skipFrameLengthBytes)
197+
.value();
198+
}
199+
200+
std::unique_ptr<folly::IOBuf> Framer::drainPayloadQueue() {
201+
return payloadQueue_.move();
202+
}
203+
204+
} // namespace rsocket

rsocket/framing/Framer.h

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright (c) Facebook, Inc. and its affiliates.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <folly/io/IOBufQueue.h>
18+
#include "rsocket/framing/ProtocolVersion.h"
19+
#include "rsocket/internal/Common.h"
20+
21+
namespace rsocket {
22+
23+
///
24+
/// Frames class is used to parse individual rsocket frames from the stream of
25+
/// incoming payload chunks. Every time a frame is parsed the onFrame method is
26+
/// invoked.
27+
/// Each rsocket frame is prepended with the frame length by
28+
/// prependSize method.
29+
///
30+
class Framer {
31+
public:
32+
Framer(ProtocolVersion protocolVersion, bool stripFrameLengthField)
33+
: protocolVersion_{protocolVersion},
34+
stripFrameLengthField_{stripFrameLengthField} {}
35+
virtual ~Framer() {}
36+
37+
/// For processing incoming frame chunks
38+
void addFrameChunk(std::unique_ptr<folly::IOBuf>);
39+
40+
/// Prepends payload size to the beginning of he IOBuf based on the
41+
/// set protocol version
42+
std::unique_ptr<folly::IOBuf> prependSize(
43+
std::unique_ptr<folly::IOBuf> payload);
44+
45+
/// derived class can override this method to react to termination
46+
virtual void error(const char*) = 0;
47+
virtual void onFrame(std::unique_ptr<folly::IOBuf>) = 0;
48+
49+
ProtocolVersion protocolVersion() const {
50+
return protocolVersion_;
51+
}
52+
53+
StreamId peekStreamId(const folly::IOBuf& frame, bool) const;
54+
55+
std::unique_ptr<folly::IOBuf> drainPayloadQueue();
56+
57+
private:
58+
// to explicitly trigger parsing frames
59+
void parseFrames();
60+
bool ensureOrAutodetectProtocolVersion();
61+
62+
size_t readFrameLength() const;
63+
size_t frameSizeFieldLength() const;
64+
size_t minimalFrameLength() const;
65+
size_t frameSizeWithLengthField(size_t frameSize) const;
66+
size_t frameSizeWithoutLengthField(size_t frameSize) const;
67+
68+
folly::IOBufQueue payloadQueue_{folly::IOBufQueue::cacheChainLength()};
69+
ProtocolVersion protocolVersion_;
70+
const bool stripFrameLengthField_;
71+
};
72+
73+
} // namespace rsocket

rsocket/statemachine/RSocketStateMachine.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ void RSocketStateMachine::processFrame(std::unique_ptr<folly::IOBuf> frame) {
491491
const auto frameType = frameSerializer_->peekFrameType(*frame);
492492
stats_->frameRead(frameType);
493493

494-
const auto optStreamId = frameSerializer_->peekStreamId(*frame);
494+
const auto optStreamId = frameSerializer_->peekStreamId(*frame, false);
495495
if (!optStreamId) {
496496
constexpr auto msg = "Cannot decode stream ID";
497497
closeWithError(Frame_ERROR::connectionError(msg));
@@ -1075,7 +1075,7 @@ void RSocketStateMachine::outputFrame(std::unique_ptr<folly::IOBuf> frame) {
10751075
stats_->frameWritten(frameType);
10761076

10771077
if (isResumable_) {
1078-
auto streamIdPtr = frameSerializer_->peekStreamId(*frame);
1078+
auto streamIdPtr = frameSerializer_->peekStreamId(*frame, false);
10791079
CHECK(streamIdPtr) << "Error in serialized frame.";
10801080
resumeManager_->trackSentFrame(
10811081
*frame, frameType, *streamIdPtr, getConsumerAllowance(*streamIdPtr));

0 commit comments

Comments
 (0)