diff --git a/CHANGELOG.md b/CHANGELOG.md index a8ba01f..3d9f4a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning]https://semver.org/spec/v2.0.0. ## Releases ### Unreleased +- Added rollback support [DV-1943] ### [0.9.6] - 2025-09-29 - Added two-factor secret encryption [DV-2526] diff --git a/internal/eproxy/service.go b/internal/eproxy/service.go index bb159fa..1cab3c5 100644 --- a/internal/eproxy/service.go +++ b/internal/eproxy/service.go @@ -10,6 +10,7 @@ import ( "github.com/dv-net/dv-processing/internal/constants" "github.com/dv-net/dv-processing/internal/models" "github.com/dv-net/dv-processing/pkg/retry" + "github.com/dv-net/dv-processing/pkg/utils" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" addressesv2 "github.com/dv-net/dv-proto/gen/go/eproxy/addresses/v2" "github.com/dv-net/dv-proto/gen/go/eproxy/addresses/v2/addressesv2connect" @@ -19,6 +20,8 @@ import ( "github.com/dv-net/dv-proto/gen/go/eproxy/blocks/v2/blocksv2connect" "github.com/dv-net/dv-proto/gen/go/eproxy/btclike/v2/btclikev2connect" commonv2 "github.com/dv-net/dv-proto/gen/go/eproxy/common/v2" + incidentsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2" + "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2/incidentsv2connect" trxv2 "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2" "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2/transactionsv2connect" "github.com/dv-net/dv-proto/gen/go/eproxy/tron/v1/tronv1connect" @@ -92,6 +95,10 @@ func (s *Service) TransactionsClient() transactionsv2connect.TransactionsService return s.eproxyClient.TransactionsClient } +func (s *Service) IncidentsClient() incidentsv2connect.IncidentsServiceClient { + return s.eproxyClient.IncidentsClient +} + // AddressBalances returns the balances of the address on the blockchain func (s *Service) AddressBalances(ctx context.Context, address string, blockchain wconstants.BlockchainType) ([]*models.Asset, error) { srvBlockchain := ConvertBlockchain(blockchain) @@ -233,6 +240,41 @@ func (s *Service) LastBlockNumber(ctx context.Context, blockchain wconstants.Blo return response.Msg.GetHeight(), nil } +// GetIncidents returns recent incidents for the blockchain +func (s *Service) GetIncidents(ctx context.Context, blockchain wconstants.BlockchainType, limit uint32) ([]*incidentsv2.Incident, error) { + incidents, err := s.eproxyClient.IncidentsClient.Find(ctx, connect.NewRequest(&incidentsv2.FindRequest{ + Blockchain: ConvertBlockchain(blockchain), + Common: &commonv2.FindRequestCommon{ + Page: utils.Pointer(uint32(1)), + PageSize: utils.Pointer(limit), + }, + })) + if err != nil { + return nil, fmt.Errorf("get incidents: %w", err) + } + + return incidents.Msg.GetItems(), nil +} + +// GetRollbackStartingBlock returns the block height to start parsing from after a rollback incident +func (s *Service) GetRollbackStartingBlock(ctx context.Context, blockchain wconstants.BlockchainType) (uint64, error) { + incidents, err := s.GetIncidents(ctx, blockchain, 1) + if err != nil { + return 0, fmt.Errorf("get last incident for rollback recovery: %w", err) + } + + if len(incidents) < 1 { + return 0, fmt.Errorf("no incidents found for blockchain %s", blockchain.String()) + } + + incident := incidents[0] + if incident.GetType() != incidentsv2.IncidentType_INCIDENT_TYPE_ROLLBACK { + return 0, fmt.Errorf("last incident is not a rollback incident for blockchain %s", blockchain.String()) + } + + return incident.GetDataRollback().GetRevertToBlockHeight(), nil +} + type FindTransactionsParams struct { BlockHeight *uint64 Hash *string diff --git a/internal/escanner/scanner.go b/internal/escanner/scanner.go index 1953391..138ebcc 100644 --- a/internal/escanner/scanner.go +++ b/internal/escanner/scanner.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "connectrpc.com/connect" "github.com/dv-net/mx/logger" "github.com/jackc/pgx/v5" "github.com/riverqueue/river" @@ -30,6 +31,8 @@ import ( "github.com/dv-net/dv-processing/pkg/walletsdk/bch" "github.com/dv-net/dv-processing/pkg/walletsdk/tron" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" + blocksv2 "github.com/dv-net/dv-proto/gen/go/eproxy/blocks/v2" + incidentsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/incidents/v2" transactionsv2 "github.com/dv-net/dv-proto/gen/go/eproxy/transactions/v2" ) @@ -87,16 +90,16 @@ func (s *scanner) start(ctx context.Context) error { go func() { for ctx.Err() == nil { fn := func() error { - lastNodeBlockHeight, err := s.bs.EProxy().LastBlockNumber(ctx, s.blockchain) + lastDBBlockHeight, err := s.bs.EProxy().LastBlockNumber(ctx, s.blockchain) if err != nil { return fmt.Errorf("get last block from explorer: %w", err) } - if s.lastNodeBlockHeight.Load() == int64(lastNodeBlockHeight) { //nolint:gosec + if s.lastNodeBlockHeight.Load() == int64(lastDBBlockHeight) { //nolint:gosec return nil } - s.lastNodeBlockHeight.Store(int64(lastNodeBlockHeight)) //nolint:gosec + s.lastNodeBlockHeight.Store(int64(lastDBBlockHeight)) //nolint:gosec return nil } @@ -142,6 +145,22 @@ func (s *scanner) handleBlocks(ctx context.Context) error { existsLastBlockDB = false } + // Check for potential rollback before processing next block + if existsLastBlockDB && lpBlock >= 0 { + nextBlock := lpBlock + 1 + s.logger.Debugf("Checking for rollback for next block %d", nextBlock) + if err := s.checkForRollback(ctx, nextBlock); err != nil { + return fmt.Errorf("rollback check for block %d: %w", nextBlock, err) + } + + // Reload lpBlock in case it was updated during rollback recovery + newLpBlock := s.lastParsedBlockHeight.Load() + if newLpBlock != lpBlock { + s.logger.Infof("Block height updated after rollback check: %d -> %d", lpBlock, newLpBlock) + lpBlock = newLpBlock + } + } + for i := lpBlock + 1; i <= lnBlock; i++ { if ctx.Err() != nil { return nil //nolint:nilerr @@ -188,7 +207,7 @@ func (cwp *createWebhookParams) IsTrxHotWalletDeposit() bool { } // handleBlock -func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { +func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { //nolint:funlen ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() @@ -204,6 +223,17 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { s.logger.Debugf("found %d transactions in block %d in %s", len(txs), blockHeight, time.Since(now)) + // Get block hash for rollback detection + block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ + Blockchain: eproxy.ConvertBlockchain(s.blockchain), + Height: uint64(blockHeight), //nolint:gosec + })) + if err != nil { + return fmt.Errorf("get block for hash: %w", err) + } + + blockHash := block.Msg.GetItem().GetHash() + createWhParams := utils.NewSlice[createWebhookParams]() eg, gCtx := errgroup.WithContext(ctx) @@ -357,13 +387,13 @@ func (s *scanner) handleBlock(blockHeight int64, existsLastBlockDB bool) error { s.logger.Debugf("processed block %d in %s", blockHeight, time.Since(now)) if existsLastBlockDB { - if err := s.bs.ProcessedBlocks().UpdateNumber(ctx, s.blockchain, blockHeight, repos.WithTx(dbTx)); err != nil { + if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil { return fmt.Errorf("update block number: %w", err) } s.logger.Debugf("updated last block from explorer %s is %d", s.blockchain.String(), blockHeight) } else { - if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, blockHeight, repos.WithTx(dbTx)); err != nil { + if err := s.bs.ProcessedBlocks().Create(ctx, s.blockchain, blockHeight, blockHash, repos.WithTx(dbTx)); err != nil { return fmt.Errorf("create block number: %w", err) } } @@ -452,3 +482,211 @@ func (s *scanner) checksForEvent(event *transactionsv2.Event) []eventCheck { return checks } + +// checkForRollback checks if the next block's prevHash matches our stored hash +func (s *scanner) checkForRollback(ctx context.Context, nextBlockHeight int64) error { + s.logger.Debugf("Checking rollback for block %d", nextBlockHeight) + + // Get the next block to check prevHash consistency + block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ + Blockchain: eproxy.ConvertBlockchain(s.blockchain), + Height: uint64(nextBlockHeight), //nolint:gosec + })) + if err != nil { + // If we can't get the block, it might not exist yet OR there was a rollback + if strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "does not exist") { + s.logger.Warnf("Block %d not found in explorer - checking for rollback incident", nextBlockHeight) + // Check incidents as fallback - the block might have been rolled back + return s.checkForIncidents(ctx) + } + return fmt.Errorf("get block for rollback check: %w", err) + } + + // Get the previous block hash from the next block + nextBlockPrevHash := block.Msg.GetItem().GetPrevHash() + if nextBlockPrevHash == "" { + s.logger.Debugf("block %d has no prevHash, skipping rollback check", nextBlockHeight) + return nil + } + + // Get stored block info from database + lastBlock, err := s.bs.ProcessedBlocks().LastBlock(ctx, s.blockchain) + if err != nil { + if errors.Is(err, storecmn.ErrNotFound) { + s.logger.Debugf("No stored block found, skipping rollback check") + return nil + } + return fmt.Errorf("get last processed block: %w", err) + } + + s.logger.Debugf("Comparing hashes: stored=%s, next_block_prevHash=%s", lastBlock.Hash, nextBlockPrevHash) + + // If we have a stored hash and the next block's prevHash doesn't match it, we have a rollback + if lastBlock.Hash != "" && lastBlock.Hash != nextBlockPrevHash { + s.logger.Warnf("Rollback detected! Stored hash %s != next block's prevHash %s for block %d", + lastBlock.Hash, nextBlockPrevHash, nextBlockHeight) + + return s.handleRollback(ctx) + } + + s.logger.Debugf("No rollback detected for block %d", nextBlockHeight) + return nil +} + +// handleRollback handles blockchain rollback by getting the new starting point from incidents API +func (s *scanner) handleRollback(ctx context.Context) error { + s.logger.Infof("Handling rollback incident for blockchain %s", s.blockchain.String()) + + newStartingBlock, err := s.bs.EProxy().GetRollbackStartingBlock(ctx, s.blockchain) + if err != nil { + return fmt.Errorf("get rollback starting block: %w", err) + } + + s.logger.Infof("Rolling back to block %d for blockchain %s", newStartingBlock, s.blockchain.String()) + + // The rollback target is the safe block we need to revert to (newStartingBlock - 1) + // This is the last trusted block before the rollback occurred + rollbackBlockHeight := int64(newStartingBlock) - 1 //nolint:gosec + + block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ + Blockchain: eproxy.ConvertBlockchain(s.blockchain), + Height: uint64(rollbackBlockHeight), //nolint:gosec + })) + if err != nil { + return fmt.Errorf("get rollback block hash: %w", err) + } + + rollbackBlockHash := block.Msg.GetItem().GetHash() + + // IMPORTANT: Set lastParsedBlockHeight to rollbackBlockHeight so the next iteration + // will start parsing from newStartingBlock (rollbackBlockHeight + 1) + // This ensures we re-parse blocks starting from newStartingBlock which were rolled back + s.lastParsedBlockHeight.Store(rollbackBlockHeight) + + // Update the database with the rollback target block info + return pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error { + if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, s.blockchain, rollbackBlockHeight, rollbackBlockHash, repos.WithTx(dbTx)); err != nil { + return fmt.Errorf("update processed block after rollback: %w", err) + } + + s.logger.Infof("Successfully handled rollback, stored block %d with hash %s, will restart parsing from block %d", rollbackBlockHeight, rollbackBlockHash, newStartingBlock) + return nil + }) +} + +// checkForIncidents checks for new rollback incidents and processes them +func (s *scanner) checkForIncidents(ctx context.Context) error { + s.logger.Debugf("Checking for new rollback incidents") + + // Get incidents from explorer + incidents, err := s.bs.EProxy().GetIncidents(ctx, s.blockchain, 10) + if err != nil { + s.logger.Debugf("Failed to get incidents: %v", err) + return nil // Don't fail scanner if we can't get incidents + } + + if len(incidents) == 0 { + s.logger.Debugf("No incidents found") + return nil + } + + // Check each incident + for _, incident := range incidents { + if incident.GetType() != incidentsv2.IncidentType_INCIDENT_TYPE_ROLLBACK { + continue + } + + // Check if this incident was already processed + processed, err := s.bs.ProcessedIncidents().IsProcessed(ctx, s.blockchain, incident.GetId()) + if err != nil { + return fmt.Errorf("check if incident processed: %w", err) + } + + if processed { + s.logger.Debugf("Incident %s already processed, skipping", incident.GetId()) + continue + } + + // New incident found! + rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec + currentBlock := s.lastParsedBlockHeight.Load() + + // Check if we need to rollback + if currentBlock >= rollbackStartBlock { //nolint:nestif + s.logger.Warnf("New rollback incident detected: id=%s, current_block=%d, rollback_to=%d", + incident.GetId(), currentBlock, rollbackStartBlock-1) + + // Mark incident as processing before handling + if err := s.bs.ProcessedIncidents().MarkAsProcessing(ctx, + s.blockchain, + incident.GetId(), + "rollback", + rollbackStartBlock, + currentBlock); err != nil { + s.logger.Errorf("Failed to mark incident as processing: %v", err) + } + + // Handle the rollback + if err := s.handleRollbackWithIncident(ctx, incident); err != nil { + // Mark as failed + if markErr := s.bs.ProcessedIncidents().MarkAsFailed(ctx, + s.blockchain, + incident.GetId(), + err.Error()); markErr != nil { + s.logger.Errorf("Failed to mark incident as failed: %v", markErr) + } + return fmt.Errorf("handle rollback for incident %s: %w", incident.GetId(), err) + } + + // Mark as completed + if err := s.bs.ProcessedIncidents().MarkAsCompleted(ctx, s.blockchain, incident.GetId()); err != nil { + s.logger.Errorf("Failed to mark incident as completed: %v", err) + } + + s.logger.Infof("Successfully processed rollback incident %s", incident.GetId()) + return nil // Process one incident at a time + } + + s.logger.Debugf("Rollback incident %s not applicable: current_block=%d < rollback_start=%d", + incident.GetId(), currentBlock, rollbackStartBlock) + } + + return nil +} + +// handleRollbackWithIncident handles rollback using incident information +func (s *scanner) handleRollbackWithIncident(ctx context.Context, incident *incidentsv2.Incident) error { + rollbackStartBlock := int64(incident.GetDataRollback().GetRevertToBlockHeight()) //nolint:gosec + rollbackBlockHeight := rollbackStartBlock - 1 + + s.logger.Infof("Processing rollback incident %s: rolling back to block %d", incident.GetId(), rollbackBlockHeight) + + // Get the hash of the rollback target block + block, err := s.bs.EProxy().BlocksClient().Get(ctx, connect.NewRequest(&blocksv2.GetRequest{ + Blockchain: eproxy.ConvertBlockchain(s.blockchain), + Height: uint64(rollbackBlockHeight), //nolint:gosec + })) + if err != nil { + return fmt.Errorf("get rollback block hash: %w", err) + } + + rollbackBlockHash := block.Msg.GetItem().GetHash() + + // Update lastParsedBlockHeight + s.lastParsedBlockHeight.Store(rollbackBlockHeight) + + // Update database + return pgx.BeginTxFunc(ctx, s.store.PSQLConn(), pgx.TxOptions{}, func(dbTx pgx.Tx) error { + if err := s.bs.ProcessedBlocks().UpdateNumberWithHash(ctx, + s.blockchain, + rollbackBlockHeight, + rollbackBlockHash, + repos.WithTx(dbTx)); err != nil { + return fmt.Errorf("update processed block: %w", err) + } + + s.logger.Infof("Rollback completed: stored block %d with hash %s, will resume from block %d", + rollbackBlockHeight, rollbackBlockHash, rollbackStartBlock) + return nil + }) +} diff --git a/internal/fsm/fsmtron/fsm_test.go b/internal/fsm/fsmtron/fsm_test.go index aedcdf5..7723e11 100644 --- a/internal/fsm/fsmtron/fsm_test.go +++ b/internal/fsm/fsmtron/fsm_test.go @@ -86,7 +86,7 @@ func TestFSM(t *testing.T) { bc, err := blockchains.New(context.Background(), conf.Blockchain, sdk) require.NoError(t, err) - sysSvc := system.New(st, "sysVersion", "sysCommit") + sysSvc := system.New(l, st, "sysVersion", "sysCommit") bs, err := baseservices.New(appCtx, l, conf, st, explorerProxySvc, bc, sysSvc, dispatcher.New(), nil, sdk) require.NoError(t, err) diff --git a/internal/models/models_gen.go b/internal/models/models_gen.go index 8818ffd..ea43534 100644 --- a/internal/models/models_gen.go +++ b/internal/models/models_gen.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package models @@ -143,6 +143,20 @@ type ProcessedBlock struct { Number int64 `db:"number" json:"number"` CreatedAt pgtype.Timestamptz `db:"created_at" json:"created_at"` UpdatedAt pgtype.Timestamptz `db:"updated_at" json:"updated_at"` + Hash pgtype.Text `db:"hash" json:"hash"` +} + +type ProcessedIncident struct { + ID string `db:"id" json:"id"` + Blockchain wconstants.BlockchainType `db:"blockchain" json:"blockchain"` + IncidentType string `db:"incident_type" json:"incident_type"` + Status string `db:"status" json:"status"` + RollbackFromBlock pgtype.Int8 `db:"rollback_from_block" json:"rollback_from_block"` + RollbackToBlock pgtype.Int8 `db:"rollback_to_block" json:"rollback_to_block"` + ErrorMessage pgtype.Text `db:"error_message" json:"error_message"` + StartedAt pgtype.Timestamp `db:"started_at" json:"started_at"` + CompletedAt pgtype.Timestamp `db:"completed_at" json:"completed_at"` + CreatedAt pgtype.Timestamp `db:"created_at" json:"created_at"` } type ProcessingWallet struct { diff --git a/internal/services/baseservices/baseservices.go b/internal/services/baseservices/baseservices.go index 71e2b36..e268c90 100644 --- a/internal/services/baseservices/baseservices.go +++ b/internal/services/baseservices/baseservices.go @@ -14,6 +14,7 @@ import ( "github.com/dv-net/dv-processing/internal/services/clients" "github.com/dv-net/dv-processing/internal/services/owners" "github.com/dv-net/dv-processing/internal/services/processedblocks" + "github.com/dv-net/dv-processing/internal/services/processedincidents" "github.com/dv-net/dv-processing/internal/services/system" "github.com/dv-net/dv-processing/internal/services/transfers" "github.com/dv-net/dv-processing/internal/services/wallets" @@ -33,6 +34,7 @@ type IBaseServices interface { //nolint:interfacebloat Clients() *clients.Service Owners() *owners.Service ProcessedBlocks() *processedblocks.Service + ProcessedIncidents() *processedincidents.Service Wallets() *wallets.Service System() system.IService Webhooks() *webhooks.Service @@ -52,18 +54,19 @@ type IBaseServices interface { //nolint:interfacebloat } type service struct { - clients *clients.Service - owners *owners.Service - processedBlocks *processedblocks.Service - system system.IService - wallets *wallets.Service - webhooks *webhooks.Service - eproxy *eproxy.Service - blockchains *blockchains.Blockchains - transfers *transfers.Service - madmin *madmin.Service - rmanager *rmanager.Service - upd *updater.Service + clients *clients.Service + owners *owners.Service + processedBlocks *processedblocks.Service + processedIncidents *processedincidents.Service + system system.IService + wallets *wallets.Service + webhooks *webhooks.Service + eproxy *eproxy.Service + blockchains *blockchains.Blockchains + transfers *transfers.Service + madmin *madmin.Service + rmanager *rmanager.Service + upd *updater.Service } func New( @@ -86,6 +89,7 @@ func New( walletsSvc := wallets.New(l, conf, st, publisher, walletSDK) ownersSvc := owners.New(conf, st, walletsSvc) processedblocksSvc := processedblocks.New(st) + processedincidentsSvc := processedincidents.New(st) transfersSvc := transfers.New(l, conf, st, walletsSvc, explorerProxySvc, blockchains, rmanager) webhooksSvc := webhooks.New(l, conf, st, transfersSvc, ownersSvc) upd, err := updater.NewService(ctx, l, conf) @@ -93,37 +97,39 @@ func New( return nil, err } return &service{ - clients: clientsSvc, - owners: ownersSvc, - processedBlocks: processedblocksSvc, - wallets: walletsSvc, - system: systemSvc, - webhooks: webhooksSvc, - eproxy: explorerProxySvc, - transfers: transfersSvc, - blockchains: blockchains, - madmin: madmin, - rmanager: rmanager, - upd: upd, + clients: clientsSvc, + owners: ownersSvc, + processedBlocks: processedblocksSvc, + processedIncidents: processedincidentsSvc, + wallets: walletsSvc, + system: systemSvc, + webhooks: webhooksSvc, + eproxy: explorerProxySvc, + transfers: transfersSvc, + blockchains: blockchains, + madmin: madmin, + rmanager: rmanager, + upd: upd, }, nil } -func (s *service) Clients() *clients.Service { return s.clients } -func (s *service) Owners() *owners.Service { return s.owners } -func (s *service) ProcessedBlocks() *processedblocks.Service { return s.processedBlocks } -func (s *service) Wallets() *wallets.Service { return s.wallets } -func (s *service) System() system.IService { return s.system } -func (s *service) Webhooks() *webhooks.Service { return s.webhooks } -func (s *service) Transfers() *transfers.Service { return s.transfers } -func (s *service) EProxy() *eproxy.Service { return s.eproxy } -func (s *service) Blockchains() *blockchains.Blockchains { return s.blockchains } -func (s *service) BTC() *btc.BTC { return s.blockchains.Bitcoin } -func (s *service) LTC() *ltc.LTC { return s.blockchains.Litecoin } -func (s *service) BCH() *bch.BCH { return s.blockchains.BitcoinCash } -func (s *service) Tron() *tron.Tron { return s.blockchains.Tron } -func (s *service) ETH() *evm.EVM { return s.blockchains.Ethereum } -func (s *service) BinanceSmartChain() *evm.EVM { return s.blockchains.BinanceSmartChain } -func (s *service) MAdmin() *madmin.Service { return s.madmin } -func (s *service) RManager() *rmanager.Service { return s.rmanager } -func (s *service) Updater() *updater.Service { return s.upd } -func (s *service) Doge() *doge.Doge { return s.blockchains.Dogecoin } +func (s *service) Clients() *clients.Service { return s.clients } +func (s *service) Owners() *owners.Service { return s.owners } +func (s *service) ProcessedBlocks() *processedblocks.Service { return s.processedBlocks } +func (s *service) ProcessedIncidents() *processedincidents.Service { return s.processedIncidents } +func (s *service) Wallets() *wallets.Service { return s.wallets } +func (s *service) System() system.IService { return s.system } +func (s *service) Webhooks() *webhooks.Service { return s.webhooks } +func (s *service) Transfers() *transfers.Service { return s.transfers } +func (s *service) EProxy() *eproxy.Service { return s.eproxy } +func (s *service) Blockchains() *blockchains.Blockchains { return s.blockchains } +func (s *service) BTC() *btc.BTC { return s.blockchains.Bitcoin } +func (s *service) LTC() *ltc.LTC { return s.blockchains.Litecoin } +func (s *service) BCH() *bch.BCH { return s.blockchains.BitcoinCash } +func (s *service) Tron() *tron.Tron { return s.blockchains.Tron } +func (s *service) ETH() *evm.EVM { return s.blockchains.Ethereum } +func (s *service) BinanceSmartChain() *evm.EVM { return s.blockchains.BinanceSmartChain } +func (s *service) MAdmin() *madmin.Service { return s.madmin } +func (s *service) RManager() *rmanager.Service { return s.rmanager } +func (s *service) Updater() *updater.Service { return s.upd } +func (s *service) Doge() *doge.Doge { return s.blockchains.Dogecoin } diff --git a/internal/services/processedblocks/methods.go b/internal/services/processedblocks/methods.go index da5f1e5..f790016 100644 --- a/internal/services/processedblocks/methods.go +++ b/internal/services/processedblocks/methods.go @@ -3,15 +3,18 @@ package processedblocks import ( "context" "errors" + "time" "github.com/dv-net/dv-processing/internal/store/repos" + "github.com/dv-net/dv-processing/internal/store/repos/repo_processed_blocks" "github.com/dv-net/dv-processing/internal/store/storecmn" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" ) // Create -func (s *Service) Create(ctx context.Context, blockchain wconstants.BlockchainType, number int64, opts ...repos.Option) error { +func (s *Service) Create(ctx context.Context, blockchain wconstants.BlockchainType, number int64, hash string, opts ...repos.Option) error { if blockchain == "" { return errBlockchainEmpty } @@ -20,9 +23,29 @@ func (s *Service) Create(ctx context.Context, blockchain wconstants.BlockchainTy return errNumberLessOrEqualToZero } - return s.store.ProcessedBlocks(opts...).Create(ctx, blockchain, number) + return s.store.ProcessedBlocks(opts...).Create(ctx, repo_processed_blocks.CreateParams{ + Blockchain: blockchain, + Number: number, + Hash: pgtype.Text{ + String: hash, + Valid: true, + }, + }) } +// // CreateWithHash +// func (s *Service) CreateWithHash(ctx context.Context, blockchain wconstants.BlockchainType, number int64, hash string, opts ...repos.Option) error { +// if blockchain == "" { +// return errBlockchainEmpty +// } + +// if number <= 0 { +// return errNumberLessOrEqualToZero +// } + +// return s.store.ProcessedBlocks(opts...).CreateWithHash(ctx, blockchain, number, hash) +// } + // UpdateNumber func (s *Service) UpdateNumber(ctx context.Context, blockchain wconstants.BlockchainType, number int64, opts ...repos.Option) error { if blockchain == "" { @@ -36,6 +59,26 @@ func (s *Service) UpdateNumber(ctx context.Context, blockchain wconstants.Blockc return s.store.ProcessedBlocks(opts...).UpdateNumber(ctx, blockchain, number) } +// UpdateNumberWithHash +func (s *Service) UpdateNumberWithHash(ctx context.Context, blockchain wconstants.BlockchainType, number int64, hash string, opts ...repos.Option) error { + if blockchain == "" { + return errBlockchainEmpty + } + + if number <= 0 { + return errNumberLessOrEqualToZero + } + + return s.store.ProcessedBlocks(opts...).UpdateNumberWithHash(ctx, repo_processed_blocks.UpdateNumberWithHashParams{ + Blockchain: blockchain, + Number: number, + Hash: pgtype.Text{ + String: hash, + Valid: true, + }, + }) +} + // LastBlockNumber func (s *Service) LastBlockNumber(ctx context.Context, blockchain wconstants.BlockchainType) (int64, error) { if blockchain == "" { @@ -52,3 +95,34 @@ func (s *Service) LastBlockNumber(ctx context.Context, blockchain wconstants.Blo return data, nil } + +// LastBlock returns the last block info including hash +func (s *Service) LastBlock(ctx context.Context, blockchain wconstants.BlockchainType) (*ProcessedBlock, error) { + if blockchain == "" { + return nil, errBlockchainEmpty + } + + data, err := s.store.ProcessedBlocks().LastBlock(ctx, blockchain) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, storecmn.ErrNotFound + } + return nil, err + } + + return &ProcessedBlock{ + Blockchain: data.Blockchain.String(), + Number: data.Number, + Hash: data.Hash.String, + CreatedAt: data.CreatedAt.Time, + UpdatedAt: &data.UpdatedAt.Time, + }, nil +} + +type ProcessedBlock struct { + Blockchain string `json:"blockchain"` + Number int64 `json:"number"` + Hash string `json:"hash"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt *time.Time `json:"updated_at"` +} diff --git a/internal/services/processedincidents/errors.go b/internal/services/processedincidents/errors.go new file mode 100644 index 0000000..84283f6 --- /dev/null +++ b/internal/services/processedincidents/errors.go @@ -0,0 +1,8 @@ +package processedincidents + +import "errors" + +var ( + errBlockchainEmpty = errors.New("blockchain is empty") + errIncidentIDEmpty = errors.New("incident ID is empty") +) diff --git a/internal/services/processedincidents/methods.go b/internal/services/processedincidents/methods.go new file mode 100644 index 0000000..f5ee485 --- /dev/null +++ b/internal/services/processedincidents/methods.go @@ -0,0 +1,103 @@ +package processedincidents + +import ( + "context" + "errors" + "fmt" + + "github.com/dv-net/dv-processing/internal/models" + "github.com/dv-net/dv-processing/internal/store/repos" + "github.com/dv-net/dv-processing/internal/store/repos/repo_processed_incidents" + "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" +) + +// IsProcessed checks if an incident has been processed +func (s *Service) IsProcessed(ctx context.Context, blockchain wconstants.BlockchainType, incidentID string, opts ...repos.Option) (bool, error) { + if blockchain == "" { + return false, errBlockchainEmpty + } + + if incidentID == "" { + return false, errIncidentIDEmpty + } + + exists, err := s.store.ProcessedIncidents(opts...).IsIncidentProcessed(ctx, blockchain, incidentID) + if err != nil { + return false, fmt.Errorf("check if incident processed: %w", err) + } + + return exists, nil +} + +// MarkAsProcessing marks an incident as being processed +func (s *Service) MarkAsProcessing(ctx context.Context, blockchain wconstants.BlockchainType, incidentID string, incidentType string, rollbackFromBlock, rollbackToBlock int64, opts ...repos.Option) error { + if blockchain == "" { + return errBlockchainEmpty + } + + if incidentID == "" { + return errIncidentIDEmpty + } + + return s.store.ProcessedIncidents(opts...).MarkIncidentAsProcessing(ctx, repo_processed_incidents.MarkIncidentAsProcessingParams{ + ID: incidentID, + Blockchain: blockchain, + IncidentType: incidentType, + RollbackFromBlock: pgtype.Int8{Int64: rollbackFromBlock, Valid: true}, + RollbackToBlock: pgtype.Int8{Int64: rollbackToBlock, Valid: true}, + }) +} + +// MarkAsCompleted marks an incident as successfully completed +func (s *Service) MarkAsCompleted(ctx context.Context, blockchain wconstants.BlockchainType, incidentID string, opts ...repos.Option) error { + if blockchain == "" { + return errBlockchainEmpty + } + + if incidentID == "" { + return errIncidentIDEmpty + } + + return s.store.ProcessedIncidents(opts...).MarkIncidentAsCompleted(ctx, blockchain, incidentID) +} + +// MarkAsFailed marks an incident as failed with an error message +func (s *Service) MarkAsFailed(ctx context.Context, blockchain wconstants.BlockchainType, incidentID string, errorMessage string, opts ...repos.Option) error { + if blockchain == "" { + return errBlockchainEmpty + } + + if incidentID == "" { + return errIncidentIDEmpty + } + + return s.store.ProcessedIncidents(opts...).MarkIncidentAsFailed(ctx, repo_processed_incidents.MarkIncidentAsFailedParams{ + Blockchain: blockchain, + ID: incidentID, + ErrorMessage: pgtype.Text{String: errorMessage, Valid: true}, + }) +} + +// GetIncompleteIncidents returns all incidents that are still being processed +func (s *Service) GetIncompleteIncidents(ctx context.Context, blockchain wconstants.BlockchainType, opts ...repos.Option) ([]*models.ProcessedIncident, error) { + if blockchain == "" { + return nil, errBlockchainEmpty + } + + incidents, err := s.store.ProcessedIncidents(opts...).GetIncompleteIncidents(ctx, blockchain) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return []*models.ProcessedIncident{}, nil + } + return nil, fmt.Errorf("get incomplete incidents: %w", err) + } + + return incidents, nil +} + +// CleanupOldIncidents removes old completed incidents (older than 30 days) +func (s *Service) CleanupOldIncidents(ctx context.Context, opts ...repos.Option) error { + return s.store.ProcessedIncidents(opts...).CleanupOldIncidents(ctx) +} diff --git a/internal/services/processedincidents/service.go b/internal/services/processedincidents/service.go new file mode 100644 index 0000000..45c20dd --- /dev/null +++ b/internal/services/processedincidents/service.go @@ -0,0 +1,13 @@ +package processedincidents + +import "github.com/dv-net/dv-processing/internal/store" + +type Service struct { + store store.IStore +} + +func New(st store.IStore) *Service { + return &Service{ + store: st, + } +} diff --git a/internal/store/repos/repo_clients/clients.sql.go b/internal/store/repos/repo_clients/clients.sql.go index 44ed3c0..1038303 100644 --- a/internal/store/repos/repo_clients/clients.sql.go +++ b/internal/store/repos/repo_clients/clients.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: clients.sql package repo_clients diff --git a/internal/store/repos/repo_clients/clients_gen.sql.go b/internal/store/repos/repo_clients/clients_gen.sql.go index fdd6559..3821746 100644 --- a/internal/store/repos/repo_clients/clients_gen.sql.go +++ b/internal/store/repos/repo_clients/clients_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: clients_gen.sql package repo_clients diff --git a/internal/store/repos/repo_clients/db.go b/internal/store/repos/repo_clients/db.go index eaee79c..349e823 100644 --- a/internal/store/repos/repo_clients/db.go +++ b/internal/store/repos/repo_clients/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_clients diff --git a/internal/store/repos/repo_clients/querier.go b/internal/store/repos/repo_clients/querier.go index 9a3e1ca..7fe5b52 100644 --- a/internal/store/repos/repo_clients/querier.go +++ b/internal/store/repos/repo_clients/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_clients diff --git a/internal/store/repos/repo_owners/db.go b/internal/store/repos/repo_owners/db.go index 908c2d0..55ccbda 100644 --- a/internal/store/repos/repo_owners/db.go +++ b/internal/store/repos/repo_owners/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_owners diff --git a/internal/store/repos/repo_owners/owners.sql.go b/internal/store/repos/repo_owners/owners.sql.go index 61a6098..1c7235b 100644 --- a/internal/store/repos/repo_owners/owners.sql.go +++ b/internal/store/repos/repo_owners/owners.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: owners.sql package repo_owners diff --git a/internal/store/repos/repo_owners/owners_gen.sql.go b/internal/store/repos/repo_owners/owners_gen.sql.go index ca68010..9113d86 100644 --- a/internal/store/repos/repo_owners/owners_gen.sql.go +++ b/internal/store/repos/repo_owners/owners_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: owners_gen.sql package repo_owners diff --git a/internal/store/repos/repo_owners/querier.go b/internal/store/repos/repo_owners/querier.go index 9f1e56f..41bac51 100644 --- a/internal/store/repos/repo_owners/querier.go +++ b/internal/store/repos/repo_owners/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_owners diff --git a/internal/store/repos/repo_processed_blocks/db.go b/internal/store/repos/repo_processed_blocks/db.go index 62ee3a5..1688f87 100644 --- a/internal/store/repos/repo_processed_blocks/db.go +++ b/internal/store/repos/repo_processed_blocks/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_processed_blocks diff --git a/internal/store/repos/repo_processed_blocks/processed.sql.go b/internal/store/repos/repo_processed_blocks/processed.sql.go index 1bd3ffe..2de74b1 100644 --- a/internal/store/repos/repo_processed_blocks/processed.sql.go +++ b/internal/store/repos/repo_processed_blocks/processed.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: processed.sql package repo_processed_blocks @@ -8,11 +8,30 @@ package repo_processed_blocks import ( "context" + "github.com/dv-net/dv-processing/internal/models" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" + "github.com/jackc/pgx/v5/pgtype" ) +const lastBlock = `-- name: LastBlock :one +SELECT blockchain, number, created_at, updated_at, hash FROM processed_blocks pb WHERE pb.blockchain = $1 LIMIT 1 +` + +func (q *Queries) LastBlock(ctx context.Context, blockchain wconstants.BlockchainType) (*models.ProcessedBlock, error) { + row := q.db.QueryRow(ctx, lastBlock, blockchain) + var i models.ProcessedBlock + err := row.Scan( + &i.Blockchain, + &i.Number, + &i.CreatedAt, + &i.UpdatedAt, + &i.Hash, + ) + return &i, err +} + const lastBlockNumber = `-- name: LastBlockNumber :one -select "number" from processed_blocks pb where pb.blockchain = $1 limit 1 +SELECT "number" FROM processed_blocks pb WHERE pb.blockchain = $1 LIMIT 1 ` func (q *Queries) LastBlockNumber(ctx context.Context, blockchain wconstants.BlockchainType) (int64, error) { @@ -30,3 +49,18 @@ func (q *Queries) UpdateNumber(ctx context.Context, blockchain wconstants.Blockc _, err := q.db.Exec(ctx, updateNumber, blockchain, number) return err } + +const updateNumberWithHash = `-- name: UpdateNumberWithHash :exec +UPDATE processed_blocks SET "number" = $2, hash = $3, updated_at=now() WHERE blockchain = $1 +` + +type UpdateNumberWithHashParams struct { + Blockchain wconstants.BlockchainType `db:"blockchain" json:"blockchain"` + Number int64 `db:"number" json:"number"` + Hash pgtype.Text `db:"hash" json:"hash"` +} + +func (q *Queries) UpdateNumberWithHash(ctx context.Context, arg UpdateNumberWithHashParams) error { + _, err := q.db.Exec(ctx, updateNumberWithHash, arg.Blockchain, arg.Number, arg.Hash) + return err +} diff --git a/internal/store/repos/repo_processed_blocks/processed_blocks_gen.sql.go b/internal/store/repos/repo_processed_blocks/processed_blocks_gen.sql.go index 53225f0..1603564 100644 --- a/internal/store/repos/repo_processed_blocks/processed_blocks_gen.sql.go +++ b/internal/store/repos/repo_processed_blocks/processed_blocks_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: processed_blocks_gen.sql package repo_processed_blocks @@ -9,14 +9,21 @@ import ( "context" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" + "github.com/jackc/pgx/v5/pgtype" ) const create = `-- name: Create :exec -INSERT INTO processed_blocks (blockchain, number, created_at) - VALUES ($1, $2, now()) +INSERT INTO processed_blocks (blockchain, number, created_at, hash) + VALUES ($1, $2, now(), $3) ` -func (q *Queries) Create(ctx context.Context, blockchain wconstants.BlockchainType, number int64) error { - _, err := q.db.Exec(ctx, create, blockchain, number) +type CreateParams struct { + Blockchain wconstants.BlockchainType `db:"blockchain" json:"blockchain"` + Number int64 `db:"number" json:"number"` + Hash pgtype.Text `db:"hash" json:"hash"` +} + +func (q *Queries) Create(ctx context.Context, arg CreateParams) error { + _, err := q.db.Exec(ctx, create, arg.Blockchain, arg.Number, arg.Hash) return err } diff --git a/internal/store/repos/repo_processed_blocks/querier.go b/internal/store/repos/repo_processed_blocks/querier.go index 2a8a8ed..c0333b6 100644 --- a/internal/store/repos/repo_processed_blocks/querier.go +++ b/internal/store/repos/repo_processed_blocks/querier.go @@ -1,19 +1,22 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_processed_blocks import ( "context" + "github.com/dv-net/dv-processing/internal/models" "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" ) type Querier interface { - Create(ctx context.Context, blockchain wconstants.BlockchainType, number int64) error + Create(ctx context.Context, arg CreateParams) error + LastBlock(ctx context.Context, blockchain wconstants.BlockchainType) (*models.ProcessedBlock, error) LastBlockNumber(ctx context.Context, blockchain wconstants.BlockchainType) (int64, error) UpdateNumber(ctx context.Context, blockchain wconstants.BlockchainType, number int64) error + UpdateNumberWithHash(ctx context.Context, arg UpdateNumberWithHashParams) error } var _ Querier = (*Queries)(nil) diff --git a/internal/store/repos/repo_processed_incidents/db.go b/internal/store/repos/repo_processed_incidents/db.go new file mode 100644 index 0000000..2f759e5 --- /dev/null +++ b/internal/store/repos/repo_processed_incidents/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package repo_processed_incidents + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/store/repos/repo_processed_incidents/processed_incidents.sql.go b/internal/store/repos/repo_processed_incidents/processed_incidents.sql.go new file mode 100644 index 0000000..11a8576 --- /dev/null +++ b/internal/store/repos/repo_processed_incidents/processed_incidents.sql.go @@ -0,0 +1,146 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: processed_incidents.sql + +package repo_processed_incidents + +import ( + "context" + + "github.com/dv-net/dv-processing/internal/models" + "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" + "github.com/jackc/pgx/v5/pgtype" +) + +const cleanupOldIncidents = `-- name: CleanupOldIncidents :exec +DELETE FROM processed_incidents +WHERE created_at < NOW() - INTERVAL '30 days' + AND status = 'completed' +` + +func (q *Queries) CleanupOldIncidents(ctx context.Context) error { + _, err := q.db.Exec(ctx, cleanupOldIncidents) + return err +} + +const getIncompleteIncidents = `-- name: GetIncompleteIncidents :many +SELECT id, blockchain, incident_type, status, rollback_from_block, rollback_to_block, error_message, started_at, completed_at, created_at +FROM processed_incidents +WHERE blockchain = $1 + AND status = 'processing' +ORDER BY started_at ASC +` + +func (q *Queries) GetIncompleteIncidents(ctx context.Context, blockchain wconstants.BlockchainType) ([]*models.ProcessedIncident, error) { + rows, err := q.db.Query(ctx, getIncompleteIncidents, blockchain) + if err != nil { + return nil, err + } + defer rows.Close() + items := []*models.ProcessedIncident{} + for rows.Next() { + var i models.ProcessedIncident + if err := rows.Scan( + &i.ID, + &i.Blockchain, + &i.IncidentType, + &i.Status, + &i.RollbackFromBlock, + &i.RollbackToBlock, + &i.ErrorMessage, + &i.StartedAt, + &i.CompletedAt, + &i.CreatedAt, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const isIncidentProcessed = `-- name: IsIncidentProcessed :one +SELECT EXISTS( + SELECT 1 + FROM processed_incidents + WHERE blockchain = $1 AND id = $2 +) AS exists +` + +func (q *Queries) IsIncidentProcessed(ctx context.Context, blockchain wconstants.BlockchainType, iD string) (bool, error) { + row := q.db.QueryRow(ctx, isIncidentProcessed, blockchain, iD) + var exists bool + err := row.Scan(&exists) + return exists, err +} + +const markIncidentAsCompleted = `-- name: MarkIncidentAsCompleted :exec +UPDATE processed_incidents +SET status = 'completed', + completed_at = NOW(), + error_message = NULL +WHERE blockchain = $1 AND id = $2 +` + +func (q *Queries) MarkIncidentAsCompleted(ctx context.Context, blockchain wconstants.BlockchainType, iD string) error { + _, err := q.db.Exec(ctx, markIncidentAsCompleted, blockchain, iD) + return err +} + +const markIncidentAsFailed = `-- name: MarkIncidentAsFailed :exec +UPDATE processed_incidents +SET status = 'failed', + completed_at = NOW(), + error_message = $3 +WHERE blockchain = $1 AND id = $2 +` + +type MarkIncidentAsFailedParams struct { + Blockchain wconstants.BlockchainType `db:"blockchain" json:"blockchain"` + ID string `db:"id" json:"id"` + ErrorMessage pgtype.Text `db:"error_message" json:"error_message"` +} + +func (q *Queries) MarkIncidentAsFailed(ctx context.Context, arg MarkIncidentAsFailedParams) error { + _, err := q.db.Exec(ctx, markIncidentAsFailed, arg.Blockchain, arg.ID, arg.ErrorMessage) + return err +} + +const markIncidentAsProcessing = `-- name: MarkIncidentAsProcessing :exec +INSERT INTO processed_incidents ( + id, + blockchain, + incident_type, + status, + rollback_from_block, + rollback_to_block, + started_at +) +VALUES ($1, $2, $3, 'processing', $4, $5, NOW()) +ON CONFLICT (id) DO UPDATE +SET status = 'processing', + started_at = NOW() +` + +type MarkIncidentAsProcessingParams struct { + ID string `db:"id" json:"id"` + Blockchain wconstants.BlockchainType `db:"blockchain" json:"blockchain"` + IncidentType string `db:"incident_type" json:"incident_type"` + RollbackFromBlock pgtype.Int8 `db:"rollback_from_block" json:"rollback_from_block"` + RollbackToBlock pgtype.Int8 `db:"rollback_to_block" json:"rollback_to_block"` +} + +func (q *Queries) MarkIncidentAsProcessing(ctx context.Context, arg MarkIncidentAsProcessingParams) error { + _, err := q.db.Exec(ctx, markIncidentAsProcessing, + arg.ID, + arg.Blockchain, + arg.IncidentType, + arg.RollbackFromBlock, + arg.RollbackToBlock, + ) + return err +} diff --git a/internal/store/repos/repo_processed_incidents/querier.go b/internal/store/repos/repo_processed_incidents/querier.go new file mode 100644 index 0000000..b224522 --- /dev/null +++ b/internal/store/repos/repo_processed_incidents/querier.go @@ -0,0 +1,23 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package repo_processed_incidents + +import ( + "context" + + "github.com/dv-net/dv-processing/internal/models" + "github.com/dv-net/dv-processing/pkg/walletsdk/wconstants" +) + +type Querier interface { + CleanupOldIncidents(ctx context.Context) error + GetIncompleteIncidents(ctx context.Context, blockchain wconstants.BlockchainType) ([]*models.ProcessedIncident, error) + IsIncidentProcessed(ctx context.Context, blockchain wconstants.BlockchainType, iD string) (bool, error) + MarkIncidentAsCompleted(ctx context.Context, blockchain wconstants.BlockchainType, iD string) error + MarkIncidentAsFailed(ctx context.Context, arg MarkIncidentAsFailedParams) error + MarkIncidentAsProcessing(ctx context.Context, arg MarkIncidentAsProcessingParams) error +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/store/repos/repo_settings/db.go b/internal/store/repos/repo_settings/db.go index de3c598..e7d315a 100644 --- a/internal/store/repos/repo_settings/db.go +++ b/internal/store/repos/repo_settings/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_settings diff --git a/internal/store/repos/repo_settings/querier.go b/internal/store/repos/repo_settings/querier.go index 6e91d8d..2272269 100644 --- a/internal/store/repos/repo_settings/querier.go +++ b/internal/store/repos/repo_settings/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_settings diff --git a/internal/store/repos/repo_settings/settings.sql.go b/internal/store/repos/repo_settings/settings.sql.go index a5414cd..e833a9e 100644 --- a/internal/store/repos/repo_settings/settings.sql.go +++ b/internal/store/repos/repo_settings/settings.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: settings.sql package repo_settings diff --git a/internal/store/repos/repo_settings/settings_gen.sql.go b/internal/store/repos/repo_settings/settings_gen.sql.go index c345133..9c68a69 100644 --- a/internal/store/repos/repo_settings/settings_gen.sql.go +++ b/internal/store/repos/repo_settings/settings_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: settings_gen.sql package repo_settings diff --git a/internal/store/repos/repo_transfer_transactions/db.go b/internal/store/repos/repo_transfer_transactions/db.go index bf85aeb..fd498b5 100644 --- a/internal/store/repos/repo_transfer_transactions/db.go +++ b/internal/store/repos/repo_transfer_transactions/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_transfer_transactions diff --git a/internal/store/repos/repo_transfer_transactions/querier.go b/internal/store/repos/repo_transfer_transactions/querier.go index 90e94e6..54d8e8a 100644 --- a/internal/store/repos/repo_transfer_transactions/querier.go +++ b/internal/store/repos/repo_transfer_transactions/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_transfer_transactions diff --git a/internal/store/repos/repo_transfer_transactions/transfer_transactions.sql.go b/internal/store/repos/repo_transfer_transactions/transfer_transactions.sql.go index b7fe974..f820d38 100644 --- a/internal/store/repos/repo_transfer_transactions/transfer_transactions.sql.go +++ b/internal/store/repos/repo_transfer_transactions/transfer_transactions.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: transfer_transactions.sql package repo_transfer_transactions diff --git a/internal/store/repos/repo_transfer_transactions/transfer_transactions_gen.sql.go b/internal/store/repos/repo_transfer_transactions/transfer_transactions_gen.sql.go index e78dd93..f5dba52 100644 --- a/internal/store/repos/repo_transfer_transactions/transfer_transactions_gen.sql.go +++ b/internal/store/repos/repo_transfer_transactions/transfer_transactions_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: transfer_transactions_gen.sql package repo_transfer_transactions diff --git a/internal/store/repos/repo_transfers/constants_gen.go b/internal/store/repos/repo_transfers/constants_gen.go index 6071690..99bac06 100755 --- a/internal/store/repos/repo_transfers/constants_gen.go +++ b/internal/store/repos/repo_transfers/constants_gen.go @@ -1,7 +1,7 @@ // Code generated by pgxgen. DO NOT EDIT. // versions: // -// pgxgen v0.3.10 +// pgxgen v0.3.12 package repo_transfers import ( diff --git a/internal/store/repos/repo_transfers/db.go b/internal/store/repos/repo_transfers/db.go index 8bdfaeb..2d17410 100644 --- a/internal/store/repos/repo_transfers/db.go +++ b/internal/store/repos/repo_transfers/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_transfers diff --git a/internal/store/repos/repo_transfers/querier.go b/internal/store/repos/repo_transfers/querier.go index 41465be..0672fdf 100644 --- a/internal/store/repos/repo_transfers/querier.go +++ b/internal/store/repos/repo_transfers/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_transfers diff --git a/internal/store/repos/repo_transfers/transfers.sql.go b/internal/store/repos/repo_transfers/transfers.sql.go index ee8f7a0..efd654e 100644 --- a/internal/store/repos/repo_transfers/transfers.sql.go +++ b/internal/store/repos/repo_transfers/transfers.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: transfers.sql package repo_transfers diff --git a/internal/store/repos/repo_transfers/transfers_gen.sql.go b/internal/store/repos/repo_transfers/transfers_gen.sql.go index afcf4e1..de4e19f 100644 --- a/internal/store/repos/repo_transfers/transfers_gen.sql.go +++ b/internal/store/repos/repo_transfers/transfers_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: transfers_gen.sql package repo_transfers diff --git a/internal/store/repos/repo_wallets/db.go b/internal/store/repos/repo_wallets/db.go index 5f1b097..ac442a9 100644 --- a/internal/store/repos/repo_wallets/db.go +++ b/internal/store/repos/repo_wallets/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets diff --git a/internal/store/repos/repo_wallets/querier.go b/internal/store/repos/repo_wallets/querier.go index 2491d3f..bf505d3 100644 --- a/internal/store/repos/repo_wallets/querier.go +++ b/internal/store/repos/repo_wallets/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets diff --git a/internal/store/repos/repo_wallets/wallets.sql.go b/internal/store/repos/repo_wallets/wallets.sql.go index 58515d1..2c39220 100644 --- a/internal/store/repos/repo_wallets/wallets.sql.go +++ b/internal/store/repos/repo_wallets/wallets.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: wallets.sql package repo_wallets diff --git a/internal/store/repos/repo_wallets_cold/cold_wallets.sql.go b/internal/store/repos/repo_wallets_cold/cold_wallets.sql.go index f54d3e2..44ce73d 100644 --- a/internal/store/repos/repo_wallets_cold/cold_wallets.sql.go +++ b/internal/store/repos/repo_wallets_cold/cold_wallets.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: cold_wallets.sql package repo_wallets_cold diff --git a/internal/store/repos/repo_wallets_cold/cold_wallets_gen.sql.go b/internal/store/repos/repo_wallets_cold/cold_wallets_gen.sql.go index 5baa689..04382ec 100644 --- a/internal/store/repos/repo_wallets_cold/cold_wallets_gen.sql.go +++ b/internal/store/repos/repo_wallets_cold/cold_wallets_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: cold_wallets_gen.sql package repo_wallets_cold diff --git a/internal/store/repos/repo_wallets_cold/constants_gen.go b/internal/store/repos/repo_wallets_cold/constants_gen.go index 160f7cd..e815d4e 100755 --- a/internal/store/repos/repo_wallets_cold/constants_gen.go +++ b/internal/store/repos/repo_wallets_cold/constants_gen.go @@ -1,7 +1,7 @@ // Code generated by pgxgen. DO NOT EDIT. // versions: // -// pgxgen v0.3.10 +// pgxgen v0.3.12 package repo_wallets_cold import ( diff --git a/internal/store/repos/repo_wallets_cold/db.go b/internal/store/repos/repo_wallets_cold/db.go index 69f86fa..bd36402 100644 --- a/internal/store/repos/repo_wallets_cold/db.go +++ b/internal/store/repos/repo_wallets_cold/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_cold diff --git a/internal/store/repos/repo_wallets_cold/querier.go b/internal/store/repos/repo_wallets_cold/querier.go index e1d506a..5495c52 100644 --- a/internal/store/repos/repo_wallets_cold/querier.go +++ b/internal/store/repos/repo_wallets_cold/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_cold diff --git a/internal/store/repos/repo_wallets_hot/constants_gen.go b/internal/store/repos/repo_wallets_hot/constants_gen.go index 061bf01..63b548f 100755 --- a/internal/store/repos/repo_wallets_hot/constants_gen.go +++ b/internal/store/repos/repo_wallets_hot/constants_gen.go @@ -1,7 +1,7 @@ // Code generated by pgxgen. DO NOT EDIT. // versions: // -// pgxgen v0.3.10 +// pgxgen v0.3.12 package repo_wallets_hot import ( diff --git a/internal/store/repos/repo_wallets_hot/db.go b/internal/store/repos/repo_wallets_hot/db.go index d1469bf..418451b 100644 --- a/internal/store/repos/repo_wallets_hot/db.go +++ b/internal/store/repos/repo_wallets_hot/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_hot diff --git a/internal/store/repos/repo_wallets_hot/hot_wallets.sql.go b/internal/store/repos/repo_wallets_hot/hot_wallets.sql.go index 937e62f..4c415bc 100644 --- a/internal/store/repos/repo_wallets_hot/hot_wallets.sql.go +++ b/internal/store/repos/repo_wallets_hot/hot_wallets.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: hot_wallets.sql package repo_wallets_hot diff --git a/internal/store/repos/repo_wallets_hot/hot_wallets_gen.sql.go b/internal/store/repos/repo_wallets_hot/hot_wallets_gen.sql.go index 7e6960b..a351790 100644 --- a/internal/store/repos/repo_wallets_hot/hot_wallets_gen.sql.go +++ b/internal/store/repos/repo_wallets_hot/hot_wallets_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: hot_wallets_gen.sql package repo_wallets_hot diff --git a/internal/store/repos/repo_wallets_hot/querier.go b/internal/store/repos/repo_wallets_hot/querier.go index c92a915..6cd985a 100644 --- a/internal/store/repos/repo_wallets_hot/querier.go +++ b/internal/store/repos/repo_wallets_hot/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_hot diff --git a/internal/store/repos/repo_wallets_processing/constants_gen.go b/internal/store/repos/repo_wallets_processing/constants_gen.go index d78d54f..89d25d2 100755 --- a/internal/store/repos/repo_wallets_processing/constants_gen.go +++ b/internal/store/repos/repo_wallets_processing/constants_gen.go @@ -1,7 +1,7 @@ // Code generated by pgxgen. DO NOT EDIT. // versions: // -// pgxgen v0.3.10 +// pgxgen v0.3.12 package repo_wallets_processing import ( diff --git a/internal/store/repos/repo_wallets_processing/db.go b/internal/store/repos/repo_wallets_processing/db.go index 7b69b00..d86d980 100644 --- a/internal/store/repos/repo_wallets_processing/db.go +++ b/internal/store/repos/repo_wallets_processing/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_processing diff --git a/internal/store/repos/repo_wallets_processing/processing_wallets.sql.go b/internal/store/repos/repo_wallets_processing/processing_wallets.sql.go index 47abf7b..6b99b66 100644 --- a/internal/store/repos/repo_wallets_processing/processing_wallets.sql.go +++ b/internal/store/repos/repo_wallets_processing/processing_wallets.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: processing_wallets.sql package repo_wallets_processing diff --git a/internal/store/repos/repo_wallets_processing/processing_wallets_gen.sql.go b/internal/store/repos/repo_wallets_processing/processing_wallets_gen.sql.go index d0d105d..e10e791 100644 --- a/internal/store/repos/repo_wallets_processing/processing_wallets_gen.sql.go +++ b/internal/store/repos/repo_wallets_processing/processing_wallets_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: processing_wallets_gen.sql package repo_wallets_processing diff --git a/internal/store/repos/repo_wallets_processing/querier.go b/internal/store/repos/repo_wallets_processing/querier.go index 52d5172..e0db674 100644 --- a/internal/store/repos/repo_wallets_processing/querier.go +++ b/internal/store/repos/repo_wallets_processing/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_wallets_processing diff --git a/internal/store/repos/repo_webhooks/batch.go b/internal/store/repos/repo_webhooks/batch.go index b1fd47a..7894f38 100644 --- a/internal/store/repos/repo_webhooks/batch.go +++ b/internal/store/repos/repo_webhooks/batch.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: batch.go package repo_webhooks diff --git a/internal/store/repos/repo_webhooks/db.go b/internal/store/repos/repo_webhooks/db.go index 0a3fde0..4a99d65 100644 --- a/internal/store/repos/repo_webhooks/db.go +++ b/internal/store/repos/repo_webhooks/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_webhooks diff --git a/internal/store/repos/repo_webhooks/querier.go b/internal/store/repos/repo_webhooks/querier.go index 1631ad8..8a0bfed 100644 --- a/internal/store/repos/repo_webhooks/querier.go +++ b/internal/store/repos/repo_webhooks/querier.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 package repo_webhooks diff --git a/internal/store/repos/repo_webhooks/webhooks.sql.go b/internal/store/repos/repo_webhooks/webhooks.sql.go index 9e1ff0f..69015d0 100644 --- a/internal/store/repos/repo_webhooks/webhooks.sql.go +++ b/internal/store/repos/repo_webhooks/webhooks.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: webhooks.sql package repo_webhooks diff --git a/internal/store/repos/repo_webhooks/webhooks_gen.sql.go b/internal/store/repos/repo_webhooks/webhooks_gen.sql.go index 7ae11d8..7810665 100644 --- a/internal/store/repos/repo_webhooks/webhooks_gen.sql.go +++ b/internal/store/repos/repo_webhooks/webhooks_gen.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.29.0 +// sqlc v1.30.0 // source: webhooks_gen.sql package repo_webhooks diff --git a/internal/store/repos/repos.go b/internal/store/repos/repos.go index 9fd5c97..ef1788d 100644 --- a/internal/store/repos/repos.go +++ b/internal/store/repos/repos.go @@ -4,6 +4,7 @@ import ( "github.com/dv-net/dv-processing/internal/store/repos/repo_clients" "github.com/dv-net/dv-processing/internal/store/repos/repo_owners" "github.com/dv-net/dv-processing/internal/store/repos/repo_processed_blocks" + "github.com/dv-net/dv-processing/internal/store/repos/repo_processed_incidents" "github.com/dv-net/dv-processing/internal/store/repos/repo_settings" "github.com/dv-net/dv-processing/internal/store/repos/repo_system" "github.com/dv-net/dv-processing/internal/store/repos/repo_transfer_transactions" @@ -14,6 +15,7 @@ import ( type IRepos interface { ProcessedBlocks(opts ...Option) repo_processed_blocks.Querier + ProcessedIncidents(opts ...Option) repo_processed_incidents.Querier Clients(opts ...Option) repo_clients.Querier Owners(opts ...Option) repo_owners.Querier Transfers(opts ...Option) repo_transfers.ICustomQuerier @@ -26,6 +28,7 @@ type IRepos interface { type repos struct { processedBlocks *repo_processed_blocks.Queries + processedIncidents *repo_processed_incidents.Queries clients *repo_clients.Queries owners *repo_owners.Queries transfers *repo_transfers.CustomQuerier @@ -39,6 +42,7 @@ type repos struct { func New(psql *postgres.Postgres) IRepos { return &repos{ processedBlocks: repo_processed_blocks.New(psql.DB), + processedIncidents: repo_processed_incidents.New(psql.DB), clients: repo_clients.New(psql.DB), owners: repo_owners.New(psql.DB), transfers: repo_transfers.NewCustom(psql.DB), @@ -61,6 +65,17 @@ func (s *repos) ProcessedBlocks(opts ...Option) repo_processed_blocks.Querier { return s.processedBlocks } +// ProcessedIncidents +func (s *repos) ProcessedIncidents(opts ...Option) repo_processed_incidents.Querier { + options := parseOptions(opts...) + + if options.Tx != nil { + return s.processedIncidents.WithTx(options.Tx) + } + + return s.processedIncidents +} + // Clients func (s *repos) Clients(opts ...Option) repo_clients.Querier { options := parseOptions(opts...) diff --git a/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.down.sql b/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.down.sql new file mode 100644 index 0000000..0aed71a --- /dev/null +++ b/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.down.sql @@ -0,0 +1 @@ +ALTER TABLE processed_blocks DROP COLUMN IF EXISTS hash; \ No newline at end of file diff --git a/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.up.sql b/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.up.sql new file mode 100644 index 0000000..8813304 --- /dev/null +++ b/sql/postgres/migrations/20251003112151_add_processed_block_hash_column.up.sql @@ -0,0 +1 @@ +ALTER TABLE processed_blocks ADD COLUMN IF NOT EXISTS hash VARCHAR(255) NULL; \ No newline at end of file diff --git a/sql/postgres/migrations/20251016090413_create_processed_incidents_table.down.sql b/sql/postgres/migrations/20251016090413_create_processed_incidents_table.down.sql new file mode 100644 index 0000000..9b35e50 --- /dev/null +++ b/sql/postgres/migrations/20251016090413_create_processed_incidents_table.down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS idx_processed_incidents_status; +DROP INDEX IF EXISTS idx_processed_incidents_created_at; +DROP INDEX IF EXISTS idx_processed_incidents_blockchain; +DROP TABLE IF EXISTS processed_incidents; diff --git a/sql/postgres/migrations/20251016090413_create_processed_incidents_table.up.sql b/sql/postgres/migrations/20251016090413_create_processed_incidents_table.up.sql new file mode 100644 index 0000000..d0f96b3 --- /dev/null +++ b/sql/postgres/migrations/20251016090413_create_processed_incidents_table.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS processed_incidents ( + id VARCHAR(255) NOT NULL PRIMARY KEY, + blockchain VARCHAR(50) NOT NULL, + incident_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'completed', + rollback_from_block BIGINT, + rollback_to_block BIGINT, + error_message TEXT, + started_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_processed_incidents_blockchain ON processed_incidents(blockchain); +CREATE INDEX IF NOT EXISTS idx_processed_incidents_created_at ON processed_incidents(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_processed_incidents_status ON processed_incidents(blockchain, status); diff --git a/sql/postgres/queries/processed_blocks/processed.sql b/sql/postgres/queries/processed_blocks/processed.sql index f1c2bf0..e13811c 100644 --- a/sql/postgres/queries/processed_blocks/processed.sql +++ b/sql/postgres/queries/processed_blocks/processed.sql @@ -1,5 +1,11 @@ -- name: LastBlockNumber :one -select "number" from processed_blocks pb where pb.blockchain = $1 limit 1; +SELECT "number" FROM processed_blocks pb WHERE pb.blockchain = $1 LIMIT 1; + +-- name: LastBlock :one +SELECT * FROM processed_blocks pb WHERE pb.blockchain = $1 LIMIT 1; -- name: UpdateNumber :exec UPDATE processed_blocks SET "number" = $2, updated_at=now() WHERE blockchain = $1; + +-- name: UpdateNumberWithHash :exec +UPDATE processed_blocks SET "number" = $2, hash = $3, updated_at=now() WHERE blockchain = $1; diff --git a/sql/postgres/queries/processed_blocks/processed_blocks_gen.sql b/sql/postgres/queries/processed_blocks/processed_blocks_gen.sql index d983d87..887996e 100755 --- a/sql/postgres/queries/processed_blocks/processed_blocks_gen.sql +++ b/sql/postgres/queries/processed_blocks/processed_blocks_gen.sql @@ -1,4 +1,4 @@ -- name: Create :exec -INSERT INTO processed_blocks (blockchain, number, created_at) - VALUES ($1, $2, now()); +INSERT INTO processed_blocks (blockchain, number, created_at, hash) + VALUES ($1, $2, now(), $3); diff --git a/sql/postgres/queries/processed_incidents/processed_incidents.sql b/sql/postgres/queries/processed_incidents/processed_incidents.sql new file mode 100644 index 0000000..c4d1f9e --- /dev/null +++ b/sql/postgres/queries/processed_incidents/processed_incidents.sql @@ -0,0 +1,47 @@ +-- name: IsIncidentProcessed :one +SELECT EXISTS( + SELECT 1 + FROM processed_incidents + WHERE blockchain = $1 AND id = $2 +) AS exists; + +-- name: MarkIncidentAsProcessing :exec +INSERT INTO processed_incidents ( + id, + blockchain, + incident_type, + status, + rollback_from_block, + rollback_to_block, + started_at +) +VALUES ($1, $2, $3, 'processing', $4, $5, NOW()) +ON CONFLICT (id) DO UPDATE +SET status = 'processing', + started_at = NOW(); + +-- name: MarkIncidentAsCompleted :exec +UPDATE processed_incidents +SET status = 'completed', + completed_at = NOW(), + error_message = NULL +WHERE blockchain = $1 AND id = $2; + +-- name: MarkIncidentAsFailed :exec +UPDATE processed_incidents +SET status = 'failed', + completed_at = NOW(), + error_message = $3 +WHERE blockchain = $1 AND id = $2; + +-- name: GetIncompleteIncidents :many +SELECT * +FROM processed_incidents +WHERE blockchain = $1 + AND status = 'processing' +ORDER BY started_at ASC; + +-- name: CleanupOldIncidents :exec +DELETE FROM processed_incidents +WHERE created_at < NOW() - INTERVAL '30 days' + AND status = 'completed'; diff --git a/sqlc.yaml b/sqlc.yaml index 2a310e0..7a6a22f 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -103,6 +103,12 @@ overrides: import: github.com/dv-net/dv-processing/pkg/walletsdk/wconstants type: BlockchainType + # Processed incidents + - column: processed_incidents.blockchain + go_type: + import: github.com/dv-net/dv-processing/pkg/walletsdk/wconstants + type: BlockchainType + # Webhooks - column: webhooks.status go_type: @@ -349,3 +355,24 @@ sql: emit_enum_valid_method: true emit_all_enum_values: true query_parameter_limit: 3 + + # processed_incidents + - schema: sql/postgres/migrations + queries: sql/postgres/queries/processed_incidents + engine: postgresql + gen: + go: + sql_package: pgx/v5 + out: internal/store/repos/repo_processed_incidents + emit_prepared_queries: false + emit_json_tags: true + emit_exported_queries: false + emit_db_tags: true + emit_interface: true + emit_exact_table_names: false + emit_empty_slices: true + emit_result_struct_pointers: true + emit_params_struct_pointers: false + emit_enum_valid_method: true + emit_all_enum_values: true + query_parameter_limit: 2