Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 62 additions & 24 deletions walker/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ type UndoForkBlocks struct {

// Configuraton for WalkTheDoge.
type WalkerOptions struct {
Chain *doge.ChainParams // chain parameters, e.g. doge.DogeMainNetChain
LastProcessedBlock string // last processed block hash to begin walking from (hex)
Client spec.Blockchain // from NewCoreRPCClient()
TipChanged chan string // from TipChaser()
FullUndoBlocks bool // fully decode blocks in UndoForkBlocks (or just hash and height)
BufferBlocks int // number of blocks to decode ahead of the consumer (channel size, default 10)
Chain *doge.ChainParams // chain parameters, e.g. doge.DogeMainNetChain
LastProcessedBlock string // last processed block hash to begin walking from (hex)
Client spec.Blockchain // from NewCoreRPCClient()
TipChanged chan string // from TipChaser()
FullUndoBlocks bool // fully decode blocks in UndoForkBlocks (or just hash and height)
BufferBlocks int // number of blocks to decode ahead of the consumer (channel size, default 10)
MaxRollbackDepth int64 // maximum rollback depth before waiting for re-index (0 = unlimited)
MaxRollbackWaitDuration time.Duration // wait duration before retrying after deep reorg (0 = return immediately)
}

/*
Expand Down Expand Up @@ -93,13 +95,15 @@ func WalkTheDoge(opts WalkerOptions) (service governor.Service, blocks chan Bloc
}
c := dogeWalker{
// The larger this channel is, the more blocks we can decode-ahead.
output: make(chan BlockOrUndo, chanSize),
client: opts.Client,
chain: opts.Chain,
tipChanged: opts.TipChanged,
fullUndoBlocks: opts.FullUndoBlocks,
lastProcessed: opts.LastProcessedBlock,
blockInterval: POLL_INTERVAL,
output: make(chan BlockOrUndo, chanSize),
client: opts.Client,
chain: opts.Chain,
tipChanged: opts.TipChanged,
fullUndoBlocks: opts.FullUndoBlocks,
lastProcessed: opts.LastProcessedBlock,
blockInterval: POLL_INTERVAL,
maxRollback: opts.MaxRollbackDepth,
maxRollbackWait: opts.MaxRollbackWaitDuration,
}
if opts.TipChanged != nil {
// We will receive tipChanged notifications: use a longer polling timer
Expand All @@ -112,15 +116,17 @@ func WalkTheDoge(opts WalkerOptions) (service governor.Service, blocks chan Bloc
// dogeWalker is the internal state.
type dogeWalker struct {
governor.ServiceCtx
output chan BlockOrUndo
client spec.Blockchain
chain *doge.ChainParams
tipChanged chan string // receive from TipChaser.
stop <-chan struct{} // ctx.Done() channel.
fullUndoBlocks bool // fully decode blocks in UndoForkBlocks
lastProcessed string // last processed block hash to begin walking from (hex)
blockInterval time.Duration // interval for polling blocks (longer if tipChanged is set)
isIdle bool // true if the last message we sent was 'idle'
output chan BlockOrUndo
client spec.Blockchain
chain *doge.ChainParams
tipChanged chan string // receive from TipChaser.
stop <-chan struct{} // ctx.Done() channel.
fullUndoBlocks bool // fully decode blocks in UndoForkBlocks
lastProcessed string // last processed block hash to begin walking from (hex)
blockInterval time.Duration // interval for polling blocks (longer if tipChanged is set)
isIdle bool // true if the last message we sent was 'idle'
maxRollback int64 // maximum rollback depth
maxRollbackWait time.Duration // wait duration before retrying after deep reorg
}

func (c *dogeWalker) Run() {
Expand Down Expand Up @@ -168,7 +174,16 @@ func (c *dogeWalker) Run() {
if head.Confirmations == -1 {
// Last-processed block is longer on-chain, start with a rollback.
undo, nextBlock, cont := c.undoBlocks(head)
if !cont {
if !cont || undo == nil {
// Deep reorg detected - wait and retry if configured
if c.maxRollbackWait > 0 {
log.Printf("DogeWalker: Deep reorg detected, waiting %v for RPC node to re-index...", c.maxRollbackWait)
if c.Sleep(c.maxRollbackWait) {
return // stopping
}
// Retry checking the block - it might be back on-chain now
continue // retry the loop
}
return // stopping
}
select {
Expand Down Expand Up @@ -248,7 +263,26 @@ func (c *dogeWalker) followTheChain(height int64, nextUnprocessed string) (lastP
// This block is no longer on-chain.
// Roll back until we find a block that is on-chain.
undo, nextBlock, cont := c.undoBlocks(head)
if !cont {
if !cont || undo == nil {
// Deep reorg detected - wait and retry if configured
if c.maxRollbackWait > 0 {
log.Printf("DogeWalker: Deep reorg detected in followTheChain, waiting %v for RPC node to re-index...", c.maxRollbackWait)
if c.Sleep(c.maxRollbackWait) {
return lastProcessed, false // stopping
}
// Retry checking the block - it might be back on-chain now
// Re-fetch the block header to check if it's back on-chain
head, cont := c.fetchBlockHeader(nextUnprocessed)
if !cont {
return lastProcessed, false // stopping
}
if head.Confirmations != -1 {
// Block is back on-chain, continue processing
continue // retry processing this block
}
// Still off-chain, return to let Run() handle retry
return lastProcessed, false
}
return lastProcessed, false
}
select {
Expand Down Expand Up @@ -285,6 +319,10 @@ func (c *dogeWalker) undoBlocks(head spec.BlockHeader) (undo *UndoForkBlocks, ne
}
// Accumulate undo info.
undo.UndoBlocks = append(undo.UndoBlocks, head.Hash)
if c.maxRollback > 0 && int64(len(undo.UndoBlocks)) > c.maxRollback {
log.Printf("DogeWalker: MaxRollbackDepth exceeded (%d > %d). Stopping service.", len(undo.UndoBlocks), c.maxRollback)
return nil, "", false // stopping - return nil to prevent sending incomplete undo
}
if c.fullUndoBlocks {
block, cont := c.fetchBlockData(head.Hash)
if !cont {
Expand Down