Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ Changelog for NeoFS Node
- `owner mismatches signature` for stored objects (#3836)

### Changed
- SN returns unsigned responses to requests with API >= `v2.22` (#3785)

### Removed

### Updated
- `github.com/nspcc-dev/neofs-sdk-go` module to `v1.0.0-rc.17.0.20260211130529-740a11a64a87` (#3785)

### Updating from v0.51.1

Expand Down
16 changes: 2 additions & 14 deletions cmd/neofs-cli/modules/request/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,7 @@ func createContainer(cmd *cobra.Command, args []string) error {
return fmt.Errorf("transport failure: %w", err)
}

cmd.Println("Response received. Checking signatures...")

if err := neofscrypto.VerifyResponseWithBuffer(resp, nil); err != nil {
return fmt.Errorf("failed to verify response signatures: %w", err)
}

cmd.Println("Signatures are valid. Checking status...")
cmd.Println("Response received. Checking status...")

if err := apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return fmt.Errorf("status failure: %w", err)
Expand Down Expand Up @@ -210,13 +204,7 @@ func createContainer(cmd *cobra.Command, args []string) error {
return fmt.Errorf("transport failure: %w", err)
}

cmd.Println("Response received. Checking signatures...")

if err := neofscrypto.VerifyResponseWithBuffer(resp, nil); err != nil {
return fmt.Errorf("failed to verify response signatures: %w", err)
}

cmd.Println("Signatures are valid. Checking status...")
cmd.Println("Response received. Checking status...")

if err := apistatus.ToError(resp.GetMetaHeader().GetStatus()); err == nil {
cmd.Println("Status OK. Operation succeeded.")
Expand Down
3 changes: 2 additions & 1 deletion cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/user"
"github.com/nspcc-dev/neofs-sdk-go/version"
Expand Down Expand Up @@ -413,7 +414,7 @@ func initCfg(appCfg *config.Config) *cfg {
pingTimeout := appCfg.APIClient.PingTimeout
newClientCache := func(scope string) *cache.Clients {
return cache.NewClients(c.log.With(zap.String("scope", scope)), &buffers, streamTimeout,
minConnTimeout, pingInterval, pingTimeout)
minConnTimeout, pingInterval, pingTimeout, neofsecdsa.Signer(key.PrivateKey))
}
c.shared = shared{
basics: basicSharedConfig,
Expand Down
4 changes: 2 additions & 2 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,11 @@ func initNetmapService(c *cfg) {
local := slices.IndexFunc(nodes, func(node netmapSDK.NodeInfo) bool { return c.IsLocalKey(node.PublicKey()) })
var wg sync.WaitGroup
l.Info("syncing SN connection caches with the new network map...")
for _, c := range []*cache.Clients{c.clientCache, c.putClientCache, c.bgClientCache} {
for _, cl := range []*cache.Clients{c.clientCache, c.putClientCache, c.bgClientCache} {
wg.Add(1)
go func() {
defer wg.Done()
c.SyncWithNewNetmap(nodes, local)
cl.SyncWithNewNetmap(c.ctx, nodes, local)
}()
}
wg.Wait()
Expand Down
10 changes: 5 additions & 5 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {

type coreClientConstructor reputationClientConstructor

func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(info)
func (x *coreClientConstructor) Get(ctx context.Context, info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(ctx, info)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +340,7 @@ type reputationClientConstructor struct {
trustStorage *truststorage.Storage

basicConstructor interface {
Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
Get(context.Context, coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
}
}

Expand Down Expand Up @@ -421,8 +421,8 @@ func (c *reputationClient) ObjectSearchInit(ctx context.Context, containerID cid
return res, err
}

func (c *reputationClientConstructor) Get(info coreclient.NodeInfo) (coreclient.Client, error) {
cl, err := c.basicConstructor.Get(info)
func (c *reputationClientConstructor) Get(ctx context.Context, info coreclient.NodeInfo) (coreclient.Client, error) {
cl, err := c.basicConstructor.Get(ctx, info)
if err != nil {
return nil, err
}
Expand Down
24 changes: 12 additions & 12 deletions cmd/neofs-node/reputation.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,17 @@ func (s *reputationServer) makeResponseMetaHeader(st *protostatus.Status) *proto
}
}

func (s *reputationServer) makeLocalResponse(err error) (*protoreputation.AnnounceLocalTrustResponse, error) {
func (s *reputationServer) makeLocalResponse(err error, req *protoreputation.AnnounceLocalTrustRequest) (*protoreputation.AnnounceLocalTrustResponse, error) {
resp := &protoreputation.AnnounceLocalTrustResponse{
MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)),
}
resp.VerifyHeader = util.SignResponse(&s.key.PrivateKey, resp)
resp.VerifyHeader = util.SignResponseIfNeeded(&s.key.PrivateKey, resp, req)
return resp, nil
}

func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *protoreputation.AnnounceLocalTrustRequest) (*protoreputation.AnnounceLocalTrustResponse, error) {
if err := icrypto.VerifyRequestSignatures(req); err != nil {
return s.makeLocalResponse(err)
return s.makeLocalResponse(err, req)
}

passedRoute := reverseRoute(req.GetVerifyHeader())
Expand All @@ -288,30 +288,30 @@ func (s *reputationServer) AnnounceLocalTrust(ctx context.Context, req *protorep

w, err := s.localRouter.InitWriter(reputationrouter.NewRouteContext(eCtx, passedRoute))
if err != nil {
return s.makeLocalResponse(fmt.Errorf("could not initialize local trust writer: %w", err))
return s.makeLocalResponse(fmt.Errorf("could not initialize local trust writer: %w", err), req)
}

for _, trust := range body.GetTrusts() {
err = s.processLocalTrust(body.GetEpoch(), apiToLocalTrust(trust, passedRoute[0].PublicKey()), passedRoute, w)
if err != nil {
return s.makeLocalResponse(fmt.Errorf("could not write one of local trusts: %w", err))
return s.makeLocalResponse(fmt.Errorf("could not write one of local trusts: %w", err), req)
}
}

return s.makeLocalResponse(util.StatusOKErr)
return s.makeLocalResponse(util.StatusOKErr, req)
}

func (s *reputationServer) makeIntermediateResponse(err error) (*protoreputation.AnnounceIntermediateResultResponse, error) {
func (s *reputationServer) makeIntermediateResponse(err error, req *protoreputation.AnnounceIntermediateResultRequest) (*protoreputation.AnnounceIntermediateResultResponse, error) {
resp := &protoreputation.AnnounceIntermediateResultResponse{
MetaHeader: s.makeResponseMetaHeader(util.ToStatus(err)),
}
resp.VerifyHeader = util.SignResponse(&s.key.PrivateKey, resp)
resp.VerifyHeader = util.SignResponseIfNeeded(&s.key.PrivateKey, resp, req)
return resp, nil
}

func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *protoreputation.AnnounceIntermediateResultRequest) (*protoreputation.AnnounceIntermediateResultResponse, error) {
if err := icrypto.VerifyRequestSignatures(req); err != nil {
return s.makeIntermediateResponse(err)
return s.makeIntermediateResponse(err, req)
}

passedRoute := reverseRoute(req.GetVerifyHeader())
Expand All @@ -323,7 +323,7 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *

w, err := s.intermediateRouter.InitWriter(reputationrouter.NewRouteContext(eiCtx, passedRoute))
if err != nil {
return s.makeIntermediateResponse(fmt.Errorf("could not initialize trust writer: %w", err))
return s.makeIntermediateResponse(fmt.Errorf("could not initialize trust writer: %w", err), req)
}

v2Trust := body.GetTrust()
Expand All @@ -332,10 +332,10 @@ func (s *reputationServer) AnnounceIntermediateResult(ctx context.Context, req *

err = w.Write(trust)
if err != nil {
return s.makeIntermediateResponse(fmt.Errorf("could not write trust: %w", err))
return s.makeIntermediateResponse(fmt.Errorf("could not write trust: %w", err), req)
}

return s.makeIntermediateResponse(util.StatusOKErr)
return s.makeIntermediateResponse(util.StatusOKErr, req)
}

func (s *reputationServer) processLocalTrust(epoch uint64, t reputation.Trust,
Expand Down
7 changes: 4 additions & 3 deletions cmd/neofs-node/reputation/common/remote.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
Expand All @@ -12,7 +13,7 @@ import (
)

type clientCache interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
Get(context.Context, client.NodeInfo) (client.MultiAddressClient, error)
}

// clientKeyRemoteProvider must provide a remote writer and take into account
Expand Down Expand Up @@ -71,7 +72,7 @@ func NewRemoteTrustProvider(prm RemoteProviderPrm) reputationrouter.RemoteWriter
}
}

func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (reputationcommon.WriterProvider, error) {
func (rtp *remoteTrustProvider) InitRemote(ctx context.Context, srv reputationcommon.ServerInfo) (reputationcommon.WriterProvider, error) {
rtp.log.Debug("initializing remote writer provider")

if srv == nil {
Expand All @@ -92,7 +93,7 @@ func (rtp *remoteTrustProvider) InitRemote(srv reputationcommon.ServerInfo) (rep
return nil, fmt.Errorf("parse client node info: %w", err)
}

c, err := rtp.clientCache.Get(info)
c, err := rtp.clientCache.Get(ctx, info)
if err != nil {
return nil, fmt.Errorf("could not initialize API client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/neofs-node/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type transport struct {
// SendReplicationRequestToNode connects to described node and sends prepared
// replication request message to it.
func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) ([]byte, error) {
c, err := x.clients.Get(node)
c, err := x.clients.Get(ctx, node)
if err != nil {
return nil, fmt.Errorf("connect to remote node: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/nspcc-dev/neo-go v0.117.0
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea
github.com/nspcc-dev/neofs-contract v0.26.1
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17.0.20260211130529-740a11a64a87
github.com/nspcc-dev/tzhash v1.8.3
github.com/panjf2000/ants/v2 v2.11.3
github.com/prometheus/client_golang v1.23.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y=
github.com/nspcc-dev/neofs-contract v0.26.1 h1:7Ii7Q4L3au408LOsIWKiSgfnT1g8G9jo3W7381d41T8=
github.com/nspcc-dev/neofs-contract v0.26.1/go.mod h1:pevVF9OWdEN5bweKxOu6ryZv9muCEtS1ppzYM4RfBIo=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17 h1:MahpltbItODvLsGIUsDuW9fz1MXmAi0c8dZNsK8Azqc=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17/go.mod h1:y2vNz9DVTqBkR7ctYb6taLnabWTtG7xtCHlGofEpKOM=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17.0.20260211130529-740a11a64a87 h1:RlOzZGS7925Dz5F6OckibMSLv/awqhla2D8PeO9nIHI=
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.17.0.20260211130529-740a11a64a87/go.mod h1:y2vNz9DVTqBkR7ctYb6taLnabWTtG7xtCHlGofEpKOM=
github.com/nspcc-dev/rfc6979 v0.2.4 h1:NBgsdCjhLpEPJZqmC9rciMZDcSY297po2smeaRjw57k=
github.com/nspcc-dev/rfc6979 v0.2.4/go.mod h1:86ylDw6Kss+P6v4QAJqo1Sp3mC0/Zr9G97xSjQ9TuFg=
github.com/nspcc-dev/tzhash v1.8.3 h1:EWJMOL/ppdqNBvkKjHECljusopcsNu4i4kH8KctTv10=
Expand Down
46 changes: 29 additions & 17 deletions pkg/network/cache/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ type Clients struct {

mtx sync.RWMutex
conns map[string]*connections // keys are public key bytes

signer neofscrypto.Signer
}

// NewClients constructs Clients initializing connection to any endpoint with
// given parameters.
func NewClients(l *zap.Logger, signBufPool *sync.Pool, streamTimeout, minConnTimeout, pingInterval, pingTimeout time.Duration) *Clients {
func NewClients(l *zap.Logger, signBufPool *sync.Pool, streamTimeout, minConnTimeout, pingInterval, pingTimeout time.Duration, signer neofscrypto.Signer) *Clients {
return &Clients{
log: l,
streamMsgTimeout: streamTimeout,
Expand All @@ -59,6 +61,7 @@ func NewClients(l *zap.Logger, signBufPool *sync.Pool, streamTimeout, minConnTim
pingInterval: pingInterval,
pingTimeout: pingTimeout,
conns: make(map[string]*connections),
signer: signer,
}
}

Expand All @@ -76,7 +79,7 @@ func snCacheKey(pub []byte) string { return string(pub) }
// Get initializes connections to network addresses of described SN and returns
// interface to access them. All opened connections are cached and kept alive
// until [Clients.CloseAll].
func (x *Clients) Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient, error) {
func (x *Clients) Get(ctx context.Context, info clientcore.NodeInfo) (clientcore.MultiAddressClient, error) {
pub := info.PublicKey()
cacheKey := snCacheKey(pub)

Expand All @@ -93,7 +96,7 @@ func (x *Clients) Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient,
return c, nil
}

c, err := x.initConnections(pub, info.AddressGroup())
c, err := x.initConnections(ctx, pub, info.AddressGroup())
if err != nil {
return nil, fmt.Errorf("init connections: %w", err)
}
Expand All @@ -102,15 +105,15 @@ func (x *Clients) Get(info clientcore.NodeInfo) (clientcore.MultiAddressClient,
}

// SyncWithNewNetmap synchronizes x with the passed new network map.
func (x *Clients) SyncWithNewNetmap(sns []netmap.NodeInfo, local int) {
func (x *Clients) SyncWithNewNetmap(ctx context.Context, sns []netmap.NodeInfo, local int) {
x.mtx.Lock()
defer x.mtx.Unlock()

for i := range sns {
if i == local {
continue
}
if err := x.syncWithNetmapSN(sns[i]); err != nil {
if err := x.syncWithNetmapSN(ctx, sns[i]); err != nil {
x.log.Warn("failed to sync connection cache with SN from the new network map, skip",
zap.String("pub", hex.EncodeToString(sns[i].PublicKey())), zap.Error(err))
}
Expand All @@ -128,7 +131,7 @@ func (x *Clients) SyncWithNewNetmap(sns []netmap.NodeInfo, local int) {
})
}

func (x *Clients) syncWithNetmapSN(sn netmap.NodeInfo) error {
func (x *Clients) syncWithNetmapSN(ctx context.Context, sn netmap.NodeInfo) error {
pub := sn.PublicKey()
conns, ok := x.conns[snCacheKey(pub)]
if !ok {
Expand Down Expand Up @@ -168,7 +171,7 @@ func (x *Clients) syncWithNetmapSN(sn netmap.NodeInfo) error {
continue
}
x.log.Info("initializing connection to new SN address in the new network map...", zap.String("address", ma))
c, err := x.initConnection(pub, as[i].URIAddr())
c, err := x.initConnection(ctx, pub, as[i].URIAddr())
if err != nil {
x.log.Info("failed to init connection to new SN address in the new network map",
zap.String("address", ma), zap.Error(err))
Expand All @@ -181,13 +184,13 @@ func (x *Clients) syncWithNetmapSN(sn netmap.NodeInfo) error {
return nil
}

func (x *Clients) initConnections(pub []byte, as network.AddressGroup) (*connections, error) {
func (x *Clients) initConnections(ctx context.Context, pub []byte, as network.AddressGroup) (*connections, error) {
m := make(map[string]*client.Client, len(as))
l := x.log.With(zap.String("public key", hex.EncodeToString(pub)))
for i := range as {
cacheKey := as[i].String()
l.Info("initializing connection to the SN...", zap.String("address", cacheKey))
c, err := x.initConnection(pub, as[i].URIAddr())
c, err := x.initConnection(ctx, pub, as[i].URIAddr())
if err != nil {
// TODO: if at least one address is OK, SN can be operational
for cl := range maps.Values(m) {
Expand All @@ -206,7 +209,7 @@ func (x *Clients) initConnections(pub []byte, as network.AddressGroup) (*connect
}, nil
}

func (x *Clients) initConnection(pub []byte, uri string) (*client.Client, error) {
func (x *Clients) initConnection(ctx context.Context, pub []byte, uri string) (*client.Client, error) {
target, withTLS, err := uriutil.Parse(uri)
if err != nil {
return nil, fmt.Errorf("parse URI: %w", err)
Expand All @@ -232,17 +235,26 @@ func (x *Clients) initConnection(pub []byte, uri string) (*client.Client, error)
if err != nil { // should never happen
return nil, fmt.Errorf("init gRPC client conn: %w", err)
}
res, err := client.NewGRPC(grpcConn, x.signBufPool, x.streamMsgTimeout, func(respPub []byte) error {
if !bytes.Equal(respPub, pub) {
return clientcore.ErrWrongPublicKey
}
return nil
})
res, err := client.NewGRPC(ctx, pub, grpcConn, x.signBufPool, x.streamMsgTimeout, nil)
if err != nil {
_ = grpcConn.Close()
return res, fmt.Errorf("init NeoFS API client from gRPC client conn: %w", err)
}
grpcConn.Connect()

ctx, cancel := context.WithTimeout(ctx, x.streamMsgTimeout)
defer cancel()

resp, err := res.EndpointInfo(ctx, client.PrmEndpointInfo{})
if err != nil {
_ = grpcConn.Close()
return nil, fmt.Errorf("get node info to check public key: %w", err)
}

if got := resp.NodeInfo().PublicKey(); !bytes.Equal(got, pub) {
_ = grpcConn.Close()
return nil, clientcore.ErrWrongPublicKey
}

return res, nil
}

Expand Down
Loading
Loading