From 29faea975bd840690d84df0c007d8c2f89c1f4a0 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:13:17 -0700 Subject: [PATCH 1/2] Move providing responsabilities from bitswap to blockservice - bitswap/server: remove provide - blockservice: add session workaround to work with wrapped blockservices - blockservice: add WithProvider option - blockservice: remove session embeding in context Replaces #534 by @Jorropo which was outdated enough to make merging difficult. --- CHANGELOG.md | 2 + bitswap/bitswap.go | 12 +- bitswap/bitswap_test.go | 2 +- bitswap/client/bitswap_with_sessions_test.go | 4 + bitswap/internal/defaults/defaults.go | 5 - bitswap/options.go | 4 - bitswap/server/server.go | 166 +------------------ blockservice/blockservice.go | 132 ++++++++------- blockservice/blockservice_test.go | 127 +++++++++----- blockservice/providing_blockstore.go | 37 +++++ gateway/backend_blocks.go | 6 - 11 files changed, 204 insertions(+), 293 deletions(-) create mode 100644 blockservice/providing_blockstore.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ee78a8b19..350e93370 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: * `boxo/bitswap/server`: * A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672) - `routing/http`: added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) +- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do. ### Changed @@ -61,6 +62,7 @@ The following emojis are used to highlight certain changes: - `bitswap/client` fix memory leak in BlockPresenceManager due to unlimited map growth. [#636](https://github.com/ipfs/boxo/pull/636) - `bitswap/network` fixed race condition when a timeout occurred before hole punching completed while establishing a first-time stream to a peer behind a NAT [#651](https://github.com/ipfs/boxo/pull/651) - `bitswap`: wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have. [#629](https://github.com/ipfs/boxo/pull/629) +- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany. ## [v0.21.0] diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 393ab96ad..90c8690b7 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/ipfs/boxo/bitswap/client" - "github.com/ipfs/boxo/bitswap/internal/defaults" "github.com/ipfs/boxo/bitswap/message" "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" @@ -45,9 +44,8 @@ type bitswap interface { } var ( - _ exchange.SessionExchange = (*Bitswap)(nil) - _ bitswap = (*Bitswap)(nil) - HasBlockBufferSize = defaults.HasBlockBufferSize + _ exchange.SessionExchange = (*Bitswap)(nil) + _ bitswap = (*Bitswap)(nil) ) type Bitswap struct { @@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - if HasBlockBufferSize != defaults.HasBlockBufferSize { - serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) - } - ctx = metrics.CtxSubScope(ctx, "bitswap") bs.Server = server.New(ctx, net, bstore, serverOptions...) @@ -115,7 +109,6 @@ type Stat struct { MessagesReceived uint64 BlocksSent uint64 DataSent uint64 - ProvideBufLen int } func (bs *Bitswap) Stat() (*Stat, error) { @@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) { Peers: ss.Peers, BlocksSent: ss.BlocksSent, DataSent: ss.DataSent, - ProvideBufLen: ss.ProvideBufLen, }, nil } diff --git a/bitswap/bitswap_test.go b/bitswap/bitswap_test.go index 85055879c..cfd9f84b7 100644 --- a/bitswap/bitswap_test.go +++ b/bitswap/bitswap_test.go @@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)} + bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)} ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts) defer ig.Close() diff --git a/bitswap/client/bitswap_with_sessions_test.go b/bitswap/client/bitswap_with_sessions_test.go index 6241865ef..4bc02a436 100644 --- a/bitswap/client/bitswap_with_sessions_test.go +++ b/bitswap/client/bitswap_with_sessions_test.go @@ -39,6 +39,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk if err != nil { t.Fatal(err) } + err = inst.Adapter.Provide(ctx, blk.Cid()) + if err != nil { + t.Fatal(err) + } } func TestBasicSessions(t *testing.T) { diff --git a/bitswap/internal/defaults/defaults.go b/bitswap/internal/defaults/defaults.go index b30bcc87f..dbcd62a31 100644 --- a/bitswap/internal/defaults/defaults.go +++ b/bitswap/internal/defaults/defaults.go @@ -20,11 +20,6 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 // Maximum size of the wantlist we are willing to keep in memory. MaxQueuedWantlistEntiresPerPeer = 1024 diff --git a/bitswap/options.go b/bitswap/options.go index 6a98b27db..6e81235d9 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option { return Option{server.TaskWorkerCount(count)} } -func ProvideEnabled(enabled bool) Option { - return Option{server.ProvideEnabled(enabled)} -} - func SetSendDontHaves(send bool) Option { return Option{server.SetSendDontHaves(send)} } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 46d29a8fc..f5064bab6 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -21,20 +21,15 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" - procctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/zap" ) -var provideKeysBufferSize = 2048 - var ( log = logging.Logger("bitswap/server") sflog = log.Desugar() ) -const provideWorkerMax = 6 - type Option func(*Server) type Server struct { @@ -59,20 +54,8 @@ type Server struct { process process.Process - // newBlocks is a channel for newly added blocks to be provided to the - // network. blocks pushed down this channel get buffered and fed to the - // provideKeys channel later on to avoid too much network activity - newBlocks chan cid.Cid - // provideKeys directly feeds provide workers - provideKeys chan cid.Cid - // Extra options to pass to the decision manager engineOptions []decision.Option - - // the size of channel buffer to use - hasBlockBufferSize int - // whether or not to make provide announcements - provideEnabled bool } func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { @@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: bmetrics.SentHist(ctx), - sendTimeHistogram: bmetrics.SendTimeHist(ctx), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - process: px, - provideEnabled: true, - hasBlockBufferSize: defaults.HasBlockBufferSize, - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + process: px, } - s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option { } } -// ProvideEnabled is an option for enabling/disabling provide announcements -func ProvideEnabled(enabled bool) Option { - return func(bs *Server) { - bs.provideEnabled = enabled - } -} - func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option { o := decision.WithPeerBlockRequestFilter(pbrf) return func(bs *Server) { @@ -241,16 +213,6 @@ func MaxCidSize(n uint) Option { } } -// HasBlockBufferSize configure how big the new blocks buffer should be. -func HasBlockBufferSize(count int) Option { - if count < 0 { - panic("cannot have negative buffer size") - } - return func(bs *Server) { - bs.hasBlockBufferSize = count - } -} - // WithWantHaveReplaceSize sets the maximum size of a block in bytes up to // which the bitswap server will replace a WantHave with a WantBlock response. // @@ -303,18 +265,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) { bs.taskWorker(ctx, i) }) } - - if bs.provideEnabled { - // Start up a worker to manage sending out provides messages - px.Go(func(px process.Process) { - bs.provideCollector(ctx) - }) - - // Spawn up multiple workers to handle incoming blocks - // consider increasing number if providing blocks bottlenecks - // file transfers - px.Go(bs.provideWorker) - } } func (bs *Server) taskWorker(ctx context.Context, id int) { @@ -422,10 +372,9 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) { } type Stat struct { - Peers []string - ProvideBufLen int - BlocksSent uint64 - DataSent uint64 + Peers []string + BlocksSent uint64 + DataSent uint64 } // Stat returns aggregated statistics about bitswap operations @@ -433,7 +382,6 @@ func (bs *Server) Stat() (Stat, error) { bs.counterLk.Lock() s := bs.counters bs.counterLk.Unlock() - s.ProvideBufLen = len(bs.newBlocks) peers := bs.engine.Peers() peersStr := make([]string, len(peers)) @@ -460,107 +408,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err // Send wanted blocks to decision engine bs.engine.NotifyNewBlocks(blks) - // If the reprovider is enabled, send block to reprovider - if bs.provideEnabled { - for _, blk := range blks { - select { - case bs.newBlocks <- blk.Cid(): - // send block off to be reprovided - case <-bs.process.Closing(): - return bs.process.Close() - } - } - } - return nil } -func (bs *Server) provideCollector(ctx context.Context) { - defer close(bs.provideKeys) - var toProvide []cid.Cid - var nextKey cid.Cid - var keysOut chan cid.Cid - - for { - select { - case blkey, ok := <-bs.newBlocks: - if !ok { - log.Debug("newBlocks channel closed") - return - } - - if keysOut == nil { - nextKey = blkey - keysOut = bs.provideKeys - } else { - toProvide = append(toProvide, blkey) - } - case keysOut <- nextKey: - if len(toProvide) > 0 { - nextKey = toProvide[0] - toProvide = toProvide[1:] - } else { - keysOut = nil - } - case <-ctx.Done(): - return - } - } -} - -func (bs *Server) provideWorker(px process.Process) { - // FIXME: OnClosingContext returns a _custom_ context type. - // Unfortunately, deriving a new cancelable context from this custom - // type fires off a goroutine. To work around this, we create a single - // cancelable context up-front and derive all sub-contexts from that. - // - // See: https://github.com/ipfs/go-ipfs/issues/5810 - ctx := procctx.OnClosingContext(px) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - limit := make(chan struct{}, provideWorkerMax) - - limitedGoProvide := func(k cid.Cid, wid int) { - defer func() { - // replace token when done - <-limit - }() - - log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k) - defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k) - - ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx - defer cancel() - - if err := bs.network.Provide(ctx, k); err != nil { - log.Warn(err) - } - } - - // worker spawner, reads from bs.provideKeys until it closes, spawning a - // _ratelimited_ number of workers to handle each key. - for wid := 2; ; wid++ { - log.Debug("Bitswap.ProvideWorker.Loop") - - select { - case <-px.Closing(): - return - case k, ok := <-bs.provideKeys: - if !ok { - log.Debug("provideKeys channel closed") - return - } - select { - case <-px.Closing(): - return - case limit <- struct{}{}: - go limitedGoProvide(k, wid) - } - } - } -} - func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { // This call records changes to wantlists, blocks received, // and number of bytes transfered. diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 353be00f8..e24e5b44e 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" + "github.com/ipfs/boxo/provider" "github.com/ipfs/boxo/verifcid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -73,10 +74,21 @@ type BoundedBlockService interface { var _ BoundedBlockService = (*blockService)(nil) +// ProvidingBlockService is a Blockservice which provides new blocks to a provider. +type ProvidingBlockService interface { + BlockService + + // Provider can return nil, then no provider is used. + Provider() provider.Provider +} + +var _ ProvidingBlockService = (*blockService)(nil) + type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore exchange exchange.Interface + provider provider.Provider // If checkFirst is true then first check that a block doesn't // already exist to avoid republishing the block on the exchange. checkFirst bool @@ -99,6 +111,13 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option { } } +// WithProvider allows to advertise anything that is added through the blockservice. +func WithProvider(prov provider.Provider) Option { + return func(bs *blockService) { + bs.provider = prov + } +} + // New creates a BlockService with given datastore instance. func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { if exchange == nil { @@ -121,6 +140,11 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { + if s.provider != nil { + // FIXME: this is a hack remove once ipfs/boxo#567 is solved. + return providingBlockstore{s.blockstore, s.provider} + } + return s.blockstore } @@ -133,23 +157,13 @@ func (s *blockService) Allowlist() verifcid.Allowlist { return s.allowlist } -// NewSession creates a new session that allows for -// controlled exchange of wantlists to decrease the bandwidth overhead. -// If the current exchange is a SessionExchange, a new exchange -// session will be created. Otherwise, the current exchange will be used -// directly. -// Sessions are lazily setup, this is cheap. -func NewSession(ctx context.Context, bs BlockService) *Session { - ses := grabSessionFromContext(ctx, bs) - if ses != nil { - return ses - } - - return newSession(ctx, bs) +func (s *blockService) Provider() provider.Provider { + return s.provider } -// newSession is like [NewSession] but it does not attempt to reuse session from the existing context. -func newSession(ctx context.Context, bs BlockService) *Session { +// NewSession creates a new session that allows for controlled exchange of +// wantlists to decrease the bandwidth overhead. +func NewSession(ctx context.Context, bs BlockService) *Session { return &Session{bs: bs, sesctx: ctx} } @@ -169,7 +183,7 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { } } - if err := s.blockstore.Put(ctx, o); err != nil { + if err = s.blockstore.Put(ctx, o); err != nil { return err } @@ -180,6 +194,11 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } return nil } @@ -226,16 +245,19 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } + if s.provider != nil { + for _, o := range toput { + if err := s.provider.Provide(o.Cid()); err != nil { + logger.Errorf("Provide: %s", err.Error()) + } + } + } return nil } // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlock(ctx, c) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -253,7 +275,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } - blockstore := bs.Blockstore() + provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs) block, err := blockstore.Get(ctx, c) switch { @@ -287,6 +309,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } + if provider != nil { + err = provider.Provide(blk.Cid()) + if err != nil { + return nil, err + } + } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -295,10 +323,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { - if ses := grabSessionFromContext(ctx, s); ses != nil { - return ses.GetBlocks(ctx, ks) - } - ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() @@ -336,7 +360,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - bs := blockservice.Blockstore() + provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice) var misses []cid.Cid for _, c := range ks { @@ -395,6 +419,14 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet cache[0] = nil // early gc } + if provider != nil { + err = provider.Provide(b.Cid()) + if err != nil { + logger.Errorf("could not tell the provider about new blocks: %s", err) + return + } + } + select { case out <- b: case <-ctx.Done(): @@ -474,43 +506,6 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo var _ BlockGetter = (*Session)(nil) -// ContextWithSession is a helper which creates a context with an embded session, -// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] -// will be redirected to this same session instead. -// Sessions are lazily setup, this is cheap. -// It wont make a new session if one exists already in the context. -func ContextWithSession(ctx context.Context, bs BlockService) context.Context { - if grabSessionFromContext(ctx, bs) != nil { - return ctx - } - return EmbedSessionInContext(ctx, newSession(ctx, bs)) -} - -// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. -func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { - // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. - return context.WithValue(ctx, ses.bs, ses) -} - -// grabSessionFromContext returns nil if the session was not found -// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, -// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. -// By having this private we allow consumers to follow the trace of where the blockservice is passed and used. -func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { - s := ctx.Value(bs) - if s == nil { - return nil - } - - ss, ok := s.(*Session) - if !ok { - // idk what to do here, that kinda sucks, giveup - return nil - } - - return ss -} - // grabAllowlistFromBlockservice never returns nil func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { if bbs, ok := bs.(BoundedBlockService); ok { @@ -518,3 +513,14 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { } return verifcid.DefaultAllowlist } + +// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used. +func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) { + if bbs, ok := bs.(*blockService); ok { + return bbs.provider, bbs.blockstore + } + if bbs, ok := bs.(ProvidingBlockService); ok { + return bbs.Provider(), bbs.Blockstore() + } + return nil, bs.Blockstore() +} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 29350ff37..9629a8074 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -288,67 +288,102 @@ func TestAllowlist(t *testing.T) { check(NewSession(ctx, blockservice).GetBlock) } -type fakeIsNewSessionCreateExchange struct { - ses exchange.Fetcher - newSessionWasCalled bool +type wrappedBlockservice struct { + BlockService } -var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil) +type mockProvider []cid.Cid -func (*fakeIsNewSessionCreateExchange) Close() error { +func (p *mockProvider) Provide(c cid.Cid) error { + *p = append(*p, c) return nil } -func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { - panic("should call on the session") -} - -func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { - panic("should call on the session") -} - -func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher { - f.newSessionWasCalled = true - return f.ses -} - -func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { - return nil -} - -func TestContextSession(t *testing.T) { +func TestProviding(t *testing.T) { t.Parallel() a := assert.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - blks := random.BlocksOfSize(2, blockSize) - block1 := blks[0] - block2 := blks[1] + blocks := random.BlocksOfSize(12, blockSize) + + exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) - bs := blockstore.NewBlockstore(ds.NewMapDatastore()) - a.NoError(bs.Put(ctx, block1)) - a.NoError(bs.Put(ctx, block2)) - sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)} + prov := mockProvider{} + blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov)) + var added []cid.Cid - service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) + // Adding one block provide it. + a.NoError(blockservice.AddBlock(ctx, blocks[0])) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] - ctx = ContextWithSession(ctx, service) + // Adding multiple blocks provide them. + a.NoError(blockservice.AddBlocks(ctx, blocks[0:2])) + added = append(added, blocks[0].Cid(), blocks[1].Cid()) + blocks = blocks[2:] - b, err := service.GetBlock(ctx, block1.Cid()) + // Downloading one block provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err := blockservice.GetBlock(ctx, blocks[0].Cid()) a.NoError(err) - a.Equal(b.RawData(), block1.RawData()) - a.True(sesEx.newSessionWasCalled, "new session from context should be created") - sesEx.newSessionWasCalled = false - - bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()}) - a.Equal((<-bchan).RawData(), block2.RawData()) - a.False(sesEx.newSessionWasCalled, "session should be reused in context") - - a.Equal( - NewSession(ctx, service), - NewSession(ContextWithSession(ctx, service), service), - "session must be deduped in all invocations on the same context", - ) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + var got []cid.Cid + for b := range blockservice.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + added = append(added, cids...) + a.ElementsMatch(cids, got) + blocks = blocks[2:] + + session := NewSession(ctx, blockservice) + + // Downloading one block over a session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + // Test wrapping the blockservice like nopfs does. + session = NewSession(ctx, wrappedBlockservice{blockservice}) + + // Downloading one block over a wrapped blockservice session provide it. + a.NoError(exchange.Put(ctx, blocks[0])) + _, err = session.GetBlock(ctx, blocks[0].Cid()) + a.NoError(err) + added = append(added, blocks[0].Cid()) + blocks = blocks[1:] + + // Downloading multiple blocks over a wrapped blockservice session provide them. + a.NoError(exchange.PutMany(ctx, blocks[0:2])) + cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} + got = nil + for b := range session.GetBlocks(ctx, cids) { + got = append(got, b.Cid()) + } + a.ElementsMatch(cids, got) + added = append(added, cids...) + blocks = blocks[2:] + + a.Empty(blocks) + + a.ElementsMatch(added, []cid.Cid(prov)) } diff --git a/blockservice/providing_blockstore.go b/blockservice/providing_blockstore.go new file mode 100644 index 000000000..7435f8ae2 --- /dev/null +++ b/blockservice/providing_blockstore.go @@ -0,0 +1,37 @@ +package blockservice + +import ( + "context" + + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/provider" + blocks "github.com/ipfs/go-block-format" +) + +var _ blockstore.Blockstore = providingBlockstore{} + +type providingBlockstore struct { + blockstore.Blockstore + provider provider.Provider +} + +func (pbs providingBlockstore) Put(ctx context.Context, b blocks.Block) error { + if err := pbs.Blockstore.Put(ctx, b); err != nil { + return err + } + + return pbs.provider.Provide(b.Cid()) +} + +func (pbs providingBlockstore) PutMany(ctx context.Context, b []blocks.Block) error { + if err := pbs.Blockstore.PutMany(ctx, b); err != nil { + return err // what are the semantics here, did some blocks were put ? assume PutMany is atomic + } + + for _, b := range b { + if err := pbs.provider.Provide(b.Cid()); err != nil { + return err // this can only error if the whole provider is done for + } + } + return nil +} diff --git a/gateway/backend_blocks.go b/gateway/backend_blocks.go index d62d3d876..3d4f98bf0 100644 --- a/gateway/backend_blocks.go +++ b/gateway/backend_blocks.go @@ -624,12 +624,6 @@ func (bb *BlocksBackend) IsCached(ctx context.Context, p path.Path) bool { return has } -var _ WithContextHint = (*BlocksBackend)(nil) - -func (bb *BlocksBackend) WrapContextForRequest(ctx context.Context) context.Context { - return blockservice.ContextWithSession(ctx, bb.blockService) -} - func (bb *BlocksBackend) ResolvePath(ctx context.Context, path path.ImmutablePath) (ContentPathMetadata, error) { roots, lastSeg, remainder, err := bb.getPathRoots(ctx, path) if err != nil { From 6af1b50eb9d9ebb56fb9484ec5117b736ffacb27 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:27:21 -0700 Subject: [PATCH 2/2] mod tidy examples --- examples/go.mod | 1 + examples/go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/examples/go.mod b/examples/go.mod index fd77bc07e..7eb3c737b 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -60,6 +60,7 @@ require ( github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect github.com/ipfs/go-blockservice v0.5.2 // indirect + github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-ipfs-blockstore v1.3.1 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.1 // indirect diff --git a/examples/go.sum b/examples/go.sum index f8d2600ad..ab54df4e4 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -172,6 +172,8 @@ github.com/ipfs/go-blockservice v0.5.2 h1:in9Bc+QcXwd1apOVM7Un9t8tixPKdaHQFdLSUM github.com/ipfs/go-blockservice v0.5.2/go.mod h1:VpMblFEqG67A/H2sHKAemeH9vlURVavlysbdUI632yk= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= +github.com/ipfs/go-cidutil v0.1.0 h1:RW5hO7Vcf16dplUU60Hs0AKDkQAVPVplr7lk97CFL+Q= +github.com/ipfs/go-cidutil v0.1.0/go.mod h1:e7OEVBMIv9JaOxt9zaGEmAoSlXW9jdFZ5lP/0PwcfpA= github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0Myk= github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=