Skip to content

Commit 4562ded

Browse files
committed
Minor updates
1 parent 3111158 commit 4562ded

4 files changed

Lines changed: 22 additions & 4 deletions

File tree

node/failover.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ func setupFailoverState(
166166

167167
func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
168168
stopService := func(stoppable func(context.Context) error, name string) {
169+
// parent context is cancelled already, so we need to create a new one
169170
shutdownCtx, done := context.WithTimeout(context.Background(), 3*time.Second)
170171
defer done()
171172

pkg/raft/election.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error {
6363
}()
6464

6565
startWorker := func(name string, workerFunc func(ctx context.Context) error) {
66+
workerCancel()
6667
workerCtx, cancel := context.WithCancel(ctx)
6768
workerCancel = cancel
6869
wg.Add(1)
@@ -89,12 +90,20 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error {
8990
d.logger.Info().Msg("Raft leader changed notification")
9091
if becameLeader && !isCurrentlyLeader { // new leader
9192
if isStarted {
93+
var synced bool
9294
d.logger.Info().Msg("became leader, stopping follower operations")
9395
// wait for in flight raft msgs to land
94-
time.Sleep(d.node.Config().SendTimeout)
95-
if !runnable.IsSynced(d.node.GetState()) {
96+
awaitSyncLoop:
97+
for end := time.Now().Add(d.node.Config().SendTimeout); time.Now().Before(end); {
98+
if synced = runnable.IsSynced(d.node.GetState()); synced {
99+
break awaitSyncLoop
100+
}
101+
time.Sleep(d.node.Config().SendTimeout / 10)
102+
}
103+
if !synced && !runnable.IsSynced(d.node.GetState()) {
96104
d.logger.Info().Msg("became leader, but not synced. Pass on leadership")
97105
if err := d.node.leadershipTransfer(); err != nil && !errors.Is(err, raft.ErrNotLeader) {
106+
// the leadership transfer can fail due to no suitable leader. Better stop than double sign on old state
98107
return err
99108
}
100109
continue
@@ -127,10 +136,12 @@ func (d *DynamicLeaderElection) Run(ctx context.Context) error {
127136
case <-ticker.C: // LeaderCh fires only when leader changes not on initial election
128137
if isStarted {
129138
ticker.Stop()
139+
ticker.C = nil
130140
continue
131141
}
132142
if leaderID := d.node.leaderID(); leaderID != "" && leaderID != d.node.NodeID() {
133143
ticker.Stop()
144+
ticker.C = nil
134145
d.logger.Info().Msg("starting follower operations")
135146
isStarted = true
136147
var err error

pkg/raft/node.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ func (n *Node) Start(_ context.Context) error {
112112
return fmt.Errorf("raft cluster requires bootstrap mode")
113113
}
114114

115+
if future := n.raft.GetConfiguration(); future.Error() == nil && len(future.Configuration().Servers) > 0 {
116+
n.logger.Info().Msg("cluster already bootstrapped, skipping")
117+
return nil
118+
}
119+
115120
n.logger.Info().Msg("Boostrap raft cluster")
116121
thisNode := raft.Server{ID: raft.ServerID(n.config.NodeID), Address: raft.ServerAddress(n.config.RaftAddr)}
117122
cfg := raft.Configuration{
@@ -264,6 +269,7 @@ func (f *FSM) Apply(log *raft.Log) interface{} {
264269
select {
265270
case f.applyCh <- RaftApplyMsg{Index: log.Index, State: &state}:
266271
default:
272+
// on a slow consumer, the raft cluster should not be blocked. Followers can sync from DA or other peers, too.
267273
f.logger.Warn().Msg("apply channel full, dropping message")
268274
}
269275
}
@@ -326,6 +332,7 @@ func splitPeerAddr(peer string) (raft.Server, error) {
326332
if address == "" {
327333
return raft.Server{}, errors.New("address cannot be empty")
328334
}
335+
// we can skip address validation as they come from a local configuration
329336

330337
return raft.Server{
331338
ID: raft.ServerID(nodeID),

pkg/raft/types.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package raft
22

33
import "fmt"
44

5-
// todo: refactor to use proto
6-
// RaftBlockState represents replicated block state
5+
// RaftBlockState represents a replicated block state
76
type RaftBlockState struct {
87
Height uint64
98
Hash []byte

0 commit comments

Comments
 (0)