Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,6 @@ paths:
schema:
$ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageBatchId"
required: true
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPinParameter"
required: false
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPostageStamp"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmAct"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmActHistoryAddress"
Expand Down
15 changes: 14 additions & 1 deletion pkg/api/accesscontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
resp struct {
Reference swarm.Address `json:"reference"`
}
direct bool
}{
{
name: "bzz",
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
data: bytes.NewReader(sch.WrappedChunk.Data()),
expdata: sch.Chunk().Data(),
contenttype: "binary/octet-stream",
direct: true,
},
}

Expand All @@ -183,13 +185,24 @@ func TestAccessLogicEachEndpointWithAct(t *testing.T) {
upTestOpts = append(upTestOpts, jsonhttptest.WithRequestHeader(api.SwarmCollectionHeader, "True"))
}
t.Run(v.name, func(t *testing.T) {
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: logger,
Post: mockpost.New(mockpost.WithAcceptAll()),
PublicKey: pk.PublicKey,
AccessControl: mockac.New(),
DirectUpload: v.direct,
})

if chanStore != nil {
chanStore.Subscribe(func(chunk swarm.Chunk) {
err := storerMock.Put(context.Background(), chunk)
if err != nil {
t.Fatal(err)
}
})
}

header := jsonhttptest.Request(t, client, http.MethodPost, v.upurl, http.StatusCreated,
upTestOpts...,
)
Expand Down
10 changes: 10 additions & 0 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ type chanStorer struct {
lock sync.Mutex
chunks map[string]struct{}
quit chan struct{}
subs []func(chunk swarm.Chunk)
}

func newChanStore(cc <-chan *pusher.Op) *chanStorer {
Expand All @@ -650,6 +651,9 @@ func (c *chanStorer) drain(cc <-chan *pusher.Op) {
case op := <-cc:
c.lock.Lock()
c.chunks[op.Chunk.Address().ByteString()] = struct{}{}
for _, h := range c.subs {
h(op.Chunk)
}
c.lock.Unlock()
op.Err <- nil
case <-c.quit:
Expand All @@ -670,6 +674,12 @@ func (c *chanStorer) Has(addr swarm.Address) bool {
return ok
}

func (c *chanStorer) Subscribe(f func(chunk swarm.Chunk)) {
c.lock.Lock()
defer c.lock.Unlock()
c.subs = append(c.subs, f)
}

func createRedistributionAgentService(
t *testing.T,
addr swarm.Address,
Expand Down
39 changes: 9 additions & 30 deletions pkg/api/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/soc"
"github.com/ethersphere/bee/v2/pkg/storage"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
)
Expand Down Expand Up @@ -51,7 +50,6 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id"`
StampSig []byte `map:"Swarm-Postage-Stamp"`
Pin bool `map:"Swarm-Pin"`
Act bool `map:"Swarm-Act"`
HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"`
}{}
Expand All @@ -66,30 +64,11 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {
return
}

// if pinning header is set we do a deferred upload, else we do a direct upload
var (
tag uint64
err error
putter storer.PutterSession
err error
)
if headers.Pin {
session, err := s.storer.NewSession()
if err != nil {
logger.Debug("get or create tag failed", "error", err)
logger.Error(nil, "get or create tag failed")
switch {
case errors.Is(err, storage.ErrNotFound):
jsonhttp.NotFound(w, "tag not found")
default:
jsonhttp.InternalServerError(w, "cannot get or create tag")
}
return
}
tag = session.TagID
}

deferred := tag != 0

var putter storer.PutterSession
if len(headers.StampSig) != 0 {
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(headers.StampSig); err != nil {
Expand All @@ -102,16 +81,16 @@ func (s *Service) socUploadHandler(w http.ResponseWriter, r *http.Request) {

putter, err = s.newStampedPutter(r.Context(), putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
TagID: 0,
Pin: false,
Deferred: false,
}, &stamp)
} else {
putter, err = s.newStamperPutter(r.Context(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Pin: headers.Pin,
Deferred: deferred,
TagID: 0,
Pin: false,
Deferred: false,
})
}
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/api/soc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api_test

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
Expand Down Expand Up @@ -74,13 +75,20 @@ func TestSOC(t *testing.T) {

t.Run("ok", func(t *testing.T) {
s := testingsoc.GenerateMockSOC(t, testData)
client, _, _, _ := newTestServer(t, testServerOptions{
client, _, _, chanStore := newTestServer(t, testServerOptions{
Storer: mockStorer,
Post: newTestPostService(),
DirectUpload: true,
})

chanStore.Subscribe(func(ch swarm.Chunk) {
err := mockStorer.Put(context.Background(), ch)
if err != nil {
t.Fatal(err)
}
})

jsonhttptest.Request(t, client, http.MethodPost, socResource(hex.EncodeToString(s.Owner), hex.EncodeToString(s.ID), hex.EncodeToString(s.Signature)), http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmPinHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestBody(bytes.NewReader(s.WrappedChunk.Data())),
jsonhttptest.WithExpectedJSONResponse(api.SocPostResponse{
Expand Down
74 changes: 74 additions & 0 deletions pkg/storer/internal/chunkstore/chunkstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package chunkstore_test

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -13,7 +14,9 @@ import (
"os"
"testing"

"github.com/ethersphere/bee/v2/pkg/crypto"
"github.com/ethersphere/bee/v2/pkg/sharky"
soctesting "github.com/ethersphere/bee/v2/pkg/soc/testing"
"github.com/ethersphere/bee/v2/pkg/storer/internal/transaction"

"github.com/ethersphere/bee/v2/pkg/storage"
Expand Down Expand Up @@ -336,6 +339,77 @@ func TestChunkStore(t *testing.T) {
}
})

t.Run("replace chunk", func(t *testing.T) {
privKey, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}
signer := crypto.NewDefaultSigner(privKey)
ctx := context.Background()

ch1 := soctesting.GenerateMockSocWithSigner(t, []byte("data"), signer).Chunk()
err = st.Run(context.Background(), func(s transaction.Store) error {
return s.ChunkStore().Put(ctx, ch1)
})
if err != nil {
t.Fatal(err)
}

tests := []struct {
data string
emplace bool
wantRefCount uint32
}{
{
data: "data1",
emplace: true,
wantRefCount: 2,
},
{
data: "data2",
emplace: false,
wantRefCount: 2,
},
{
data: "data3",
emplace: true,
wantRefCount: 3,
},
}

for _, tt := range tests {
ch2 := soctesting.GenerateMockSocWithSigner(t, []byte(tt.data), signer).Chunk()
if !ch1.Address().Equal(ch2.Address()) {
t.Fatal("chunk addresses don't match")
}

err = st.Run(ctx, func(s transaction.Store) error {
return s.ChunkStore().Replace(ctx, ch2, tt.emplace)
})
if err != nil {
t.Fatal(err)
}

ch, err := st.ChunkStore().Get(ctx, ch2.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(ch.Data(), ch2.Data()) {
t.Fatalf("expected data override")
}

rIdx := &chunkstore.RetrievalIndexItem{Address: ch2.Address()}
err = st.IndexStore().Get(rIdx)
if err != nil {
t.Fatal(err)
}

if rIdx.RefCnt != tt.wantRefCount {
t.Fatalf("expected ref count %d, got %d", tt.wantRefCount, rIdx.RefCnt)
}
}
})

t.Run("close store", func(t *testing.T) {
err := st.Close()
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/storer/mock/mockstorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"time"

"github.com/ethersphere/bee/v2/pkg/pusher"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage/inmemchunkstore"
storer "github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/storer"
"github.com/ethersphere/bee/v2/pkg/swarm"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -231,3 +231,7 @@ func (m *mockStorer) DebugInfo(_ context.Context) (storer.Info, error) {
func (m *mockStorer) NeighborhoodsStat(ctx context.Context) ([]*storer.NeighborhoodStat, error) {
return nil, nil
}

func (m *mockStorer) Put(ctx context.Context, ch swarm.Chunk) error {
return m.chunkStore.Put(ctx, ch)
}
Loading