Skip to content

Commit 63837e3

Browse files
authored
Merge pull request #35 from JoyStream/development
v0.3.1
2 parents 0a2d54a + 314e999 commit 63837e3

18 files changed

Lines changed: 493 additions & 35 deletions

conan_package/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33

44
class ProtocolSessionBase(ConanFile):
55
name = "ProtocolSession"
6-
version = "0.3.0"
6+
version = "0.3.1"
77
license = "(c) JoyStream Inc. 2016-2017"
88
url = "https://github.com/JoyStream/protocol_session-cpp.git"
99
repo_ssh_url = "git@github.com:JoyStream/protocol_session-cpp.git"
1010
repo_https_url = "https://github.com/JoyStream/protocol_session-cpp.git"
1111
settings = "os", "compiler", "build_type", "arch"
1212
generators = "cmake"
13-
requires = "ProtocolStateMachine/0.3.0@joystream/stable"
13+
requires = "ProtocolStateMachine/0.3.1@joystream/stable"
1414
build_policy = "missing"
1515

1616
def source(self):

sources/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ set(
1717
src/PieceInformation.cpp
1818
src/common.cpp
1919
src/PieceDeliveryPipeline.cpp
20+
src/SpeedTestPolicy.cpp
2021
)
2122

2223
# === build library ===

sources/include/protocol_session/Callbacks.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ enum class DisconnectCause {
5252

5353
seller_message_overflow,
5454

55+
seller_failed_speed_test,
56+
5557
//// selling
5658

5759
//buyer_invited_with_bad_terms,
@@ -62,7 +64,11 @@ enum class DisconnectCause {
6264

6365
buyer_sent_invalid_payment,
6466

65-
buyer_message_overflow
67+
buyer_message_overflow,
68+
69+
buyer_requested_too_many_speed_tests,
70+
71+
buyer_speed_test_payload_requested_too_large
6672
};
6773

6874
// Removal of a connection from the session: c++11 alias declaration

sources/include/protocol_session/Session.cpp

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ namespace protocol_session {
4747
, _observing(nullptr)
4848
, _selling(nullptr)
4949
, _buying(nullptr)
50-
, _network(network) {
50+
, _network(network)
51+
, _getTime(std::chrono::high_resolution_clock::now) {
5152

5253
time(&_started);
5354
}
@@ -911,6 +912,26 @@ namespace protocol_session {
911912
assert(false);
912913
}
913914

915+
template<class ConnectionIdType>
916+
void Session<ConnectionIdType>::sellerCompletedSpeedTest(const ConnectionIdType & id, bool successful) {
917+
918+
assert(hasConnection(id));
919+
assert(_mode == SessionMode::buying);
920+
assert(_observing == nullptr && _buying != nullptr && _selling == nullptr);
921+
922+
_buying->sellerCompletedSpeedTest(id, successful);
923+
}
924+
925+
template<class ConnectionIdType>
926+
void Session<ConnectionIdType>::buyerRequestedSpeedTest(const ConnectionIdType & id, uint32_t payloadSize) {
927+
928+
assert(hasConnection(id));
929+
assert(_mode == SessionMode::selling);
930+
assert(_observing == nullptr && _buying == nullptr && _selling != nullptr);
931+
932+
_selling->buyerRequestedSpeedTest(id, payloadSize);
933+
}
934+
914935
template<class ConnectionIdType>
915936
detail::Connection<ConnectionIdType> * Session<ConnectionIdType>::createConnection(const ConnectionIdType & id, const SendMessageOnConnectionCallbacks & sendMessageCallbacks) {
916937

@@ -931,7 +952,10 @@ namespace protocol_session {
931952
[this, id](const protocol_wire::PieceData & p) { this->receivedFullPiece(id, p); },
932953
[this, id]() { this->remoteMessageOverflow(id); },
933954
[this, id]() { this->localMessageOverflow(id); },
934-
_network);
955+
[this, id](bool successful) { this->sellerCompletedSpeedTest(id, successful); },
956+
[this, id](uint32_t payloadSize) { this->buyerRequestedSpeedTest(id, payloadSize); },
957+
_network,
958+
_getTime);
935959
}
936960

937961
template <class ConnectionIdType>
@@ -1014,5 +1038,21 @@ namespace protocol_session {
10141038

10151039
return connection;
10161040
}
1041+
1042+
template <class ConnectionIdType>
1043+
SpeedTestPolicy Session<ConnectionIdType>::speedTestPolicy() const {
1044+
return _speedTestPolicy;
1045+
}
1046+
1047+
template <class ConnectionIdType>
1048+
void Session<ConnectionIdType>::setSpeedTestPolicy(const SpeedTestPolicy policy) {
1049+
_speedTestPolicy = policy;
1050+
}
1051+
1052+
template <class ConnectionIdType>
1053+
void Session<ConnectionIdType>::setTimeGetter(const std::function<std::chrono::high_resolution_clock::time_point()> & timeGetter) {
1054+
_getTime = timeGetter;
1055+
}
1056+
10171057
}
10181058
}

sources/include/protocol_session/Session.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <protocol_session/Callbacks.hpp>
1414
#include <protocol_session/SessionMode.hpp>
1515
#include <protocol_session/SessionState.hpp>
16+
#include <protocol_session/SpeedTestPolicy.hpp>
1617

1718
#include <unordered_map>
1819
#include <chrono>
@@ -205,6 +206,12 @@ namespace detail {
205206

206207
Coin::Network network() const;
207208

209+
SpeedTestPolicy speedTestPolicy() const;
210+
211+
void setSpeedTestPolicy(const SpeedTestPolicy);
212+
213+
void setTimeGetter(const std::function<std::chrono::high_resolution_clock::time_point()> &);
214+
208215
private:
209216

210217
// Session mode
@@ -221,6 +228,11 @@ namespace detail {
221228

222229
Coin::Network _network;
223230

231+
std::function<std::chrono::high_resolution_clock::time_point()> _getTime;
232+
233+
SpeedTestPolicy _speedTestPolicy;
234+
235+
224236
//// Substates
225237

226238
// Each pointer is != nullptr only when _mode corresponds
@@ -254,6 +266,8 @@ namespace detail {
254266
void receivedFullPiece(const ConnectionIdType &, const protocol_wire::PieceData &);
255267
void remoteMessageOverflow(const ConnectionIdType &);
256268
void localMessageOverflow(const ConnectionIdType &);
269+
void buyerRequestedSpeedTest(const ConnectionIdType&, uint32_t);
270+
void sellerCompletedSpeedTest(const ConnectionIdType&, bool);
257271

258272
//// Utility routines
259273

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright (C) JoyStream - All Rights Reserved
3+
*/
4+
5+
#ifndef JOYSTREAM_PROTOCOLSESSION_SPEEDTESTPOLICY_HPP
6+
#define JOYSTREAM_PROTOCOLSESSION_SPEEDTESTPOLICY_HPP
7+
8+
#include <chrono>
9+
10+
namespace joystream {
11+
namespace protocol_session {
12+
13+
class SpeedTestPolicy {
14+
public:
15+
SpeedTestPolicy();
16+
17+
uint32_t payloadSize() const;
18+
uint32_t maxPayloadSize() const;
19+
std::chrono::seconds maxTimeToRespond() const;
20+
bool isEnabled() const;
21+
bool disconnectIfSlow() const;
22+
23+
void setPayloadSize(uint32_t);
24+
void setMaxPayloadSize(uint32_t);
25+
void setMaxTimeToRespond(std::chrono::seconds);
26+
void enable();
27+
void disable();
28+
void setDisconnectIfSlow(bool);
29+
30+
private:
31+
32+
uint32_t _payloadSize;
33+
uint32_t _maxPayloadSize;
34+
std::chrono::seconds _maxTimeToRespond;
35+
bool _enabled;
36+
bool _disconnectIfSlow;
37+
38+
};
39+
40+
}
41+
}
42+
43+
#endif // JOYSTREAM_PROTOCOLSESSION_SPEEDTESTPOLICY_HPP

sources/include/protocol_session/Status.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,13 @@ namespace status {
3030
CBStateMachine(const std::type_index & innerStateTypeIndex,
3131
const protocol_statemachine::AnnouncedModeAndTerms & announcedModeAndTermsFromPeer,
3232
const paymentchannel::Payor & payor,
33-
const paymentchannel::Payee & payee)
33+
const paymentchannel::Payee & payee,
34+
const int32_t latency)
3435
: innerStateTypeIndex(innerStateTypeIndex)
3536
, announcedModeAndTermsFromPeer(announcedModeAndTermsFromPeer)
3637
, payor(payor)
37-
, payee(payee) {
38+
, payee(payee)
39+
, latency(latency) {
3840
}
3941

4042
// Type index of innermost currently active state
@@ -54,6 +56,9 @@ namespace status {
5456
// Payee side of payment channel interaction
5557
paymentchannel::Payee payee;
5658

59+
// Number of ticks from underlying clock used to measure how long it took to send test payload
60+
int32_t latency;
61+
5762
};
5863

5964
template <class ConnectionIdType>

sources/include/protocol_session/detail/Buying.cpp

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,10 @@ namespace detail {
137137

138138
// Check that this peer is seller,
139139
protocol_statemachine::ModeAnnounced m = a.modeAnnounced();
140+
140141
assert(m != protocol_statemachine::ModeAnnounced::none);
141142

142-
// and has good enough terms to warrant an invitation,
143-
// then send invitation
144-
if(m == protocol_statemachine::ModeAnnounced::sell && _terms.satisfiedBy(a.sellModeTerms())) {
145-
c->processEvent(protocol_statemachine::event::InviteSeller());
146-
std::cout << "Invited: " << IdToString(id) << std::endl;
147-
}
143+
maybeInviteSeller(c);
148144
}
149145
}
150146

@@ -210,14 +206,68 @@ namespace detail {
210206
template<class ConnectionIdType>
211207
void Buying<ConnectionIdType>::remoteMessageOverflow(const ConnectionIdType & id) {
212208
std::clog << "Error: remoteMessageOverflow from seller connection " << id << std::endl;
209+
213210
removeConnection(id, DisconnectCause::seller_message_overflow);
211+
212+
// Notify state machine about deletion
213+
throw protocol_statemachine::exception::StateMachineDeletedException();
214+
}
215+
216+
template<class ConnectionIdType>
217+
void Buying<ConnectionIdType>::sellerCompletedSpeedTest(const ConnectionIdType & id, bool successful) {
218+
219+
detail::Connection<ConnectionIdType> * c = _session->get(id);
220+
221+
if (!successful) {
222+
// Remove connection
223+
removeConnection(id, DisconnectCause::seller_failed_speed_test);
224+
225+
// Notify state machine about deletion
226+
throw protocol_statemachine::exception::StateMachineDeletedException();
227+
228+
} else {
229+
// record completion time
230+
c->endingSpeedTest();
231+
232+
if (!_session->speedTestPolicy().disconnectIfSlow()) {
233+
maybeInviteSeller(c);
234+
} else {
235+
236+
if (c->speedTestCompletedInLessThan(_session->speedTestPolicy().maxTimeToRespond())) {
237+
// Invite seller if they met the speedTestPolicy requirement
238+
maybeInviteSeller(c);
239+
240+
} else {
241+
// otherwise disconnect them
242+
removeConnection(id, DisconnectCause::seller_failed_speed_test);
243+
244+
// Notify state machine about deletion
245+
throw protocol_statemachine::exception::StateMachineDeletedException();
246+
}
247+
}
248+
249+
}
250+
214251
}
215252

216253
template <class ConnectionIdType>
217254
void Buying<ConnectionIdType>::leavingState() {
218255

219256
// Prepare sellers before we interrupt with new mode
220257
politeSellerCompensation();
258+
259+
// Reset speed testing state for all connections that have not completed a speed test
260+
// This is to ensure when the session comes back to buying mode it will make sure to send a speed test request
261+
// to sellers that have not completed yet.
262+
for(auto itr = _session->_connections.cbegin();itr != _session->_connections.cend();) {
263+
auto connection = itr->second;
264+
265+
if (!connection->hasCompletedSpeedTest()) {
266+
connection->abandonSpeedTest();
267+
}
268+
269+
itr++;
270+
}
221271
}
222272

223273
template <class ConnectionIdType>
@@ -506,18 +556,36 @@ namespace detail {
506556

507557
detail::Connection<ConnectionIdType> * c = mapping.second;
508558

509-
// Check that this peer is seller,
510-
protocol_statemachine::AnnouncedModeAndTerms a = c->announcedModeAndTermsFromPeer();
559+
maybeInviteSeller(c);
560+
}
511561

512-
// and has good enough terms to warrant an invitation,
513-
// then send invitation
514-
if(a.modeAnnounced() == protocol_statemachine::ModeAnnounced::sell && _terms.satisfiedBy(a.sellModeTerms())) {
562+
}
515563

516-
c->processEvent(protocol_statemachine::event::InviteSeller());
564+
template <class ConnectionIdType>
565+
void Buying<ConnectionIdType>::maybeInviteSeller(detail::Connection<ConnectionIdType> * c) const {
517566

518-
std::cout << "Invited: " << IdToString(mapping.first) << std::endl;
519-
}
520-
}
567+
assert(_session->_state == SessionState::started);
568+
assert(_state == BuyingState::sending_invitations);
569+
570+
// Check that this peer is seller,
571+
protocol_statemachine::AnnouncedModeAndTerms a = c->announcedModeAndTermsFromPeer();
572+
573+
// Do not send invitations if peer is not announcing sell mode or has incompatible terms
574+
if(a.modeAnnounced() != protocol_statemachine::ModeAnnounced::sell || !_terms.satisfiedBy(a.sellModeTerms())) {
575+
return;
576+
}
577+
578+
// Does seller need to complete a speed test to be invited?
579+
if (_session->_speedTestPolicy.isEnabled() && !c->hasCompletedSpeedTest()) {
580+
if (c->hasStartedSpeedTest()) return;
581+
c->startingSpeedTest(); // record starting time
582+
c->processEvent(protocol_statemachine::event::TestSellerSpeed(_session->_speedTestPolicy.payloadSize()));
583+
return;
584+
}
585+
586+
// Seller has previously completed a speed test/or no speed test was required.. invite them
587+
c->processEvent(protocol_statemachine::event::InviteSeller());
588+
std::cout << "Invited: " << IdToString(c->connectionId()) << std::endl;
521589
}
522590

523591
template <class ConnectionIdType>
@@ -590,8 +658,8 @@ namespace detail {
590658
void Buying<ConnectionIdType>::removeSeller(detail::Seller<ConnectionIdType> & s) {
591659
if (s.isGone()) return;
592660

593-
// we must be downloading
594-
assert(_state == BuyingState::downloading);
661+
// we must be downloading or just finish downloading
662+
assert(_state == BuyingState::downloading || _state == BuyingState::download_completed);
595663

596664
// If this seller has assigned piecees, then we must unassign them
597665
for(uint i = 0;i < _pieces.size();i++) {

0 commit comments

Comments
 (0)