Skip to content

Commit 28bc1c0

Browse files
authored
feat: add configurable I/O rate limiting for snapshot writes (sei-protocol#2816)
## Describe your changes and provide context Add global rate limiting for snapshot write operations to prevent page cache eviction on machines with limited RAM. Changes: - Add SnapshotWriteRateMBps config field (default 100 MB/s) - Implement rateLimitedWriter with token bucket (shared across all trees) - Remove sc-snapshot-writer-limit from TOML template (hardcoded to 4, backward-compatible) - Add tests for rate limiter correctness, large writes, context cancellation, and config defaults - Update toml_test to verify new config field and absence of removed field - Switch back MADV_RANDOM after background snapshot load ## Testing performed to validate your change
1 parent 2f6236f commit 28bc1c0

11 files changed

Lines changed: 273 additions & 21 deletions

File tree

app/seidb.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const (
2424
FlagSCSnapshotMinTimeInterval = "state-commit.sc-snapshot-min-time-interval"
2525
FlagSCSnapshotWriterLimit = "state-commit.sc-snapshot-writer-limit"
2626
FlagSCSnapshotPrefetchThreshold = "state-commit.sc-snapshot-prefetch-threshold"
27+
FlagSCSnapshotWriteRateMBps = "state-commit.sc-snapshot-write-rate-mbps"
2728
FlagSCCacheSize = "state-commit.sc-cache-size"
2829
FlagSCOnlyAllowExportOnSnapshotVersion = "state-commit.sc-only-allow-export-on-snapshot-version"
2930

@@ -99,6 +100,7 @@ func parseSCConfigs(appOpts servertypes.AppOptions) config.StateCommitConfig {
99100
scConfig.MemIAVLConfig.SnapshotMinTimeInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotMinTimeInterval))
100101
scConfig.MemIAVLConfig.SnapshotWriterLimit = cast.ToInt(appOpts.Get(FlagSCSnapshotWriterLimit))
101102
scConfig.MemIAVLConfig.SnapshotPrefetchThreshold = cast.ToFloat64(appOpts.Get(FlagSCSnapshotPrefetchThreshold))
103+
scConfig.MemIAVLConfig.SnapshotWriteRateMBps = cast.ToInt(appOpts.Get(FlagSCSnapshotWriteRateMBps))
102104
return scConfig
103105
}
104106

app/seidb_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
3030
return defaultSCConfig.MemIAVLConfig.SnapshotWriterLimit
3131
case FlagSCSnapshotPrefetchThreshold:
3232
return defaultSCConfig.MemIAVLConfig.SnapshotPrefetchThreshold
33+
case FlagSCSnapshotWriteRateMBps:
34+
return defaultSCConfig.MemIAVLConfig.SnapshotWriteRateMBps
3335
case FlagSSEnable:
3436
return defaultSSConfig.Enable
3537
case FlagSSBackend:

sei-db/config/toml.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@ sc-snapshot-interval = {{ .StateCommit.MemIAVLConfig.SnapshotInterval }}
4141
# to allow more frequent snapshots during normal operation.
4242
sc-snapshot-min-time-interval = {{ .StateCommit.MemIAVLConfig.SnapshotMinTimeInterval }}
4343
44-
# SnapshotWriterLimit defines the max concurrency for taking commit store snapshot
45-
sc-snapshot-writer-limit = {{ .StateCommit.MemIAVLConfig.SnapshotWriterLimit }}
46-
4744
# SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0) to trigger snapshot prefetch.
4845
# Prefetch sequentially reads nodes/leaves files into page cache for faster cold-start replay.
4946
# Only active trees (evm/bank/acc) are prefetched, skipping sparse kv files to save memory.
5047
# Skips prefetch if more than threshold of pages already resident (e.g., 0.8 = 80%).
5148
# Setting to 0 disables prefetching. Defaults to 0.8
5249
sc-snapshot-prefetch-threshold = {{ .StateCommit.MemIAVLConfig.SnapshotPrefetchThreshold }}
5350
51+
# Maximum snapshot write rate in MB/s (global across all trees). 0 = unlimited. Default 100.
52+
sc-snapshot-write-rate-mbps = {{ .StateCommit.MemIAVLConfig.SnapshotWriteRateMBps }}
53+
5454
`
5555

5656
// StateStoreConfigTemplate defines the configuration template for state-store

sei-db/config/toml_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,12 @@ func TestStateCommitConfigTemplate(t *testing.T) {
4545
require.Contains(t, output, "sc-keep-recent =", "Missing sc-keep-recent")
4646
require.Contains(t, output, "sc-snapshot-interval =", "Missing sc-snapshot-interval")
4747
require.Contains(t, output, "sc-snapshot-min-time-interval =", "Missing sc-snapshot-min-time-interval")
48-
require.Contains(t, output, "sc-snapshot-writer-limit =", "Missing sc-snapshot-writer-limit")
4948
require.Contains(t, output, "sc-snapshot-prefetch-threshold =", "Missing sc-snapshot-prefetch-threshold")
49+
require.Contains(t, output, "sc-snapshot-write-rate-mbps =", "Missing sc-snapshot-write-rate-mbps")
50+
51+
// sc-snapshot-writer-limit is intentionally removed from template (hardcoded to 4)
52+
// but old configs with this field still parse fine via mapstructure
53+
require.NotContains(t, output, "sc-snapshot-writer-limit", "sc-snapshot-writer-limit should not be in template")
5054
}
5155

5256
// TestStateStoreConfigTemplate verifies that all field paths in the StateStore TOML template

sei-db/state_db/sc/memiavl/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const (
66
DefaultSnapshotMinTimeInterval = 60 * 60 // 1 hour in seconds
77
DefaultAsyncCommitBuffer = 100
88
DefaultSnapshotPrefetchThreshold = 0.8 // prefetch if <80% pages in cache
9+
DefaultSnapshotWriteRateMBps = 100 // 100 MB/s default
10+
DefaultSnapshotWriterLimit = 4 // controls tree concurrency but not I/O rate (use SnapshotWriteRateMBps for that)
911
)
1012

1113
type Config struct {
@@ -35,6 +37,9 @@ type Config struct {
3537
// Skips prefetch if >threshold of pages already resident (e.g., 0.8 = 80%).
3638
// Setting to 0 disables prefetching. Defaults to 0.8
3739
SnapshotPrefetchThreshold float64 `mapstructure:"snapshot-prefetch-threshold"`
40+
41+
// SnapshotWriteRateMBps is the global snapshot write rate limit in MB/s. 0 = unlimited. Default 100.
42+
SnapshotWriteRateMBps int `mapstructure:"snapshot-write-rate-mbps"`
3843
}
3944

4045
func DefaultConfig() Config {
@@ -44,5 +49,7 @@ func DefaultConfig() Config {
4449
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
4550
SnapshotMinTimeInterval: DefaultSnapshotMinTimeInterval,
4651
SnapshotPrefetchThreshold: DefaultSnapshotPrefetchThreshold,
52+
SnapshotWriteRateMBps: DefaultSnapshotWriteRateMBps,
53+
SnapshotWriterLimit: DefaultSnapshotWriterLimit,
4754
}
4855
}

sei-db/state_db/sc/memiavl/db.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,7 +738,7 @@ func (db *DB) RewriteSnapshot(ctx context.Context) error {
738738
path := filepath.Clean(filepath.Join(db.dir, tmpDir))
739739

740740
writeStart := time.Now()
741-
err := db.MultiTree.WriteSnapshot(ctx, path, db.snapshotWriterPool)
741+
err := db.WriteSnapshotWithRateLimit(ctx, path, db.snapshotWriterPool, db.opts.SnapshotWriteRateMBps)
742742
writeElapsed := time.Since(writeStart).Seconds()
743743

744744
if err != nil {
@@ -911,6 +911,20 @@ func (db *DB) rewriteSnapshotBackground() error {
911911
ch <- snapshotResult{err: err}
912912
return
913913
}
914+
915+
// Switch mmap hints from SEQUENTIAL to RANDOM for tree operations.
916+
// NewMmap() applies MADV_SEQUENTIAL by default for cold-start replay performance,
917+
// but after loading we need MADV_RANDOM for random tree access patterns.
918+
// Without this, the kernel aggressively discards accessed pages and does wrong-direction
919+
// readahead, which is catastrophic on high-latency storage (e.g. NAS).
920+
// This matches the behavior in OpenDB() which also calls PrepareForRandomRead().
921+
for _, tree := range mtree.trees {
922+
if tree.snapshot != nil {
923+
tree.snapshot.nodesMap.PrepareForRandomRead()
924+
tree.snapshot.leavesMap.PrepareForRandomRead()
925+
}
926+
}
927+
914928
cloned.logger.Info("loaded multitree after snapshot", "elapsed", time.Since(loadStart).Seconds())
915929

916930
// do a best effort catch-up, will do another final catch-up in main thread.
@@ -1054,7 +1068,7 @@ func (db *DB) WriteSnapshot(dir string) error {
10541068
db.mtx.Lock()
10551069
defer db.mtx.Unlock()
10561070

1057-
return db.MultiTree.WriteSnapshot(context.Background(), dir, db.snapshotWriterPool)
1071+
return db.WriteSnapshotWithRateLimit(context.Background(), dir, db.snapshotWriterPool, db.opts.SnapshotWriteRateMBps)
10581072
}
10591073

10601074
func snapshotName(version int64) string {

sei-db/state_db/sc/memiavl/multitree.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/alitto/pond"
1515
"golang.org/x/exp/slices"
16+
"golang.org/x/time/rate"
1617

1718
"github.com/sei-protocol/sei-chain/sei-db/common/errors"
1819
"github.com/sei-protocol/sei-chain/sei-db/common/logger"
@@ -462,19 +463,35 @@ func (t *MultiTree) Catchup(ctx context.Context, stream wal.ChangelogWAL, delta
462463
}
463464

464465
func (t *MultiTree) WriteSnapshot(ctx context.Context, dir string, wp *pond.WorkerPool) error {
465-
t.logger.Info("starting snapshot write", "trees", len(t.trees))
466+
return t.WriteSnapshotWithRateLimit(ctx, dir, wp, 0)
467+
}
468+
469+
// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting.
470+
// rateMBps is the rate limit in MB/s. 0 means unlimited.
471+
// A single global limiter is shared across ALL trees and files to ensure
472+
// the total write rate is capped at the configured value.
473+
func (t *MultiTree) WriteSnapshotWithRateLimit(ctx context.Context, dir string, wp *pond.WorkerPool, rateMBps int) error {
474+
t.logger.Info("starting snapshot write", "trees", len(t.trees), "rate_limit_mbps", rateMBps)
466475

467476
if err := os.MkdirAll(dir, os.ModePerm); err != nil { //nolint:gosec
468477
return err
469478
}
470479

480+
// Create a single global limiter shared by all trees and files
481+
// This ensures total write rate is capped regardless of parallelism
482+
limiter := NewGlobalRateLimiter(rateMBps)
483+
if limiter != nil {
484+
t.logger.Info("global rate limiting enabled", "rate_mbps", rateMBps)
485+
}
486+
471487
// Write EVM first to avoid disk I/O contention, then parallel
472-
return t.writeSnapshotPriorityEVM(ctx, dir, wp)
488+
return t.writeSnapshotPriorityEVM(ctx, dir, wp, limiter)
473489
}
474490

475491
// writeSnapshotPriorityEVM writes EVM tree first, then others in parallel
476492
// Best strategy: reduces disk I/O contention for the largest tree
477-
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool) error {
493+
// limiter is a shared rate limiter. nil means unlimited.
494+
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool, limiter *rate.Limiter) error {
478495
startTime := time.Now()
479496

480497
// Phase 1: Write EVM tree first (if it exists)
@@ -494,7 +511,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
494511
if evmTree != nil {
495512
t.logger.Info("writing evm tree", "phase", "1/2")
496513
evmStart := time.Now()
497-
if err := evmTree.WriteSnapshot(ctx, filepath.Join(dir, evmName)); err != nil {
514+
if err := evmTree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, evmName), limiter); err != nil {
498515
return err
499516
}
500517
evmElapsed := time.Since(evmStart).Seconds()
@@ -521,7 +538,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
521538
wg.Add(1)
522539
wp.Submit(func() {
523540
defer wg.Done()
524-
if err := entry.WriteSnapshot(ctx, filepath.Join(dir, entry.Name)); err != nil {
541+
if err := entry.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, entry.Name), limiter); err != nil {
525542
mu.Lock()
526543
errs = append(errs, fmt.Errorf("tree %s: %w", entry.Name, err))
527544
mu.Unlock()

sei-db/state_db/sc/memiavl/opts.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package memiavl
22

33
import (
44
"errors"
5-
"runtime"
65
"time"
76

87
"github.com/sei-protocol/sei-chain/sei-db/common/logger"
@@ -60,8 +59,9 @@ func (opts *Options) FillDefaults() {
6059
opts.SnapshotInterval = DefaultSnapshotInterval
6160
}
6261

62+
// SnapshotWriterLimit controls tree concurrency but not I/O rate (use SnapshotWriteRateMBps for that)
6363
if opts.SnapshotWriterLimit <= 0 {
64-
opts.SnapshotWriterLimit = runtime.NumCPU()
64+
opts.SnapshotWriterLimit = DefaultSnapshotWriterLimit
6565
}
6666

6767
// Convert SnapshotMinTimeInterval (seconds) to Duration
@@ -71,6 +71,10 @@ func (opts *Options) FillDefaults() {
7171
opts.snapshotMinTimeIntervalDuration = 1 * time.Hour
7272
}
7373

74+
if opts.SnapshotWriteRateMBps <= 0 {
75+
opts.SnapshotWriteRateMBps = DefaultSnapshotWriteRateMBps
76+
}
77+
7478
if opts.SnapshotPrefetchThreshold < 0 || opts.SnapshotPrefetchThreshold > 1 {
7579
opts.SnapshotPrefetchThreshold = DefaultSnapshotPrefetchThreshold
7680
}

sei-db/state_db/sc/memiavl/snapshot.go

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/sei-protocol/sei-chain/sei-db/common/logger"
2020
"github.com/sei-protocol/sei-chain/sei-db/state_db/sc/types"
2121
"golang.org/x/sys/unix"
22+
"golang.org/x/time/rate"
2223
)
2324

2425
const (
@@ -53,6 +54,66 @@ func (w *monitoringWriter) Write(p []byte) (n int, err error) {
5354
return n, err
5455
}
5556

57+
// rateLimitedWriter wraps an io.Writer with rate limiting to prevent
58+
// page cache eviction on machines with limited RAM.
59+
type rateLimitedWriter struct {
60+
w io.Writer
61+
limiter *rate.Limiter
62+
ctx context.Context
63+
}
64+
65+
// NewGlobalRateLimiter creates a shared rate limiter for snapshot writes.
66+
// rateMBps is the rate limit in MB/s. If <= 0, returns nil (no limit).
67+
// This limiter should be shared across all files and trees in a single snapshot operation.
68+
func NewGlobalRateLimiter(rateMBps int) *rate.Limiter {
69+
if rateMBps <= 0 {
70+
return nil
71+
}
72+
const mb = 1024 * 1024
73+
bytesPerSec := rate.Limit(rateMBps * mb)
74+
// Burst = 4MB: small enough to spread large bufio flushes (128MB) across
75+
// many smaller IO ops, preventing page cache eviction spikes.
76+
burstBytes := 4 * mb
77+
return rate.NewLimiter(bytesPerSec, burstBytes)
78+
}
79+
80+
// newRateLimitedWriter creates a rate-limited writer with a shared limiter.
81+
// If limiter is nil, returns the original writer (no limit).
82+
func newRateLimitedWriter(ctx context.Context, w io.Writer, limiter *rate.Limiter) io.Writer {
83+
if limiter == nil {
84+
return w
85+
}
86+
return &rateLimitedWriter{
87+
w: w,
88+
limiter: limiter,
89+
ctx: ctx,
90+
}
91+
}
92+
93+
func (w *rateLimitedWriter) Write(p []byte) (n int, err error) {
94+
// Wait for rate limiter before writing
95+
// For large writes, we may need to wait multiple times
96+
remaining := len(p)
97+
written := 0
98+
for remaining > 0 {
99+
// Limit each wait to burst size to avoid very long waits
100+
toWrite := remaining
101+
if toWrite > w.limiter.Burst() {
102+
toWrite = w.limiter.Burst()
103+
}
104+
if err := w.limiter.WaitN(w.ctx, toWrite); err != nil {
105+
return written, err
106+
}
107+
n, err := w.w.Write(p[written : written+toWrite])
108+
written += n
109+
remaining -= n
110+
if err != nil {
111+
return written, err
112+
}
113+
}
114+
return written, nil
115+
}
116+
56117
// Snapshot manage the lifecycle of mmap-ed files for the snapshot,
57118
// it must out live the objects that derived from it.
58119
type Snapshot struct {
@@ -390,6 +451,12 @@ func (snapshot *Snapshot) export(callback func(*types.SnapshotNode) bool) {
390451
}
391452

392453
func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
454+
return t.WriteSnapshotWithRateLimit(ctx, snapshotDir, nil)
455+
}
456+
457+
// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting.
458+
// limiter is a shared rate limiter. nil means unlimited.
459+
func (t *Tree) WriteSnapshotWithRateLimit(ctx context.Context, snapshotDir string, limiter *rate.Limiter) error {
393460
// Estimate tree size: root.Size() returns leaf count, total = leaves + branches ≈ 2x
394461
treeSize := int64(0)
395462
if t.root != nil {
@@ -399,7 +466,7 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
399466
// Use 128MB buffer for all trees (large buffer for better performance)
400467
bufSize := bufIOSize
401468

402-
err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, t.logger, func(w *snapshotWriter) (uint32, error) {
469+
err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, limiter, t.logger, func(w *snapshotWriter) (uint32, error) {
403470
if t.root == nil {
404471
return 0, nil
405472
}
@@ -417,12 +484,14 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
417484
return nil
418485
}
419486

420-
// writeSnapshotWithBuffer writes snapshot with specified buffer size
487+
// writeSnapshotWithBuffer writes snapshot with specified buffer size and optional rate limiting.
488+
// limiter is a shared rate limiter. nil means unlimited.
421489
func writeSnapshotWithBuffer(
422490
ctx context.Context,
423491
dir string, version uint32,
424492
bufSize int,
425493
totalNodes int64,
494+
limiter *rate.Limiter,
426495
log logger.Logger,
427496
doWrite func(*snapshotWriter) (uint32, error),
428497
) (returnErr error) {
@@ -469,10 +538,17 @@ func writeSnapshotWithBuffer(
469538
leavesMonitor := &monitoringWriter{f: fpLeaves}
470539
kvsMonitor := &monitoringWriter{f: fpKVs}
471540

541+
// Apply rate limiting if configured (shared limiter across all files)
542+
// This ensures total write rate is capped regardless of file count
543+
var nodesRateLimited, leavesRateLimited, kvsRateLimited io.Writer
544+
nodesRateLimited = newRateLimitedWriter(ctx, nodesMonitor, limiter)
545+
leavesRateLimited = newRateLimitedWriter(ctx, leavesMonitor, limiter)
546+
kvsRateLimited = newRateLimitedWriter(ctx, kvsMonitor, limiter)
547+
472548
// Create buffered writers with buffers
473-
nodesWriter := bufio.NewWriterSize(nodesMonitor, bufSize)
474-
leavesWriter := bufio.NewWriterSize(leavesMonitor, bufSize)
475-
kvsWriter := bufio.NewWriterSize(kvsMonitor, bufSize)
549+
nodesWriter := bufio.NewWriterSize(nodesRateLimited, bufSize)
550+
leavesWriter := bufio.NewWriterSize(leavesRateLimited, bufSize)
551+
kvsWriter := bufio.NewWriterSize(kvsRateLimited, bufSize)
476552

477553
w := newSnapshotWriter(ctx, nodesWriter, leavesWriter, kvsWriter, log)
478554
w.treeName = filepath.Base(dir) // Set tree name for progress reporting
@@ -546,8 +622,8 @@ func writeSnapshot(
546622
dir string, version uint32,
547623
doWrite func(*snapshotWriter) (uint32, error),
548624
) error {
549-
// Use nop logger for backward compatibility
550-
return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, logger.NewNopLogger(), doWrite)
625+
// Use nop logger and no rate limit for backward compatibility
626+
return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, nil, logger.NewNopLogger(), doWrite)
551627
}
552628

553629
// kvWriteOp represents a key-value write operation

0 commit comments

Comments
 (0)