Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The `zetacored` binary must be upgraded to trigger chain parameters data migrati
* [4511](https://github.com/zeta-chain/node/pull/4511) - false mempool congested warning
* [4509](https://github.com/zeta-chain/node/pull/4509) - use outbound schedule interval in sui cctx scheduling
* [4514](https://github.com/zeta-chain/node/pull/4514) - use Zeta height as a factor to calculate the EVM chain artificial height for TSS keysign
* [4513](https://github.com/zeta-chain/node/pull/4513) - use `outbound_schedule_interval` and `outbound_schedule_lookahead` in ton cctx scheduling

### Tests

Expand Down
18 changes: 18 additions & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,22 @@ const (

// CmdUpdateERC20CustodyPauseStatus is used for CCTX of type cmd to give the instruction to the TSS to update the pause status of the ERC20 custody contract
CmdUpdateERC20CustodyPauseStatus = "cmd_update_erc20_custody_pause_status"

// MaxNonceOffsetFactor is the factor to determine the maximum nonce offset between first pending CCTX and the CCTX being scheduled.
// By setting a maximum nonce offset, the CCTX scheduler will conditionally hold on and wait for older pending CCTXs to be processed
// before scheduling higher nonce CCTXs, to avoid missing any pending CCTXs.
//
// The missed pending CCTXs situation is like this:
// - Pending nonces: [1000, 1020)
// - Pending CCTXs: [979, 980, 981, 982, 1000, 1001, ..., 1019], where 979 - 982 are missed.
// - OutboundScheduleLookahead == 20
// - MaxNonceOffsetFactor == 1.0, results in max nonce offset = 20 * 1.0 = 20
//
// In this case, the scheduler
// - should process CCTXs with nonces [979, 980, 981, 982] because they are missed pending CCTXs.
// - should NOT process CCTXs with nonces [1000, 1001, ..., 1019] because their nonces are much
// higher that of the first pending CCTX (1000 - 979 > 20).
//
// NOTE: 1.0 means use the same value as lookahead for the maximum nonce offset.
MaxNonceOffsetFactor = 1.0
)
13 changes: 3 additions & 10 deletions zetaclient/chains/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/pkg/ticker"
"github.com/zeta-chain/node/zetaclient/chains/base"
Expand All @@ -25,14 +26,6 @@ type EVM struct {
signer *signer.Signer
}

const (
// outboundLookBackFactor is the factor to determine how many nonces to look back for pending cctxs
// For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0,
// the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0)
// NOTE: 1.0 means look back the same number of cctxs as we look ahead
outboundLookBackFactor = 1.0
)

func New(scheduler *scheduler.Scheduler, observer *observer.Observer, signer *signer.Signer) *EVM {
return &EVM{
scheduler: scheduler,
Expand Down Expand Up @@ -135,7 +128,7 @@ func (e *EVM) scheduleCCTX(ctx context.Context) error {
chainID = e.observer.Chain().ChainId
lookahead = e.observer.ChainParams().OutboundScheduleLookahead
// #nosec G115 always in range
outboundScheduleLookBack = uint64(float64(lookahead) * outboundLookBackFactor)
maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor)
)

cctxList, err := e.observer.ZetaRepo().GetPendingCCTXs(ctx)
Expand All @@ -162,7 +155,7 @@ func (e *EVM) scheduleCCTX(ctx context.Context) error {
switch {
case params.ReceiverChainId != chainID:
return fmt.Errorf("chain id mismatch: want %d, got %d", chainID, params.ReceiverChainId)
case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+outboundScheduleLookBack:
case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+maxNonceOffset:
return fmt.Errorf(
"nonce %d is too high (%s). Earliest nonce %d",
params.TssNonce,
Expand Down
18 changes: 7 additions & 11 deletions zetaclient/chains/solana/solana.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"

"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/pkg/ticker"
"github.com/zeta-chain/node/zetaclient/chains/base"
Expand All @@ -17,14 +18,6 @@ import (
"github.com/zeta-chain/node/zetaclient/logs"
)

const (
// outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0,
// the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0)
// NOTE: 1.0 means look back the same number of cctxs as we look ahead
outboundLookbackFactor = 1.0
)

// Solana represents Solana observer-signer.
type Solana struct {
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -136,7 +129,7 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error {
// #nosec G115 positive
interval = uint64(s.observer.ChainParams().OutboundScheduleInterval)
scheduleLookahead = s.observer.ChainParams().OutboundScheduleLookahead
scheduleLookback = uint64(float64(scheduleLookahead) * outboundLookbackFactor)
maxNonceOffset = uint64(float64(scheduleLookahead) * constant.MaxNonceOffsetFactor)
needsProcessingCtr = 0
)

Expand All @@ -146,7 +139,7 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error {
}

// schedule keysign for each pending cctx
for _, cctx := range cctxList {
for i, cctx := range cctxList {
var (
params = cctx.GetCurrentOutboundParam()
inboundParams = cctx.GetInboundParams()
Expand All @@ -157,10 +150,13 @@ func (s *Solana) scheduleCCTX(ctx context.Context) error {
logger := s.observer.Logger().Outbound.With().Str(logs.FieldOutboundID, outboundID).Logger()

switch {
case int64(i) == scheduleLookahead:
// stop if lookahead is reached
return nil
case params.ReceiverChainId != chainID:
logger.Error().Msg("chain id mismatch")
continue
case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+scheduleLookback:
case params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+maxNonceOffset:
return fmt.Errorf(
"nonce %d is too high (%s). Earliest nonce %d",
params.TssNonce,
Expand Down
20 changes: 6 additions & 14 deletions zetaclient/chains/sui/sui.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/zeta-chain/node/pkg/bg"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/pkg/ticker"
"github.com/zeta-chain/node/zetaclient/chains/base"
Expand All @@ -26,14 +27,6 @@ type Sui struct {
signer *signer.Signer
}

const (
// outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0,
// the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0)
// NOTE: 1.0 means look back the same number of cctxs as we look ahead
outboundLookbackFactor = 1.0
)

// New Sui observer-signer constructor.
func New(scheduler *scheduler.Scheduler, observer *observer.Observer, signer *signer.Signer) *Sui {
return &Sui{scheduler, observer, signer}
Expand Down Expand Up @@ -140,30 +133,29 @@ func (s *Sui) scheduleCCTX(ctx context.Context) error {
interval = uint64(s.observer.ChainParams().OutboundScheduleInterval)
lookahead = s.observer.ChainParams().OutboundScheduleLookahead
// #nosec G115 always in range
lookback = uint64(float64(lookahead) * outboundLookbackFactor)
maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor)

firstNonce = cctxList[0].GetCurrentOutboundParam().TssNonce
maxNonce = firstNonce + lookback
maxNonce = firstNonce + maxNonceOffset
)

for i, cctx := range cctxList {
var (
outboundID = base.OutboundIDFromCCTX(cctx)
outboundParams = cctx.GetCurrentOutboundParam()
nonce = outboundParams.TssNonce
logger = s.outboundLogger(outboundID)
)

logger := s.outboundLogger(outboundID)

switch {
case int64(i) == lookahead:
// take only first N cctxs
// stop if lookahead is reached
return nil
case outboundParams.ReceiverChainId != chainID:
// should not happen
logger.Error().Msg("chain id mismatch")
continue
case nonce >= maxNonce:
case nonce > maxNonce:
return fmt.Errorf("nonce %d is too high (%s). Earliest nonce %d", nonce, outboundID, firstNonce)
case s.signer.IsOutboundActive(outboundID):
// cctx is already being processed & broadcasted by signer
Expand Down
88 changes: 55 additions & 33 deletions zetaclient/chains/ton/ton.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/pkg/constant"
"github.com/zeta-chain/node/pkg/scheduler"
"github.com/zeta-chain/node/pkg/ticker"
"github.com/zeta-chain/node/x/crosschain/types"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/chains/ton/observer"
"github.com/zeta-chain/node/zetaclient/chains/ton/signer"
Expand Down Expand Up @@ -121,48 +121,70 @@ func (t *TON) scheduleCCTX(ctx context.Context) error {

time.Sleep(delay)

// #nosec G115 always in range
zetaHeight := uint64(zetaBlock.Block.Height)

cctxs, err := t.observer.ZetaRepo().GetPendingCCTXs(ctx)
if err != nil {
return err
}

for _, cctx := range cctxs {
outboundID := base.OutboundIDFromCCTX(cctx)
err := t.processCCTX(ctx, outboundID, cctx, zetaHeight)
if err != nil {
t.outboundLogger(outboundID).Error().Err(err).Msg("failed to schedule CCTX")
}
// no-op
if len(cctxs) == 0 {
return nil
}

return nil
}
var (
// #nosec G115 always in range
zetaHeight = uint64(zetaBlock.Block.Height)
chainID = t.observer.Chain().ChainId

func (t *TON) processCCTX(ctx context.Context,
outboundID string,
cctx *types.CrossChainTx,
zetaHeight uint64,
) error {
switch {
case t.signer.IsOutboundActive(outboundID):
return nil //no-op
case cctx.GetCurrentOutboundParam().ReceiverChainId != t.observer.Chain().ChainId:
return errors.New("chain id mismatch")
}
// #nosec G115 positive
interval = uint64(t.observer.ChainParams().OutboundScheduleInterval)
lookahead = t.observer.ChainParams().OutboundScheduleLookahead
// #nosec G115 always in range
maxNonceOffset = uint64(float64(lookahead) * constant.MaxNonceOffsetFactor)

// vote outbound if it's already confirmed
continueKeySign, err := t.observer.VoteOutboundIfConfirmed(ctx, cctx)
if err != nil {
return errors.Wrap(err, "failed to VoteOutboundIfConfirmed")
}
if !continueKeySign {
t.outboundLogger(outboundID).Info().Msg("schedule CCTX: outbound already processed")
return nil
}
firstNonce = cctxs[0].GetCurrentOutboundParam().TssNonce
maxNonce = firstNonce + maxNonceOffset
)

for i, cctx := range cctxs {
var (
outboundID = base.OutboundIDFromCCTX(cctx)
outboundParams = cctx.GetCurrentOutboundParam()
nonce = outboundParams.TssNonce
logger = t.outboundLogger(outboundID)
)

switch {
case int64(i) == lookahead:
// stop if lookahead is reached
return nil
case outboundParams.ReceiverChainId != chainID:
// should not happen
logger.Error().Msg("chain id mismatch")
continue
case nonce > maxNonce:
return fmt.Errorf("nonce %d is too high (%s). Earliest nonce %d", nonce, outboundID, firstNonce)
case t.signer.IsOutboundActive(outboundID):
// cctx is already being processed & broadcasted by signer
continue
}

// vote outbound if it's already confirmed
continueKeysign, err := t.observer.VoteOutboundIfConfirmed(ctx, cctx)
if err != nil {
logger.Error().Err(err).Msg("call to VoteOutboundIfConfirmed failed")
continue
}
if !continueKeysign {
logger.Info().Msg("outbound already processed")
continue
}

go t.signer.TryProcessOutbound(ctx, cctx, t.observer.ZetaRepo(), zetaHeight)
// schedule keysign if the interval has arrived
if nonce%interval == zetaHeight%interval {
go t.signer.TryProcessOutbound(ctx, cctx, t.observer.ZetaRepo(), zetaHeight)
}
}

return nil
}
Expand Down
Loading