Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning]https://semver.org/spec/v2.0.0.
## Releases

### Unreleased
- Added rollback support [DV-1943]

### [0.9.6] - 2025-09-29
- Added two-factor secret encryption [DV-2526]
Expand Down
42 changes: 42 additions & 0 deletions internal/eproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/dv-net/dv-processing/internal/constants"
"github.com/dv-net/dv-processing/internal/models"
"github.com/dv-net/dv-processing/pkg/retry"
"github.com/dv-net/dv-processing/pkg/utils"
"github.com/dv-net/dv-processing/pkg/walletsdk/wconstants"
addressesv2 "github.com/dv-net/dv-proto/gen/go/eproxy/addresses/v2"
"github.com/dv-net/dv-proto/gen/go/eproxy/addresses/v2/addressesv2connect"
Expand All @@ -19,6 +20,8 @@ import (
"github.com/dv-net/dv-proto/gen/go/eproxy/blocks/v2/blocksv2connect"
"github.com/dv-net/dv-proto/gen/go/eproxy/btclike/v2/btclikev2connect"
commonv2 "github.com/dv-net/dv-proto/gen/go/eproxy/common/v2"
incidentsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2"
"github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2/incidentsv2connect"
trxv2 "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2"
"github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2/transactionsv2connect"
"github.com/dv-net/dv-proto/gen/go/eproxy/tron/v1/tronv1connect"
Expand Down Expand Up @@ -92,6 +95,10 @@ func (s *Service) TransactionsClient() transactionsv2connect.TransactionsService
return s.eproxyClient.TransactionsClient
}

func (s *Service) IncidentsClient() incidentsv2connect.IncidentsServiceClient {
return s.eproxyClient.IncidentsClient
}

// AddressBalances returns the balances of the address on the blockchain
func (s *Service) AddressBalances(ctx context.Context, address string, blockchain wconstants.BlockchainType) ([]*models.Asset, error) {
srvBlockchain := ConvertBlockchain(blockchain)
Expand Down Expand Up @@ -233,6 +240,41 @@ func (s *Service) LastBlockNumber(ctx context.Context, blockchain wconstants.Blo
return response.Msg.GetHeight(), nil
}

// GetIncidents returns recent incidents for the blockchain
func (s *Service) GetIncidents(ctx context.Context, blockchain wconstants.BlockchainType, limit uint32) ([]*incidentsv2.Incident, error) {
incidents, err := s.eproxyClient.IncidentsClient.Find(ctx, connect.NewRequest(&incidentsv2.FindRequest{
Blockchain: ConvertBlockchain(blockchain),
Common: &commonv2.FindRequestCommon{
Page: utils.Pointer(uint32(1)),
PageSize: utils.Pointer(limit),
},
}))
if err != nil {
return nil, fmt.Errorf("get incidents: %w", err)
}

return incidents.Msg.GetItems(), nil
}

// GetRollbackStartingBlock returns the block height to start parsing from after a rollback incident
func (s *Service) GetRollbackStartingBlock(ctx context.Context, blockchain wconstants.BlockchainType) (uint64, error) {
incidents, err := s.GetIncidents(ctx, blockchain, 1)
if err != nil {
return 0, fmt.Errorf("get last incident for rollback recovery: %w", err)
}

if len(incidents) < 1 {
return 0, fmt.Errorf("no incidents found for blockchain %s", blockchain.String())
}

incident := incidents[0]
if incident.GetType() != incidentsv2.IncidentType_INCIDENT_TYPE_ROLLBACK {
return 0, fmt.Errorf("last incident is not a rollback incident for blockchain %s", blockchain.String())
}

return incident.GetDataRollback().GetRevertToBlockHeight(), nil
}

type FindTransactionsParams struct {
BlockHeight *uint64
Hash *string
Expand Down
250 changes: 244 additions & 6 deletions internal/escanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"

"connectrpc.com/connect"
"github.com/dv-net/mx/logger"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/dv-net/dv-processing/pkg/walletsdk/bch"
"github.com/dv-net/dv-processing/pkg/walletsdk/tron"
"github.com/dv-net/dv-processing/pkg/walletsdk/wconstants"
blocksv2 "github.com/dv-net/dv-proto/gen/go/eproxy/blocks/v2"
incidentsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2"
transactionsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2"
)

Expand Down Expand Up @@ -87,16 +90,16 @@ func (s *scanner) start(ctx context.Context) error {
go func() {
for ctx.Err() == nil {
fn := func() error {
lastNodeBlockHeight, err := s.bs.EProxy().LastBlockNumber(ctx, s.blockchain)
lastDBBlockHeight, err := s.bs.EProxy().LastBlockNumber(ctx, s.blockchain)
if err != nil {
return fmt.Errorf("get last block from explorer: %w", err)
}

if s.lastNodeBlockHeight.Load() == int64(lastNodeBlockHeight) { //nolint:gosec
if s.lastNodeBlockHeight.Load() == int64(lastDBBlockHeight) { //nolint:gosec
return nil
}

s.lastNodeBlockHeight.Store(int64(lastNodeBlockHeight)) //nolint:gosec
s.lastNodeBlockHeight.Store(int64(lastDBBlockHeight)) //nolint:gosec

return nil
}
Expand Down Expand Up @@ -142,6 +145,22 @@ func (s *scanner) handleBlocks(ctx context.Context) error {
existsLastBlockDB = false
}

// Check for potential rollback before processing next block
if existsLastBlockDB && lpBlock >= 0 {
nextBlock := lpBlock + 1
s.logger.Debugf("Checking for rollback for next block %d", nextBlock)
if err := s.checkForRollback(ctx, nextBlock); err != nil {
return fmt.Errorf("rollback check for block %d: %w", nextBlock, err)
}

// Reload lpBlock in case it was updated during rollback recovery
newLpBlock := s.lastParsedBlockHeight.Load()
if newLpBlock != lpBlock {
s.logger.Infof("Block height updated after rollback check: %d -> %d", lpBlock, newLpBlock)
lpBlock = newLpBlock
}
}

for i := lpBlock + 1; i <= lnBlock; i++ {
if ctx.Err() != nil {
return nil //nolint:nilerr
Expand Down Expand Up @@ -188,7 +207,7 @@ func (cwp *createWebhookParams) IsTrxHotWalletDeposit() bool {
}

// handleBlock
func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error {
func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { //nolint:funlen
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()

Expand All @@ -204,6 +223,17 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error {

s.logger.Debugf("found %d transactions in block %d in %s", len(txs), blockHeight, time.Since(now))

// Get block hash for rollback detection
block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{
Blockchain: eproxy.ConvertBlockchain(s.blockchain),
Height: uint64(blockHeight), //nolint:gosec
}))
if err != nil {
return fmt.Errorf("get block for hash: %w", err)
}

blockHash := block.Msg.GetItem().GetHash()

createWhParams := utils.NewSlice[createWebhookParams]()

eg, gCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -357,13 +387,13 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error {
s.logger.Debugf("processed block %d in %s", blockHeight, time.Since(now))

if existsLastBlockDB {
if err := s.bs.ProcessedBlocks().UpdateNumber(ctx, s.blockchain, blockHeight, repos.WithTx(dbTx)); err != nil {
if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil {
return fmt.Errorf("update block number: %w", err)
}

s.logger.Debugf("updated last block from explorer %s is %d", s.blockchain.String(), blockHeight)
} else {
if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, blockHeight, repos.WithTx(dbTx)); err != nil {
if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil {
return fmt.Errorf("create block number: %w", err)
}
}
Expand Down Expand Up @@ -452,3 +482,211 @@ func (s *scanner) checksForEvent(event *transactionsv2.Event) []eventCheck {

return checks
}

// checkForRollback checks if the next block's prevHash matches our stored hash
func (s *scanner) checkForRollback(ctx context.Context, nextBlockHeight int64) error {
s.logger.Debugf("Checking rollback for block %d", nextBlockHeight)

// Get the next block to check prevHash consistency
block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{
Blockchain: eproxy.ConvertBlockchain(s.blockchain),
Height: uint64(nextBlockHeight), //nolint:gosec
}))
if err != nil {
// If we can't get the block, it might not exist yet OR there was a rollback
if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist") {
s.logger.Warnf("Block %d not found in explorer - checking for rollback incident", nextBlockHeight)
// Check incidents as fallback - the block might have been rolled back
return s.checkForIncidents(ctx)
}
return fmt.Errorf("get block for rollback check: %w", err)
}

// Get the previous block hash from the next block
nextBlockPrevHash := block.Msg.GetItem().GetPrevHash()
if nextBlockPrevHash == "" {
s.logger.Debugf("block %d has no prevHash, skipping rollback check", nextBlockHeight)
return nil
}

// Get stored block info from database
lastBlock, err := s.bs.ProcessedBlocks().LastBlock(ctx, s.blockchain)
if err != nil {
if errors.Is(err, storecmn.ErrNotFound) {
s.logger.Debugf("No stored block found, skipping rollback check")
return nil
}
return fmt.Errorf("get last processed block: %w", err)
}

s.logger.Debugf("Comparing hashes: stored=%s, next_block_prevHash=%s", lastBlock.Hash, nextBlockPrevHash)

// If we have a stored hash and the next block's prevHash doesn't match it, we have a rollback
if lastBlock.Hash != "" && lastBlock.Hash != nextBlockPrevHash {
s.logger.Warnf("Rollback detected! Stored hash %s != next block's prevHash %s for block %d",
lastBlock.Hash, nextBlockPrevHash, nextBlockHeight)

return s.handleRollback(ctx)
}

s.logger.Debugf("No rollback detected for block %d", nextBlockHeight)
return nil
}

// handleRollback handles blockchain rollback by getting the new starting point from incidents API
func (s *scanner) handleRollback(ctx context.Context) error {
s.logger.Infof("Handling rollback incident for blockchain %s", s.blockchain.String())

newStartingBlock, err := s.bs.EProxy().GetRollbackStartingBlock(ctx, s.blockchain)
if err != nil {
return fmt.Errorf("get rollback starting block: %w", err)
}

s.logger.Infof("Rolling back to block %d for blockchain %s", newStartingBlock, s.blockchain.String())

// The rollback target is the safe block we need to revert to (newStartingBlock - 1)
// This is the last trusted block before the rollback occurred
rollbackBlockHeight := int64(newStartingBlock) - 1 //nolint:gosec

block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{
Blockchain: eproxy.ConvertBlockchain(s.blockchain),
Height: uint64(rollbackBlockHeight), //nolint:gosec
}))
if err != nil {
return fmt.Errorf("get rollback block hash: %w", err)
}

rollbackBlockHash := block.Msg.GetItem().GetHash()

// IMPORTANT: Set lastParsedBlockHeight to rollbackBlockHeight so the next iteration
// will start parsing from newStartingBlock (rollbackBlockHeight + 1)
// This ensures we re-parse blocks starting from newStartingBlock which were rolled back
s.lastParsedBlockHeight.Store(rollbackBlockHeight)

// Update the database with the rollback target block info
return pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error {
if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, rollbackBlockHeight, rollbackBlockHash, repos.WithTx(dbTx)); err != nil {
return fmt.Errorf("update processed block after rollback: %w", err)
}

s.logger.Infof("Successfully handled rollback, stored block %d with hash %s, will restart parsing from block %d", rollbackBlockHeight, rollbackBlockHash, newStartingBlock)
return nil
})
}

// checkForIncidents checks for new rollback incidents and processes them
func (s *scanner) checkForIncidents(ctx context.Context) error {
s.logger.Debugf("Checking for new rollback incidents")

// Get incidents from explorer
incidents, err := s.bs.EProxy().GetIncidents(ctx, s.blockchain, 10)
if err != nil {
s.logger.Debugf("Failed to get incidents: %v", err)
return nil // Don't fail scanner if we can't get incidents
}

if len(incidents) == 0 {
s.logger.Debugf("No incidents found")
return nil
}

// Check each incident
for _, incident := range incidents {
if incident.GetType() != incidentsv2.IncidentType_INCIDENT_TYPE_ROLLBACK {
continue
}

// Check if this incident was already processed
processed, err := s.bs.ProcessedIncidents().IsProcessed(ctx, s.blockchain, incident.GetId())
if err != nil {
return fmt.Errorf("check if incident processed: %w", err)
}

if processed {
s.logger.Debugf("Incident %s already processed, skipping", incident.GetId())
continue
}

// New incident found!
rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec
currentBlock := s.lastParsedBlockHeight.Load()

// Check if we need to rollback
if currentBlock >= rollbackStartBlock { //nolint:nestif
s.logger.Warnf("New rollback incident detected: id=%s, current_block=%d, rollback_to=%d",
incident.GetId(), currentBlock, rollbackStartBlock-1)

// Mark incident as processing before handling
if err := s.bs.ProcessedIncidents().MarkAsProcessing(ctx,
s.blockchain,
incident.GetId(),
"rollback",
rollbackStartBlock,
currentBlock); err != nil {
s.logger.Errorf("Failed to mark incident as processing: %v", err)
}

// Handle the rollback
if err := s.handleRollbackWithIncident(ctx, incident); err != nil {
// Mark as failed
if markErr := s.bs.ProcessedIncidents().MarkAsFailed(ctx,
s.blockchain,
incident.GetId(),
err.Error()); markErr != nil {
s.logger.Errorf("Failed to mark incident as failed: %v", markErr)
}
return fmt.Errorf("handle rollback for incident %s: %w", incident.GetId(), err)
}

// Mark as completed
if err := s.bs.ProcessedIncidents().MarkAsCompleted(ctx, s.blockchain, incident.GetId()); err != nil {
s.logger.Errorf("Failed to mark incident as completed: %v", err)
}

s.logger.Infof("Successfully processed rollback incident %s", incident.GetId())
return nil // Process one incident at a time
}

s.logger.Debugf("Rollback incident %s not applicable: current_block=%d < rollback_start=%d",
incident.GetId(), currentBlock, rollbackStartBlock)
}

return nil
}

// handleRollbackWithIncident handles rollback using incident information
func (s *scanner) handleRollbackWithIncident(ctx context.Context, incident *incidentsv2.Incident) error {
rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec
rollbackBlockHeight := rollbackStartBlock - 1

s.logger.Infof("Processing rollback incident %s: rolling back to block %d", incident.GetId(), rollbackBlockHeight)

// Get the hash of the rollback target block
block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{
Blockchain: eproxy.ConvertBlockchain(s.blockchain),
Height: uint64(rollbackBlockHeight), //nolint:gosec
}))
if err != nil {
return fmt.Errorf("get rollback block hash: %w", err)
}

rollbackBlockHash := block.Msg.GetItem().GetHash()

// Update lastParsedBlockHeight
s.lastParsedBlockHeight.Store(rollbackBlockHeight)

// Update database
return pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error {
if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx,
s.blockchain,
rollbackBlockHeight,
rollbackBlockHash,
repos.WithTx(dbTx)); err != nil {
return fmt.Errorf("update processed block: %w", err)
}

s.logger.Infof("Rollback completed: stored block %d with hash %s, will resume from block %d",
rollbackBlockHeight, rollbackBlockHash, rollbackStartBlock)
return nil
})
}
Loading