Skip to content
Open
16 changes: 15 additions & 1 deletion core/state/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,21 @@ func HasContract(r db.KeyValueReader, addr *felt.Felt) (bool, error) {
return r.Has(key)
}

func WriteContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error {
func WriteContract(
w db.KeyValueWriter,
addr *felt.Felt,
nonce, classHash felt.Felt,
deployHeight uint64,
) error {
contract := stateContract{
Nonce: nonce,
ClassHash: classHash,
DeployedHeight: deployHeight,
}
return writeContract(w, addr, &contract)
}

func writeContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error {
key := db.ContractKey(addr)
data, err := contract.MarshalBinary()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (s *State) flush(
return err
}
} else { // updated
if err := WriteContract(s.batch, &addr, obj.contract); err != nil {
if err := writeContract(s.batch, &addr, obj.contract); err != nil {
return err
}
}
Expand Down
49 changes: 49 additions & 0 deletions migration/headstate/committer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package headstate

import (
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
"github.com/NethermindEth/juno/utils/log"
"go.uber.org/zap"
)

type committer struct {
counter counter
logger log.StructuredLogger
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
}

var _ pipeline.State[task, struct{}] = (*committer)(nil)

func newCommitter(
logger log.StructuredLogger,
batchSemaphore semaphore.ResourceSemaphore[db.Batch],
) *committer {
return &committer{
logger: logger,
counter: newCounter(logger, timeLogRate),
batchSemaphore: batchSemaphore,
}
}

func (c *committer) Run(_ int, t task, _ chan<- struct{}) error {
c.logger.Debug(
"writing batch",
zap.Int("completedAddrs", t.completedAddrs),
zap.Int("batchSize", t.batch.Size()),
)

byteSize := uint64(t.batch.Size())
if err := t.batch.Write(); err != nil {
return err
}

c.counter.log(byteSize, t.completedAddrs)
c.batchSemaphore.Put()
return nil
}

func (c *committer) Done(int, chan<- struct{}) error {
return nil
}
47 changes: 47 additions & 0 deletions migration/headstate/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package headstate

import (
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils/log"
"go.uber.org/zap"
)

type counter struct {
logger log.StructuredLogger
timeLogRate time.Duration
start time.Time
size uint64
completedAddrs uint64
}

func newCounter(logger log.StructuredLogger, timeLogRate time.Duration) counter {
return counter{
logger: logger,
timeLogRate: timeLogRate,
start: time.Now(),
}
}

func (c *counter) log(byteSize uint64, completedAddrs int) {
c.size += byteSize
c.completedAddrs += uint64(completedAddrs)

now := time.Now()
elapsed := now.Sub(c.start).Seconds()
if elapsed > c.timeLogRate.Seconds() {
mbs := float64(c.size) / float64(db.Megabyte)
c.logger.Info(
"write speed",
zap.Float64("MB", mbs),
zap.Float64("MB/s", mbs/elapsed),
zap.Uint64("completedContracts", c.completedAddrs),
zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed),
zap.Float64("time", elapsed),
)
c.start = now
c.size = 0
c.completedAddrs = 0
}
}
95 changes: 95 additions & 0 deletions migration/headstate/ingestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package headstate

import (
"errors"
"fmt"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/core/state"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
)

type ingestor struct {
database db.KeyValueReader
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
tasks []task
}

func newIngestor(
database db.KeyValueReader,
batchSemaphore semaphore.ResourceSemaphore[db.Batch],
) *ingestor {
tasks := make([]task, ingestorCount)
for i := range tasks {
tasks[i] = task{batch: batchSemaphore.GetBlocking()}
}
return &ingestor{
database: database,
batchSemaphore: batchSemaphore,
tasks: tasks,
}
}

var _ pipeline.State[felt.Address, task] = (*ingestor)(nil)

func (c *ingestor) Run(index int, addr felt.Address, outputs chan<- task) error {
t := &c.tasks[index]

sizeBefore := t.batch.Size()
if err := c.ingestAddress(t.batch, addr); err != nil {
return err
}
if t.batch.Size() > sizeBefore {
t.completedAddrs++
}

if t.batch.Size() >= targetBatchByteSize {
outputs <- task{batch: t.batch, completedAddrs: t.completedAddrs}
t.completedAddrs = 0
t.batch = c.batchSemaphore.GetBlocking()
}
return nil
}

func (c *ingestor) Done(index int, outputs chan<- task) error {
outputs <- c.tasks[index]
return nil
}

func (c *ingestor) ingestAddress(batch db.Batch, addr felt.Address) error {
addrFelt := felt.Felt(addr)

already, err := state.HasContract(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("HasContract(%s): %w", &addrFelt, err)
}
if already {
return nil
}

classHash, err := core.GetContractClassHash(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("GetContractClassHash(%s): %w", &addrFelt, err)
}

nonce, err := core.GetContractNonce(c.database, &addrFelt)
if err != nil {
if !errors.Is(err, db.ErrKeyNotFound) {
return fmt.Errorf("GetContractNonce(%s): %w", &addrFelt, err)
}
nonce = felt.Zero
}

height, err := core.GetContractDeploymentHeight(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("GetContractDeploymentHeight(%s): %w", &addrFelt, err)
}

if err := state.WriteContract(batch, &addrFelt, nonce, classHash, height); err != nil {
return fmt.Errorf("WriteContract(%s): %w", &addrFelt, err)
}
return nil
}
163 changes: 163 additions & 0 deletions migration/headstate/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package headstate

import (
"context"
"errors"
"fmt"
"iter"
"time"

"github.com/NethermindEth/juno/blockchain/networks"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/dbutils"
"github.com/NethermindEth/juno/migration"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
"github.com/NethermindEth/juno/utils/log"
)

const (
batchByteSize = 128 * db.Megabyte
targetBatchByteSize = 96 * db.Megabyte
ingestorCount = 4
timeLogRate = 5 * time.Second
)

type task struct {
batch db.Batch
completedAddrs int
}

var (
shouldRerun = []byte{}
shouldNotRerun = []byte(nil)
)

var _ migration.Migration = (*Migrator)(nil)

// Migrator consolidates the deprecated per-field contract layout into a
// single Contract record per address, written via state.WriteContract:
//
// ContractClassHash[addr]
// ContractNonce[addr]
// ContractDeploymentHeight[addr]
// │
// ▼
// Contract[addr] = { ClassHash, Nonce, DeployedHeight }
//
// StorageRoot is left zero — the running node lazily backfills it on the
// contract's first storage write.
//
// Each address discovered in the ContractClassHash bucket is processed by one
// of ingestorCount worker goroutines that read the three old fields into a
// shared db.Batch; a single committer drains batches to disk. Once every
// address has been migrated, the three deprecated buckets are wiped via
// DeleteRange.
//
// Re-run safe: an address whose Contract record already exists is skipped
// (via state.HasContract), and the trailing wipe re-issues DeleteRange over
// the (possibly already empty) ranges.
type Migrator struct{}

func (Migrator) Before([]byte) error {
return nil
}

func (Migrator) Migrate(
ctx context.Context,
database db.KeyValueStore,
_ *networks.Network,
logger log.StructuredLogger,
) ([]byte, error) {
addressesIter, sourceErr := pendingAddresses(database)
res := migrateAddresses(ctx, database, logger, addressesIter)

if err := errors.Join(sourceErr(), res.Err); err != nil {
return shouldRerun, err
}
if !res.IsDone {
if ctxErr := ctx.Err(); ctxErr != nil {
return shouldRerun, ctxErr
}
return shouldRerun, errors.New("headstate migration did not complete")
}

return shouldNotRerun, wipeDeprecatedBuckets(database)
}

func migrateAddresses(
ctx context.Context,
database db.KeyValueStore,
logger log.StructuredLogger,
addresses iter.Seq[felt.Address],
) pipeline.Result {
batchSemaphore := semaphore.New(
ingestorCount+1,
func() db.Batch {
return database.NewBatchWithSize(batchByteSize)
},
)

source := pipeline.Source(addresses)

ingestorPipeline := pipeline.New(
source,
ingestorCount,
newIngestor(database, batchSemaphore),
)

committerPipeline := pipeline.New(
ingestorPipeline,
1,
newCommitter(logger, batchSemaphore),
)

_, wait := committerPipeline.Run(ctx)
return wait()
}

func pendingAddresses(r db.KeyValueReader) (iter.Seq[felt.Address], func() error) {
var iterErr error
seq := func(yield func(felt.Address) bool) {
prefix := db.ContractClassHash.Key()
it, err := r.NewIterator(prefix, true)
if err != nil {
iterErr = err
return
}
defer it.Close()

for valid := it.First(); valid; valid = it.Next() {
key := it.Key()
if len(key) != len(prefix)+felt.Bytes {
iterErr = fmt.Errorf(
"malformed ContractClassHash key: len %d, want %d",
len(key),
len(prefix)+felt.Bytes,
)
return
}
f := felt.FromBytes[felt.Felt](key[len(prefix):])
if !yield(felt.Address(f)) {
return
}
}
}
return seq, func() error { return iterErr }
}

func wipeDeprecatedBuckets(database db.KeyValueStore) error {
for _, bucket := range []db.Bucket{
db.ContractClassHash,
db.ContractNonce,
db.ContractDeploymentHeight,
} {
start := bucket.Key()
end := dbutils.UpperBound(start)
if err := database.DeleteRange(start, end); err != nil {
return err
}
}
return nil
}
Loading