Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions pkg/chain/ethereum/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,23 +1469,40 @@ func (tc *TbtcChain) GetWallet(
)
}

// Fetch wallet registry data on a best-effort basis. Legacy callers
// only use Bridge-sourced fields and never access MembersIDsHash, so a
// registry outage must not block them. The zero value signals that
// registry data is unavailable; downstream consumers that need it
// (e.g. signer_approval_certificate) already guard against this.
var membersIDsHash [32]byte

walletRegistryWallet, err := tc.walletRegistry.GetWallet(wallet.EcdsaWalletID)
if err != nil {
return nil, fmt.Errorf(
logger.Warnf(
"cannot get wallet registry data for wallet [0x%x]: [%v]",
wallet.EcdsaWalletID,
err,
)
} else {
membersIDsHash = walletRegistryWallet.MembersIdsHash
}

walletState, err := parseWalletState(wallet.State)
if err != nil {
return nil, fmt.Errorf("cannot parse wallet state: [%v]", err)
}

return makeWalletChainData(wallet, membersIDsHash, walletState), nil
}

func makeWalletChainData(
wallet tbtcabi.WalletsWallet,
membersIDsHash [32]byte,
walletState tbtc.WalletState,
) *tbtc.WalletChainData {
return &tbtc.WalletChainData{
EcdsaWalletID: wallet.EcdsaWalletID,
MembersIDsHash: walletRegistryWallet.MembersIdsHash,
MembersIDsHash: membersIDsHash,
MainUtxoHash: wallet.MainUtxoHash,
PendingRedemptionsValue: wallet.PendingRedemptionsValue,
CreatedAt: time.Unix(int64(wallet.CreatedAt), 0),
Expand All @@ -1494,7 +1511,7 @@ func (tc *TbtcChain) GetWallet(
PendingMovedFundsSweepRequestsCount: wallet.PendingMovedFundsSweepRequestsCount,
State: walletState,
MovingFundsTargetWalletsCommitmentHash: wallet.MovingFundsTargetWalletsCommitmentHash,
}, nil
}
}

func (tc *TbtcChain) OnWalletClosed(
Expand Down
64 changes: 64 additions & 0 deletions pkg/chain/ethereum/tbtc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
commonEthereum "github.com/keep-network/keep-common/pkg/chain/ethereum"

"github.com/keep-network/keep-core/internal/testutils"
tbtcabi "github.com/keep-network/keep-core/pkg/chain/ethereum/tbtc/gen/abi"
"github.com/keep-network/keep-core/pkg/chain/local_v1"
"github.com/keep-network/keep-core/pkg/protocol/group"
tbtcpkg "github.com/keep-network/keep-core/pkg/tbtc"
)

func TestComputeOperatorsIDsHash(t *testing.T) {
Expand Down Expand Up @@ -132,6 +134,68 @@ func TestConvertSignaturesToChainFormat(t *testing.T) {
}
}

func TestMakeWalletChainDataPreservesBridgeFieldsWhenRegistryDataUnavailable(t *testing.T) {
bridgeWallet := tbtcabi.WalletsWallet{
EcdsaWalletID: [32]byte{0xaa},
MainUtxoHash: [32]byte{0xbb},
PendingRedemptionsValue: 12345,
CreatedAt: 1700000000,
MovingFundsRequestedAt: 1700000100,
ClosingStartedAt: 1700000200,
PendingMovedFundsSweepRequestsCount: 7,
MovingFundsTargetWalletsCommitmentHash: [32]byte{0xcc},
}

walletChainData := makeWalletChainData(
bridgeWallet,
[32]byte{},
tbtcpkg.StateLive,
)

if walletChainData.MembersIDsHash != ([32]byte{}) {
t.Fatalf("expected zero members IDs hash, got [0x%x]", walletChainData.MembersIDsHash)
}
if walletChainData.EcdsaWalletID != bridgeWallet.EcdsaWalletID {
t.Fatalf("expected wallet ID [0x%x], got [0x%x]", bridgeWallet.EcdsaWalletID, walletChainData.EcdsaWalletID)
}
if walletChainData.MainUtxoHash != bridgeWallet.MainUtxoHash {
t.Fatalf("expected main UTXO hash [0x%x], got [0x%x]", bridgeWallet.MainUtxoHash, walletChainData.MainUtxoHash)
}
if walletChainData.PendingRedemptionsValue != bridgeWallet.PendingRedemptionsValue {
t.Fatalf(
"expected pending redemptions value [%v], got [%v]",
bridgeWallet.PendingRedemptionsValue,
walletChainData.PendingRedemptionsValue,
)
}
if walletChainData.State != tbtcpkg.StateLive {
t.Fatalf("expected wallet state [%v], got [%v]", tbtcpkg.StateLive, walletChainData.State)
}
}

func TestMakeWalletChainDataUsesWalletRegistryMembersIDsHashWhenAvailable(t *testing.T) {
membersIDsHash := [32]byte{0xdd}

walletChainData := makeWalletChainData(
tbtcabi.WalletsWallet{
EcdsaWalletID: [32]byte{0xee},
},
membersIDsHash,
tbtcpkg.StateMovingFunds,
)

if walletChainData.MembersIDsHash != membersIDsHash {
t.Fatalf("expected members IDs hash [0x%x], got [0x%x]", membersIDsHash, walletChainData.MembersIDsHash)
}
if walletChainData.State != tbtcpkg.StateMovingFunds {
t.Fatalf(
"expected wallet state [%v], got [%v]",
tbtcpkg.StateMovingFunds,
walletChainData.State,
)
}
}

func TestConvertPubKeyToChainFormat(t *testing.T) {
bytes30 := []byte{229, 19, 136, 216, 125, 157, 135, 142, 67, 130,
136, 13, 76, 188, 32, 218, 243, 134, 95, 73, 155, 24, 38, 73, 117, 90,
Expand Down
50 changes: 49 additions & 1 deletion pkg/covenantsigner/covenantsigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"context"
"crypto/ecdsa"
"crypto/ed25519"
"crypto/sha256"
"crypto/elliptic"
"crypto/rand"
"crypto/sha256"
"crypto/x509"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -104,6 +104,54 @@ func (fmh *faultingMemoryHandle) Delete(directory string, name string) error {
return fmh.memoryHandle.Delete(directory, name)
}

// faultingDescriptor wraps a memoryDescriptor and returns an injected error
// from Content(), allowing tests to simulate unreadable job files.
type faultingDescriptor struct {
name string
directory string
err error
}

func (fd *faultingDescriptor) Name() string { return fd.name }
func (fd *faultingDescriptor) Directory() string { return fd.directory }
func (fd *faultingDescriptor) Content() ([]byte, error) { return nil, fd.err }

// contentFaultingHandle extends memoryHandle by injecting faulting descriptors
// into the ReadAll channel alongside normal descriptors. This enables testing
// of load() behavior when individual file reads fail.
type contentFaultingHandle struct {
*memoryHandle
faultingDescriptors []*faultingDescriptor
}

func newContentFaultingHandle() *contentFaultingHandle {
return &contentFaultingHandle{
memoryHandle: newMemoryHandle(),
}
}

func (cfh *contentFaultingHandle) AddFaultingDescriptor(name, directory string, err error) {
cfh.faultingDescriptors = append(cfh.faultingDescriptors, &faultingDescriptor{
name: name,
directory: directory,
err: err,
})
}

func (cfh *contentFaultingHandle) ReadAll() (<-chan persistence.DataDescriptor, <-chan error) {
dataChan := make(chan persistence.DataDescriptor, len(cfh.items)+len(cfh.faultingDescriptors))
errorChan := make(chan error)
for _, item := range cfh.items {
dataChan <- item
}
for _, fd := range cfh.faultingDescriptors {
dataChan <- fd
}
close(dataChan)
close(errorChan)
return dataChan, errorChan
}

type scriptedEngine struct {
submit func(*Job) (*Transition, error)
poll func(*Job) (*Transition, error)
Expand Down
74 changes: 60 additions & 14 deletions pkg/covenantsigner/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -45,7 +46,9 @@

if err := store.load(); err != nil {
// Release the lock if loading fails after successful acquisition.
store.Close()
if closeErr := store.Close(); closeErr != nil {
return nil, errors.Join(err, fmt.Errorf("failed to release store lock: %w", closeErr))
}
return nil, err
}

Expand All @@ -57,16 +60,25 @@
// kept open for the lifetime of the lock; closing it releases the lock.
func acquireFileLock(dataDir string) (*os.File, error) {
lockPath := filepath.Join(dataDir, jobsDirectory, lockFileName)
root, err := os.OpenRoot(dataDir)
if err != nil {
return nil, fmt.Errorf("cannot open data directory root [%s]: %w", dataDir, err)
}
defer func() {
if closeErr := root.Close(); closeErr != nil {
logger.Warnf("failed to close store root [%s]: [%v]", dataDir, closeErr)
}
}()

if err := os.MkdirAll(filepath.Dir(lockPath), 0700); err != nil {
if err := root.MkdirAll(jobsDirectory, 0700); err != nil {

Check failure on line 73 in pkg/covenantsigner/store.go

View workflow job for this annotation

GitHub Actions / client-vet

root.MkdirAll undefined (type *os.Root has no field or method MkdirAll)

Check failure on line 73 in pkg/covenantsigner/store.go

View workflow job for this annotation

GitHub Actions / client-lint

root.MkdirAll undefined (type *os.Root has no field or method MkdirAll) (compile)

Check failure on line 73 in pkg/covenantsigner/store.go

View workflow job for this annotation

GitHub Actions / client-lint

root.MkdirAll undefined (type *os.Root has no field or method MkdirAll) (compile)

Check failure on line 73 in pkg/covenantsigner/store.go

View workflow job for this annotation

GitHub Actions / client-race

root.MkdirAll undefined (type *os.Root has no field or method MkdirAll)
return nil, fmt.Errorf(
"cannot create lock directory [%s]: %w",
filepath.Dir(lockPath),
err,
)
}

lockFile, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0600)
lockFile, err := root.OpenFile(filepath.Join(jobsDirectory, lockFileName), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, fmt.Errorf(
"cannot open lock file [%s]: %w",
Expand All @@ -79,7 +91,9 @@
int(lockFile.Fd()),
syscall.LOCK_EX|syscall.LOCK_NB,
); err != nil {
lockFile.Close()
if closeErr := lockFile.Close(); closeErr != nil {
err = errors.Join(err, fmt.Errorf("failed to close lock file [%s]: %w", lockPath, closeErr))
}
return nil, fmt.Errorf(
"cannot acquire exclusive lock on [%s]: "+
"another process may already own the store: %w",
Expand Down Expand Up @@ -169,30 +183,62 @@

content, err := descriptor.Content()
if err != nil {
return err
return fmt.Errorf(
"cannot read persisted covenant signer job file [%s]: %w",
descriptor.Name(),
err,
)
}

job := &Job{}
if err := json.Unmarshal(content, job); err != nil {
return err
return fmt.Errorf(
"cannot parse persisted covenant signer job file [%s]: %w",
descriptor.Name(),
err,
)
}

existingID, ok := s.byRouteKey[routeKey(job.Route, job.RouteRequestID)]
if ok {
existing := s.byRequestID[existingID]
if existing != nil {
key := routeKey(job.Route, job.RouteRequestID)

if existingID, ok := s.byRouteKey[key]; ok {
if existing := s.byRequestID[existingID]; existing != nil {
existingIsNewerOrSame, err := isNewerOrSameJobRevision(existing, job)
if err != nil {
return err
}
if existingIsNewerOrSame {
// When the timestamp comparison fails, prefer
// whichever job has a parseable timestamp. If the
// candidate's timestamp is valid, the failure is on
// the existing job -- replace it. Otherwise skip the
// candidate.
if _, parseErr := time.Parse(time.RFC3339Nano, job.UpdatedAt); parseErr != nil {
logger.Warnf(
"skipping job [%s] with invalid timestamp on duplicate route key [%s/%s]: [%v]",
job.RequestID,
job.Route,
job.RouteRequestID,
err,
)
continue
}
logger.Warnf(
"replacing job [%s] with invalid timestamp on duplicate route key [%s/%s]: [%v]",
existing.RequestID,
job.Route,
job.RouteRequestID,
err,
)
} else if existingIsNewerOrSame {
continue
}
}

if existingID != job.RequestID {
delete(s.byRequestID, existingID)
}
}

s.byRequestID[job.RequestID] = job
s.byRouteKey[routeKey(job.Route, job.RouteRequestID)] = job.RequestID
s.byRouteKey[key] = job.RequestID
case err, ok := <-errorChan:
if !ok {
errorChan = nil
Expand Down
Loading
Loading