Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ checks:
postage-label: gc-check
timeout: 5m
type: gc
ci-manifest-v1:
options:
files-in-collection: 10
max-pathname-length: 64
postage-ttl: 24h
postage-depth: 21
postage-label: test-label
timeout: 30m
type: manifest
ci-manifest:
options:
files-in-collection: 10
Expand Down Expand Up @@ -371,6 +380,12 @@ checks:
postage-label: test-label
timeout: 10m
type: gsoc
ci-feed-v1:
options:
postage-ttl: 24h
postage-depth: 21
postage-label: test-label
type: feed-v1
ci-feed:
options:
postage-ttl: 24h
Expand Down
15 changes: 14 additions & 1 deletion pkg/bee/api/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"net/http"
"slices"
"strconv"
"time"

"github.com/ethersphere/bee/v2/pkg/cac"
"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/swarm"
Expand Down Expand Up @@ -68,7 +70,18 @@ func (f *FeedService) CreateRootManifest(ctx context.Context, signer crypto.Sign
return &response, nil
}

// UpdateWithRootChunk updates a feed with a root chunk
// UpdateWithReference updates a feed with a reference. This is a type v1 feed update.
func (f *FeedService) UpdateWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o UploadOptions) (*SocResponse, error) {
ts := make([]byte, 8)
binary.BigEndian.PutUint64(ts, uint64(time.Now().Unix()))
ch, err := cac.New(append(append([]byte{}, ts...), addr.Bytes()...))
if err != nil {
return nil, err
}
return f.UpdateWithRootChunk(ctx, signer, topic, i, ch, o)
}

// UpdateWithRootChunk updates a feed with a root chunk.
func (f *FeedService) UpdateWithRootChunk(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, ch swarm.Chunk, o UploadOptions) (*SocResponse, error) {
ownerHex, err := ownerFromSigner(signer)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/bee/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,11 @@ func (c *Client) UpdateFeedWithRootChunk(ctx context.Context, signer crypto.Sign
return c.api.Feed.UpdateWithRootChunk(ctx, signer, topic, i, ch, o)
}

// UpdateFeedWithReference updates a feed with a reference
func (c *Client) UpdateFeedWithReference(ctx context.Context, signer crypto.Signer, topic []byte, i uint64, addr swarm.Address, o api.UploadOptions) (*api.SocResponse, error) {
return c.api.Feed.UpdateWithReference(ctx, signer, topic, i, addr, o)
}

// FindFeedUpdate finds the latest update for a feed
func (c *Client) FindFeedUpdate(ctx context.Context, signer crypto.Signer, topic []byte, o *api.DownloadOptions) (*api.FindFeedUpdateResponse, error) {
return c.api.Feed.FindUpdate(ctx, signer, topic, o)
Expand Down
183 changes: 183 additions & 0 deletions pkg/check/feed/feed_v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package feed

import (
"bytes"
"context"
"fmt"
"time"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper/pkg/bee"
"github.com/ethersphere/beekeeper/pkg/bee/api"
"github.com/ethersphere/beekeeper/pkg/beekeeper"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/orchestration"
"github.com/ethersphere/beekeeper/pkg/random"
)

// Options represents check options
type Options struct {
PostageTTL time.Duration
PostageDepth uint64
PostageLabel string
NUpdates int
RootRef string
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
PostageTTL: 24 * time.Hour,
PostageDepth: 17,
PostageLabel: "test-label",
NUpdates: 2,
}
}

// compile check whether Check implements interface
var _ beekeeper.Action = (*CheckV1)(nil)

// Check instance.
type CheckV1 struct {
logger logging.Logger
}

// NewCheck returns a new check instance.
func NewCheckV1(logger logging.Logger) beekeeper.Action {
return &CheckV1{
logger: logger,
}
}

func (c *CheckV1) Run(ctx context.Context, cluster orchestration.Cluster, opts interface{}) (err error) {
o, ok := opts.(Options)
if !ok {
return fmt.Errorf("invalid options type")
}

if o.RootRef != "" {
c.logger.Infof("running availability check")
return c.checkAvailability(ctx, cluster, o)
}
return c.feedCheck(ctx, cluster, o)
}

func (c *CheckV1) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error {
ref, err := swarm.ParseHexAddress(o.RootRef)
if err != nil {
return fmt.Errorf("invalid root ref: %w", err)
}

nodeNames := cluster.FullNodeNames()
nodeName := nodeNames[0]
clients, err := cluster.NodesClients(ctx)
if err != nil {
return fmt.Errorf("nodes clients: %w", err)
}

client := clients[nodeName]
_, _, err = client.DownloadFile(ctx, ref, nil)
if err != nil {
return fmt.Errorf("download file: %w", err)
}
return nil
}

// feedCheck creates a root feed manifest, makes a series of updates to the feed
// and verifies that the updates are retrievable via another node.
func (c *CheckV1) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error {
rnd := random.PseudoGenerator(time.Now().UnixNano())
names := cluster.FullNodeNames()
perm := rnd.Perm(len(names))

if len(names) < 2 {
return fmt.Errorf("not enough nodes to run feed check")
}

clients, err := cluster.NodesClients(ctx)
if err != nil {
return fmt.Errorf("nodes clients: %w", err)
}
upClient := clients[names[perm[0]]]
downClient := clients[names[perm[1]]]

c.logger.Infof("upload client: %s", upClient.Name())

batchID, err := upClient.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("get or create batch: %w", err)
}

privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
return fmt.Errorf("secp: %w", err)
}

signer := crypto.NewDefaultSigner(privKey)
topic, err := crypto.LegacyKeccak256([]byte("my-topic-v1"))
if err != nil {
return fmt.Errorf("keccak: %w", err)
}

// create root
createManifestRes, err := upClient.CreateRootFeedManifest(ctx, signer, topic, api.UploadOptions{BatchID: batchID})
if err != nil {
return fmt.Errorf("create root feed manifest: %w", err)
}
c.logger.Infof("node %s: manifest created", upClient.Name())
c.logger.Infof("reference: %s", createManifestRes.Reference)
c.logger.Infof("owner: %s", createManifestRes.Owner)
c.logger.Infof("topic: %s", createManifestRes.Topic)

// make updates
for i := 0; i < o.NUpdates; i++ {
time.Sleep(3 * time.Second)
data := fmt.Sprintf("update-%d", i)
fName := fmt.Sprintf("file-%d", i)
file := bee.NewBufferFile(fName, bytes.NewBuffer([]byte(data)))
err = upClient.UploadFile(context.Background(), &file, api.UploadOptions{
BatchID: batchID,
Direct: true,
})
if err != nil {
return fmt.Errorf("upload: %w", err)
}
ref := file.Address()
socRes, err := upClient.UpdateFeedWithReference(ctx, signer, topic, uint64(i), ref, api.UploadOptions{BatchID: batchID})
if err != nil {
return fmt.Errorf("update feed: %w", err)
}
c.logger.Infof("node %s: feed updated", upClient.Name())
c.logger.Infof("soc reference: %s", socRes.Reference)
c.logger.Infof("wrapped reference: %s", file.Address())
}
time.Sleep(5 * time.Second)

// fetch update
c.logger.Infof("fetching feed update")
c.logger.Infof("download client: %s", downClient.Name())
update, err := downClient.FindFeedUpdate(ctx, signer, topic, nil)
if err != nil {
return fmt.Errorf("find update: %w", err)
}

c.logger.Infof("node %s: feed update found", downClient.Name())
c.logger.Infof("index: %d", update.Index)
c.logger.Infof("next index: %d", update.NextIndex)

if update.NextIndex != uint64(o.NUpdates) {
return fmt.Errorf("expected next index to be %d, got %d", o.NUpdates, update.NextIndex)
}

// fetch feed via bzz
d, err := downClient.DownloadFileBytes(ctx, createManifestRes.Reference, nil)
if err != nil {
return fmt.Errorf("download root feed: %w", err)
}
lastUpdateData := fmt.Sprintf("update-%d", o.NUpdates-1)
if string(d) != lastUpdateData {
return fmt.Errorf("expected file content to be %s, got %s", lastUpdateData, string(d))
}
return nil
}
35 changes: 8 additions & 27 deletions pkg/check/feed/feed.go → pkg/check/feed/feed_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,22 @@ import (
"github.com/ethersphere/beekeeper/pkg/random"
)

// Options represents check options
type Options struct {
PostageTTL time.Duration
PostageDepth uint64
PostageLabel string
NUpdates int
RootRef string
}

// NewDefaultOptions returns new default options
func NewDefaultOptions() Options {
return Options{
PostageTTL: 24 * time.Hour,
PostageDepth: 17,
PostageLabel: "test-label",
NUpdates: 2,
}
}

// compile check whether Check implements interface
var _ beekeeper.Action = (*Check)(nil)
var _ beekeeper.Action = (*CheckV2)(nil)

// Check instance.
type Check struct {
type CheckV2 struct {
logger logging.Logger
}

// NewCheck returns a new check instance.
func NewCheck(logger logging.Logger) beekeeper.Action {
return &Check{
func NewCheckV2(logger logging.Logger) beekeeper.Action {
return &CheckV2{
logger: logger,
}
}

func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any) (err error) {
func (c *CheckV2) Run(ctx context.Context, cluster orchestration.Cluster, opts any) (err error) {
o, ok := opts.(Options)
if !ok {
return fmt.Errorf("invalid options type")
Expand All @@ -73,7 +54,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
return nil
}

func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error {
func (c *CheckV2) checkAvailability(ctx context.Context, cluster orchestration.Cluster, o Options) error {
ref, err := swarm.ParseHexAddress(o.RootRef)
if err != nil {
return fmt.Errorf("invalid root ref: %w", err)
Expand All @@ -98,7 +79,7 @@ func (c *Check) checkAvailability(ctx context.Context, cluster orchestration.Clu

// feedCheck creates a root feed manifest, makes a series of updates to the feed
// and verifies that the updates are retrievable via another node.
func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error {
func (c *CheckV2) feedCheck(ctx context.Context, cluster orchestration.Cluster, o Options) error {
clients, err := cluster.ShuffledFullNodeClients(ctx, random.PseudoGenerator(time.Now().UnixNano()))
if err != nil {
return fmt.Errorf("node clients: %w", err)
Expand All @@ -124,7 +105,7 @@ func (c *Check) feedCheck(ctx context.Context, cluster orchestration.Cluster, o
}

signer := crypto.NewDefaultSigner(privKey)
topic, err := crypto.LegacyKeccak256([]byte("my-topic"))
topic, err := crypto.LegacyKeccak256([]byte("my-topic-v2"))
if err != nil {
return fmt.Errorf("topic hash: %w", err)
}
Expand Down
Loading