From e62413bb00577c4d31885c200fd5c11b39b24c9a Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Thu, 6 May 2021 09:15:11 +0200 Subject: [PATCH 1/7] release: v1.5.0 --- CHANGELOG.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5c42ce4..43bfdf9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,26 @@ # go-data-transfer changelog +# go-data-transfer 1.5.0 + +Support the data transfer being restarted. + +- github.com/filecoin-project/go-data-transfer: + - Add isRestart param to validators (#197) ([filecoin-project/go-data-transfer#197](https://github.com/filecoin-project/go-data-transfer/pull/197)) + - fix: flaky TestChannelMonitorAutoRestart (#198) ([filecoin-project/go-data-transfer#198](https://github.com/filecoin-project/go-data-transfer/pull/198)) + - Channel monitor watches for errors instead of measuring data rate (#190) ([filecoin-project/go-data-transfer#190](https://github.com/filecoin-project/go-data-transfer/pull/190)) + - fix: prevent concurrent restarts for same channel (#195) ([filecoin-project/go-data-transfer#195](https://github.com/filecoin-project/go-data-transfer/pull/195)) + - fix: channel state machine event handling (#194) ([filecoin-project/go-data-transfer#194](https://github.com/filecoin-project/go-data-transfer/pull/194)) + - Dont double count data sent (#185) ([filecoin-project/go-data-transfer#185](https://github.com/filecoin-project/go-data-transfer/pull/185)) +- github.com/ipfs/go-graphsync (v0.6.0 -> v0.6.1): + - feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164)) + +Contributors + +| Contributor | Commits | Lines ± | Files Changed | +|-------------|---------|---------|---------------| +| dirkmc | 8 | +1235/-868 | 37 | +| Dirk McCormick | 1 | +11/-0 | 1 | + # go-data-transfer 1.4.3 - github.com/filecoin-project/go-data-transfer: From d1e064261fbcee8ea87c5b9e125c84dee7090c66 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 26 May 2021 16:07:10 +0530 Subject: [PATCH 2/7] logs --- impl/events.go | 12 ++++++++++++ transport/graphsync/graphsync.go | 6 +++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/impl/events.go b/impl/events.go index abcadf3f..4814b082 100644 --- a/impl/events.go +++ b/impl/events.go @@ -161,25 +161,31 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra } func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error { + log.Infof("channel %s: received response %+v from provider", chid, response) + if response.IsCancel() { log.Infof("channel %s: received cancel response, cancelling channel", chid) return m.channels.Cancel(chid) } + if response.IsVoucherResult() { if !response.EmptyVoucherResult() { vresult, err := m.decodeVoucherResult(response) if err != nil { return err } + log.Infof("channel %s: received voucher response %+v", chid, vresult) err = m.channels.NewVoucherResult(chid, vresult) if err != nil { return err } } + if !response.Accepted() { log.Infof("channel %s: received rejected response, erroring out channel", chid) return m.channels.Error(chid, datatransfer.ErrRejected) } + if response.IsNew() { log.Infof("channel %s: received new response, accepting channel", chid) err := m.channels.Accept(chid) @@ -196,6 +202,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat } } } + if response.IsComplete() && response.Accepted() { if !response.IsPaused() { log.Infof("channel %s: received complete response, completing channel", chid) @@ -206,6 +213,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat return nil } } + if response.IsPaused() { return m.pauseOther(chid) } @@ -432,6 +440,10 @@ func (m *manager) validateVoucher( } result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor) + if isPull { + log.Infof("\n ValidatePull, result=%s, err=%s", result, err) + } + return vouch, result, err } diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index a5bbb6ab..1e9db943 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -820,7 +820,11 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio } dtResponse := msg.(datatransfer.Response) - return nil, t.events.OnResponseReceived(chid, dtResponse) + err = t.events.OnResponseReceived(chid, dtResponse) + if err != nil { + log.Errorf("\n error receieved from OnResponseReceived is %s", err) + } + return nil, err } func (t *Transport) gsRequestorCancelledListener(p peer.ID, request graphsync.RequestData) { From a61b1d5dc4bb71107bd79c0add6a1f716497b4a9 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 26 May 2021 20:32:31 +0530 Subject: [PATCH 3/7] more logging --- impl/events.go | 5 +++++ transport/graphsync/graphsync.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/impl/events.go b/impl/events.go index 4814b082..a2697aac 100644 --- a/impl/events.go +++ b/impl/events.go @@ -169,14 +169,18 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat } if response.IsVoucherResult() { + log.Infof("channel %s: received response %+v from provider is a voucher result", chid, response) if !response.EmptyVoucherResult() { + log.Debug("processing non-empty voucher result") vresult, err := m.decodeVoucherResult(response) if err != nil { + log.Errorf("channel %s:, failed to decode voucher result, err=%s", chid, err) return err } log.Infof("channel %s: received voucher response %+v", chid, vresult) err = m.channels.NewVoucherResult(chid, vresult) if err != nil { + log.Errorf("channel %s: failed NewVoucherResult, err=%s ", chid, err) return err } } @@ -190,6 +194,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat log.Infof("channel %s: received new response, accepting channel", chid) err := m.channels.Accept(chid) if err != nil { + log.Errorf("channel %s: failed to accept new response, err=%s", chid, err) return err } } diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 1e9db943..fce43141 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -592,7 +592,9 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook // when a DT request comes in on graphsync, it's a pull chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID} request := msg.(datatransfer.Request) + log.Debugf("will validate recieved gs request, chid=%s, request=%+v", chid, request) responseMessage, err = t.events.OnRequestReceived(chid, request) + log.Debugf("will send response message %+v for request gs chid=%s", responseMessage, chid) } else { // when a DT response comes in on graphsync, it's a push chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p} @@ -604,15 +606,18 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook extensions, extensionErr := extension.ToExtensionData(responseMessage, t.supportedExtensions) if extensionErr != nil { hookActions.TerminateWithError(err) + log.Errorf("terminated client gs request chid=%s with extension err=%s", chid, err) return } for _, extension := range extensions { + log.Debugf("queued up extension %+v for response, gs chid=%s", extension, chid) hookActions.SendExtensionData(extension) } } if err != nil && err != datatransfer.ErrPause { hookActions.TerminateWithError(err) + log.Errorf("terminated client gs request chid=%s with err=%s", chid, err) return } From bc5a06deac012e3e86ecd193cbdacb5c48e68141 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 26 May 2021 20:42:08 +0530 Subject: [PATCH 4/7] fix typo --- channelmonitor/channelmonitor.go | 2 +- impl/events.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index 5c53624b..b8a79d66 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -308,7 +308,7 @@ func (mc *monitoredChannel) watchForResponderComplete() { case <-timer.C: // Timer expired before we received a Complete message from the responder err := xerrors.Errorf("%s: timed out waiting %s for Complete message from remote peer", - mc.chid, mc.cfg.AcceptTimeout) + mc.chid, mc.cfg.CompleteTimeout) mc.closeChannelAndShutdown(err) } } diff --git a/impl/events.go b/impl/events.go index a2697aac..8338b58c 100644 --- a/impl/events.go +++ b/impl/events.go @@ -213,6 +213,9 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat log.Infof("channel %s: received complete response, completing channel", chid) return m.channels.ResponderCompletes(chid) } + + log.Infof("channel %s: received complete response but responder is paused", chid) + err := m.channels.ResponderBeginsFinalization(chid) if err != nil { return nil From 6934ef0f0348e50a9c9859b95546272b3c2ec01e Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 26 May 2021 21:07:30 +0530 Subject: [PATCH 5/7] log response processing --- transport/graphsync/graphsync.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index fce43141..7c719d9c 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -594,7 +594,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook request := msg.(datatransfer.Request) log.Debugf("will validate recieved gs request, chid=%s, request=%+v", chid, request) responseMessage, err = t.events.OnRequestReceived(chid, request) - log.Debugf("will send response message %+v for request gs chid=%s", responseMessage, chid) + log.Debugf("will send response message %+v for request gs chid=%s, error/pause/resume value=%s", responseMessage, chid, err) } else { // when a DT response comes in on graphsync, it's a push chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: t.peerID, Responder: p} @@ -637,6 +637,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook hasXferStarted, isRestart := t.channelXferStarted[chid] if isRestart && !hasXferStarted && !paused { paused = true + log.Debugf("pausing responder for request gs chid=%s, even though validator sent no-op as it's a restart req", chid) hookActions.PauseResponse() } t.channelXferStarted[chid] = !paused From fa3f7aee6c07a2e1dce3912aa6425c6e0bddb18f Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Wed, 26 May 2021 21:15:20 +0530 Subject: [PATCH 6/7] address reviews --- impl/events.go | 4 ++-- transport/graphsync/graphsync.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/impl/events.go b/impl/events.go index 8338b58c..8a153f08 100644 --- a/impl/events.go +++ b/impl/events.go @@ -171,7 +171,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat if response.IsVoucherResult() { log.Infof("channel %s: received response %+v from provider is a voucher result", chid, response) if !response.EmptyVoucherResult() { - log.Debug("processing non-empty voucher result") + log.Debugf("channel %s: processing non-empty voucher result", chid) vresult, err := m.decodeVoucherResult(response) if err != nil { log.Errorf("channel %s:, failed to decode voucher result, err=%s", chid, err) @@ -449,7 +449,7 @@ func (m *manager) validateVoucher( result, err := validatorFunc(isRestart, sender, vouch, baseCid, stor) if isPull { - log.Infof("\n ValidatePull, result=%s, err=%s", result, err) + log.Infof("ValidatePull, result=%s, err=%s", result, err) } return vouch, result, err diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 7c719d9c..64a902a6 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -592,7 +592,7 @@ func (t *Transport) gsReqRecdHook(p peer.ID, request graphsync.RequestData, hook // when a DT request comes in on graphsync, it's a pull chid = datatransfer.ChannelID{ID: msg.TransferID(), Initiator: p, Responder: t.peerID} request := msg.(datatransfer.Request) - log.Debugf("will validate recieved gs request, chid=%s, request=%+v", chid, request) + log.Debugf("will validate received gs request, chid=%s, request=%+v", chid, request) responseMessage, err = t.events.OnRequestReceived(chid, request) log.Debugf("will send response message %+v for request gs chid=%s, error/pause/resume value=%s", responseMessage, chid, err) } else { @@ -828,7 +828,7 @@ func (t *Transport) processExtension(chid datatransfer.ChannelID, gsMsg extensio dtResponse := msg.(datatransfer.Response) err = t.events.OnResponseReceived(chid, dtResponse) if err != nil { - log.Errorf("\n error receieved from OnResponseReceived is %s", err) + log.Errorf("error receieved from OnResponseReceived is %s", err) } return nil, err } From 8982d1a6e07e7d4999e6af1f236fc822ce6d5b88 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Fri, 28 May 2021 21:33:00 +0530 Subject: [PATCH 7/7] more logs --- channelmonitor/channelmonitor.go | 15 +++++++++++++-- impl/impl.go | 27 ++++++++++++--------------- network/libp2p_impl.go | 2 ++ 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/channelmonitor/channelmonitor.go b/channelmonitor/channelmonitor.go index b8a79d66..e5138e1f 100644 --- a/channelmonitor/channelmonitor.go +++ b/channelmonitor/channelmonitor.go @@ -105,7 +105,12 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore } m.lk.Lock() - defer m.lk.Unlock() + log.Debugf("aquired lock to create channel monitor for channelID=%s", chid) + defer func() { + log.Debugf("will release channel monitor lock for channelID=%s", chid) + m.lk.Unlock() + log.Debugf("released channel monitor lock for channelID=%s", chid) + }() // Check if there is already a monitor for this channel if _, ok := m.channels[chid]; ok { @@ -118,8 +123,10 @@ func (m *Monitor) addChannel(chid datatransfer.ChannelID, isPush bool) *monitore return nil } + log.Debugf("will create channel monitor for channelID=%s", chid) mpc := newMonitoredChannel(m.ctx, m.mgr, chid, m.cfg, m.onMonitoredChannelShutdown) m.channels[chid] = mpc + log.Debugf("created channel monitor for channelID=%s", chid) return mpc } @@ -229,6 +236,8 @@ func (mc *monitoredChannel) start() { // Watch to make sure the responder accepts the channel in time cancelAcceptTimer := mc.watchForResponderAccept() + log.Debugf("finished creating timer for accept messages, channelID=%s", mc.chid) + // Watch for data-transfer channel events mc.unsub = mc.mgr.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { if channelState.ChannelID() != mc.chid { @@ -438,12 +447,14 @@ func (mc *monitoredChannel) sendRestartMessage(restartCount int) error { log.Infof("%s: re-established connection to %s in %s", mc.chid, p, time.Since(start)) // Send a restart message for the channel - restartResult := mc.waitForRestartResponse() log.Infof("%s: sending restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) err = mc.mgr.RestartDataTransferChannel(mc.ctx, mc.chid) if err != nil { return xerrors.Errorf("%s: failed to send restart message to %s: %w", mc.chid, p, err) } + log.Infof("%s: sent restart message to %s (%d consecutive restarts)", mc.chid, p, restartCount) + + restartResult := mc.waitForRestartResponse() // The restart message is fire and forget, so we need to watch for a // restart response to know that the restart message reached the peer. diff --git a/impl/impl.go b/impl/impl.go index b0210f04..17ee89f6 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -204,20 +204,18 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo transportConfigurer(chid, voucher, m.transport) } m.dataTransferNetwork.Protect(requestTo, chid.String()) - monitoredChan := m.channelMonitor.AddPushChannel(chid) + if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) _ = m.channels.Error(chid, err) - - // If push channel monitoring is enabled, shutdown the monitor as it - // wasn't possible to start the data transfer - if monitoredChan != nil { - monitoredChan.Shutdown() - } - return chid, err } + log.Debugf("sent push request message, channelID=%s", chid) + + m.channelMonitor.AddPushChannel(chid) + log.Infof("started new channel monitor for push request, channelID=%s", chid) + return chid, nil } @@ -242,19 +240,18 @@ func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, vo transportConfigurer(chid, voucher, m.transport) } m.dataTransferNetwork.Protect(requestTo, chid.String()) - monitoredChan := m.channelMonitor.AddPullChannel(chid) + if err := m.transport.OpenChannel(ctx, requestTo, chid, cidlink.Link{Cid: baseCid}, selector, nil, req); err != nil { err = fmt.Errorf("Unable to send request: %w", err) _ = m.channels.Error(chid, err) - // If pull channel monitoring is enabled, shutdown the monitor as it - // wasn't possible to start the data transfer - if monitoredChan != nil { - monitoredChan.Shutdown() - } - return chid, err } + + log.Debugf("sent pull channel request channelID=%s", chid) + m.channelMonitor.AddPullChannel(chid) + log.Infof("started new channel monitor for pull request: channelID=%s", chid) + return chid, nil } diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index c9b13b2b..74d4e822 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -154,11 +154,13 @@ func (dtnet *libp2pDataTransferNetwork) SendMessage( ctx context.Context, p peer.ID, outgoing datatransfer.Message) error { + log.Debugf("opening stream to peer %s to send message %+v", p, outgoing) s, err := dtnet.openStream(ctx, p, dtnet.dtProtocols...) if err != nil { return err } + log.Debugf("finished opening stream to peer %s to send message %+v", p, outgoing) outgoing, err = outgoing.MessageForProtocol(s.Protocol()) if err != nil {