Skip to content

Commit ff6f3ed

Browse files
committed
feat(net): activate v2 network as default
1 parent fc58e23 commit ff6f3ed

3 files changed

Lines changed: 63 additions & 45 deletions

File tree

impl/graphsync_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ var protocolsForTest = map[string]struct {
6363
host1Protocols []protocol.ID
6464
host2Protocols []protocol.ID
6565
}{
66-
"(v1.1 -> v1.1)": {nil, nil},
67-
"(v1.0 -> v1.1)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
68-
"(v1.1 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
66+
"(v2.0 -> v2.0)": {nil, nil},
67+
"(v1.0 -> v2.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, nil},
68+
"(v2.0 -> v1.0)": {nil, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
6969
"(v1.0 -> v1.0)": {[]protocol.ID{gsnet.ProtocolGraphsync_1_0_0}, []protocol.ID{gsnet.ProtocolGraphsync_1_0_0}},
7070
}
7171

network/libp2p_impl.go

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ import (
1414
"github.com/libp2p/go-msgio"
1515
ma "github.com/multiformats/go-multiaddr"
1616

17+
"github.com/ipfs/go-graphsync/message"
1718
gsmsg "github.com/ipfs/go-graphsync/message"
1819
gsmsgv1 "github.com/ipfs/go-graphsync/message/v1"
20+
gsmsgv2 "github.com/ipfs/go-graphsync/message/v2"
1921
)
2022

2123
var log = logging.Logger("graphsync_network")
@@ -35,10 +37,14 @@ func GraphsyncProtocols(protocols []protocol.ID) Option {
3537

3638
// NewFromLibp2pHost returns a GraphSyncNetwork supported by underlying Libp2p host.
3739
func NewFromLibp2pHost(host host.Host, options ...Option) GraphSyncNetwork {
40+
messageHandlerSelector := messageHandlerSelector{
41+
v1MessageHandler: gsmsgv1.NewMessageHandler(),
42+
v2MessageHandler: gsmsgv2.NewMessageHandler(),
43+
}
3844
graphSyncNetwork := libp2pGraphSyncNetwork{
39-
host: host,
40-
messageHandler: gsmsgv1.NewMessageHandler(),
41-
protocols: []protocol.ID{ProtocolGraphsync_1_0_0, ProtocolGraphsync_2_0_0},
45+
host: host,
46+
messageHandlerSelector: &messageHandlerSelector,
47+
protocols: []protocol.ID{ProtocolGraphsync_2_0_0, ProtocolGraphsync_1_0_0},
4248
}
4349

4450
for _, option := range options {
@@ -48,20 +54,53 @@ func NewFromLibp2pHost(host host.Host, options ...Option) GraphSyncNetwork {
4854
return &graphSyncNetwork
4955
}
5056

57+
// a message.MessageHandler that simply returns an error for any of the calls, allows
58+
// us to simplify erroring on bad protocol within the messageHandlerSelector#Select()
59+
// call so we only have one place to be strict about allowed versions
60+
type messageHandlerErrorer struct {
61+
err error
62+
}
63+
64+
func (mhe messageHandlerErrorer) FromNet(peer.ID, io.Reader) (message.GraphSyncMessage, error) {
65+
return message.GraphSyncMessage{}, mhe.err
66+
}
67+
func (mhe messageHandlerErrorer) FromMsgReader(peer.ID, msgio.Reader) (message.GraphSyncMessage, error) {
68+
return message.GraphSyncMessage{}, mhe.err
69+
}
70+
func (mhe messageHandlerErrorer) ToNet(peer.ID, message.GraphSyncMessage, io.Writer) error {
71+
return mhe.err
72+
}
73+
74+
type messageHandlerSelector struct {
75+
v1MessageHandler gsmsg.MessageHandler
76+
v2MessageHandler gsmsg.MessageHandler
77+
}
78+
79+
func (smh messageHandlerSelector) Select(protocol protocol.ID) gsmsg.MessageHandler {
80+
switch protocol {
81+
case ProtocolGraphsync_1_0_0:
82+
return smh.v1MessageHandler
83+
case ProtocolGraphsync_2_0_0:
84+
return smh.v2MessageHandler
85+
default:
86+
return messageHandlerErrorer{fmt.Errorf("unrecognized protocol version: %s", protocol)}
87+
}
88+
}
89+
5190
// libp2pGraphSyncNetwork transforms the libp2p host interface, which sends and receives
5291
// NetMessage objects, into the graphsync network interface.
5392
type libp2pGraphSyncNetwork struct {
5493
host host.Host
5594
// inbound messages from the network are forwarded to the receiver
56-
receiver Receiver
57-
messageHandler gsmsg.MessageHandler
58-
protocols []protocol.ID
95+
receiver Receiver
96+
protocols []protocol.ID
97+
messageHandlerSelector *messageHandlerSelector
5998
}
6099

61100
type streamMessageSender struct {
62-
s network.Stream
63-
opts MessageSenderOpts
64-
messageHandler gsmsg.MessageHandler
101+
s network.Stream
102+
opts MessageSenderOpts
103+
messageHandlerSelector *messageHandlerSelector
65104
}
66105

67106
func (s *streamMessageSender) Close() error {
@@ -73,10 +112,10 @@ func (s *streamMessageSender) Reset() error {
73112
}
74113

75114
func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
76-
return msgToStream(ctx, s.s, s.messageHandler, msg, s.opts.SendTimeout)
115+
return msgToStream(ctx, s.s, s.messageHandlerSelector, msg, s.opts.SendTimeout)
77116
}
78117

79-
func msgToStream(ctx context.Context, s network.Stream, mh gsmsg.MessageHandler, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
118+
func msgToStream(ctx context.Context, s network.Stream, mh *messageHandlerSelector, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
80119
log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
81120
len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))
82121

@@ -88,19 +127,9 @@ func msgToStream(ctx context.Context, s network.Stream, mh gsmsg.MessageHandler,
88127
log.Warnf("error setting deadline: %s", err)
89128
}
90129

91-
switch s.Protocol() {
92-
case ProtocolGraphsync_1_0_0:
93-
if err := mh.ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
94-
log.Debugf("error: %s", err)
95-
return err
96-
}
97-
case ProtocolGraphsync_2_0_0:
98-
if err := mh.ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
99-
log.Debugf("error: %s", err)
100-
return err
101-
}
102-
default:
103-
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
130+
if err := mh.Select(s.Protocol()).ToNet(s.Conn().RemotePeer(), msg, s); err != nil {
131+
log.Debugf("error: %s", err)
132+
return err
104133
}
105134

106135
if err := s.SetWriteDeadline(time.Time{}); err != nil {
@@ -116,9 +145,9 @@ func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p pee
116145
}
117146

118147
return &streamMessageSender{
119-
s: s,
120-
opts: setDefaults(opts),
121-
messageHandler: gsnet.messageHandler,
148+
s: s,
149+
opts: setDefaults(opts),
150+
messageHandlerSelector: gsnet.messageHandlerSelector,
122151
}, nil
123152
}
124153

@@ -136,7 +165,7 @@ func (gsnet *libp2pGraphSyncNetwork) SendMessage(
136165
return err
137166
}
138167

139-
if err = msgToStream(ctx, s, gsnet.messageHandler, outgoing, sendMessageTimeout); err != nil {
168+
if err = msgToStream(ctx, s, gsnet.messageHandlerSelector, outgoing, sendMessageTimeout); err != nil {
140169
_ = s.Reset()
141170
return err
142171
}
@@ -167,16 +196,7 @@ func (gsnet *libp2pGraphSyncNetwork) handleNewStream(s network.Stream) {
167196

168197
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
169198
for {
170-
var received gsmsg.GraphSyncMessage
171-
var err error
172-
switch s.Protocol() {
173-
case ProtocolGraphsync_1_0_0:
174-
received, err = gsnet.messageHandler.FromMsgReader(s.Conn().RemotePeer(), reader)
175-
case ProtocolGraphsync_2_0_0:
176-
received, err = gsnet.messageHandler.FromMsgReader(s.Conn().RemotePeer(), reader)
177-
default:
178-
err = fmt.Errorf("unexpected protocol version %s", s.Protocol())
179-
}
199+
received, err := gsnet.messageHandlerSelector.Select(s.Protocol()).FromMsgReader(s.Conn().RemotePeer(), reader)
180200
p := s.Conn().RemotePeer()
181201

182202
if err != nil {

network/libp2p_impl_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ func TestMessageSendAndReceive(t *testing.T) {
106106
receivedRequests := received.Requests()
107107
require.Len(t, receivedRequests, 1, "did not add request to received message")
108108
receivedRequest := receivedRequests[0]
109-
// TODO: for protocol v1 this shouldn't match, but for v2 it should
110-
// require.Equal(t, sentRequest.ID(), receivedRequest.ID())
109+
require.Equal(t, sentRequest.ID(), receivedRequest.ID())
111110
require.Equal(t, sentRequest.IsCancel(), receivedRequest.IsCancel())
112111
require.Equal(t, sentRequest.Priority(), receivedRequest.Priority())
113112
require.Equal(t, sentRequest.Root().String(), receivedRequest.Root().String())
@@ -120,8 +119,7 @@ func TestMessageSendAndReceive(t *testing.T) {
120119
require.Len(t, receivedResponses, 1, "did not add response to received message")
121120
receivedResponse := receivedResponses[0]
122121
extensionData, found := receivedResponse.Extension(extensionName)
123-
// TODO: for protocol v1 this shouldn't match, but for v2 it should
124-
// require.Equal(t, sentResponse.RequestID(), receivedResponse.RequestID())
122+
require.Equal(t, sentResponse.RequestID(), receivedResponse.RequestID())
125123
require.Equal(t, sentResponse.Status(), receivedResponse.Status())
126124
require.True(t, found)
127125
require.Equal(t, extension.Data, extensionData)

0 commit comments

Comments
 (0)