diff --git a/core/state/accessors.go b/core/state/accessors.go index a121ab7808..4996937076 100644 --- a/core/state/accessors.go +++ b/core/state/accessors.go @@ -41,7 +41,21 @@ func HasContract(r db.KeyValueReader, addr *felt.Felt) (bool, error) { return r.Has(key) } -func WriteContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error { +func WriteContract( + w db.KeyValueWriter, + addr *felt.Felt, + nonce, classHash felt.Felt, + deployHeight uint64, +) error { + contract := stateContract{ + Nonce: nonce, + ClassHash: classHash, + DeployedHeight: deployHeight, + } + return writeContract(w, addr, &contract) +} + +func writeContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error { key := db.ContractKey(addr) data, err := contract.MarshalBinary() if err != nil { diff --git a/core/state/state.go b/core/state/state.go index 6ea9cb5543..edd6b8d234 100644 --- a/core/state/state.go +++ b/core/state/state.go @@ -376,7 +376,7 @@ func (s *State) flush( return err } } else { // updated - if err := WriteContract(s.batch, &addr, obj.contract); err != nil { + if err := writeContract(s.batch, &addr, obj.contract); err != nil { return err } } diff --git a/migration/headstate/committer.go b/migration/headstate/committer.go new file mode 100644 index 0000000000..cf5f15c03f --- /dev/null +++ b/migration/headstate/committer.go @@ -0,0 +1,49 @@ +package headstate + +import ( + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type committer struct { + counter counter + logger log.StructuredLogger + batchSemaphore semaphore.ResourceSemaphore[db.Batch] +} + +var _ pipeline.State[task, struct{}] = (*committer)(nil) + +func newCommitter( + logger log.StructuredLogger, + batchSemaphore semaphore.ResourceSemaphore[db.Batch], +) *committer { + return &committer{ + logger: logger, + counter: newCounter(logger, timeLogRate), + batchSemaphore: batchSemaphore, + } +} + +func (c *committer) Run(_ int, t task, _ chan<- struct{}) error { + c.logger.Debug( + "writing batch", + zap.Int("completedAddrs", t.completedAddrs), + zap.Int("batchSize", t.batch.Size()), + ) + + byteSize := uint64(t.batch.Size()) + if err := t.batch.Write(); err != nil { + return err + } + + c.counter.log(byteSize, t.completedAddrs) + c.batchSemaphore.Put() + return nil +} + +func (c *committer) Done(int, chan<- struct{}) error { + return nil +} diff --git a/migration/headstate/counter.go b/migration/headstate/counter.go new file mode 100644 index 0000000000..db427d11fc --- /dev/null +++ b/migration/headstate/counter.go @@ -0,0 +1,47 @@ +package headstate + +import ( + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type counter struct { + logger log.StructuredLogger + timeLogRate time.Duration + start time.Time + size uint64 + completedAddrs uint64 +} + +func newCounter(logger log.StructuredLogger, timeLogRate time.Duration) counter { + return counter{ + logger: logger, + timeLogRate: timeLogRate, + start: time.Now(), + } +} + +func (c *counter) log(byteSize uint64, completedAddrs int) { + c.size += byteSize + c.completedAddrs += uint64(completedAddrs) + + now := time.Now() + elapsed := now.Sub(c.start).Seconds() + if elapsed > c.timeLogRate.Seconds() { + mbs := float64(c.size) / float64(db.Megabyte) + c.logger.Info( + "write speed", + zap.Float64("MB", mbs), + zap.Float64("MB/s", mbs/elapsed), + zap.Uint64("completedContracts", c.completedAddrs), + zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed), + zap.Float64("time", elapsed), + ) + c.start = now + c.size = 0 + c.completedAddrs = 0 + } +} diff --git a/migration/headstate/ingestor.go b/migration/headstate/ingestor.go new file mode 100644 index 0000000000..a8df2739f9 --- /dev/null +++ b/migration/headstate/ingestor.go @@ -0,0 +1,95 @@ +package headstate + +import ( + "errors" + "fmt" + + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type ingestor struct { + database db.KeyValueReader + batchSemaphore semaphore.ResourceSemaphore[db.Batch] + tasks []task +} + +func newIngestor( + database db.KeyValueReader, + batchSemaphore semaphore.ResourceSemaphore[db.Batch], +) *ingestor { + tasks := make([]task, ingestorCount) + for i := range tasks { + tasks[i] = task{batch: batchSemaphore.GetBlocking()} + } + return &ingestor{ + database: database, + batchSemaphore: batchSemaphore, + tasks: tasks, + } +} + +var _ pipeline.State[felt.Address, task] = (*ingestor)(nil) + +func (c *ingestor) Run(index int, addr felt.Address, outputs chan<- task) error { + t := &c.tasks[index] + + sizeBefore := t.batch.Size() + if err := c.ingestAddress(t.batch, addr); err != nil { + return err + } + if t.batch.Size() > sizeBefore { + t.completedAddrs++ + } + + if t.batch.Size() >= targetBatchByteSize { + outputs <- task{batch: t.batch, completedAddrs: t.completedAddrs} + t.completedAddrs = 0 + t.batch = c.batchSemaphore.GetBlocking() + } + return nil +} + +func (c *ingestor) Done(index int, outputs chan<- task) error { + outputs <- c.tasks[index] + return nil +} + +func (c *ingestor) ingestAddress(batch db.Batch, addr felt.Address) error { + addrFelt := felt.Felt(addr) + + already, err := state.HasContract(c.database, &addrFelt) + if err != nil { + return fmt.Errorf("HasContract(%s): %w", &addrFelt, err) + } + if already { + return nil + } + + classHash, err := core.GetContractClassHash(c.database, &addrFelt) + if err != nil { + return fmt.Errorf("GetContractClassHash(%s): %w", &addrFelt, err) + } + + nonce, err := core.GetContractNonce(c.database, &addrFelt) + if err != nil { + if !errors.Is(err, db.ErrKeyNotFound) { + return fmt.Errorf("GetContractNonce(%s): %w", &addrFelt, err) + } + nonce = felt.Zero + } + + height, err := core.GetContractDeploymentHeight(c.database, &addrFelt) + if err != nil { + return fmt.Errorf("GetContractDeploymentHeight(%s): %w", &addrFelt, err) + } + + if err := state.WriteContract(batch, &addrFelt, nonce, classHash, height); err != nil { + return fmt.Errorf("WriteContract(%s): %w", &addrFelt, err) + } + return nil +} diff --git a/migration/headstate/migrator.go b/migration/headstate/migrator.go new file mode 100644 index 0000000000..c669f8a20c --- /dev/null +++ b/migration/headstate/migrator.go @@ -0,0 +1,163 @@ +package headstate + +import ( + "context" + "errors" + "fmt" + "iter" + "time" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" +) + +const ( + batchByteSize = 128 * db.Megabyte + targetBatchByteSize = 96 * db.Megabyte + ingestorCount = 4 + timeLogRate = 5 * time.Second +) + +type task struct { + batch db.Batch + completedAddrs int +} + +var ( + shouldRerun = []byte{} + shouldNotRerun = []byte(nil) +) + +var _ migration.Migration = (*Migrator)(nil) + +// Migrator consolidates the deprecated per-field contract layout into a +// single Contract record per address, written via state.WriteContract: +// +// ContractClassHash[addr] +// ContractNonce[addr] +// ContractDeploymentHeight[addr] +// │ +// ▼ +// Contract[addr] = { ClassHash, Nonce, DeployedHeight } +// +// StorageRoot is left zero — the running node lazily backfills it on the +// contract's first storage write. +// +// Each address discovered in the ContractClassHash bucket is processed by one +// of ingestorCount worker goroutines that read the three old fields into a +// shared db.Batch; a single committer drains batches to disk. Once every +// address has been migrated, the three deprecated buckets are wiped via +// DeleteRange. +// +// Re-run safe: an address whose Contract record already exists is skipped +// (via state.HasContract), and the trailing wipe re-issues DeleteRange over +// the (possibly already empty) ranges. +type Migrator struct{} + +func (Migrator) Before([]byte) error { + return nil +} + +func (Migrator) Migrate( + ctx context.Context, + database db.KeyValueStore, + _ *networks.Network, + logger log.StructuredLogger, +) ([]byte, error) { + addressesIter, sourceErr := pendingAddresses(database) + res := migrateAddresses(ctx, database, logger, addressesIter) + + if err := errors.Join(sourceErr(), res.Err); err != nil { + return shouldRerun, err + } + if !res.IsDone { + if ctxErr := ctx.Err(); ctxErr != nil { + return shouldRerun, ctxErr + } + return shouldRerun, errors.New("headstate migration did not complete") + } + + return shouldNotRerun, wipeDeprecatedBuckets(database) +} + +func migrateAddresses( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, + addresses iter.Seq[felt.Address], +) pipeline.Result { + batchSemaphore := semaphore.New( + ingestorCount+1, + func() db.Batch { + return database.NewBatchWithSize(batchByteSize) + }, + ) + + source := pipeline.Source(addresses) + + ingestorPipeline := pipeline.New( + source, + ingestorCount, + newIngestor(database, batchSemaphore), + ) + + committerPipeline := pipeline.New( + ingestorPipeline, + 1, + newCommitter(logger, batchSemaphore), + ) + + _, wait := committerPipeline.Run(ctx) + return wait() +} + +func pendingAddresses(r db.KeyValueReader) (iter.Seq[felt.Address], func() error) { + var iterErr error + seq := func(yield func(felt.Address) bool) { + prefix := db.ContractClassHash.Key() + it, err := r.NewIterator(prefix, true) + if err != nil { + iterErr = err + return + } + defer it.Close() + + for valid := it.First(); valid; valid = it.Next() { + key := it.Key() + if len(key) != len(prefix)+felt.Bytes { + iterErr = fmt.Errorf( + "malformed ContractClassHash key: len %d, want %d", + len(key), + len(prefix)+felt.Bytes, + ) + return + } + f := felt.FromBytes[felt.Felt](key[len(prefix):]) + if !yield(felt.Address(f)) { + return + } + } + } + return seq, func() error { return iterErr } +} + +func wipeDeprecatedBuckets(database db.KeyValueStore) error { + for _, bucket := range []db.Bucket{ + db.ContractClassHash, + db.ContractNonce, + db.ContractDeploymentHeight, + } { + start := bucket.Key() + end := dbutils.UpperBound(start) + if err := database.DeleteRange(start, end); err != nil { + return err + } + } + return nil +} diff --git a/migration/headstate/migrator_test.go b/migration/headstate/migrator_test.go new file mode 100644 index 0000000000..99048754eb --- /dev/null +++ b/migration/headstate/migrator_test.go @@ -0,0 +1,209 @@ +package headstate_test + +import ( + "context" + "testing" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/memory" + "github.com/NethermindEth/juno/migration/headstate" + "github.com/NethermindEth/juno/utils/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type contractData struct { + addr felt.Felt + classHash felt.Felt + nonce felt.Felt // felt.Zero means "do not write nonce entry" + height uint64 +} + +func seedDeprecated(t *testing.T, memDB db.KeyValueStore, seeds []contractData) { + t.Helper() + for i := range seeds { + s := &seeds[i] + require.NoError(t, core.WriteContractClassHash(memDB, &s.addr, &s.classHash)) + if !s.nonce.IsZero() { + require.NoError(t, core.WriteContractNonce(memDB, &s.addr, &s.nonce)) + } + require.NoError(t, core.WriteContractDeploymentHeight(memDB, &s.addr, s.height)) + } +} + +func bucketKeyCount(t *testing.T, r db.KeyValueReader, bucket db.Bucket) int { + t.Helper() + it, err := r.NewIterator(bucket.Key(), true) + require.NoError(t, err) + defer it.Close() + count := 0 + for valid := it.First(); valid; valid = it.Next() { + count++ + } + return count +} + +func TestMigrate_EmptyDB(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + res, err := headstate.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + assert.Nil(t, res, "empty DB must complete with no intermediate state") +} + +func TestMigrate_ConsolidatesAddresses(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + seeds := []contractData{ + { + addr: felt.FromUint64[felt.Felt](1), + classHash: felt.FromUint64[felt.Felt](170), + nonce: felt.FromUint64[felt.Felt](7), + height: 100, + }, + { + addr: felt.FromUint64[felt.Felt](2), + classHash: felt.FromUint64[felt.Felt](187), + nonce: felt.Zero, // never updated + height: 200, + }, + { + addr: felt.FromUint64[felt.Felt](3), + classHash: felt.FromUint64[felt.Felt](204), + nonce: felt.FromUint64[felt.Felt](66), + height: 300, + }, + } + seedDeprecated(t, memDB, seeds) + + res, err := headstate.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + assert.Nil(t, res) + + for i := range seeds { + s := &seeds[i] + got, err := state.GetContract(memDB, &s.addr) + require.NoErrorf(t, err, "missing Contract for %s", &s.addr) + assert.Equal(t, s.classHash, got.ClassHash, "ClassHash for %s", &s.addr) + assert.Equal(t, s.nonce, got.Nonce, "Nonce for %s", &s.addr) + assert.Equal(t, s.height, got.DeployedHeight, "DeployedHeight for %s", &s.addr) + assert.True(t, got.StorageRoot.IsZero(), "StorageRoot must be zero (lazy)") + } + + for _, bucket := range []db.Bucket{ + db.ContractClassHash, + db.ContractNonce, + db.ContractDeploymentHeight, + } { + assert.Equal(t, 0, bucketKeyCount(t, memDB, bucket), "old bucket %v must be empty", bucket) + } +} + +func TestMigrate_SkipsAlreadyMigrated(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + // addrDone has been migrated already (Contract record exists). Its old + // keys are still around (simulating a previous partial run that wrote + // the Contract record but didn't reach wipeDeprecatedBuckets). + addrDone := felt.FromUint64[felt.Felt](1) + doneClassHash := felt.FromUint64[felt.Felt](170) + doneNonce := felt.FromUint64[felt.Felt](57005) + + require.NoError(t, state.WriteContract(memDB, &addrDone, doneNonce, doneClassHash, 111)) + require.NoError(t, core.WriteContractClassHash(memDB, &addrDone, &doneClassHash)) + require.NoError(t, core.WriteContractDeploymentHeight(memDB, &addrDone, 111)) + + addrPending := felt.FromUint64[felt.Felt](2) + pendingClassHash := felt.FromUint64[felt.Felt](187) + pendingNonce := felt.FromUint64[felt.Felt](9) + + require.NoError(t, core.WriteContractClassHash(memDB, &addrPending, &pendingClassHash)) + require.NoError(t, core.WriteContractNonce(memDB, &addrPending, &pendingNonce)) + require.NoError(t, core.WriteContractDeploymentHeight(memDB, &addrPending, 222)) + + res, err := headstate.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + assert.Nil(t, res) + + // addrDone's pre-existing Contract was preserved (not overwritten). + done, err := state.GetContract(memDB, &addrDone) + require.NoError(t, err) + assert.Equal(t, doneNonce, done.Nonce, "addrDone's pre-existing Contract must not be overwritten") + assert.Equal(t, uint64(111), done.DeployedHeight) + + // addrPending got migrated. + pending, err := state.GetContract(memDB, &addrPending) + require.NoError(t, err) + assert.Equal(t, pendingClassHash, pending.ClassHash) + assert.Equal(t, pendingNonce, pending.Nonce) + assert.Equal(t, uint64(222), pending.DeployedHeight) + + for _, bucket := range []db.Bucket{ + db.ContractClassHash, + db.ContractNonce, + db.ContractDeploymentHeight, + } { + assert.Equal(t, 0, bucketKeyCount(t, memDB, bucket), "old bucket %v must be empty", bucket) + } +} + +func TestMigrate_Idempotent(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + seeds := []contractData{ + { + addr: felt.FromUint64[felt.Felt](1), + classHash: felt.FromUint64[felt.Felt](170), + nonce: felt.FromUint64[felt.Felt](7), + height: 100, + }, + { + addr: felt.FromUint64[felt.Felt](2), + classHash: felt.FromUint64[felt.Felt](187), + nonce: felt.Zero, + height: 200, + }, + } + seedDeprecated(t, memDB, seeds) + + for i := range 3 { + res, err := headstate.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoErrorf(t, err, "run %d", i) + assert.Nilf(t, res, "run %d intermediate state", i) + } + + for i := range seeds { + s := &seeds[i] + got, err := state.GetContract(memDB, &s.addr) + require.NoError(t, err) + assert.Equal(t, s.classHash, got.ClassHash) + } +} diff --git a/migration/statehistory/class_hash_ingestor.go b/migration/statehistory/class_hash_ingestor.go new file mode 100644 index 0000000000..2c810fbf8d --- /dev/null +++ b/migration/statehistory/class_hash_ingestor.go @@ -0,0 +1,162 @@ +package statehistory + +import ( + "context" + "fmt" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type classHashIngestor struct { + baseIngestor +} + +var _ pipeline.State[*felt.Felt, task] = (*classHashIngestor)(nil) + +func newClassHashIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *classHashIngestor { + return &classHashIngestor{baseIngestor: newBaseIngestor(ctx, sem, database)} +} + +// Run migrates the class-hash history of a single contract. +// +// Legend: Bₙ = block at which the n-th class-hash *replacement* happened. +// Vₙ = the class hash active *after* Bₙ; V₀ is the deploy-time hash. The +// deprecated layout writes nothing at deploy: each entry is written only +// on a *Replace*, and the value stored is the hash that was active before +// that replace. So deprecated[B₁] = V₀ even though no replace happened at +// deploy_h itself. The new layout adds an explicit deploy entry and shifts +// everything else by one slot: +// +// block │ deprecated │ new +// ─────────┼────────────────┼────── +// deploy_h │ — │ V₀ ← inserted from first deprecated entry +// B₁ │ V₀ │ V₁ +// B₂ │ V₁ │ V₂ +// B₃ │ V₂ │ V₃ +// ─────────┼────────────────┼────── +// > B₃ │ contract │ V₃ (last entry — self-contained) +// .ClassHash ← deprecated must reach into the Contract +// record for any block past the last replace +// +// If the deprecated history is empty (no replaces ever), the single deploy +// entry is written with contract.ClassHash directly. Deprecated rows are +// deleted at the end of the run. Resume-safe: empty-deprecated + existing +// deploy entry → no-op. +func (i *classHashIngestor) Run(index int, addr *felt.Felt, outputs chan<- task) error { + t := &i.tasks[index] + + deprecatedPrefix := db.DeprecatedContractClassHashHistoryKey(addr) + contract, err := state.GetContract(i.database, addr) + if err != nil { + return fmt.Errorf("class-hash: GetContract(%s): %w", addr, err) + } + + depIt, err := i.database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("class-hash: open deprecated iter(%s): %w", addr, err) + } + defer depIt.Close() + + if !depIt.First() { + return i.writeDeployOnly(t, outputs, addr, contract.DeployedHeight, &contract.ClassHash) + } + return i.writeShiftedHistory( + t, outputs, depIt, deprecatedPrefix, addr, + contract.DeployedHeight, &contract.ClassHash, + ) +} + +// writeDeployOnly handles the "no deprecated history" branch: write the +// deploy-time entry from contract.ClassHash, unless a previous run already +// wrote it. +func (i *classHashIngestor) writeDeployOnly( + t *task, + outputs chan<- task, + addr *felt.Felt, + deployHeight uint64, + classHash *felt.Felt, +) error { + deployKey := db.ContractClassHashHistoryAtBlockKey(addr, deployHeight) + deployEntryExists, err := i.database.Has(deployKey) + if err != nil { + return fmt.Errorf("class-hash: Has(deploy entry): %w", err) + } + if deployEntryExists { + return nil + } + if err := state.WriteClassHashHistory(t.batch, addr, deployHeight, classHash); err != nil { + return err + } + t.completedAddrs++ + t.entryCount++ + return i.flush(t, outputs) +} + +// writeShiftedHistory handles the "non-empty deprecated history" branch: +// writes the deploy entry from the first deprecated value, shifts each +// deprecated entry into the new layout using the next entry's pre-value +// (or contract.ClassHash for the last), and deletes the deprecated rows. +// depIt must be positioned at the first deprecated entry. +func (i *classHashIngestor) writeShiftedHistory( + t *task, + outputs chan<- task, + depIt db.Iterator, + prefix []byte, + addr *felt.Felt, + deployHeight uint64, + headClassHash *felt.Felt, +) error { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("class-hash: read first value(%s): %w", addr, err) + } + deployClassHash := felt.FromBytes[felt.Felt](rawValue) + if err := state.WriteClassHashHistory(t.batch, addr, deployHeight, &deployClassHash); err != nil { + return err + } + t.entryCount++ + if err := i.flush(t, outputs); err != nil { + return err + } + + for { + block, err := parseBlockKey(depIt.Key(), prefix) + if err != nil { + return fmt.Errorf("class-hash(%s): %w", addr, err) + } + hasNext := depIt.Next() + historyValue := *headClassHash + if hasNext { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("class-hash(%s): %w", addr, err) + } + historyValue = felt.FromBytes[felt.Felt](rawValue) + } + if err := state.WriteClassHashHistory(t.batch, addr, block, &historyValue); err != nil { + return err + } + t.entryCount++ + if err := i.flush(t, outputs); err != nil { + return err + } + if !hasNext { + break + } + } + + if err := t.batch.DeleteRange(prefix, dbutils.UpperBound(prefix)); err != nil { + return fmt.Errorf("class-hash: DeleteRange deprecated(%s): %w", addr, err) + } + t.completedAddrs++ + return nil +} diff --git a/migration/statehistory/committer.go b/migration/statehistory/committer.go new file mode 100644 index 0000000000..8dbff6b03e --- /dev/null +++ b/migration/statehistory/committer.go @@ -0,0 +1,52 @@ +package statehistory + +import ( + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type committer struct { + logger log.StructuredLogger + counter counter + batchSemaphore semaphore.ResourceSemaphore[db.Batch] +} + +var _ pipeline.State[task, struct{}] = (*committer)(nil) + +func newCommitter( + logger log.StructuredLogger, + batchSemaphore semaphore.ResourceSemaphore[db.Batch], + phaseName string, +) *committer { + return &committer{ + logger: logger, + counter: newCounter(logger, timeLogRate, phaseName), + batchSemaphore: batchSemaphore, + } +} + +func (c *committer) Run(_ int, t task, _ chan<- struct{}) error { + defer c.batchSemaphore.Put() + + c.logger.Debug( + "writing batch", + zap.Int("completedAddrs", t.completedAddrs), + zap.Int("entryCount", t.entryCount), + zap.Int("batchSize", t.batch.Size()), + ) + + byteSize := uint64(t.batch.Size()) + if err := t.batch.Write(); err != nil { + return err + } + + c.counter.log(byteSize, t.completedAddrs, t.entryCount) + return nil +} + +func (c *committer) Done(int, chan<- struct{}) error { + return nil +} diff --git a/migration/statehistory/counter.go b/migration/statehistory/counter.go new file mode 100644 index 0000000000..82e2aff9d0 --- /dev/null +++ b/migration/statehistory/counter.go @@ -0,0 +1,55 @@ +package statehistory + +import ( + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type counter struct { + logger log.StructuredLogger + timeLogRate time.Duration + phaseName string + start time.Time + size uint64 + completedAddrs uint64 + entryCount uint64 +} + +func newCounter(logger log.StructuredLogger, timeLogRate time.Duration, phaseName string) counter { + return counter{ + logger: logger, + timeLogRate: timeLogRate, + phaseName: phaseName, + start: time.Now(), + } +} + +func (c *counter) log(byteSize uint64, completedAddrs, entryCount int) { + c.size += byteSize + c.completedAddrs += uint64(completedAddrs) + c.entryCount += uint64(entryCount) + + now := time.Now() + elapsed := now.Sub(c.start).Seconds() + if elapsed > c.timeLogRate.Seconds() { + mbs := float64(c.size) / float64(db.Megabyte) + c.logger.Info( + "write speed", + zap.String("phase", c.phaseName), + zap.Float64("MB", mbs), + zap.Float64("MB/s", mbs/elapsed), + zap.Uint64("completedContracts", c.completedAddrs), + zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed), + zap.Uint64("entries", c.entryCount), + zap.Float64("entries/s", float64(c.entryCount)/elapsed), + zap.Float64("time", elapsed), + ) + c.start = now + c.size = 0 + c.completedAddrs = 0 + c.entryCount = 0 + } +} diff --git a/migration/statehistory/ingestor.go b/migration/statehistory/ingestor.go new file mode 100644 index 0000000000..ffd2001583 --- /dev/null +++ b/migration/statehistory/ingestor.go @@ -0,0 +1,63 @@ +package statehistory + +import ( + "context" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type baseIngestor struct { + ctx context.Context + batchSemaphore semaphore.ResourceSemaphore[db.Batch] + database db.KeyValueReader + tasks []task +} + +// newBaseIngestor pre-allocates one batch per ingestor slot. The semaphore is +// created with capacity ingestorCount+1 immediately before this call, so the +// acquires cannot block — using GetBlocking keeps the constructor signature +// error-free. +func newBaseIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) baseIngestor { + tasks := make([]task, ingestorCount) + for i := range tasks { + tasks[i] = task{batch: sem.GetBlocking()} + } + return baseIngestor{ + ctx: ctx, + batchSemaphore: sem, + database: database, + tasks: tasks, + } +} + +// flush emits the current task downstream when its batch hits target size and +// acquires a fresh batch. The ctx-aware select on the channel send is the +// snappy cancellation point. The semaphore acquire uses GetBlocking — it is +// guaranteed to unblock within one committer iteration because the committer's +// deferred Put always runs. +func (b *baseIngestor) flush(t *task, outputs chan<- task) error { + if t.batch.Size() < targetBatchByteSize { + return nil + } + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case outputs <- *t: + } + *t = task{batch: b.batchSemaphore.GetBlocking()} + return nil +} + +func (b *baseIngestor) Done(index int, outputs chan<- task) error { + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case outputs <- b.tasks[index]: + } + return nil +} diff --git a/migration/statehistory/migrator.go b/migration/statehistory/migrator.go new file mode 100644 index 0000000000..96d2b56eb6 --- /dev/null +++ b/migration/statehistory/migrator.go @@ -0,0 +1,183 @@ +package statehistory + +import ( + "context" + "errors" + "fmt" + "iter" + "time" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" +) + +const ( + batchByteSize = 128 * db.Megabyte + targetBatchByteSize = 96 * db.Megabyte + ingestorCount = 4 + timeLogRate = 5 * time.Second +) + +type task struct { + batch db.Batch + completedAddrs int + entryCount int +} + +var ( + shouldRerun = []byte{} + shouldNotRerun = []byte(nil) +) + +var _ migration.Migration = (*Migrator)(nil) + +// Migrator rewrites the contract history layout so each entry stores the +// post-update value at its block, instead of the pre-update value. +// +// Example — a contract whose class hash was 0xAA at deploy (block 100), +// changed to 0xBB at block 200, then to 0xCC at block 500: +// +// block │ old layout (pre-value) │ new layout (post-value) +// ──────┼────────────────────────┼───────────────────────── +// 100 │ (no entry) │ 0xAA ← explicit deploy +// 200 │ 0xAA │ 0xBB +// 500 │ 0xBB │ 0xCC +// head │ 0xCC (contract record) │ (read from history) +// +// The same shape change applies to nonces and per-slot storage. The +// migrator runs three phases (class-hash, nonce, storage); each phase +// iterates the Contract bucket and rewrites one contract's deprecated +// entries at a time, deleting them in the same batch. +// +// Crash / cancellation safety: pebble batches commit atomically, so the +// writes inside any single committed batch are durable as a unit. A +// contract whose history is large may span more than one batch — but each +// new entry's value is a pure function of the deprecated source data, so +// re-running over an already-partially-rewritten contract overwrites with +// identical values and then deletes the (still-present) deprecated rows. +// Contracts whose deprecated entries are already gone short-circuit on an +// empty iterator. The three phases run sequentially: a later phase only +// starts after the earlier phase completes. +type Migrator struct{} + +func (Migrator) Before([]byte) error { return nil } + +func (Migrator) Migrate( + ctx context.Context, + database db.KeyValueStore, + _ *networks.Network, + logger log.StructuredLogger, +) ([]byte, error) { + if err := runClassHashPhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + if err := runNoncePhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + if err := runStoragePhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + + return shouldNotRerun, nil +} + +func runClassHashPhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newClassHashIngestor(ctx, sem, database) + return runPipeline(ctx, "class-hash", src, ing, logger, sem, sourceErr) +} + +func runNoncePhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newNonceIngestor(ctx, sem, database) + return runPipeline(ctx, "nonce", src, ing, logger, sem, sourceErr) +} + +func runStoragePhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newStorageIngestor(ctx, sem, database) + return runPipeline(ctx, "storage", src, ing, logger, sem, sourceErr) +} + +func setupBeforePhase( + database db.KeyValueStore, +) (semaphore.ResourceSemaphore[db.Batch], pipeline.Pipeline[*felt.Felt], func() error) { + sem := semaphore.New(ingestorCount+1, func() db.Batch { + return database.NewBatchWithSize(batchByteSize) + }) + seq, sourceErr := addressSeq(database) + return sem, pipeline.Source(seq), sourceErr +} + +func runPipeline( + ctx context.Context, + name string, + src pipeline.Pipeline[*felt.Felt], + ing pipeline.State[*felt.Felt, task], + logger log.StructuredLogger, + sem semaphore.ResourceSemaphore[db.Batch], + sourceErr func() error, +) error { + ingestors := pipeline.New(src, ingestorCount, ing) + committers := pipeline.New(ingestors, 1, newCommitter(logger, sem, name)) + + _, wait := committers.Run(ctx) + res := wait() + + if err := errors.Join(sourceErr(), res.Err); err != nil { + return fmt.Errorf("%s: %w", name, err) + } + if !res.IsDone { + if ctxErr := ctx.Err(); ctxErr != nil { + return fmt.Errorf("%s: %w", name, ctxErr) + } + return fmt.Errorf("%s phase did not complete", name) + } + return nil +} + +func addressSeq(r db.KeyValueReader) (iter.Seq[*felt.Felt], func() error) { + var iterErr error + seq := func(yield func(*felt.Felt) bool) { + prefix := db.Contract.Key() + it, err := r.NewIterator(prefix, true) + if err != nil { + iterErr = err + return + } + defer it.Close() + for valid := it.First(); valid; valid = it.Next() { + key := it.Key() + if len(key) != len(prefix)+felt.Bytes { + iterErr = fmt.Errorf( + "malformed Contract key: len %d, want %d", + len(key), + len(prefix)+felt.Bytes, + ) + return + } + f := felt.FromBytes[felt.Felt](key[len(prefix):]) + if !yield(&f) { + return + } + } + } + return seq, func() error { return iterErr } +} diff --git a/migration/statehistory/migrator_test.go b/migration/statehistory/migrator_test.go new file mode 100644 index 0000000000..afb348f007 --- /dev/null +++ b/migration/statehistory/migrator_test.go @@ -0,0 +1,762 @@ +package statehistory_test + +import ( + "context" + "testing" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/deprecatedstate" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/core/trie2/triedb" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/memory" + "github.com/NethermindEth/juno/migration/statehistory" + "github.com/NethermindEth/juno/utils/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func seedContract( + t *testing.T, + memDB db.KeyValueStore, + addr felt.Felt, + nonce, classHash felt.Felt, +) { + t.Helper() + require.NoError(t, state.WriteContract(memDB, &addr, nonce, classHash, 100)) +} + +func seedDeprecatedClassHashHistory( + t *testing.T, + w db.KeyValueWriter, + addr felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractClassHashHistory(w, &addr, &oldValue, block)) +} + +func seedDeprecatedNonceHistory( + t *testing.T, + w db.KeyValueWriter, + addr felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractNonceHistory(w, &addr, &oldValue, block)) +} + +func seedDeprecatedStorageHistory( + t *testing.T, + w db.KeyValueWriter, + addr, slot felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractStorageHistory(w, &addr, &slot, &oldValue, block)) +} + +func seedDeprecatedStorageTrie( + t *testing.T, + memDB db.KeyValueStore, + addr felt.Felt, + leaves map[felt.Felt]felt.Felt, +) { + t.Helper() + //nolint:staticcheck // Necessary for old state + txn := memDB.NewIndexedBatch() + tr, err := trie.NewTriePedersen( + txn, + db.ContractStorage.Key(addr.Marshal()), + deprecatedstate.ContractStorageTrieHeight, + ) + require.NoError(t, err) + for k, v := range leaves { + _, err := tr.Put(&k, &v) + require.NoError(t, err) + } + require.NoError(t, tr.Commit()) + require.NoError(t, txn.Write()) +} + +func bucketKeyCount(t *testing.T, r db.KeyValueReader, bucket db.Bucket) int { + t.Helper() + it, err := r.NewIterator(bucket.Key(), true) + require.NoError(t, err) + defer it.Close() + count := 0 + for valid := it.First(); valid; valid = it.Next() { + count++ + } + return count +} + +func TestMigrate_EmptyDB(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) +} + +func TestMigrate_ClassHash_DeployOnly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + seedContract(t, memDB, addr, felt.Zero, classHash) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, classHash, got) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_ClassHash_Reclassed(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + deployHeight := uint64(100) + replaceBlock := uint64(300) + + seedContract(t, memDB, addr, felt.Zero, replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, replaceBlock, deployClass) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, deployHeight) + require.NoError(t, err) + assert.Equal(t, deployClass, got, "deploy entry preserved") + + got, err = reader.ContractClassHashAt(&addr, replaceBlock) + require.NoError(t, err) + assert.Equal(t, replacedClass, got, "replace block has post-update value (head)") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Nonce_Updated(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + headNonce := felt.FromUint64[felt.Felt](66) + + seedContract(t, memDB, addr, headNonce, classHash) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + seedDeprecatedNonceHistory(t, memDB, addr, 300, felt.FromUint64[felt.Felt](1)) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractNonceAt(&addr, 200) + require.NoError(t, err) + assert.Equal( + t, + felt.FromUint64[felt.Felt](1), + got, + "value installed at 200 = next entry's old value", + ) + + got, err = reader.ContractNonceAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, headNonce, got, "value installed at 300 = head") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractNonceHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Nonce_DeployOnly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](170)) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.ContractNonceHistory), + "no nonce entry expected when never updated", + ) +} + +func TestMigrate_Storage_MultiWrite(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + firstVal := felt.FromUint64[felt.Felt](5) + secondVal := felt.FromUint64[felt.Felt](12) + headVal := felt.FromUint64[felt.Felt](7) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, firstVal) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 300, secondVal) + + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: headVal}) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, firstVal, got, "value installed at 100") + + got, err = reader.ContractStorageAt(&addr, &slot, 200) + require.NoError(t, err) + assert.Equal(t, secondVal, got, "value installed at 200") + + got, err = reader.ContractStorageAt(&addr, &slot, 300) + require.NoError(t, err) + assert.Equal(t, headVal, got, "value installed at 300 = head from trie") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Storage_SingleWrite(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + v := felt.FromUint64[felt.Felt](9) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: v}) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, v, got) + + assert.Equal(t, 0, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_Idempotent(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + + seedContract(t, memDB, addr, felt.FromUint64[felt.Felt](66), replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, 300, deployClass) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + + for range 3 { + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + } + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, deployClass, got) + + got, err = reader.ContractClassHashAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, replacedClass, got) + + got, err = reader.ContractNonceAt(&addr, 200) + require.NoError(t, err) + assert.Equal(t, felt.FromUint64[felt.Felt](66), got) +} + +func TestMigrate_ClassHash_ResumeFromPartial(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + deployHeight := uint64(100) + replaceBlock := uint64(300) + + seedContract(t, memDB, addr, felt.Zero, replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, replaceBlock, deployClass) + + require.NoError(t, state.WriteClassHashHistory(memDB, &addr, deployHeight, &deployClass)) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, deployHeight) + require.NoError(t, err) + assert.Equal(t, deployClass, got, "deploy entry must survive partial-resume re-run") + + got, err = reader.ContractClassHashAt(&addr, replaceBlock) + require.NoError(t, err) + assert.Equal(t, replacedClass, got, "shifted entry at replace block") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty after migration completes", + ) +} + +func TestMigrate_Storage_ZeroedSlotHasNoLeaf(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + zeroedSlot := felt.FromUint64[felt.Felt](170) + keptSlot := felt.FromUint64[felt.Felt](187) + keptHead := felt.FromUint64[felt.Felt](9) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](204)) + + seedDeprecatedStorageHistory(t, memDB, addr, zeroedSlot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, zeroedSlot, 200, felt.FromUint64[felt.Felt](5)) + seedDeprecatedStorageHistory(t, memDB, addr, keptSlot, 100, felt.Zero) + + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{keptSlot: keptHead}) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &keptSlot, 100) + require.NoError(t, err) + assert.Equal(t, keptHead, got, "kept slot last entry = head") + + got, err = reader.ContractStorageAt(&addr, &zeroedSlot, 200) + require.NoError(t, err) + assert.Equal(t, felt.Zero, got, "zeroed slot last entry = Zero (no leaf in trie)") + + got, err = reader.ContractStorageAt(&addr, &zeroedSlot, 100) + require.NoError(t, err) + assert.Equal(t, felt.FromUint64[felt.Felt](5), got, "shift-up at block 100 = next-entry's value") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated fully drained", + ) +} + +func TestMigrate_Storage_ManyEntries(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + headValues := map[felt.Felt]felt.Felt{} + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + const ( + numSlots = 50 + numEntriesPerSlot = 20 + startBlock = uint64(100) + ) + + for s := uint64(1); s <= numSlots; s++ { + slot := felt.NewFromUint64[felt.Felt](s) + headVal := felt.NewFromUint64[felt.Felt](1000000 + s) + headValues[*slot] = *headVal + + for b := range uint64(numEntriesPerSlot) { + block := startBlock + b + oldVal := felt.Zero + if b > 0 { + oldVal = *felt.NewFromUint64[felt.Felt](s*10000 + b) + } + seedDeprecatedStorageHistory(t, memDB, addr, *slot, block, oldVal) + } + } + seedDeprecatedStorageTrie(t, memDB, addr, headValues) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated storage history must be fully drained", + ) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for slot, head := range headValues { + lastBlock := startBlock + numEntriesPerSlot - 1 + got, err := reader.ContractStorageAt(&addr, &slot, lastBlock) + require.NoErrorf(t, err, "read storage failed for slot %v", &slot) + assert.Equalf(t, head, got, "last entry must equal head for slot %v", &slot) + } +} + +func TestMigrate_Storage_MultiAddress(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addrs := []felt.Felt{ + felt.FromUint64[felt.Felt](1), + felt.FromUint64[felt.Felt](2), + felt.FromUint64[felt.Felt](3), + } + slots := []felt.Felt{ + felt.FromUint64[felt.Felt](100), + felt.FromUint64[felt.Felt](200), + } + + for i := range addrs { + seedContract(t, memDB, addrs[i], felt.Zero, felt.FromUint64[felt.Felt](uint64(170+i))) + for _, slot := range slots { + seedDeprecatedStorageHistory(t, memDB, addrs[i], slot, 100, felt.Zero) + seedDeprecatedStorageHistory( + t, memDB, addrs[i], slot, 200, + felt.FromUint64[felt.Felt](uint64(10+i)), + ) + } + seedDeprecatedStorageTrie(t, memDB, addrs[i], map[felt.Felt]felt.Felt{ + slots[0]: felt.FromUint64[felt.Felt](uint64(1000 + i*10)), + slots[1]: felt.FromUint64[felt.Felt](uint64(1000 + i*10 + 1)), + }) + } + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for i := range addrs { + for j, slot := range slots { + got, err := reader.ContractStorageAt(&addrs[i], &slot, 100) + require.NoErrorf(t, err, "addr %d slot %d block 100", i, j) + assert.Equalf( + t, felt.FromUint64[felt.Felt](uint64(10+i)), got, + "addr %d slot %d block 100 = next-entry's value", i, j, + ) + got, err = reader.ContractStorageAt(&addrs[i], &slot, 200) + require.NoErrorf(t, err, "addr %d slot %d block 200", i, j) + assert.Equalf( + t, felt.FromUint64[felt.Felt](uint64(1000+i*10+j)), got, + "addr %d slot %d block 200 = head from trie", i, j, + ) + } + } + + assert.Zero( + t, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated must be empty across all addresses", + ) +} + +func TestMigrate_CancelledContext_ResumesCleanly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](170)) + seedDeprecatedClassHashHistory(t, memDB, addr, 200, felt.FromUint64[felt.Felt](42)) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + slot := felt.FromUint64[felt.Felt](5) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, felt.Zero) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{ + slot: felt.FromUint64[felt.Felt](9), + }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + res, err := statehistory.Migrator{}.Migrate(ctx, memDB, &networks.Sepolia, log.NewNopZapLogger()) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + require.NotNil(t, res, "shouldRerun sentinel must not be nil") + require.Empty(t, res, "shouldRerun is a non-nil empty slice") + + res, err = statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory)) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractNonceHistory)) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_Storage_ResumeFromPartial(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + firstVal := felt.FromUint64[felt.Felt](5) + secondVal := felt.FromUint64[felt.Felt](12) + headVal := felt.FromUint64[felt.Felt](7) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, firstVal) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 300, secondVal) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: headVal}) + + require.NoError(t, state.WriteStorageHistory(memDB, &addr, &slot, 100, &firstVal)) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, firstVal, got, "preserved deploy entry after partial-resume") + + got, err = reader.ContractStorageAt(&addr, &slot, 200) + require.NoError(t, err) + assert.Equal(t, secondVal, got) + + got, err = reader.ContractStorageAt(&addr, &slot, 300) + require.NoError(t, err) + assert.Equal(t, headVal, got) + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_AddressWithEmptyHistoryForOnePhase(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + deployClass := felt.FromUint64[felt.Felt](42) + + seedContract(t, memDB, addr, felt.Zero, classHash) + seedDeprecatedClassHashHistory(t, memDB, addr, 300, deployClass) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, deployClass, got) + + got, err = reader.ContractClassHashAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, classHash, got) + + assert.Zero( + t, bucketKeyCount(t, memDB, db.ContractNonceHistory), + "nonce history empty when phase no-ops", + ) + assert.Zero( + t, bucketKeyCount(t, memDB, db.ContractStorageHistory), + "storage history empty when phase no-ops", + ) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory)) +} + +func TestMigrate_Storage_InterleavedZeroedSlots(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot1 := felt.FromUint64[felt.Felt](100) // zeroed + slot2 := felt.FromUint64[felt.Felt](200) // kept + slot3 := felt.FromUint64[felt.Felt](300) // zeroed + slot4 := felt.FromUint64[felt.Felt](400) // kept + head2 := felt.FromUint64[felt.Felt](22) + head4 := felt.FromUint64[felt.Felt](44) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](204)) + + for _, slot := range []felt.Felt{slot1, slot2, slot3, slot4} { + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + } + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{ + slot2: head2, + slot4: head4, + }) + + res, err := statehistory.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for _, slot := range []felt.Felt{slot1, slot3} { + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoErrorf(t, err, "zeroed slot %v", &slot) + assert.Equalf(t, felt.Zero, got, "zeroed slot %v has no trie leaf", &slot) + } + got, err := reader.ContractStorageAt(&addr, &slot2, 100) + require.NoError(t, err) + assert.Equal(t, head2, got, "slot2 kept its head value") + got, err = reader.ContractStorageAt(&addr, &slot4, 100) + require.NoError(t, err) + assert.Equal(t, head4, got, "slot4 kept its head value") + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} diff --git a/migration/statehistory/nonce_ingestor.go b/migration/statehistory/nonce_ingestor.go new file mode 100644 index 0000000000..9e20210a56 --- /dev/null +++ b/migration/statehistory/nonce_ingestor.go @@ -0,0 +1,99 @@ +package statehistory + +import ( + "context" + "fmt" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type nonceIngestor struct { + baseIngestor +} + +var _ pipeline.State[*felt.Felt, task] = (*nonceIngestor)(nil) + +func newNonceIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *nonceIngestor { + return &nonceIngestor{baseIngestor: newBaseIngestor(ctx, sem, database)} +} + +// Run migrates the nonce history of a single contract. +// +// Legend: Bₙ = block at which the n-th nonce change happened. Nₙ = the +// nonce active *after* Bₙ; the deploy nonce is always 0 and is *not* +// written to the deprecated history — its presence is implicit in the +// pre-value of the first change entry. The new layout stores the same +// number of entries, just shifted to post-values: +// +// block │ deprecated │ new +// ───────┼────────────────┼────── +// B₁ │ 0 │ N₁ +// B₂ │ N₁ │ N₂ +// B₃ │ N₂ │ N₃ +// ───────┼────────────────┼────── +// > B₃ │ contract │ N₃ (last entry — self-contained) +// .Nonce ← deprecated must reach into the Contract +// record for any block past the last change +// +// Contracts with no deprecated nonce history are skipped. Deprecated rows +// are deleted at the end of the run. +func (i *nonceIngestor) Run(index int, addr *felt.Felt, outputs chan<- task) error { + t := &i.tasks[index] + deprecatedPrefix := db.DeprecatedContractNonceHistoryKey(addr) + + depIt, err := i.database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("nonce: open deprecated iter(%s): %w", addr, err) + } + defer depIt.Close() + if !depIt.First() { + return nil + } + + contract, err := state.GetContract(i.database, addr) + if err != nil { + return fmt.Errorf("nonce: GetContract(%s): %w", addr, err) + } + + for { + block, err := parseBlockKey(depIt.Key(), deprecatedPrefix) + if err != nil { + return fmt.Errorf("nonce(%s): %w", addr, err) + } + hasNext := depIt.Next() + historyValue := contract.Nonce + if hasNext { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("nonce(%s): %w", addr, err) + } + historyValue = felt.FromBytes[felt.Felt](rawValue) + } + err = state.WriteNonceHistory(t.batch, addr, block, &historyValue) + if err != nil { + return err + } + t.entryCount++ + if err := i.flush(t, outputs); err != nil { + return err + } + if !hasNext { + break + } + } + + if err := t.batch.DeleteRange(deprecatedPrefix, dbutils.UpperBound(deprecatedPrefix)); err != nil { + return fmt.Errorf("nonce: DeleteRange deprecated(%s): %w", addr, err) + } + t.completedAddrs++ + return nil +} diff --git a/migration/statehistory/parse.go b/migration/statehistory/parse.go new file mode 100644 index 0000000000..196c9b6150 --- /dev/null +++ b/migration/statehistory/parse.go @@ -0,0 +1,28 @@ +package statehistory + +import ( + "encoding/binary" + "fmt" + + "github.com/NethermindEth/juno/core/felt" +) + +func parseBlockKey(key, prefix []byte) (uint64, error) { + if len(key) != len(prefix)+8 { + return 0, fmt.Errorf("malformed block-keyed entry: key len %d, want %d", len(key), len(prefix)+8) + } + return binary.BigEndian.Uint64(key[len(prefix):]), nil +} + +func parseStorageKey(key, prefix []byte) (felt.Felt, uint64, error) { + if len(key) != len(prefix)+felt.Bytes+8 { + return felt.Felt{}, 0, fmt.Errorf( + "malformed storage-history entry: key len %d, want %d", + len(key), + len(prefix)+felt.Bytes+8, + ) + } + slot := felt.FromBytes[felt.Felt](key[len(prefix) : len(prefix)+felt.Bytes]) + block := binary.BigEndian.Uint64(key[len(prefix)+felt.Bytes:]) + return slot, block, nil +} diff --git a/migration/statehistory/storage_ingestor.go b/migration/statehistory/storage_ingestor.go new file mode 100644 index 0000000000..e8a29cf91f --- /dev/null +++ b/migration/statehistory/storage_ingestor.go @@ -0,0 +1,219 @@ +package statehistory + +import ( + "bytes" + "context" + "fmt" + + "github.com/NethermindEth/juno/core/deprecatedstate" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type storageIngestor struct { + baseIngestor +} + +var _ pipeline.State[*felt.Felt, task] = (*storageIngestor)(nil) + +func newStorageIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *storageIngestor { + return &storageIngestor{baseIngestor: newBaseIngestor(ctx, sem, database)} +} + +// Run migrates the per-slot storage history of a single contract. +// +// Legend: Bₙ = block at which the n-th change to a slot happened. preXₙ +// is the value of slot X before Bₙ (= what the deprecated layout stores +// at [X, Bₙ]); headX is the slot's current value, read from the head +// storage trie. The deprecated layout writes nothing at deploy — the +// pre-deploy value (0) is implicit in the first change entry. The new +// layout stores the same number of entries per slot, just shifted to +// post-values. For one slot: +// +// block │ deprecated[slotA] │ new[slotA] +// ───────┼───────────────────┼─────────── +// B₁ │ 0 │ preA₁ +// B₂ │ preA₁ │ preA₂ +// B₃ │ preA₂ │ headA +// ───────┼───────────────────┼─────────── +// > B₃ │ head trie leaf │ headA (last entry — self-contained) +// for slotA ← deprecated must reach into the head +// storage trie for any block past the +// last change +// +// For each deprecated entry the post-value comes from one of: +// +// - the *next* deprecated entry, when it's on the same slot — its stored +// pre-value is exactly this block's post-value; +// - the head storage trie leaf for that slot, when there is no next +// deprecated entry on the same slot; +// - felt.Zero, when there is no head leaf for the slot (the slot was +// eventually zeroed out and dropped from the trie). +// +// Both the deprecated history and the head trie are sorted by raw slot +// bytes, so the ingestor walks them in lockstep — the head-trie iterator +// advances only when its current leaf matches the slot just resolved: +// +// deprecated history head trie new history +// ───────────────────── ───────────── ───────────────────────── +// [slotA, B₁..B₃] ──→ [slotA] = headA [slotA, B₁..B₃] last uses headA +// [slotB, B₁..B₂] ──→ (no leaf) [slotB, B₁..B₂] last uses 0 +// ← slotB was set (slotB was zeroed +// and later zeroed at B₂) +// [slotC, B₁] ──→ [slotC] = headC [slotC, B₁] = headC +// +// Contracts with no deprecated storage history are skipped; deprecated +// rows are deleted at the end of the run via DeleteRange. +func (i *storageIngestor) Run(index int, addr *felt.Felt, outputs chan<- task) error { + t := &i.tasks[index] + + addrBytes := addr.Marshal() + deprecatedPrefix := db.DeprecatedContractStorageHistory.Key(addrBytes) + + deprecatedHistoryIt, err := i.database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("storage: open deprecated iter(%s): %w", addr, err) + } + defer deprecatedHistoryIt.Close() + if !deprecatedHistoryIt.First() { + return nil + } + + leafPrefix := db.ContractStorage.Key(addrBytes) + leafPrefix = append(leafPrefix, deprecatedstate.ContractStorageTrieHeight) + + headStorageTrieIt, err := i.database.NewIterator(leafPrefix, true) + if err != nil { + return fmt.Errorf("storage: open leaf iter(%s): %w", addr, err) + } + defer headStorageTrieIt.Close() + leafValid := headStorageTrieIt.First() + + for { + slot, block, err := parseStorageKey(deprecatedHistoryIt.Key(), deprecatedPrefix) + if err != nil { + return fmt.Errorf("storage: parse key(%s): %w", addr, err) + } + + successorSlot, successorValue, hasNext, err := peekSuccessor( + deprecatedHistoryIt, + deprecatedPrefix, + addr, + ) + if err != nil { + return err + } + + historyValue, advanced, err := resolveHistoryValue( + headStorageTrieIt, + leafPrefix, + addr, + &slot, + hasNext, + successorSlot, + successorValue, + leafValid, + ) + if err != nil { + return err + } + if advanced { + leafValid = headStorageTrieIt.Next() + } + + err = state.WriteStorageHistory( + t.batch, + addr, + &slot, + block, + &historyValue, + ) + if err != nil { + return err + } + t.entryCount++ + if err := i.flush(t, outputs); err != nil { + return err + } + + if !hasNext { + break + } + } + + if err := t.batch.DeleteRange(deprecatedPrefix, dbutils.UpperBound(deprecatedPrefix)); err != nil { + return fmt.Errorf("storage: DeleteRange deprecated(%s): %w", addr, err) + } + t.completedAddrs++ + return nil +} + +// peekSuccessor advances the deprecated-history iterator. If a next entry +// exists, returns its (slot, value, true); otherwise returns (_, _, false). +func peekSuccessor( + it db.Iterator, + prefix []byte, + addr *felt.Felt, +) (felt.Felt, felt.Felt, bool, error) { + if !it.Next() { + return felt.Felt{}, felt.Felt{}, false, nil + } + slot, _, err := parseStorageKey(it.Key(), prefix) + if err != nil { + return felt.Felt{}, felt.Felt{}, false, fmt.Errorf( + "storage: parse successor key(%s): %w", addr, err, + ) + } + rawValue, err := it.Value() + if err != nil { + return felt.Felt{}, felt.Felt{}, false, fmt.Errorf( + "storage: read successor value(%s, slot=%s): %w", addr, &slot, err, + ) + } + return slot, felt.FromBytes[felt.Felt](rawValue), true, nil +} + +// resolveHistoryValue decides the value to install at the current entry: the +// successor's value when both are on the same slot, otherwise the head-trie +// leaf (when the iterator is positioned on this slot), otherwise zero. Returns +// advanced=true when the head-trie iterator should be advanced by the caller. +func resolveHistoryValue( + headIt db.Iterator, + leafPrefix []byte, + addr, slot *felt.Felt, + hasSuccessor bool, + successorSlot, successorValue felt.Felt, + leafValid bool, +) (value felt.Felt, advanced bool, err error) { + if hasSuccessor && successorSlot == *slot { + return successorValue, false, nil + } + if !leafValid { + return felt.Felt{}, false, nil + } + if !bytes.Equal(headIt.Key()[len(leafPrefix):], slot.Marshal()) { + return felt.Felt{}, false, nil + } + raw, err := headIt.Value() + if err != nil { + return felt.Felt{}, false, fmt.Errorf( + "storage: leaf(%s, slot=%s): %w", addr, slot, err, + ) + } + var node trie.Node + if err := node.UnmarshalBinary(raw); err != nil { + return felt.Felt{}, false, fmt.Errorf( + "storage: decode leaf(%s, slot=%s): %w", addr, slot, err, + ) + } + return *node.Value, true, nil +} diff --git a/migration/trie/codec.go b/migration/trie/codec.go new file mode 100644 index 0000000000..33b12253c3 --- /dev/null +++ b/migration/trie/codec.go @@ -0,0 +1,123 @@ +package trie + +import ( + "github.com/NethermindEth/juno/core/crypto" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/core/trie2/trieutils" + "github.com/NethermindEth/juno/db" +) + +const ( + binaryNodeTag byte = 0x01 + edgeNodeTag byte = 0x02 + + valueNodeBlobSize = felt.Bytes + binaryNodeBlobSize = 1 + 2*felt.Bytes + edgeNodeMinSize = 1 + felt.Bytes + 1 + edgeNodeMaxSize = 1 + felt.Bytes + trieutils.MaxBitArraySize + maxNodeKeySize = 1 + felt.Bytes + 1 + trieutils.MaxBitArraySize + + nonLeafByte byte = 1 + leafByte byte = 2 +) + +// +// --- Encoding related helpers --- +// + +// encodeBinaryNode writes a binary-node blob into dst. +// dst must have at least binaryNodeBlobSize bytes of capacity. +func encodeBinaryNode(dst []byte, leftEdgeHash, rightEdgeHash *felt.Felt) int { + dst[0] = binaryNodeTag + lb := leftEdgeHash.Bytes() + rb := rightEdgeHash.Bytes() + copy(dst[1:], lb[:]) + copy(dst[1+felt.Bytes:], rb[:]) + return binaryNodeBlobSize +} + +// encodeEdgeNode writes an edge-node blob into dst. +// dst must have at least edgeNodeMaxSize bytes of capacity. +func encodeEdgeNode(dst []byte, childHash *felt.Felt, pathSeg *trieutils.Path) int { + encoded := pathSeg.EncodedBytes() + dst[0] = edgeNodeTag + h := childHash.Bytes() + copy(dst[1:], h[:]) + copy(dst[1+felt.Bytes:], encoded) + return 1 + felt.Bytes + len(encoded) +} + +func encodeNodeKey( + dst []byte, + bucket db.Bucket, + owner *felt.Address, + path *trieutils.Path, + isLeaf bool, +) int { + n := 0 + dst[n] = byte(bucket) + n++ + + if !felt.IsZero(owner) { + ownerBytes := owner.Bytes() + copy(dst[n:], ownerBytes[:]) + n += 32 + } + + if isLeaf { + dst[n] = leafByte + } else { + dst[n] = nonLeafByte + } + n++ + + pathBytes := path.EncodedBytes() + copy(dst[n:], pathBytes) + n += len(pathBytes) + + return n +} + +// +// --- Path related helpers --- +// + +func parseDeprecatedPath(val []byte) (trie.BitArray, error) { + if len(val) == 0 { + return trie.BitArray{}, nil + } + var ba trie.BitArray + if err := ba.UnmarshalBinary(val); err != nil { + return trie.BitArray{}, err + } + return ba, nil +} + +func toNewPath(old *trie.BitArray) trieutils.Path { + b := old.Bytes() + var p trieutils.Path + p.SetBytes(old.Len(), b[:]) + return p +} + +func compressedSegment(childFullPath *trie.BitArray, parentLen uint8) trieutils.Path { + var seg trie.BitArray + seg.LSBs(childFullPath, parentLen+1) + return toNewPath(&seg) +} + +// +// --- Hash related helpers --- +// + +func computeEdgeHash(childHash *felt.Felt, path *trieutils.Path, hashFn crypto.HashFn) felt.Felt { + if path.Len() == 0 { + return *childHash + } + pathFelt := path.Felt() + h := hashFn(childHash, &pathFelt) + lenFelt := felt.FromUint64[felt.Felt](uint64(path.Len())) + h.Add(&h, &lenFelt) + return h +} diff --git a/migration/trie/committer.go b/migration/trie/committer.go new file mode 100644 index 0000000000..4086e3ff7e --- /dev/null +++ b/migration/trie/committer.go @@ -0,0 +1,39 @@ +package trie + +import ( + "fmt" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" +) + +type committer struct { + batchSem semaphore.ResourceSemaphore[db.Batch] + counter counter +} + +func newCommitter( + logger log.StructuredLogger, + batchSem semaphore.ResourceSemaphore[db.Batch], + allTries, allNodes uint64, +) *committer { + return &committer{ + batchSem: batchSem, + counter: newCounter(logger, timeLogRate, allTries, allNodes), + } +} + +func (c *committer) Run(_ int, t task, _ chan<- struct{}) error { + byteSize := uint64(t.batch.Size()) + if err := t.batch.Write(); err != nil { + return fmt.Errorf("trie migration: batch write failed: %w", err) + } + c.counter.log(byteSize, t.tries, t.nodes) + c.batchSem.Put() + return nil +} + +func (c *committer) Done(int, chan<- struct{}) error { + return nil +} diff --git a/migration/trie/counter.go b/migration/trie/counter.go new file mode 100644 index 0000000000..88cdd47d19 --- /dev/null +++ b/migration/trie/counter.go @@ -0,0 +1,75 @@ +package trie + +import ( + "fmt" + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type counter struct { + logger log.StructuredLogger + timeLogRate time.Duration + migrationStart time.Time + allTries uint64 + allNodes uint64 + totalTries uint64 + totalNodes uint64 + start time.Time + size uint64 + tries uint64 + nodes uint64 +} + +func newCounter( + logger log.StructuredLogger, + timeLogRate time.Duration, + allTries, allNodes uint64, +) counter { + now := time.Now() + return counter{ + logger: logger, + timeLogRate: timeLogRate, + migrationStart: now, + start: now, + allTries: allTries, + allNodes: allNodes, + } +} + +func (c *counter) log(byteSize uint64, tries, nodes int) { + c.size += byteSize + c.tries += uint64(tries) + c.nodes += uint64(nodes) + c.totalTries += uint64(tries) + c.totalNodes += uint64(nodes) + + now := time.Now() + elapsed := now.Sub(c.start).Seconds() + if elapsed > c.timeLogRate.Seconds() { + mbs := float64(c.size) / float64(db.Megabyte) + c.logger.Info( + "write speed", + zap.Float64("MB", mbs), + zap.Float64("MB/s", mbs/elapsed), + zap.Float64("nodes/s", float64(c.nodes)/elapsed), + zap.Float64("tries/s", float64(c.tries)/elapsed), + zap.String("tries_processed", fmtPercent(c.totalTries, c.allTries)), + zap.String("nodes_processed", fmtPercent(c.totalNodes, c.allNodes)), + zap.Float64("totalTime", now.Sub(c.migrationStart).Seconds()), + ) + c.start = now + c.size = 0 + c.tries = 0 + c.nodes = 0 + } +} + +func fmtPercent(done, total uint64) string { + if total == 0 { + return "100.0%" + } + return fmt.Sprintf("%.1f%%", 100.0*float64(done)/float64(total)) +} diff --git a/migration/trie/hashpool.go b/migration/trie/hashpool.go new file mode 100644 index 0000000000..ca08b338c1 --- /dev/null +++ b/migration/trie/hashpool.go @@ -0,0 +1,61 @@ +package trie + +import ( + "sync" + + "github.com/NethermindEth/juno/core/crypto" + "github.com/NethermindEth/juno/core/felt" +) + +type hashWork struct { + hashFn crypto.HashFn + jobs []edgeHashJob + results []felt.Felt + wg *sync.WaitGroup +} + +type hashWorkerPool struct { + work chan hashWork + n int +} + +func newHashWorkerPool() *hashWorkerPool { + p := &hashWorkerPool{ + work: make(chan hashWork, IngestorCount*2), + n: IngestorCount, + } + for range IngestorCount { + go func() { + for w := range p.work { + for i := range w.jobs { + w.results[2*i] = computeEdgeHash(&w.jobs[i].leftChildHash, &w.jobs[i].leftSeg, w.hashFn) + w.results[2*i+1] = computeEdgeHash(&w.jobs[i].rightChildHash, &w.jobs[i].rightSeg, w.hashFn) + } + w.wg.Done() + } + }() + } + return p +} + +func (p *hashWorkerPool) submit( + hashFn crypto.HashFn, + jobs []edgeHashJob, + results []felt.Felt, +) <-chan struct{} { + done := make(chan struct{}) + go func() { + var wg sync.WaitGroup + chunkSize := max(1, (len(jobs)+p.n-1)/p.n) + for i := 0; i < len(jobs); i += chunkSize { + end := min(i+chunkSize, len(jobs)) + wg.Add(1) + p.work <- hashWork{hashFn, jobs[i:end], results[2*i : 2*end], &wg} + } + wg.Wait() + close(done) + }() + return done +} + +func (p *hashWorkerPool) close() { close(p.work) } diff --git a/migration/trie/hashworker.go b/migration/trie/hashworker.go new file mode 100644 index 0000000000..0fafa08920 --- /dev/null +++ b/migration/trie/hashworker.go @@ -0,0 +1,166 @@ +package trie + +import ( + "github.com/NethermindEth/juno/core/crypto" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/trie2/trieutils" + "github.com/NethermindEth/juno/db" +) + +type edgeHashJob struct { + leftChildHash, rightChildHash felt.Felt + leftSeg, rightSeg trieutils.Path + parentPath trieutils.Path +} + +type inFlightBatch struct { + jobs []edgeHashJob + results []felt.Felt + done <-chan struct{} +} + +type hashScheduler struct { + hashFn crypto.HashFn + parallel bool + bucket db.Bucket + owner felt.Address + pool *hashWorkerPool + + jobs []edgeHashJob + altJobs []edgeHashJob + results []felt.Felt + inFlightBuf inFlightBatch + hasInFlight bool +} + +func newHashScheduler( + hashFn crypto.HashFn, + parallel bool, + bucket db.Bucket, + owner felt.Address, + pool *hashWorkerPool, +) *hashScheduler { + s := &hashScheduler{ + hashFn: hashFn, + parallel: parallel, + bucket: bucket, + owner: owner, + pool: pool, + } + if parallel { + s.jobs = make([]edgeHashJob, 0, parallelHashBatchSize) + s.altJobs = make([]edgeHashJob, 0, parallelHashBatchSize) + s.results = make([]felt.Felt, 2*parallelHashBatchSize) + } + return s +} + +func (s *hashScheduler) schedule(job *edgeHashJob, batch db.Batch) error { + if !s.parallel { + leftEdge := computeEdgeHash(&job.leftChildHash, &job.leftSeg, s.hashFn) + rightEdge := computeEdgeHash(&job.rightChildHash, &job.rightSeg, s.hashFn) + return s.writeBinaryAndEdges(job, &leftEdge, &rightEdge, batch) + } + s.jobs = append(s.jobs, *job) + if len(s.jobs) >= parallelHashBatchSize { + return s.fire(batch) + } + return nil +} + +func (s *hashScheduler) fire(batch db.Batch) error { + if err := s.drainInFlight(batch); err != nil { + return err + } + results := s.results[:2*len(s.jobs)] + s.inFlightBuf = inFlightBatch{ + jobs: s.jobs, + results: results, + done: s.pool.submit(s.hashFn, s.jobs, results), + } + s.hasInFlight = true + s.jobs, s.altJobs = s.altJobs[:0], s.jobs + return nil +} + +func (s *hashScheduler) drainInFlight(batch db.Batch) error { + if !s.hasInFlight { + return nil + } + <-s.inFlightBuf.done + for i := range s.inFlightBuf.jobs { + err := s.writeBinaryAndEdges( + &s.inFlightBuf.jobs[i], + &s.inFlightBuf.results[2*i], + &s.inFlightBuf.results[2*i+1], + batch, + ) + if err != nil { + return err + } + } + s.hasInFlight = false + return nil +} + +func (s *hashScheduler) sync(batch db.Batch) error { + if !s.parallel { + return nil + } + if err := s.drainInFlight(batch); err != nil { + return err + } + if len(s.jobs) > 0 { + results := s.results[:2*len(s.jobs)] + <-s.pool.submit(s.hashFn, s.jobs, results) + for i := range s.jobs { + if err := s.writeBinaryAndEdges( + &s.jobs[i], + &results[2*i], + &results[2*i+1], + batch, + ); err != nil { + return err + } + } + s.jobs = s.jobs[:0] + } + return nil +} + +func (s *hashScheduler) writeBinaryAndEdges( + job *edgeHashJob, + leftEdge, + rightEdge *felt.Felt, + batch db.Batch, +) error { + var buf [maxNodeKeySize + binaryNodeBlobSize]byte + keyLen := encodeNodeKey(buf[:], s.bucket, &s.owner, &job.parentPath, false) + blobLen := encodeBinaryNode(buf[keyLen:], leftEdge, rightEdge) + if err := batch.Put(buf[:keyLen], buf[keyLen:keyLen+blobLen]); err != nil { + return err + } + + if err := s.writeEdge(&job.parentPath, 0, &job.leftChildHash, &job.leftSeg, batch); err != nil { + return err + } + return s.writeEdge(&job.parentPath, 1, &job.rightChildHash, &job.rightSeg, batch) +} + +func (s *hashScheduler) writeEdge( + parentPath *trieutils.Path, + bit uint8, + childHash *felt.Felt, + seg *trieutils.Path, + batch db.Batch, +) error { + if seg.Len() == 0 { + return nil + } + var edgePath trieutils.Path + edgePath.AppendBit(parentPath, bit) + var buf [maxNodeKeySize + edgeNodeMaxSize]byte + keyLen := encodeNodeKey(buf[:], s.bucket, &s.owner, &edgePath, false) + blobLen := encodeEdgeNode(buf[keyLen:], childHash, seg) + return batch.Put(buf[:keyLen], buf[keyLen:keyLen+blobLen]) +} diff --git a/migration/trie/ingestor.go b/migration/trie/ingestor.go new file mode 100644 index 0000000000..76d85f56e0 --- /dev/null +++ b/migration/trie/ingestor.go @@ -0,0 +1,364 @@ +package trie + +import ( + "context" + "fmt" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/core/trie2/trieutils" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type task struct { + batch db.Batch + tries int + nodes int +} + +type ingestor struct { + ctx context.Context + database db.KeyValueReader + batchSemaphore semaphore.ResourceSemaphore[db.Batch] + pool *hashWorkerPool + tasks [IngestorCount]task +} + +func newIngestor( + ctx context.Context, + database db.KeyValueReader, + batchSemaphore semaphore.ResourceSemaphore[db.Batch], + pool *hashWorkerPool, +) *ingestor { + in := &ingestor{ + ctx: ctx, + database: database, + batchSemaphore: batchSemaphore, + pool: pool, + } + for i := range IngestorCount { + in.tasks[i].batch = batchSemaphore.GetBlocking() + } + return in +} + +func (i *ingestor) Run(index int, desc TrieDesc, outputs chan<- task) error { + done, err := rootProcessed(i.database, desc.TrieBucket, &desc.Owner) + if err != nil { + return fmt.Errorf("rootProcessed(%v, %x): %w", desc.TrieBucket, desc.Owner, err) + } + + t := &i.tasks[index] + if done { + // Already migrated — credit the counts so progress display reaches 100% on resume. + t.tries++ + t.nodes += desc.NodeCount + return i.flush(t, outputs) + } + + if err := i.migrateTrie(t, desc, outputs); err != nil { + return err + } + + t.tries++ + return i.flush(t, outputs) +} + +func (i *ingestor) Done(index int, outputs chan<- task) error { + select { + case <-i.ctx.Done(): + return i.ctx.Err() + case outputs <- i.tasks[index]: + } + return nil +} + +func (i *ingestor) flush(t *task, outputs chan<- task) error { + if t.batch.Size() < targetBatchByteSize { + return nil + } + select { + case <-i.ctx.Done(): + return i.ctx.Err() + case outputs <- task{batch: t.batch, tries: t.tries, nodes: t.nodes}: + } + t.tries = 0 + t.nodes = 0 + t.batch = i.batchSemaphore.GetBlocking() + return nil +} + +// migrateTrie reads one deprecated trie and writes its equivalent into the +// new layout. Three things differ between the formats: how nodes are keyed +// on disk, how nodes are encoded, and how path compression is expressed. +// +// On-disk keying +// -------------- +// Both layouts share a common prefix; only the suffix differs: +// +// common (both) suffix +// ───────────── ───────────────────────────────────────── +// bucket [|| owner] → path-length-byte || path-bytes (deprecated) +// → nodeType-byte || path-length-byte || path-bytes +// (new) +// +// The owner is present only for storage tries. The new layout's extra +// nodeType byte splits leaves from internal nodes into two index slices +// within the same bucket — the new-state lookups use this to short-circuit +// between leaf reads and internal-node traversals. +// +// Node encoding +// ------------- +// Both layouts are byte streams. The deprecated format keeps each node +// self-contained — internal binary nodes embed the compressed paths to +// their children inline: +// +// leaf value +// binary value || left-child-path || right-child-path +// [|| left-hash || right-hash, optional cache, ignored here] +// +// "value" is the node's own Starknet trie hash, or the stored value when +// the node is a leaf. +// +// The new format gives each node an explicit type tag and moves path +// compression into separate edge nodes: +// +// value value +// binary 0x01 || left-edge-hash || right-edge-hash +// edge 0x02 || child-hash || encoded-path-segment +// +// Path compression +// ---------------- +// This is the key structural change. The deprecated format compresses +// paths inside the parent binary node (via its embedded child-path +// fields). The new format moves compression into dedicated edge nodes +// sitting between binary nodes and their children: +// +// old: binary ──────── child-path ────────► child +// new: binary ──► edge ──► child +// +// The deprecated root marker — a single entry at the bare bucket prefix +// recording the root's path — disappears in the new layout. Whatever the +// deprecated root embedded becomes either a direct binary/leaf at the +// empty path or, when the deprecated root path was itself non-empty, an +// edge node at the empty path that points "down" to the real root. +// +// Traversal +// --------- +// The migrator walks the deprecated trie depth-first, decoding one node at +// a time. A leaf becomes a value node at the same path. An internal binary +// node, after both subtrees have been visited, becomes a binary node plus +// up to two edge nodes (one per non-empty child segment). If the trie's +// stored root path is itself non-empty — meaning the deprecated root +// embeds a compression — a single edge node at the empty path is written +// after the traversal completes, replacing the root marker. +// +// Hashes +// ------ +// Starknet trie hashes: +// +// leaf value +// binary hashFn(left-edge-hash, right-edge-hash) +// edge hashFn(child-hash, path-segment-as-felt) + segment-length +// +// Zero-length edges short-circuit to the bare child-hash — the convention +// for absent edges. Class tries hash with Poseidon; contract and storage +// tries with Pedersen. +// +// For small tries every edge hash is computed inline. Above the threshold, +// edge-hash jobs are batched and dispatched to a worker pool for parallel +// computation; the scheduler preserves the original job order so the +// persisted bytes are byte-identical to a natively-built trie2. +// +// In-flight batches flush at target size; cancellation is observed at +// every flush and every channel send. +func (i *ingestor) migrateTrie(t *task, desc TrieDesc, outputs chan<- task) error { + if desc.NodeCount == 0 { + return nil + } + parallelDispatch := desc.NodeCount >= SmallTrieThreshold + prefix := deprecatedTriePrefix(desc) + sched := newHashScheduler(desc.HashFn, parallelDispatch, desc.TrieBucket, desc.Owner, i.pool) + + rootHash, err := i.traverse(t, outputs, prefix, *desc.RootPath, sched) + if err != nil { + return err + } + if err := sched.sync(t.batch); err != nil { + return err + } + if desc.RootPath.Len() > 0 { + if err := writeRootEdgeNode(desc.RootPath, rootHash, sched, t.batch); err != nil { + return err + } + } + return nil +} + +func (i *ingestor) traverse( + t *task, + outputs chan<- task, + prefix []byte, + oldPath trie.BitArray, + sched *hashScheduler, +) (felt.Felt, error) { + parsed, err := readNode(i.database, prefix, &oldPath) + if err != nil { + return felt.Felt{}, err + } + t.nodes++ + + if parsed.isLeaf { + newPath := toNewPath(&oldPath) + if err := writeLeafNode(newPath, &parsed.value, sched, t.batch); err != nil { + return felt.Felt{}, err + } + if err := i.flush(t, outputs); err != nil { + return felt.Felt{}, err + } + return parsed.value, nil + } + + leftHash, err := i.traverse(t, outputs, prefix, parsed.left, sched) + if err != nil { + return felt.Felt{}, err + } + rightHash, err := i.traverse(t, outputs, prefix, parsed.right, sched) + if err != nil { + return felt.Felt{}, err + } + + newPath := toNewPath(&oldPath) + if err := processBinary( + newPath, &parsed.left, &parsed.right, leftHash, rightHash, sched, t.batch, + ); err != nil { + return felt.Felt{}, err + } + if err := i.flush(t, outputs); err != nil { + return felt.Felt{}, err + } + return parsed.value, nil +} + +func rootProcessed(r db.KeyValueReader, newBucket db.Bucket, owner *felt.Address) (bool, error) { + var emptyPath trieutils.Path + var buf [maxNodeKeySize]byte + + n := encodeNodeKey(buf[:], newBucket, owner, &emptyPath, false) + if exists, err := r.Has(buf[:n]); err != nil || exists { + return exists, err + } + n = encodeNodeKey(buf[:], newBucket, owner, &emptyPath, true) + return r.Has(buf[:n]) +} + +func encodeOldPath(path *trie.BitArray, dst []byte) int { + pathLen := path.Len() + b := path.Bytes() + activeBytes := (uint(pathLen) + 7) / 8 + dst[0] = pathLen + copy(dst[1:], b[32-activeBytes:]) + return int(activeBytes) + 1 +} + +type parsedNode struct { + value felt.Felt + left trie.BitArray + right trie.BitArray + isLeaf bool +} + +func readNode(r db.KeyValueReader, prefix []byte, oldPath *trie.BitArray) (parsedNode, error) { + var arr [maxNodeKeySize]byte + n := copy(arr[:], prefix) + n += encodeOldPath(oldPath, arr[n:]) + var node parsedNode + err := r.Get(arr[:n], node.UnmarshalBinary) + return node, err +} + +func (n *parsedNode) UnmarshalBinary(data []byte) error { + if len(data) < felt.Bytes { + return fmt.Errorf("trie: node data too short (%d bytes)", len(data)) + } + n.value = felt.FromBytes[felt.Felt](data[:felt.Bytes]) + data = data[felt.Bytes:] + if len(data) == 0 { + n.isLeaf = true + return nil + } + if err := n.left.UnmarshalBinary(data); err != nil { + return fmt.Errorf("trie: unmarshalling left path: %w", err) + } + data = data[n.left.EncodedLen():] + if err := n.right.UnmarshalBinary(data); err != nil { + return fmt.Errorf("trie: unmarshalling right path: %w", err) + } + return nil +} + +func writeLeafNode( + path trieutils.Path, + value *felt.Felt, + sched *hashScheduler, + batch db.Batch, +) error { + var buf [maxNodeKeySize + valueNodeBlobSize]byte + keyLen := encodeNodeKey(buf[:], sched.bucket, &sched.owner, &path, true) + blob := value.Bytes() + copy(buf[keyLen:], blob[:]) + return batch.Put(buf[:keyLen], buf[keyLen:keyLen+valueNodeBlobSize]) +} + +func processBinary( + parentPath trieutils.Path, + left, right *trie.BitArray, + leftChildHash, rightChildHash felt.Felt, + sched *hashScheduler, + batch db.Batch, +) error { + leftSeg := compressedSegment(left, parentPath.Len()) + rightSeg := compressedSegment(right, parentPath.Len()) + return sched.schedule(&edgeHashJob{ + leftChildHash: leftChildHash, + leftSeg: leftSeg, + rightChildHash: rightChildHash, + rightSeg: rightSeg, + parentPath: parentPath, + }, batch) +} + +func writeRootEdgeNode( + rootPath *trie.BitArray, + childHash felt.Felt, + sched *hashScheduler, + batch db.Batch, +) error { + seg := toNewPath(rootPath) + var buf [edgeNodeMaxSize]byte + n := encodeEdgeNode(buf[:], &childHash, &seg) + return trieutils.WriteNodeByPath( + batch, + sched.bucket, + &sched.owner, + &trieutils.Path{}, + false, + buf[:n], + ) +} + +func deprecatedTriePrefix(desc TrieDesc) []byte { + switch desc.DeprecatedTrieBucket { + case db.ClassesTrie, db.StateTrie: + return desc.DeprecatedTrieBucket.Key() + case db.ContractStorage: + ownerBytes := desc.Owner.Bytes() + return desc.DeprecatedTrieBucket.Key(ownerBytes[:]) + default: + panic(fmt.Sprintf( + "unexpected deprecated trie bucket %v", + desc.DeprecatedTrieBucket, + )) + } +} diff --git a/migration/trie/trie.go b/migration/trie/trie.go new file mode 100644 index 0000000000..415ca2a5d6 --- /dev/null +++ b/migration/trie/trie.go @@ -0,0 +1,299 @@ +package trie + +import ( + "bytes" + "context" + "errors" + "fmt" + "time" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core/crypto" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" +) + +const ( + batchByteSize = 128 * db.Megabyte + targetBatchByteSize = 96 * db.Megabyte + timeLogRate = 5 * time.Second + SmallTrieThreshold = 100_000 + parallelHashBatchSize = 16384 + IngestorCount = 4 +) + +var ( + shouldRerun = []byte{} + shouldNotRerun []byte +) + +var deprecatedTrieBuckets = []db.Bucket{db.ClassesTrie, db.StateTrie, db.ContractStorage} + +// Migrator converts every deprecated Starknet trie on disk into the +// equivalent trie2 layout used by the new state: +// +// ClassesTrie ─→ ClassTrie (Poseidon) +// StateTrie ─→ ContractTrieContract (Pedersen) +// ContractStorage ─→ ContractTrieStorage (Pedersen, per contract owner) +// +// Each deprecated trie is enumerated (its bucket holds one root-path entry +// plus N node entries), then walked in DFS order; every visited node is +// re-encoded into the new format and written to its destination bucket in +// the same batch. After every trie completes successfully, the three +// deprecated buckets are wiped via DeleteRange. +// +// The pipeline runs IngestorCount worker goroutines, each pulling one trie +// at a time from the enumeration source, plus a single committer that +// flushes filled batches to disk. A semaphore caps in-flight batches at +// IngestorCount * 2. See migrateTrie for the per-trie traversal. +// +// Re-run safe: every trie's first action checks for its new-format root +// key; if present, the trie is treated as already migrated and skipped. +// A subsequent run after a crash picks up where the previous one stopped — +// partially migrated tries either have a root key (skipped on the next +// pass) or don't (re-migrated from scratch; the deprecated source data is +// still present because the trailing wipe runs only on full success). +// +// Cancellation: every flush and every channel send checks ctx.Done. On +// cancel, Migrate returns the shouldRerun sentinel with ctx.Err(); the +// migration runner re-invokes on the next process start. +type Migrator struct{} + +var _ migration.Migration = (*Migrator)(nil) + +func (*Migrator) Before([]byte) error { return nil } + +func (*Migrator) Migrate( + ctx context.Context, + database db.KeyValueStore, + _ *networks.Network, + logger log.StructuredLogger, +) ([]byte, error) { + needed, err := needsMigration(database) + if err != nil { + return shouldRerun, err + } + if !needed { + logger.Info("trie migration: no old-format data found, marking applied") + return shouldNotRerun, nil + } + + return runMigration(ctx, database, logger) +} + +func needsMigration(r db.KeyValueReader) (bool, error) { + for _, bucket := range deprecatedTrieBuckets { + prefix := bucket.Key() + iter, err := r.NewIterator(prefix, true) + if err != nil { + return false, err + } + hasKeys := iter.First() + if err := iter.Close(); err != nil { + return false, err + } + if hasKeys { + return true, nil + } + } + return false, nil +} + +func runMigration( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) ([]byte, error) { + batchSem := semaphore.New(IngestorCount*2, func() db.Batch { + return database.NewBatchWithSize(batchByteSize) + }) + + pool := newHashWorkerPool() + defer pool.close() + + ing := newIngestor(ctx, database, batchSem, pool) + + tries, err := enumerateTries(database) + if err != nil { + return shouldRerun, err + } + + var allNodes uint64 + for _, d := range tries { + allNodes += uint64(d.NodeCount) + } + allTries := uint64(len(tries)) + + src := pipeline.Source(func(yield func(TrieDesc) bool) { + for _, d := range tries { + if !yield(d) { + return + } + } + }) + ingested := pipeline.New(src, IngestorCount, ing) + committed := pipeline.New( + ingested, + 1, + newCommitter(logger, batchSem, allTries, allNodes), + ) + + _, wait := committed.Run(ctx) + res := wait() + if res.Err != nil { + return shouldRerun, res.Err + } + if !res.IsDone { + if ctxErr := ctx.Err(); ctxErr != nil { + return shouldRerun, ctxErr + } + return shouldRerun, errors.New("trie migration: pipeline did not complete") + } + + if err := wipeDeprecatedBuckets(database); err != nil { + return shouldRerun, err + } + logger.Info("trie migration: source buckets deleted") + + return shouldNotRerun, nil +} + +func wipeDeprecatedBuckets(database db.KeyValueRangeDeleter) error { + for _, bucket := range deprecatedTrieBuckets { + start := bucket.Key() + end := dbutils.UpperBound(start) + if err := database.DeleteRange(start, end); err != nil { + return fmt.Errorf("trie migration: cleanup DeleteRange for %v: %w", bucket, err) + } + } + return nil +} + +type TrieDesc struct { + DeprecatedTrieBucket db.Bucket + TrieBucket db.Bucket + Owner felt.Address + HashFn crypto.HashFn + NodeCount int + RootPath *trie.BitArray +} + +func enumerateTries(r db.KeyValueReader) ([]TrieDesc, error) { + var descs []TrieDesc + + for _, spec := range []struct { + oldBucket, newBucket db.Bucket + hashFn crypto.HashFn + }{ + {db.ClassesTrie, db.ClassTrie, crypto.Poseidon}, + {db.StateTrie, db.ContractTrieContract, crypto.Pedersen}, + } { + desc, err := enumerateGlobalTrie(r, spec.oldBucket, spec.newBucket, spec.hashFn) + if err != nil { + return nil, err + } + descs = append(descs, desc) + } + + storageDescs, err := enumerateStorageTries(r) + if err != nil { + return nil, err + } + descs = append(descs, storageDescs...) + + return descs, nil +} + +func enumerateGlobalTrie( + r db.KeyValueReader, + oldBucket, newBucket db.Bucket, + hashFn crypto.HashFn, +) (TrieDesc, error) { + prefix := oldBucket.Key() + it, err := r.NewIterator(prefix, true) + if err != nil { + return TrieDesc{}, fmt.Errorf("opening iterator for bucket %v: %w", oldBucket, err) + } + defer it.Close() + it.First() + + rootPath, count, err := scanTrie(it, prefix) + if err != nil { + return TrieDesc{}, fmt.Errorf("enumerating bucket %v: %w", oldBucket, err) + } + return TrieDesc{ + DeprecatedTrieBucket: oldBucket, + TrieBucket: newBucket, + HashFn: hashFn, + NodeCount: count, + RootPath: &rootPath, + }, nil +} + +func enumerateStorageTries(r db.KeyValueReader) ([]TrieDesc, error) { + it, err := r.NewIterator(db.ContractStorage.Key(), true) + if err != nil { + return nil, fmt.Errorf("opening storage iterator: %w", err) + } + defer it.Close() + it.First() + + var descs []TrieDesc + for it.Valid() { + key := it.Key() + if len(key) < 1+felt.Bytes { + it.Next() + continue + } + owner := felt.FromBytes[felt.Address](key[1 : 1+felt.Bytes]) + ownerBytes := owner.Bytes() + ownerPrefix := db.ContractStorage.Key(ownerBytes[:]) + + rootPath, count, err := scanTrie(it, ownerPrefix) + if err != nil { + return nil, fmt.Errorf("enumerating storage owner %s: %w", &owner, err) + } + descs = append(descs, TrieDesc{ + DeprecatedTrieBucket: db.ContractStorage, + TrieBucket: db.ContractTrieStorage, + Owner: owner, + HashFn: crypto.Pedersen, + NodeCount: count, + RootPath: &rootPath, + }) + // scanTrie leaves the iterator positioned past this owner's range. + } + return descs, nil +} + +func scanTrie(it db.Iterator, prefix []byte) (trie.BitArray, int, error) { + var rootPath trie.BitArray + count := 0 + for it.Valid() { + key := it.Key() + if !bytes.HasPrefix(key, prefix) { + return rootPath, count, nil + } + if len(key) == len(prefix) { + val, err := it.Value() + if err != nil { + return trie.BitArray{}, 0, err + } + parsedRootPath, err := parseDeprecatedPath(val) + if err != nil { + return trie.BitArray{}, 0, err + } + rootPath = parsedRootPath + } else { + count++ + } + it.Next() + } + return rootPath, count, nil +} diff --git a/migration/trie/trie_test.go b/migration/trie/trie_test.go new file mode 100644 index 0000000000..c2d08b9df0 --- /dev/null +++ b/migration/trie/trie_test.go @@ -0,0 +1,441 @@ +package trie_test + +import ( + "context" + "testing" + + "github.com/NethermindEth/juno/core/crypto" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/core/trie2" + "github.com/NethermindEth/juno/core/trie2/triedb/rawdb" + "github.com/NethermindEth/juno/core/trie2/trienode" + "github.com/NethermindEth/juno/core/trie2/trieutils" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/memory" + trielib "github.com/NethermindEth/juno/migration/trie" + "github.com/NethermindEth/juno/utils/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type leafMap map[felt.Felt]felt.Felt + +type trieCase struct { + name string + oldBucket db.Bucket + newBucket db.Bucket + owner felt.Address + oldBuildPrefix func(owner *felt.Address) []byte + newTrieID func(owner *felt.Address) trieutils.TrieID + hashFn crypto.HashFn + //nolint:staticcheck // Necessary for old state + buildOldFn func(db.IndexedBatch, []byte, uint8) (*trie.Trie, error) +} + +var trieCases = []trieCase{ + { + name: "ClassTrie", + oldBucket: db.ClassesTrie, + newBucket: db.ClassTrie, + oldBuildPrefix: func(_ *felt.Address) []byte { return []byte{byte(db.ClassesTrie)} }, + newTrieID: func(_ *felt.Address) trieutils.TrieID { + return trieutils.NewClassTrieID(felt.StateRootHash(felt.One)) + }, + hashFn: crypto.Poseidon, + buildOldFn: trie.NewTriePoseidon, + }, + { + name: "ContractTrie", + oldBucket: db.StateTrie, + newBucket: db.ContractTrieContract, + oldBuildPrefix: func(_ *felt.Address) []byte { return []byte{byte(db.StateTrie)} }, + newTrieID: func(_ *felt.Address) trieutils.TrieID { + return trieutils.NewContractTrieID(felt.StateRootHash(felt.One)) + }, + hashFn: crypto.Pedersen, + buildOldFn: trie.NewTriePedersen, + }, + { + name: "StorageTrie", + oldBucket: db.ContractStorage, + newBucket: db.ContractTrieStorage, + owner: felt.FromUint64[felt.Address](42), + oldBuildPrefix: func(owner *felt.Address) []byte { + ownerBytes := owner.Bytes() + return db.ContractStorage.Key(ownerBytes[:]) + }, + newTrieID: func(owner *felt.Address) trieutils.TrieID { + return trieutils.NewContractStorageTrieID(felt.StateRootHash(felt.One), *owner) + }, + hashFn: crypto.Pedersen, + buildOldFn: trie.NewTriePedersen, + }, +} + +var transcoderCases = []struct { + name string + leaves leafMap +}{ + {"empty trie", nil}, + {"single leaf", leafMap{ + felt.FromUint64[felt.Felt](1): felt.FromUint64[felt.Felt](100), + }}, + {"deep split", leafMap{ + felt.FromUint64[felt.Felt](2): felt.FromUint64[felt.Felt](10), + felt.FromUint64[felt.Felt](3): felt.FromUint64[felt.Felt](20), + }}, + {"left right split", leafMap{ + felt.FromUint64[felt.Felt](1): felt.FromUint64[felt.Felt](10), + felt.FromBytes[felt.Felt]([]byte{0x04}): felt.FromUint64[felt.Felt](20), // 2^250 + }}, + {"full depth 2 tree", leafMap{ + felt.FromUint64[felt.Felt](1): felt.FromUint64[felt.Felt](10), // 00... + felt.FromBytes[felt.Felt]([]byte{0x02}): felt.FromUint64[felt.Felt](20), // 01... + felt.FromBytes[felt.Felt]([]byte{0x04}): felt.FromUint64[felt.Felt](30), // 10... + felt.FromBytes[felt.Felt]([]byte{0x06}): felt.FromUint64[felt.Felt](40), // 11... + }}, + {"hundred sequential leaves", func() leafMap { + leaves := make(leafMap, 100) + for i := 1; i <= 100; i++ { + leaves[felt.FromUint64[felt.Felt](uint64(i))] = felt.FromUint64[felt.Felt](uint64(i) * 7) + } + return leaves + }()}, + {"random 1000 leaves", randomLeaves(1000)}, +} + +func TestMigrate_FreshDBIsNoOp(t *testing.T) { + memDB := memory.New() + + state, err := (&trielib.Migrator{}).Migrate( + context.Background(), + memDB, + nil, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + assert.Nil( + t, + state, + "fresh DB must mark migration applied (nil intermediate state) without doing work", + ) +} + +func TestMigrate_RunsWhenOldDataPresent(t *testing.T) { + leaves := randomLeaves(100) + memDB := buildFullDB(t, leaves) + + require.True(t, bucketHasKeys(t, memDB, db.ClassesTrie), "precondition: DB has old-format data") + + state, err := (&trielib.Migrator{}).Migrate( + context.Background(), + memDB, + nil, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + assert.Nil(t, state, "completed migration must return nil intermediate state") + + for _, bucket := range []db.Bucket{db.ClassesTrie, db.StateTrie, db.ContractStorage} { + assert.False(t, + bucketHasKeys(t, memDB, bucket), + "old-format bucket %v should be empty after migration", bucket) + } +} + +// TestMigrationEndToEnd verifies that the migration produces byte-for-byte +// identical DB output to a natively-built trie2 for all three trie types and +// all leaf counts. Catches encoding bugs that root-hash comparison cannot. +func TestMigrationEndToEnd(t *testing.T) { + type testCase struct { + name string + tc trieCase + leaves leafMap + } + + var cases []testCase + for _, tc := range trieCases { + for _, lc := range transcoderCases { + cases = append(cases, testCase{ + name: tc.name + "/" + lc.name, + tc: tc, + leaves: lc.leaves, + }) + } + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + prefix := c.tc.oldBuildPrefix(&c.tc.owner) + + migratedDB := memory.New() + buildDeprecatedTrie(t, migratedDB, c.leaves, c.tc.buildOldFn, prefix) + _, err := (&trielib.Migrator{}).Migrate( + context.Background(), migratedDB, nil, log.NewNopZapLogger(), + ) + require.NoError(t, err) + + nativeDB := memory.New() + buildTrie(t, nativeDB, c.leaves, + c.tc.newTrieID(&c.tc.owner), c.tc.hashFn, c.tc.newBucket) + + assert.Equal(t, + allKeysUnder(t, nativeDB, c.tc.newBucket), + allKeysUnder(t, migratedDB, c.tc.newBucket)) + }) + } +} + +// TestMigrationIsResumable verifies that re-running migration over a DB +// whose class-trie destination root is already present skips the class +// migration and only finishes the contract trie. The "partial state" is +// faked by copying the reference DB's new-format class-trie keys into the +// partial DB before running migration. +func TestMigrationIsResumable(t *testing.T) { + leaves := randomLeaves(1000) + + // Reference: full migration from scratch. + refDB := buildFullDB(t, leaves) + _, err := (&trielib.Migrator{}).Migrate(context.Background(), refDB, nil, log.NewNopZapLogger()) + require.NoError(t, err) + + // Partial DB: both tries in old format initially. + partialDB := buildFullDB(t, leaves) + + // Fake a prior successful class-trie migration by copying refDB's + // new-format class-trie keys directly into partialDB. + refClassKeys := allKeysUnder(t, refDB, db.ClassTrie) + require.NotEmpty(t, refClassKeys, "reference class trie should be non-empty") + for k, v := range refClassKeys { + require.NoError(t, partialDB.Put([]byte(k), v)) + } + + // Resume: migration should skip the class trie (its dest root is present) + // and complete only the contract trie. + _, err = (&trielib.Migrator{}).Migrate(context.Background(), partialDB, nil, log.NewNopZapLogger()) + require.NoError(t, err) + + // Final state must match the reference full-run output for every new-format bucket. + for _, bucket := range []db.Bucket{db.ClassTrie, db.ContractTrieContract} { + refKeys := allKeysUnder(t, refDB, bucket) + resumedKeys := allKeysUnder(t, partialDB, bucket) + assert.Equal(t, refKeys, resumedKeys, + "resumed migration result differs from full run for bucket %v", bucket) + } +} + +// TestMigrationMultiStorageOwners exercises enumerateStorageTries across +// multiple owners (scanTrie's prefix-leave path) and keeps all 4 ingestor +// workers busy by giving them 7 tries to chew through (2 global + 5 storage). +func TestMigrationMultiStorageOwners(t *testing.T) { + leaves := randomLeaves(50) + + migratedDB := memory.New() + buildDeprecatedTrie(t, migratedDB, leaves, trie.NewTriePoseidon, db.ClassesTrie.Key()) + buildDeprecatedTrie(t, migratedDB, leaves, trie.NewTriePedersen, db.StateTrie.Key()) + + owners := []felt.Address{ + felt.FromUint64[felt.Address](1), + felt.FromUint64[felt.Address](2), + felt.FromUint64[felt.Address](3), + felt.FromUint64[felt.Address](42), + felt.FromUint64[felt.Address](999), + } + for _, owner := range owners { + ownerBytes := owner.Bytes() + buildDeprecatedTrie(t, migratedDB, leaves, trie.NewTriePedersen, + db.ContractStorage.Key(ownerBytes[:])) + } + + _, err := (&trielib.Migrator{}).Migrate( + context.Background(), + migratedDB, + nil, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + + // Per-owner native build → assert every native key is present (with the + migratedAll := allKeysUnder(t, migratedDB, db.ContractTrieStorage) + for _, owner := range owners { + nativeDB := memory.New() + id := trieutils.NewContractStorageTrieID(felt.StateRootHash(felt.One), owner) + buildTrie(t, nativeDB, leaves, id, crypto.Pedersen, db.ContractTrieStorage) + for k, v := range allKeysUnder(t, nativeDB, db.ContractTrieStorage) { + gotV, ok := migratedAll[k] + require.True(t, ok, "owner %v missing key", owner) + assert.Equal(t, v, gotV, "owner %v value differs at key", owner) + } + } + + // Old buckets fully drained. + for _, bucket := range []db.Bucket{db.ClassesTrie, db.StateTrie, db.ContractStorage} { + assert.False(t, bucketHasKeys(t, migratedDB, bucket), + "old bucket %v should be drained", bucket) + } +} + +// TestMigrationIdempotent verifies that a successful migration is a no-op on +// a second run: needsMigration sees the wiped deprecated buckets and returns +// early without touching the migrated state. +func TestMigrationIsNoopOnSecondRun(t *testing.T) { + leaves := randomLeaves(100) + memDB := buildFullDB(t, leaves) + + state, err := (&trielib.Migrator{}).Migrate( + context.Background(), + memDB, + nil, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, state) + snapshot := snapshotAllBuckets(t, memDB, + db.ClassTrie, db.ContractTrieContract, db.ContractTrieStorage) + + state, err = (&trielib.Migrator{}).Migrate(context.Background(), memDB, nil, log.NewNopZapLogger()) + require.NoError(t, err) + require.Nil(t, state) + require.Equal(t, snapshot, + snapshotAllBuckets(t, memDB, + db.ClassTrie, db.ContractTrieContract, db.ContractTrieStorage), + "second Migrate call must not change state") +} + +// TestMigrationCancelledContext verifies that a pre-cancelled ctx surfaces +// context.Canceled with the shouldRerun sentinel, and that a fresh ctx +// completes the migration normally afterwards. +func TestMigrationCancelledContext(t *testing.T) { + leaves := randomLeaves(100) + memDB := buildFullDB(t, leaves) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + state, err := (&trielib.Migrator{}).Migrate(ctx, memDB, nil, log.NewNopZapLogger()) + require.ErrorIs(t, err, context.Canceled) + require.NotNil(t, state, "shouldRerun sentinel must not be nil") + require.Empty(t, state, "shouldRerun is a non-nil empty slice") + + state, err = (&trielib.Migrator{}).Migrate(context.Background(), memDB, nil, log.NewNopZapLogger()) + require.NoError(t, err) + require.Nil(t, state) +} + +// buildFullDB creates an old-format DB populated with a class, a contract, and +// one storage trie, all built from the same leaf set. +func buildFullDB(t *testing.T, leaves leafMap) db.KeyValueStore { + t.Helper() + memDB := memory.New() + + owner := felt.FromUint64[felt.Address](42) + ownerBytes := owner.Bytes() + storagePrefix := db.ContractStorage.Key(ownerBytes[:]) + + buildDeprecatedTrie(t, memDB, leaves, trie.NewTriePoseidon, db.ClassesTrie.Key()) + buildDeprecatedTrie(t, memDB, leaves, trie.NewTriePedersen, db.StateTrie.Key()) + buildDeprecatedTrie(t, memDB, leaves, trie.NewTriePedersen, storagePrefix) + + return memDB +} + +func buildDeprecatedTrie( + t *testing.T, + database db.KeyValueStore, + leaves leafMap, + //nolint:staticcheck // Necessary for old state + trieFn func(db.IndexedBatch, []byte, uint8) (*trie.Trie, error), + prefix []byte, +) felt.Felt { + t.Helper() + //nolint:staticcheck // Necessary for old state + txn := database.NewIndexedBatch() + tr, err := trieFn(txn, prefix, 251) + require.NoError(t, err) + for key, value := range leaves { + _, err := tr.Put(&key, &value) + require.NoError(t, err) + } + root, err := tr.Root() + require.NoError(t, err) + require.NoError(t, tr.Commit()) + require.NoError(t, txn.Write()) + return root +} + +func buildTrie( + t *testing.T, + kvStore db.KeyValueStore, + leaves leafMap, + id trieutils.TrieID, + hashFn crypto.HashFn, + newBucket db.Bucket, +) { + t.Helper() + rawDB := rawdb.New(kvStore) + tr, err := trie2.New(id, 251, hashFn, rawDB) + require.NoError(t, err) + for key, value := range leaves { + require.NoError(t, tr.Update(&key, &value)) + } + root, nodes := tr.Commit() + if nodes == nil { + return // empty trie — nothing to persist + } + mergeSet := trienode.NewMergeNodeSet(nodes) + var zero felt.StateRootHash + stateRoot := felt.StateRootHash(root) + batch := kvStore.NewBatch() + if newBucket == db.ClassTrie { + require.NoError(t, rawDB.Update(&stateRoot, &zero, 0, mergeSet, nil, batch)) + } else { + require.NoError(t, rawDB.Update(&stateRoot, &zero, 0, nil, mergeSet, batch)) + } + require.NoError(t, batch.Write()) +} + +func randomLeaves(n int) leafMap { + leaves := make(leafMap, n) + for len(leaves) < n { + var k, v felt.Felt + k.SetRandom() + v.SetRandom() + leaves[k] = v + } + return leaves +} + +func snapshotAllBuckets(t *testing.T, r db.KeyValueReader, buckets ...db.Bucket) map[string][]byte { + t.Helper() + out := make(map[string][]byte) + for _, b := range buckets { + for k, v := range allKeysUnder(t, r, b) { + out[k] = v + } + } + return out +} + +func allKeysUnder(t *testing.T, r db.KeyValueReader, bucket db.Bucket) map[string][]byte { + t.Helper() + prefix := bucket.Key() + iter, err := r.NewIterator(prefix, true) + require.NoError(t, err) + defer iter.Close() + out := make(map[string][]byte) + for ok := iter.First(); ok; ok = iter.Next() { + val, err := iter.Value() + require.NoError(t, err) + out[string(iter.Key())] = val + } + return out +} + +func bucketHasKeys(t *testing.T, r db.KeyValueReader, bucket db.Bucket) bool { + t.Helper() + it, err := r.NewIterator(bucket.Key(), true) + require.NoError(t, err) + defer it.Close() + return it.First() +} diff --git a/node/migration.go b/node/migration.go index 4b7576d403..2ca79e2332 100644 --- a/node/migration.go +++ b/node/migration.go @@ -11,7 +11,10 @@ import ( "github.com/NethermindEth/juno/migration" "github.com/NethermindEth/juno/migration/blocktransactions" "github.com/NethermindEth/juno/migration/deprecated" //nolint:staticcheck,nolintlint,lll // ignore statick check package will be removed in future, nolinlint because main config does not check + "github.com/NethermindEth/juno/migration/headstate" "github.com/NethermindEth/juno/migration/historyprunner" + "github.com/NethermindEth/juno/migration/statehistory" + "github.com/NethermindEth/juno/migration/trie" "github.com/NethermindEth/juno/utils/log" ) @@ -25,7 +28,10 @@ func registerMigrations(cfg *Config) *migration.Registry { historyprunner.New(cfg.RetainedBlocks), cfg.Prune, PruneModeFlag, - ) + ). + WithOptional(&headstate.Migrator{}, cfg.NewState, "new-state"). + WithOptional(&statehistory.Migrator{}, cfg.NewState, "new-state"). + WithOptional(&trie.Migrator{}, cfg.NewState, "new-state") return registry }