Skip to content

Commit 5d77d4c

Browse files
lexsfacebook-github-bot
authored andcommitted
Don't send onDisconnect callback before resuming
Summary: If we have an open connection when a resume comes in we drop the old connection as expected. However we also send a onDisconnected callback to the server which makes it think it should pause the session. It pauses everything only to unpause it moments later as we were in the process of resuming. Reviewed By: phoad Differential Revision: D9267026 fbshipit-source-id: 8894291a84da3ab0072ed87c1b783246092fcc17
1 parent 0b5e3df commit 5d77d4c

File tree

2 files changed

+63
-2
lines changed

2 files changed

+63
-2
lines changed

rsocket/statemachine/RSocketStateMachine.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,11 @@ bool RSocketStateMachine::resumeServer(
154154
const int64_t serverDelta =
155155
resumeManager_->lastSentPosition() - resumeParams.serverPosition;
156156

157-
std::runtime_error exn{"Connection being resumed, dropping old connection"};
158-
disconnect(std::move(exn));
157+
if (frameTransport) {
158+
stats_->socketDisconnected();
159+
}
160+
closeFrameTransport(
161+
std::runtime_error{"Connection being resumed, dropping old connection"});
159162
setProtocolVersionOrThrow(resumeParams.protocolVersion, frameTransport);
160163
connect(std::move(frameTransport));
161164

rsocket/test/statemachine/RSocketStateMachineTest.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <yarpl/single/SingleSubscriptions.h>
1919
#include <yarpl/single/Singles.h>
2020
#include <yarpl/test_utils/Mocks.h>
21+
#include "rsocket/RSocketConnectionEvents.h"
2122
#include "rsocket/RSocketResponder.h"
2223
#include "rsocket/framing/FrameSerializer_v1_0.h"
2324
#include "rsocket/framing/FrameTransportImpl.h"
@@ -68,6 +69,11 @@ class ResponderMock : public RSocketResponder {
6869
}
6970
};
7071

72+
struct ConnectionEventsMock : public RSocketConnectionEvents {
73+
MOCK_METHOD1(onDisconnected, void(const folly::exception_wrapper&));
74+
MOCK_METHOD0(onStreamsPaused, void());
75+
};
76+
7177
class RSocketStateMachineTest : public Test {
7278
public:
7379
auto createClient(
@@ -96,6 +102,37 @@ class RSocketStateMachineTest : public Test {
96102
return stateMachine;
97103
}
98104

105+
auto createServer(
106+
std::unique_ptr<MockDuplexConnection> connection,
107+
std::shared_ptr<RSocketResponder> responder,
108+
folly::Optional<ResumeIdentificationToken> resumeToken = folly::none,
109+
std::shared_ptr<RSocketConnectionEvents> connectionEvents = nullptr) {
110+
auto transport =
111+
std::make_shared<FrameTransportImpl>(std::move(connection));
112+
113+
auto stateMachine = std::make_shared<RSocketStateMachine>(
114+
std::move(responder),
115+
nullptr,
116+
RSocketMode::SERVER,
117+
nullptr,
118+
std::move(connectionEvents),
119+
ResumeManager::makeEmpty(),
120+
nullptr);
121+
122+
if (resumeToken) {
123+
SetupParameters setupParameters;
124+
setupParameters.resumable = true;
125+
setupParameters.token = *resumeToken;
126+
stateMachine->connectServer(std::move(transport), setupParameters);
127+
} else {
128+
SetupParameters setupParameters;
129+
setupParameters.resumable = false;
130+
stateMachine->connectServer(std::move(transport), setupParameters);
131+
}
132+
133+
return stateMachine;
134+
}
135+
99136
const std::unordered_map<StreamId, std::shared_ptr<StreamStateMachineBase>>&
100137
getStreams(RSocketStateMachine& stateMachine) {
101138
return stateMachine.streams_;
@@ -370,4 +407,25 @@ TEST_F(RSocketStateMachineTest, TransportOnNextClose) {
370407
rawTransport->onNext(std::move(buf));
371408
}
372409

410+
TEST_F(RSocketStateMachineTest, ResumeWithCurrentConnection) {
411+
auto resumeToken = ResumeIdentificationToken::generateNew();
412+
413+
auto eventsMock = std::make_shared<ConnectionEventsMock>();
414+
auto stateMachine = createServer(
415+
std::make_unique<NiceMock<MockDuplexConnection>>(),
416+
std::make_shared<RSocketResponder>(),
417+
resumeToken,
418+
eventsMock);
419+
420+
EXPECT_CALL(*eventsMock, onDisconnected(_)).Times(0);
421+
EXPECT_CALL(*eventsMock, onStreamsPaused()).Times(0);
422+
423+
ResumeParameters resumeParams{resumeToken, 0, 0, ProtocolVersion::Latest};
424+
auto transport = std::make_shared<FrameTransportImpl>(
425+
std::make_unique<NiceMock<MockDuplexConnection>>());
426+
stateMachine->resumeServer(transport, resumeParams);
427+
428+
stateMachine->close({}, StreamCompletionSignal::CONNECTION_END);
429+
}
430+
373431
} // namespace rsocket

0 commit comments

Comments
 (0)