Skip to content

Commit 3d98478

Browse files
committed
fix(requestmanager): make collect test requests with uuids sortable
1 parent 25c02e5 commit 3d98478

1 file changed

Lines changed: 40 additions & 39 deletions

File tree

requestmanager/requestmanager_test.go

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestNormalSimultaneousFetch(t *testing.T) {
4141
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
4242
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
4343

44-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
44+
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)
4545

4646
td.tcm.AssertProtected(t, peers[0])
4747
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag())
@@ -130,7 +130,7 @@ func TestCancelRequestInProgress(t *testing.T) {
130130
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx1, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
131131
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx2, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
132132

133-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
133+
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)
134134

135135
td.tcm.AssertProtected(t, peers[0])
136136
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag(), requestRecords[1].gsr.ID().Tag())
@@ -148,7 +148,7 @@ func TestCancelRequestInProgress(t *testing.T) {
148148
td.fal.SuccessResponseOn(peers[0], requestRecords[1].gsr.ID(), firstBlocks)
149149
td.blockChain.VerifyResponseRange(requestCtx1, returnedResponseChan1, 0, 3)
150150
cancel1()
151-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
151+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
152152

153153
require.True(t, rr.gsr.IsCancel())
154154
require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
@@ -194,7 +194,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) {
194194

195195
_, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
196196

197-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)
197+
requestRecords := readNNetworkRequests(requestCtx, t, td, 1)
198198

199199
td.tcm.AssertProtected(t, peers[0])
200200
td.tcm.AssertProtectedWithTags(t, peers[0], requestRecords[0].gsr.ID().Tag())
@@ -215,7 +215,7 @@ func TestCancelRequestImperativeNoMoreBlocks(t *testing.T) {
215215
require.NoError(t, err)
216216
postCancel <- struct{}{}
217217

218-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
218+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
219219

220220
require.True(t, rr.gsr.IsCancel())
221221
require.Equal(t, requestRecords[0].gsr.ID(), rr.gsr.ID())
@@ -243,7 +243,7 @@ func TestCancelManagerExitsGracefully(t *testing.T) {
243243

244244
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
245245

246-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
246+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
247247

248248
firstBlocks := td.blockChain.Blocks(0, 3)
249249
firstMetadata := encodedMetadataForBlocks(t, firstBlocks, true)
@@ -275,7 +275,7 @@ func TestFailedRequest(t *testing.T) {
275275

276276
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
277277

278-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
278+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
279279
td.tcm.AssertProtected(t, peers[0])
280280
td.tcm.AssertProtectedWithTags(t, peers[0], rr.gsr.ID().Tag())
281281

@@ -299,7 +299,7 @@ func TestLocallyFulfilledFirstRequestFailsLater(t *testing.T) {
299299

300300
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
301301

302-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
302+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
303303

304304
// async loaded response responds immediately
305305
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks())
@@ -330,7 +330,7 @@ func TestLocallyFulfilledFirstRequestSucceedsLater(t *testing.T) {
330330
})
331331
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
332332

333-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
333+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
334334

335335
// async loaded response responds immediately
336336
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), td.blockChain.AllBlocks())
@@ -358,7 +358,7 @@ func TestRequestReturnsMissingBlocks(t *testing.T) {
358358

359359
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
360360

361-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
361+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
362362

363363
md := encodedMetadataForBlocks(t, td.blockChain.AllBlocks(), false)
364364
firstResponses := []gsmsg.GraphSyncResponse{
@@ -436,7 +436,7 @@ func TestEncodingExtensions(t *testing.T) {
436436
td.responseHooks.Register(hook)
437437
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
438438

439-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
439+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
440440

441441
gsr := rr.gsr
442442
returnedData1, found := gsr.Extension(td.extensionName1)
@@ -474,7 +474,7 @@ func TestEncodingExtensions(t *testing.T) {
474474
testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
475475
require.Equal(t, expectedData, received, "did not receive correct extension data from resposne")
476476

477-
rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
477+
rr = readNNetworkRequests(requestCtx, t, td, 1)[0]
478478
receivedUpdateData, has := rr.gsr.Extension(td.extensionName1)
479479
require.True(t, has)
480480
require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")
@@ -510,7 +510,7 @@ func TestEncodingExtensions(t *testing.T) {
510510
testutil.AssertReceive(ctx, t, receivedExtensionData, &received, "did not receive extension data")
511511
require.Equal(t, nextExpectedData, received, "did not receive correct extension data from resposne")
512512

513-
rr = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
513+
rr = readNNetworkRequests(requestCtx, t, td, 1)[0]
514514
receivedUpdateData, has = rr.gsr.Extension(td.extensionName1)
515515
require.True(t, has)
516516
require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
@@ -550,7 +550,7 @@ func TestBlockHooks(t *testing.T) {
550550
td.blockHooks.Register(hook)
551551
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1, td.extension2)
552552

553-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
553+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
554554

555555
gsr := rr.gsr
556556
returnedData1, found := gsr.Extension(td.extensionName1)
@@ -602,7 +602,7 @@ func TestBlockHooks(t *testing.T) {
602602
})
603603
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), firstBlocks)
604604

605-
ur := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
605+
ur := readNNetworkRequests(requestCtx, t, td, 1)[0]
606606
receivedUpdateData, has := ur.gsr.Extension(td.extensionName1)
607607
require.True(t, has)
608608
require.Equal(t, expectedUpdate, receivedUpdateData, "should have updated with correct extension")
@@ -666,7 +666,7 @@ func TestBlockHooks(t *testing.T) {
666666
})
667667
td.fal.SuccessResponseOn(peers[0], rr.gsr.ID(), nextBlocks)
668668

669-
ur = readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
669+
ur = readNNetworkRequests(requestCtx, t, td, 1)[0]
670670
receivedUpdateData, has = ur.gsr.Extension(td.extensionName1)
671671
require.True(t, has)
672672
require.Equal(t, nextExpectedUpdate1, receivedUpdateData, "should have updated with correct extension")
@@ -715,7 +715,7 @@ func TestOutgoingRequestHooks(t *testing.T) {
715715
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
716716
returnedResponseChan2, returnedErrorChan2 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
717717

718-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 2)
718+
requestRecords := readNNetworkRequests(requestCtx, t, td, 2)
719719

720720
dedupData, has := requestRecords[0].gsr.Extension(graphsync.ExtensionDeDupByKey)
721721
require.True(t, has)
@@ -773,7 +773,7 @@ func TestOutgoingRequestListeners(t *testing.T) {
773773

774774
returnedResponseChan1, returnedErrorChan1 := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector(), td.extension1)
775775

776-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)
776+
requestRecords := readNNetworkRequests(requestCtx, t, td, 1)
777777

778778
// Should have fired by now
779779
select {
@@ -836,7 +836,7 @@ func TestPauseResume(t *testing.T) {
836836
// Start request
837837
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
838838

839-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
839+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
840840

841841
// Start processing responses
842842
md := metadataForBlocks(td.blockChain.AllBlocks(), true)
@@ -862,7 +862,7 @@ func TestPauseResume(t *testing.T) {
862862
<-holdForPause
863863

864864
// read the outgoing cancel request
865-
pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
865+
pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0]
866866
require.True(t, pauseCancel.gsr.IsCancel())
867867

868868
// verify no further responses come through
@@ -875,7 +875,7 @@ func TestPauseResume(t *testing.T) {
875875
require.NoError(t, err)
876876

877877
// verify the correct new request with Do-no-send-cids & other extensions
878-
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
878+
resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0]
879879
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
880880
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
881881
require.NoError(t, err)
@@ -922,7 +922,7 @@ func TestPauseResumeExternal(t *testing.T) {
922922
// Start request
923923
returnedResponseChan, returnedErrorChan := td.requestManager.NewRequest(requestCtx, peers[0], td.blockChain.TipLink, td.blockChain.Selector())
924924

925-
rr := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
925+
rr := readNNetworkRequests(requestCtx, t, td, 1)[0]
926926

927927
// Start processing responses
928928
md := metadataForBlocks(td.blockChain.AllBlocks(), true)
@@ -942,7 +942,7 @@ func TestPauseResumeExternal(t *testing.T) {
942942
<-holdForPause
943943

944944
// read the outgoing cancel request
945-
pauseCancel := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
945+
pauseCancel := readNNetworkRequests(requestCtx, t, td, 1)[0]
946946
require.True(t, pauseCancel.gsr.IsCancel())
947947

948948
// verify no further responses come through
@@ -955,7 +955,7 @@ func TestPauseResumeExternal(t *testing.T) {
955955
require.NoError(t, err)
956956

957957
// verify the correct new request with Do-no-send-cids & other extensions
958-
resumedRequest := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 1)[0]
958+
resumedRequest := readNNetworkRequests(requestCtx, t, td, 1)[0]
959959
doNotSendCidsData, has := resumedRequest.gsr.Extension(graphsync.ExtensionDoNotSendCIDs)
960960
doNotSendCids, err := cidset.DecodeCidSet(doNotSendCidsData)
961961
require.NoError(t, err)
@@ -991,7 +991,7 @@ func TestStats(t *testing.T) {
991991
_, _ = td.requestManager.NewRequest(requestCtx, peers[0], blockChain2.TipLink, blockChain2.Selector())
992992
_, _ = td.requestManager.NewRequest(requestCtx, peers[1], td.blockChain.TipLink, td.blockChain.Selector())
993993

994-
requestRecords := readNNetworkRequests(requestCtx, t, td.requestRecordChan, 3)
994+
requestRecords := readNNetworkRequests(requestCtx, t, td, 3)
995995

996996
peerState := td.requestManager.PeerState(peers[0])
997997
require.Len(t, peerState.RequestStates, 2)
@@ -1026,25 +1026,22 @@ func (fph *fakePeerHandler) AllocateAndBuildMessage(p peer.ID, blkSize uint64,
10261026
}
10271027
}
10281028

1029-
func readNNetworkRequests(ctx context.Context,
1030-
t *testing.T,
1031-
requestRecordChan <-chan requestRecord,
1032-
count int) []requestRecord {
1033-
requestRecords := make([]requestRecord, 0, count)
1029+
func readNNetworkRequests(ctx context.Context, t *testing.T, td *testData, count int) []requestRecord {
1030+
requestRecords := make(map[graphsync.RequestID]requestRecord, count)
10341031
for i := 0; i < count; i++ {
10351032
var rr requestRecord
1036-
testutil.AssertReceive(ctx, t, requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
1037-
requestRecords = append(requestRecords, rr)
1033+
testutil.AssertReceive(ctx, t, td.requestRecordChan, &rr, fmt.Sprintf("did not receive request %d", i))
1034+
requestRecords[rr.gsr.ID()] = rr
10381035
}
10391036
// because of the simultaneous request queues it's possible for the requests to go to the network layer out of order
10401037
// if the requests are queued at a near identical time
1041-
// TODO: howdo?
1042-
/*
1043-
sort.Slice(requestRecords, func(i, j int) bool {
1044-
return requestRecords[i].gsr.ID() < requestRecords[j].gsr.ID()
1045-
})
1046-
*/
1047-
return requestRecords
1038+
sorted := make([]requestRecord, 0, len(requestRecords))
1039+
for _, id := range td.requestIds {
1040+
if rr, ok := requestRecords[id]; ok {
1041+
sorted = append(sorted, rr)
1042+
}
1043+
}
1044+
return sorted
10481045
}
10491046

10501047
func metadataForBlocks(blks []blocks.Block, present bool) metadata.Metadata {
@@ -1091,6 +1088,7 @@ type testData struct {
10911088
outgoingRequestProcessingListeners *listeners.OutgoingRequestProcessingListeners
10921089
taskqueue *taskqueue.WorkerTaskQueue
10931090
executor *executor.Executor
1091+
requestIds []graphsync.RequestID
10941092
}
10951093

10961094
func newTestData(ctx context.Context, t *testing.T) *testData {
@@ -1127,5 +1125,8 @@ func newTestData(ctx context.Context, t *testing.T) *testData {
11271125
Name: td.extensionName2,
11281126
Data: td.extensionData2,
11291127
}
1128+
td.requestHooks.Register(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.OutgoingRequestHookActions) {
1129+
td.requestIds = append(td.requestIds, request.ID())
1130+
})
11301131
return td
11311132
}

0 commit comments

Comments
 (0)