Skip to content

Commit 4a67bed

Browse files
rvaggmvdanhannahhoward
authored
[Feature] UUIDs, protocol versioning, v2 protocol w/ dag-cbor messaging (#332)
* feat(net): initial dag-cbor protocol support also added first roundtrip benchmark * feat(requestid): use uuids for requestids Ref: ipfs/go-graphsync#278 Closes: ipfs/go-graphsync#279 Closes: ipfs/go-graphsync#281 * fix(requestmanager): make collect test requests with uuids sortable * fix(requestid): print requestids as string uuids in logs * fix(requestid): use string as base type for RequestId * chore(requestid): wrap requestid string in a struct * feat(libp2p): add v1.0.0 network compatibility * chore(net): resolve most cbor + uuid merge problems * feat(net): to/from ipld bindnode types, more cbor protoc improvements * feat(net): introduce 2.0.0 protocol for dag-cbor * fix(net): more bindnode dag-cbor protocol fixes Not quite working yet, still need some upstream fixes and no extensions work has been attempted yet. * chore(metadata): convert metadata to bindnode * chore(net,extensions): wire up IPLD extensions, expose as Node instead of []byte * Extensions now working with new dag-cbor network protocol * dag-cbor network protocol still not default, most tests are still exercising the existing v1 protocol * Metadata now using bindnode instead of cbor-gen * []byte for deferred extensions decoding is now replaced with datamodel.Node everywhere. Internal extensions now using some form of go-ipld-prime decode to convert them to local types (metadata using bindnode, others using direct inspection). * V1 protocol also using dag-cbor decode of extensions data and exporting the bytes - this may be a breaking change for exising extensions - need to check whether this should be done differently. Maybe a try-decode and if it fails export a wrapped Bytes Node? * fix(src): fix imports * fix(mod): clean up go.mod * fix(net): refactor message version format code to separate packages * feat(net): activate v2 network as default * fix(src): build error * chore: remove GraphSyncMessage#Loggable Ref: ipfs/go-graphsync#332 (comment) * chore: remove intermediate v1.1 pb protocol message type v1.1.0 was introduced to start the transition to UUID RequestIDs. That change has since been combined with the switch to DAG-CBOR messaging format for a v2.0.0 protocol. Thus, this interim v1.1.0 format is no longer needed and has not been used at all in a released version of go-graphsync. Fixes: filecoin-project/lightning-planning#14 * fix: clarify comments re dag-cbor extension data As per dission in ipfs/go-graphsync#338, we are going to be erroring on extension data that is not properly dag-cbor encoded from now on * feat: new LinkMetadata iface, integrate metadata into Response type (#342) * feat(metadata): new LinkMetadata iface, integrate metadata into Response type * LinkMetadata wrapper around existing metadata type to allow for easier backward-compat upgrade path * integrate metadata directly into GraphSyncResponse type, moving it from an optional extension * still deal with metadata as an extension for now—further work for v2 protocol will move it into the core message schema Ref: ipfs/go-graphsync#335 * feat(metadata): move metadata to core protocol, only use extension in v1 proto * fix(metadata): bindnode expects Go enum strings to be at the type level * fix(metadata): minor fixes, tidy up naming * fix(metadata): make gofmt and staticcheck happy * fix(metadata): docs and minor tweaks after review Co-authored-by: Daniel Martí <mvdan@mvdan.cc> * fix: avoid double-encode for extension size estimation Closes: filecoin-project/lightning-planning#15 * feat(requesttype): introduce RequestType enum to replace cancel&update bools (#352) Closes: ipfs/go-graphsync#345 * fix(metadata): extend round-trip tests to byte representation (#350) * feat!(messagev2): tweak dag-cbor message schema (#354) * feat!(messagev2): tweak dag-cbor message schema For: 1. Efficiency: compacting the noisy structures into tuples representations and making top-level components of a message optional. 2. Migrations: providing a secondary mechanism to lean on for versioning if we want a gentler upgrade path than libp2p protocol versioning. Closes: ipfs/go-graphsync#351 * fix(messagev2): adjust schema per feedback * feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID (#355) * feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID Closes: ipfs/go-graphsync#349 * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID when using error type T, use *T with As, rather than **T * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID * fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID Co-authored-by: Daniel Martí <mvdan@mvdan.cc> * feat: SendUpdates() API to send only extension data to via existing request * fix(responsemanager): send update while completing If request has finished selector traversal but is still sending blocks, I think it should be possible to send updates. As a side effect, this fixes our race. Logically, this makes sense, cause our external indicator that we're done (completed response listener) has not been called. * fix(requestmanager): revert change to pointer type * Refactor async loading for simplicity and correctness (#356) * feat(reconciledloader): first working version of reconciled loader * feat(traversalrecorder): add better recorder for traversals * feat(reconciledloader): pipe reconciled loader through code style(lint): fix static checks * Update requestmanager/reconciledloader/injest.go Co-authored-by: Rod Vagg <rod@vagg.org> * feat(reconciledloader): respond to PR comments Co-authored-by: Rod Vagg <rod@vagg.org> * fix(requestmanager): update test for rebase Co-authored-by: Daniel Martí <mvdan@mvdan.cc> Co-authored-by: hannahhoward <hannah@hannahhoward.net>
1 parent 5e32397 commit 4a67bed

File tree

1 file changed

+16
-16
lines changed

1 file changed

+16
-16
lines changed

messagequeue/messagequeue_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestStartupAndShutdown(t *testing.T) {
3939

4040
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
4141
messageQueue.Startup()
42-
id := graphsync.RequestID(rand.Int31())
42+
id := graphsync.NewRequestID()
4343
priority := graphsync.Priority(rand.Int31())
4444
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
4545
selector := ssb.Matcher().Node()
@@ -77,7 +77,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {
7777

7878
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
7979
messageQueue.Startup()
80-
id := graphsync.RequestID(rand.Int31())
80+
id := graphsync.NewRequestID()
8181
priority := graphsync.Priority(rand.Int31())
8282
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
8383
selector := ssb.Matcher().Node()
@@ -128,11 +128,11 @@ func TestProcessingNotification(t *testing.T) {
128128
waitGroup.Add(1)
129129
blks := testutil.GenerateBlocksOfSize(3, 128)
130130

131-
responseID := graphsync.RequestID(rand.Int31())
131+
responseID := graphsync.NewRequestID()
132132
extensionName := graphsync.ExtensionName("graphsync/awesome")
133133
extension := graphsync.ExtensionData{
134134
Name: extensionName,
135-
Data: testutil.RandomBytes(100),
135+
Data: basicnode.NewBytes(testutil.RandomBytes(100)),
136136
}
137137
status := graphsync.RequestCompletedFull
138138
blkData := testutil.NewFakeBlockData()
@@ -199,7 +199,7 @@ func TestDedupingMessages(t *testing.T) {
199199
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
200200
messageQueue.Startup()
201201
waitGroup.Add(1)
202-
id := graphsync.RequestID(rand.Int31())
202+
id := graphsync.NewRequestID()
203203
priority := graphsync.Priority(rand.Int31())
204204
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
205205
selector := ssb.Matcher().Node()
@@ -210,11 +210,11 @@ func TestDedupingMessages(t *testing.T) {
210210
})
211211
// wait for send attempt
212212
waitGroup.Wait()
213-
id2 := graphsync.RequestID(rand.Int31())
213+
id2 := graphsync.NewRequestID()
214214
priority2 := graphsync.Priority(rand.Int31())
215215
selector2 := ssb.ExploreAll(ssb.Matcher()).Node()
216216
root2 := testutil.GenerateCids(1)[0]
217-
id3 := graphsync.RequestID(rand.Int31())
217+
id3 := graphsync.NewRequestID()
218218
priority3 := graphsync.Priority(rand.Int31())
219219
selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
220220
root3 := testutil.GenerateCids(1)[0]
@@ -231,7 +231,7 @@ func TestDedupingMessages(t *testing.T) {
231231
require.Len(t, requests, 1, "number of requests in first message was not 1")
232232
request := requests[0]
233233
require.Equal(t, id, request.ID())
234-
require.False(t, request.IsCancel())
234+
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
235235
require.Equal(t, priority, request.Priority())
236236
require.Equal(t, selector, request.Selector())
237237

@@ -241,11 +241,11 @@ func TestDedupingMessages(t *testing.T) {
241241
require.Len(t, requests, 2, "number of requests in second message was not 2")
242242
for _, request := range requests {
243243
if request.ID() == id2 {
244-
require.False(t, request.IsCancel())
244+
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
245245
require.Equal(t, priority2, request.Priority())
246246
require.Equal(t, selector2, request.Selector())
247247
} else if request.ID() == id3 {
248-
require.False(t, request.IsCancel())
248+
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
249249
require.Equal(t, priority3, request.Priority())
250250
require.Equal(t, selector3, request.Selector())
251251
} else {
@@ -385,8 +385,8 @@ func TestNetworkErrorClearResponses(t *testing.T) {
385385
messagesSent := make(chan gsmsg.GraphSyncMessage)
386386
resetChan := make(chan struct{}, 1)
387387
fullClosedChan := make(chan struct{}, 1)
388-
requestID1 := graphsync.RequestID(rand.Int31())
389-
requestID2 := graphsync.RequestID(rand.Int31())
388+
requestID1 := graphsync.NewRequestID()
389+
requestID2 := graphsync.NewRequestID()
390390
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
391391
var waitGroup sync.WaitGroup
392392
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
@@ -403,7 +403,7 @@ func TestNetworkErrorClearResponses(t *testing.T) {
403403

404404
messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *Builder) {
405405
b.AddBlock(blks[0])
406-
b.AddLink(requestID1, cidlink.Link{Cid: blks[0].Cid()}, true)
406+
b.AddLink(requestID1, cidlink.Link{Cid: blks[0].Cid()}, graphsync.LinkActionPresent)
407407
b.SetSubscriber(requestID1, subscriber)
408408
})
409409
waitGroup.Wait()
@@ -431,16 +431,16 @@ func TestNetworkErrorClearResponses(t *testing.T) {
431431
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *Builder) {
432432
b.AddBlock(blks[1])
433433
b.SetResponseStream(requestID1, fc1)
434-
b.AddLink(requestID1, cidlink.Link{Cid: blks[1].Cid()}, true)
434+
b.AddLink(requestID1, cidlink.Link{Cid: blks[1].Cid()}, graphsync.LinkActionPresent)
435435
})
436436
messageQueue.AllocateAndBuildMessage(uint64(len(blks[2].RawData())), func(b *Builder) {
437437
b.AddBlock(blks[2])
438438
b.SetResponseStream(requestID1, fc1)
439-
b.AddLink(requestID1, cidlink.Link{Cid: blks[2].Cid()}, true)
439+
b.AddLink(requestID1, cidlink.Link{Cid: blks[2].Cid()}, graphsync.LinkActionPresent)
440440
})
441441
messageQueue.AllocateAndBuildMessage(uint64(len(blks[3].RawData())), func(b *Builder) {
442442
b.SetResponseStream(requestID2, fc2)
443-
b.AddLink(requestID2, cidlink.Link{Cid: blks[3].Cid()}, true)
443+
b.AddLink(requestID2, cidlink.Link{Cid: blks[3].Cid()}, graphsync.LinkActionPresent)
444444
b.AddBlock(blks[3])
445445
})
446446

0 commit comments

Comments
 (0)