Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 18 additions & 226 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
"container/heap"
"errors"
"fmt"
"maps"
"math"
"math/big"
"os"
"path/filepath"
"slices"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -100,7 +98,7 @@
// conversionTimeWindow defines the period after the Osaka fork during which
// the pool will still accept and convert legacy blob transactions. After this
// window, all legacy blob transactions will be rejected.
conversionTimeWindow = time.Hour * 2

Check failure on line 101 in core/txpool/blobpool/blobpool.go

View workflow job for this annotation

GitHub Actions / Lint

const conversionTimeWindow is unused (unused)
)

// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
Expand Down Expand Up @@ -337,9 +335,8 @@
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs

signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through

head atomic.Pointer[types.Header] // Current head of the chain
state *state.StateDB // Current state at the head of the chain
Expand Down Expand Up @@ -368,7 +365,6 @@
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
cQueue: newConversionQueue(), // Deprecate it after the osaka fork
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
Expand Down Expand Up @@ -485,9 +481,6 @@

// Close closes down the underlying persistent store.
func (p *BlobPool) Close() error {
// Terminate the conversion queue
p.cQueue.close()

var errs []error
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
if err := p.limbo.Close(); err != nil {
Expand Down Expand Up @@ -885,172 +878,6 @@
basefeeGauge.Update(int64(basefee.Uint64()))
blobfeeGauge.Update(int64(blobfee.Uint64()))
p.updateStorageMetrics()

// Perform the conversion logic at the fork boundary
if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) {
// Deep copy all indexed transaction metadata.
var (
ids = make(map[common.Address]map[uint64]uint64)
txs = make(map[common.Address]map[uint64]common.Hash)
)
for sender, list := range p.index {
ids[sender] = make(map[uint64]uint64)
txs[sender] = make(map[uint64]common.Hash)
for _, m := range list {
ids[sender][m.nonce] = m.id
txs[sender][m.nonce] = m.hash
}
}
// Initiate the background conversion thread.
p.cQueue.launchBillyConversion(func() {
p.convertLegacySidecars(ids, txs)
})
}
}

// compareAndSwap checks if the specified transaction is still tracked in the pool
// and replace the metadata accordingly. It should only be used in the fork boundary
// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped
// for simplicity which we assume it's very likely to happen.
//
// The returned flag indicates whether the replacement succeeded.
func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool {
p.lock.Lock()
defer p.lock.Unlock()

newId, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to store transaction", "hash", hash, "err", err)
return false
}
newSize := uint64(len(blob))
newStorageSize := p.store.Size(newId)

// Terminate the procedure if the transaction was already evicted. The
// newly added blob should be removed before return.
if !p.lookup.update(hash, newId, newSize) {
if derr := p.store.Delete(newId); derr != nil {
log.Error("Failed to delete the dangling blob tx", "err", derr)
} else {
log.Warn("Deleted the dangling blob tx", "id", newId)
}
return false
}
// Update the metadata of blob transaction
for _, meta := range p.index[address] {
if meta.hash == hash {
meta.id = newId
meta.version = types.BlobSidecarVersion1
meta.storageSize = newStorageSize
meta.size = newSize

p.stored += uint64(newStorageSize)
p.stored -= uint64(oldStorageSize)
break
}
}
if err := p.store.Delete(oldID); err != nil {
log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err)
}
return true
}

// convertLegacySidecar fetches transaction data from the store, performs an
// on-the-fly conversion. This function is intended for use only during the
// Osaka fork transition period.
//
// The returned flag indicates whether the replacement succeeds or not.
func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool {
start := time.Now()

// Retrieves the legacy blob transaction from the underlying store with
// read lock held, preventing any potential data race around the slot
// specified by the id.
p.lock.RLock()
data, err := p.store.Get(id)
if err != nil {
p.lock.RUnlock()
// The transaction may have been evicted simultaneously, safe to skip conversion.
log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err)
return false
}
oldStorageSize := p.store.Size(id)
p.lock.RUnlock()

// Decode the transaction, the failure is not expected and report the error
// loudly if possible. If the blob transaction in this slot is corrupted,
// leave it in the store, it will be dropped during the next pool
// initialization.
var tx types.Transaction
if err = rlp.DecodeBytes(data, &tx); err != nil {
log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err)
return false
}

// Skip conversion if the transaction does not match the expected hash, or if it was
// already converted. This can occur if the original transaction was evicted from the
// pool and the slot was reused by a new one.
if tx.Hash() != hash {
log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash())
return false
}
sc := tx.BlobTxSidecar()
if sc.Version >= types.BlobSidecarVersion1 {
log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id)
return false
}

// Perform the sidecar conversion, the failure is not expected and report the error
// loudly if possible.
if err := tx.BlobTxSidecar().ToV1(); err != nil {
log.Error("Failed to convert blob transaction", "hash", hash, "err", err)
return false
}

// Encode the converted transaction, the failure is not expected and report
// the error loudly if possible.
blob, err := rlp.EncodeToBytes(&tx)
if err != nil {
log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err)
return false
}

// Replace the legacy blob transaction with the converted format.
if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) {
log.Error("Failed to replace the legacy transaction", "hash", hash)
return false
}
log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
return true
}

// convertLegacySidecars converts all given transactions to sidecar version 1.
//
// If any of them fails to be converted, the subsequent transactions will still
// be processed, as we assume the failure is very unlikely to happen. If happens,
// these transactions will be stuck in the pool until eviction.
func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) {
var (
start = time.Now()
success int
failure int
)
for addr, list := range txs {
// Transactions evicted from the pool must be contiguous, if in any case,
// the transactions are gapped with each other, they will be discarded.
nonces := slices.Collect(maps.Keys(list))
slices.Sort(nonces)

// Convert the txs with nonce order
for _, nonce := range nonces {
if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) {
success++
} else {
failure++
}
}
}
log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start)))
}

// reorg assembles all the transactors and missing transactions between an old
Expand Down Expand Up @@ -1530,8 +1357,8 @@
//
// The version argument specifies the type of proofs to return, either the
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
// CPU intensive, so only done if explicitly requested with the convert flag.
func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
// CPU intensive and prohibited in the blobpool explicitly.
func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
commitments = make([]kzg4844.Commitment, len(vhashes))
Expand Down Expand Up @@ -1582,7 +1409,7 @@
}
// Mark hash as seen.
filled[hash] = struct{}{}
if sidecar.Version != version && !convert {
if sidecar.Version != version {
// Skip blobs with incompatible version. Note we still track the blob hash
// in `filled` here, ensuring that we do not resolve this tx another time.
continue
Expand All @@ -1591,29 +1418,13 @@
var pf []kzg4844.Proof
switch version {
case types.BlobSidecarVersion0:
if sidecar.Version == types.BlobSidecarVersion0 {
pf = []kzg4844.Proof{sidecar.Proofs[i]}
} else {
proof, err := kzg4844.ComputeBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i])
if err != nil {
return nil, nil, nil, err
}
pf = []kzg4844.Proof{proof}
}
pf = []kzg4844.Proof{sidecar.Proofs[i]}
case types.BlobSidecarVersion1:
if sidecar.Version == types.BlobSidecarVersion0 {
cellProofs, err := kzg4844.ComputeCellProofs(&sidecar.Blobs[i])
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
} else {
cellProofs, err := sidecar.CellProofsAt(i)
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
cellProofs, err := sidecar.CellProofsAt(i)
if err != nil {
return nil, nil, nil, err
}
pf = cellProofs
}
for _, index := range list {
blobs[index] = &sidecar.Blobs[i]
Expand All @@ -1640,45 +1451,26 @@
return available
}

// preCheck performs the static validation upon the provided tx and converts
// the legacy sidecars if Osaka fork has been activated with a short time window.
// preCheck performs the static validation upon the provided tx.
//
// This function is pure static and lock free.
func (p *BlobPool) preCheck(tx *types.Transaction) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to keep this function? Inside of ValidateTransaction we are already checking whether we have entered osaka. Could we integrate it there or is there a specific reason to keep this function ??

var (
head = p.head.Load()
isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time)
deadline time.Time
head = p.head.Load()
version = types.BlobSidecarVersion0
)
if isOsaka {
deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow)
if p.chain.Config().IsOsaka(head.Number, head.Time) {
version = types.BlobSidecarVersion1
}
// Validate the transaction statically at first to avoid unnecessary
// conversion. This step doesn't require lock protection.
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
// Before the Osaka fork, reject the blob txs with cell proofs
if !isOsaka {
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
return nil
} else {
return errors.New("cell proof is not supported yet")
}
}
// After the Osaka fork, reject the legacy blob txs if the conversion
// time window is passed.
if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 {
return nil
}
if head.Time > uint64(deadline.Unix()) {
return errors.New("legacy blob tx is not supported")
if tx.BlobTxSidecar().Version != version {
return fmt.Errorf("sidecar version is not supported, got: %d, want: %d", tx.BlobTxSidecar().Version, version)
}
// Convert the legacy sidecar after Osaka fork. This could be a long
// procedure which takes a few seconds, even minutes if there is a long
// queue. Fortunately it will only block the routine of the source peer
// announcing the tx, without affecting other parts.
return p.cQueue.convert(tx)
return nil
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
Expand Down
Loading
Loading