diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go
index bfaf4d5b8e3c..791c7012e266 100644
--- a/core/txpool/blobpool/blobpool.go
+++ b/core/txpool/blobpool/blobpool.go
@@ -21,12 +21,10 @@ import (
"container/heap"
"errors"
"fmt"
- "maps"
"math"
"math/big"
"os"
"path/filepath"
- "slices"
"sort"
"sync"
"sync/atomic"
@@ -337,9 +335,8 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs
- signer types.Signer // Transaction signer to use for sender recovery
- chain BlockChain // Chain object to access the state through
- cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka)
+ signer types.Signer // Transaction signer to use for sender recovery
+ chain BlockChain // Chain object to access the state through
head atomic.Pointer[types.Header] // Current head of the chain
state *state.StateDB // Current state at the head of the chain
@@ -368,7 +365,6 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
- cQueue: newConversionQueue(), // Deprecate it after the osaka fork
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
@@ -485,9 +481,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
// Close closes down the underlying persistent store.
func (p *BlobPool) Close() error {
- // Terminate the conversion queue
- p.cQueue.close()
-
var errs []error
if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set
if err := p.limbo.Close(); err != nil {
@@ -885,172 +878,6 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
basefeeGauge.Update(int64(basefee.Uint64()))
blobfeeGauge.Update(int64(blobfee.Uint64()))
p.updateStorageMetrics()
-
- // Perform the conversion logic at the fork boundary
- if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) {
- // Deep copy all indexed transaction metadata.
- var (
- ids = make(map[common.Address]map[uint64]uint64)
- txs = make(map[common.Address]map[uint64]common.Hash)
- )
- for sender, list := range p.index {
- ids[sender] = make(map[uint64]uint64)
- txs[sender] = make(map[uint64]common.Hash)
- for _, m := range list {
- ids[sender][m.nonce] = m.id
- txs[sender][m.nonce] = m.hash
- }
- }
- // Initiate the background conversion thread.
- p.cQueue.launchBillyConversion(func() {
- p.convertLegacySidecars(ids, txs)
- })
- }
-}
-
-// compareAndSwap checks if the specified transaction is still tracked in the pool
-// and replace the metadata accordingly. It should only be used in the fork boundary
-// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped
-// for simplicity which we assume it's very likely to happen.
-//
-// The returned flag indicates whether the replacement succeeded.
-func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool {
- p.lock.Lock()
- defer p.lock.Unlock()
-
- newId, err := p.store.Put(blob)
- if err != nil {
- log.Error("Failed to store transaction", "hash", hash, "err", err)
- return false
- }
- newSize := uint64(len(blob))
- newStorageSize := p.store.Size(newId)
-
- // Terminate the procedure if the transaction was already evicted. The
- // newly added blob should be removed before return.
- if !p.lookup.update(hash, newId, newSize) {
- if derr := p.store.Delete(newId); derr != nil {
- log.Error("Failed to delete the dangling blob tx", "err", derr)
- } else {
- log.Warn("Deleted the dangling blob tx", "id", newId)
- }
- return false
- }
- // Update the metadata of blob transaction
- for _, meta := range p.index[address] {
- if meta.hash == hash {
- meta.id = newId
- meta.version = types.BlobSidecarVersion1
- meta.storageSize = newStorageSize
- meta.size = newSize
-
- p.stored += uint64(newStorageSize)
- p.stored -= uint64(oldStorageSize)
- break
- }
- }
- if err := p.store.Delete(oldID); err != nil {
- log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err)
- }
- return true
-}
-
-// convertLegacySidecar fetches transaction data from the store, performs an
-// on-the-fly conversion. This function is intended for use only during the
-// Osaka fork transition period.
-//
-// The returned flag indicates whether the replacement succeeds or not.
-func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool {
- start := time.Now()
-
- // Retrieves the legacy blob transaction from the underlying store with
- // read lock held, preventing any potential data race around the slot
- // specified by the id.
- p.lock.RLock()
- data, err := p.store.Get(id)
- if err != nil {
- p.lock.RUnlock()
- // The transaction may have been evicted simultaneously, safe to skip conversion.
- log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err)
- return false
- }
- oldStorageSize := p.store.Size(id)
- p.lock.RUnlock()
-
- // Decode the transaction, the failure is not expected and report the error
- // loudly if possible. If the blob transaction in this slot is corrupted,
- // leave it in the store, it will be dropped during the next pool
- // initialization.
- var tx types.Transaction
- if err = rlp.DecodeBytes(data, &tx); err != nil {
- log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err)
- return false
- }
-
- // Skip conversion if the transaction does not match the expected hash, or if it was
- // already converted. This can occur if the original transaction was evicted from the
- // pool and the slot was reused by a new one.
- if tx.Hash() != hash {
- log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash())
- return false
- }
- sc := tx.BlobTxSidecar()
- if sc.Version >= types.BlobSidecarVersion1 {
- log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id)
- return false
- }
-
- // Perform the sidecar conversion, the failure is not expected and report the error
- // loudly if possible.
- if err := tx.BlobTxSidecar().ToV1(); err != nil {
- log.Error("Failed to convert blob transaction", "hash", hash, "err", err)
- return false
- }
-
- // Encode the converted transaction, the failure is not expected and report
- // the error loudly if possible.
- blob, err := rlp.EncodeToBytes(&tx)
- if err != nil {
- log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err)
- return false
- }
-
- // Replace the legacy blob transaction with the converted format.
- if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) {
- log.Error("Failed to replace the legacy transaction", "hash", hash)
- return false
- }
- log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start)))
- return true
-}
-
-// convertLegacySidecars converts all given transactions to sidecar version 1.
-//
-// If any of them fails to be converted, the subsequent transactions will still
-// be processed, as we assume the failure is very unlikely to happen. If happens,
-// these transactions will be stuck in the pool until eviction.
-func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) {
- var (
- start = time.Now()
- success int
- failure int
- )
- for addr, list := range txs {
- // Transactions evicted from the pool must be contiguous, if in any case,
- // the transactions are gapped with each other, they will be discarded.
- nonces := slices.Collect(maps.Keys(list))
- slices.Sort(nonces)
-
- // Convert the txs with nonce order
- for _, nonce := range nonces {
- if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) {
- success++
- } else {
- failure++
- }
- }
- }
- log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start)))
}
// reorg assembles all the transactors and missing transactions between an old
@@ -1530,8 +1357,8 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata {
//
// The version argument specifies the type of proofs to return, either the
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
-// CPU intensive, so only done if explicitly requested with the convert flag.
-func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
+// CPU intensive and prohibited in the blobpool explicitly.
+func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) {
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
commitments = make([]kzg4844.Commitment, len(vhashes))
@@ -1582,7 +1409,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) (
}
// Mark hash as seen.
filled[hash] = struct{}{}
- if sidecar.Version != version && !convert {
+ if sidecar.Version != version {
// Skip blobs with incompatible version. Note we still track the blob hash
// in `filled` here, ensuring that we do not resolve this tx another time.
continue
@@ -1591,29 +1418,13 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) (
var pf []kzg4844.Proof
switch version {
case types.BlobSidecarVersion0:
- if sidecar.Version == types.BlobSidecarVersion0 {
- pf = []kzg4844.Proof{sidecar.Proofs[i]}
- } else {
- proof, err := kzg4844.ComputeBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i])
- if err != nil {
- return nil, nil, nil, err
- }
- pf = []kzg4844.Proof{proof}
- }
+ pf = []kzg4844.Proof{sidecar.Proofs[i]}
case types.BlobSidecarVersion1:
- if sidecar.Version == types.BlobSidecarVersion0 {
- cellProofs, err := kzg4844.ComputeCellProofs(&sidecar.Blobs[i])
- if err != nil {
- return nil, nil, nil, err
- }
- pf = cellProofs
- } else {
- cellProofs, err := sidecar.CellProofsAt(i)
- if err != nil {
- return nil, nil, nil, err
- }
- pf = cellProofs
+ cellProofs, err := sidecar.CellProofsAt(i)
+ if err != nil {
+ return nil, nil, nil, err
}
+ pf = cellProofs
}
for _, index := range list {
blobs[index] = &sidecar.Blobs[i]
@@ -1640,45 +1451,26 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int {
return available
}
-// preCheck performs the static validation upon the provided tx and converts
-// the legacy sidecars if Osaka fork has been activated with a short time window.
+// preCheck performs the static validation upon the provided tx.
//
// This function is pure static and lock free.
func (p *BlobPool) preCheck(tx *types.Transaction) error {
var (
- head = p.head.Load()
- isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time)
- deadline time.Time
+ head = p.head.Load()
+ version = types.BlobSidecarVersion0
)
- if isOsaka {
- deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow)
+ if p.chain.Config().IsOsaka(head.Number, head.Time) {
+ version = types.BlobSidecarVersion1
}
// Validate the transaction statically at first to avoid unnecessary
// conversion. This step doesn't require lock protection.
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
- // Before the Osaka fork, reject the blob txs with cell proofs
- if !isOsaka {
- if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
- return nil
- } else {
- return errors.New("cell proof is not supported yet")
- }
- }
- // After the Osaka fork, reject the legacy blob txs if the conversion
- // time window is passed.
- if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 {
- return nil
- }
- if head.Time > uint64(deadline.Unix()) {
- return errors.New("legacy blob tx is not supported")
+ if tx.BlobTxSidecar().Version != version {
+ return fmt.Errorf("sidecar version is not supported, got: %d, want: %d", tx.BlobTxSidecar().Version, version)
}
- // Convert the legacy sidecar after Osaka fork. This could be a long
- // procedure which takes a few seconds, even minutes if there is a long
- // queue. Fortunately it will only block the routine of the source peer
- // announcing the tx, without affecting other parts.
- return p.cQueue.convert(tx)
+ return nil
}
// Add inserts a set of blob transactions into the pool if they pass validation (both
diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go
index 2fa1927cae2c..ed36019f4030 100644
--- a/core/txpool/blobpool/blobpool_test.go
+++ b/core/txpool/blobpool/blobpool_test.go
@@ -433,11 +433,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
hashes = append(hashes, tx.vhashes...)
}
}
- blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0, false)
+ blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil {
t.Fatal(err)
}
- blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1, false)
+ blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1)
if err != nil {
t.Fatal(err)
}
@@ -1329,7 +1329,7 @@ func TestBlobCountLimit(t *testing.T) {
// Check that first succeeds second fails.
if errs[0] != nil {
- t.Fatalf("expected tx with 7 blobs to succeed")
+ t.Fatalf("expected tx with 7 blobs to succeed, got %v", errs[0])
}
if !errors.Is(errs[1], txpool.ErrTxBlobLimitExceeded) {
t.Fatalf("expected tx with 8 blobs to fail, got: %v", errs[1])
@@ -1806,66 +1806,6 @@ func TestAdd(t *testing.T) {
}
}
-// Tests that transactions with legacy sidecars are accepted within the
-// conversion window but rejected after it has passed.
-func TestAddLegacyBlobTx(t *testing.T) {
- testAddLegacyBlobTx(t, true) // conversion window has not yet passed
- testAddLegacyBlobTx(t, false) // conversion window passed
-}
-
-func testAddLegacyBlobTx(t *testing.T, accept bool) {
- var (
- key1, _ = crypto.GenerateKey()
- key2, _ = crypto.GenerateKey()
-
- addr1 = crypto.PubkeyToAddress(key1.PublicKey)
- addr2 = crypto.PubkeyToAddress(key2.PublicKey)
- )
-
- statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
- statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
- statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
- statedb.Commit(0, true, false)
-
- chain := &testBlockChain{
- config: params.MergedTestChainConfig,
- basefee: uint256.NewInt(1050),
- blobfee: uint256.NewInt(105),
- statedb: statedb,
- }
- var timeDiff uint64
- if accept {
- timeDiff = uint64(conversionTimeWindow.Seconds()) - 1
- } else {
- timeDiff = uint64(conversionTimeWindow.Seconds()) + 1
- }
- time := *params.MergedTestChainConfig.OsakaTime + timeDiff
- chain.setHeadTime(time)
-
- pool := New(Config{Datadir: t.TempDir()}, chain, nil)
- if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
- t.Fatalf("failed to create blob pool: %v", err)
- }
-
- // Attempt to add legacy blob transactions.
- var (
- tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0)
- tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0)
- txs = []*types.Transaction{tx1, tx2}
- )
- errs := pool.Add(txs, true)
- for _, err := range errs {
- if accept && err != nil {
- t.Fatalf("expected tx add to succeed, %v", err)
- }
- if !accept && err == nil {
- t.Fatal("expected tx add to fail")
- }
- }
- verifyPoolInternals(t, pool)
- pool.Close()
-}
-
func TestGetBlobs(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
@@ -1952,7 +1892,6 @@ func TestGetBlobs(t *testing.T) {
limit int
fillRandom bool // Whether to randomly fill some of the requested blobs with unknowns
version byte // Blob sidecar version to request
- convert bool // Whether to convert version on retrieval
}{
{
start: 0, limit: 6,
@@ -2018,11 +1957,6 @@ func TestGetBlobs(t *testing.T) {
start: 0, limit: 18, fillRandom: true,
version: types.BlobSidecarVersion1,
},
- {
- start: 0, limit: 18, fillRandom: true,
- version: types.BlobSidecarVersion1,
- convert: true, // Convert some version 0 blobs to version 1 while retrieving
- },
}
for i, c := range cases {
var (
@@ -2044,7 +1978,7 @@ func TestGetBlobs(t *testing.T) {
filled[len(vhashes)] = struct{}{}
vhashes = append(vhashes, testrand.Hash())
}
- blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version, c.convert)
+ blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version)
if err != nil {
t.Errorf("Unexpected error for case %d, %v", i, err)
}
@@ -2070,8 +2004,7 @@ func TestGetBlobs(t *testing.T) {
// If an item is missing, but shouldn't, error
if blobs[j] == nil || proofs[j] == nil {
// This is only an error if there was no version mismatch
- if c.convert ||
- (c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) ||
+ if (c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) ||
(c.version == types.BlobSidecarVersion0 && (testBlobIndex < 6 || 12 <= testBlobIndex)) {
t.Errorf("tracked blob retrieval failed: item %d, hash %x", j, vhashes[j])
}
@@ -2098,185 +2031,6 @@ func TestGetBlobs(t *testing.T) {
pool.Close()
}
-// TestSidecarConversion will verify that after the Osaka fork, all legacy
-// sidecars in the pool are successfully convert to v1 sidecars.
-func TestSidecarConversion(t *testing.T) {
- // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true)))
-
- // Create a temporary folder for the persistent backend
- storage := t.TempDir()
- os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700)
-
- var (
- preOsakaTxs = make(types.Transactions, 10)
- postOsakaTxs = make(types.Transactions, 3)
- keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs))
- addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs))
- statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
- )
- for i := range keys {
- keys[i], _ = crypto.GenerateKey()
- addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey)
- statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
- }
- for i := range preOsakaTxs {
- preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0)
- }
- for i := range postOsakaTxs {
- if i == 0 {
- // First has a v0 sidecar.
- postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0)
- }
- postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1)
- }
- statedb.Commit(0, true, false)
-
- // Test plan:
- // 1) Create a bunch v0 sidecar txs and add to pool before Osaka.
- // 2) Pass in new Osaka header to activate the conversion thread.
- // 3) Continue adding both v0 and v1 transactions to the pool.
- // 4) Verify that as additional blocks come in, transactions involved in the
- // migration are correctly discarded.
-
- config := ¶ms.ChainConfig{
- ChainID: big.NewInt(1),
- LondonBlock: big.NewInt(0),
- BerlinBlock: big.NewInt(0),
- CancunTime: newUint64(0),
- PragueTime: newUint64(0),
- OsakaTime: newUint64(1),
- BlobScheduleConfig: params.DefaultBlobSchedule,
- }
- chain := &testBlockChain{
- config: config,
- basefee: uint256.NewInt(1050),
- blobfee: uint256.NewInt(105),
- statedb: statedb,
- blocks: make(map[uint64]*types.Block),
- }
-
- // Create 3 blocks:
- // - the current block, before Osaka
- // - the first block after Osaka
- // - another post-Osaka block with several transactions in it
- header0 := chain.CurrentBlock()
- header0.Time = 0
- chain.blocks[0] = types.NewBlockWithHeader(header0)
-
- header1 := chain.CurrentBlock()
- header1.Number = big.NewInt(1)
- header1.Time = 1
- chain.blocks[1] = types.NewBlockWithHeader(header1)
-
- header2 := chain.CurrentBlock()
- header2.Time = 2
- header2.Number = big.NewInt(2)
-
- // Make a copy of one of the pre-Osaka transactions and convert it to v1 here
- // so that we can add it to the pool later and ensure a duplicate is not added
- // by the conversion queue.
- tx := preOsakaTxs[len(preOsakaTxs)-1]
- sc := *tx.BlobTxSidecar() // copy sidecar
- sc.ToV1()
- tx.WithBlobTxSidecar(&sc)
-
- block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)})
- chain.blocks[2] = block2
-
- pool := New(Config{Datadir: storage}, chain, nil)
- if err := pool.Init(1, header0, newReserver()); err != nil {
- t.Fatalf("failed to create blob pool: %v", err)
- }
-
- errs := pool.Add(preOsakaTxs, true)
- for i, err := range errs {
- if err != nil {
- t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i])
- }
- }
-
- // Kick off migration.
- pool.Reset(header0, header1)
-
- // Add the v0 sidecar tx, but don't block so we can keep doing other stuff
- // while it converts the sidecar.
- addDone := make(chan struct{})
- go func() {
- pool.Add(types.Transactions{postOsakaTxs[0]}, false)
- close(addDone)
- }()
-
- // Add the post-Osaka v1 sidecar txs.
- errs = pool.Add(postOsakaTxs[1:], false)
- for _, err := range errs {
- if err != nil {
- t.Fatalf("expected tx add to succeed: %v", err)
- }
- }
-
- // Wait for the first tx's conversion to complete, then check that all
- // transactions added after Osaka can be accounted for in the pool.
- <-addDone
- pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1})
- for _, tx := range postOsakaTxs {
- from, _ := pool.signer.Sender(tx)
- if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() {
- t.Fatalf("expected post-Osaka txs to be pending")
- }
- }
-
- // Now update the pool with the next block. This should cause the pool to
- // clear out the post-Osaka txs since they were included in block 2. Since the
- // test blockchain doesn't manage nonces, we'll just do that manually before
- // the reset is called. Don't forget about the pre-Osaka transaction we also
- // added to block 2!
- for i := range postOsakaTxs {
- statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall)
- }
- statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall)
- pool.Reset(header1, block2.Header())
-
- // Now verify no post-Osaka transactions are tracked by the pool.
- for i, tx := range postOsakaTxs {
- if pool.Get(tx.Hash()) != nil {
- t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash())
- }
- }
-
- // Wait for the pool migration to complete.
- <-pool.cQueue.anyBillyConversionDone
-
- // Verify all transactions in the pool were converted and verify the
- // subsequent cell proofs.
- count, _ := pool.Stats()
- if count != len(preOsakaTxs)-1 {
- t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1)
- }
- for addr, acc := range pool.index {
- for _, m := range acc {
- if m.version != types.BlobSidecarVersion1 {
- t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash)
- }
- tx := pool.Get(m.hash)
- if tx == nil {
- t.Errorf("failed to get tx by hash: %s", m.hash)
- }
- sc := tx.BlobTxSidecar()
- if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil {
- t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err)
- }
- }
- }
-
- verifyPoolInternals(t, pool)
-
- // Launch conversion a second time.
- // This is just a sanity check to ensure we can handle it.
- pool.Reset(header0, header1)
-
- pool.Close()
-}
-
// fakeBilly is a billy.Database implementation which just drops data on the floor.
type fakeBilly struct {
billy.Database
@@ -2360,5 +2114,3 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
}
}
}
-
-func newUint64(val uint64) *uint64 { return &val }
diff --git a/core/txpool/blobpool/conversion.go b/core/txpool/blobpool/conversion.go
deleted file mode 100644
index afdc10554f66..000000000000
--- a/core/txpool/blobpool/conversion.go
+++ /dev/null
@@ -1,218 +0,0 @@
-// Copyright 2025 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package blobpool
-
-import (
- "errors"
- "slices"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/log"
-)
-
-// maxPendingConversionTasks caps the number of pending conversion tasks. This
-// prevents excessive memory usage; the worst-case scenario (2k transactions
-// with 6 blobs each) would consume approximately 1.5GB of memory.
-const maxPendingConversionTasks = 2048
-
-// txConvert represents a conversion task with an attached legacy blob transaction.
-type txConvert struct {
- tx *types.Transaction // Legacy blob transaction
- done chan error // Channel for signaling back if the conversion succeeds
-}
-
-// conversionQueue is a dedicated queue for converting legacy blob transactions
-// received from the network after the Osaka fork. Since conversion is expensive,
-// it is performed in the background by a single thread, ensuring the main Geth
-// process is not overloaded.
-type conversionQueue struct {
- tasks chan *txConvert
- startBilly chan func()
- quit chan struct{}
- closed chan struct{}
-
- billyQueue []func()
- billyTaskDone chan struct{}
-
- // This channel will be closed when the first billy conversion finishes.
- // It's added for unit tests to synchronize with the conversion progress.
- anyBillyConversionDone chan struct{}
-}
-
-// newConversionQueue constructs the conversion queue.
-func newConversionQueue() *conversionQueue {
- q := &conversionQueue{
- tasks: make(chan *txConvert),
- startBilly: make(chan func()),
- quit: make(chan struct{}),
- closed: make(chan struct{}),
- anyBillyConversionDone: make(chan struct{}),
- }
- go q.loop()
- return q
-}
-
-// convert accepts a legacy blob transaction with version-0 blobs and queues it
-// for conversion.
-//
-// This function may block for a long time until the transaction is processed.
-func (q *conversionQueue) convert(tx *types.Transaction) error {
- done := make(chan error, 1)
- select {
- case q.tasks <- &txConvert{tx: tx, done: done}:
- return <-done
- case <-q.closed:
- return errors.New("conversion queue closed")
- }
-}
-
-// launchBillyConversion starts a conversion task in the background.
-func (q *conversionQueue) launchBillyConversion(fn func()) error {
- select {
- case q.startBilly <- fn:
- return nil
- case <-q.closed:
- return errors.New("conversion queue closed")
- }
-}
-
-// close terminates the conversion queue.
-func (q *conversionQueue) close() {
- select {
- case <-q.closed:
- return
- default:
- close(q.quit)
- <-q.closed
- }
-}
-
-// run converts a batch of legacy blob txs to the new cell proof format.
-func (q *conversionQueue) run(tasks []*txConvert, done chan struct{}, interrupt *atomic.Int32) {
- defer close(done)
-
- for _, t := range tasks {
- if interrupt != nil && interrupt.Load() != 0 {
- t.done <- errors.New("conversion is interrupted")
- continue
- }
- sidecar := t.tx.BlobTxSidecar()
- if sidecar == nil {
- t.done <- errors.New("tx without sidecar")
- continue
- }
- // Run the conversion, the original sidecar will be mutated in place
- start := time.Now()
- err := sidecar.ToV1()
- t.done <- err
- log.Trace("Converted legacy blob tx", "hash", t.tx.Hash(), "err", err, "elapsed", common.PrettyDuration(time.Since(start)))
- }
-}
-
-func (q *conversionQueue) loop() {
- defer close(q.closed)
-
- var (
- done chan struct{} // Non-nil if background routine is active
- interrupt *atomic.Int32 // Flag to signal conversion interruption
-
- // The pending tasks for sidecar conversion. We assume the number of legacy
- // blob transactions requiring conversion will not be excessive. However,
- // a hard cap is applied as a protective measure.
- txTasks []*txConvert
-
- firstBilly = true
- )
-
- for {
- select {
- case t := <-q.tasks:
- if len(txTasks) >= maxPendingConversionTasks {
- t.done <- errors.New("conversion queue is overloaded")
- continue
- }
- txTasks = append(txTasks, t)
-
- // Launch the background conversion thread if it's idle
- if done == nil {
- done, interrupt = make(chan struct{}), new(atomic.Int32)
-
- tasks := slices.Clone(txTasks)
- txTasks = txTasks[:0]
- go q.run(tasks, done, interrupt)
- }
-
- case <-done:
- done, interrupt = nil, nil
- if len(txTasks) > 0 {
- done, interrupt = make(chan struct{}), new(atomic.Int32)
- tasks := slices.Clone(txTasks)
- txTasks = txTasks[:0]
- go q.run(tasks, done, interrupt)
- }
-
- case fn := <-q.startBilly:
- q.billyQueue = append(q.billyQueue, fn)
- q.runNextBillyTask()
-
- case <-q.billyTaskDone:
- if firstBilly {
- close(q.anyBillyConversionDone)
- firstBilly = false
- }
- q.runNextBillyTask()
-
- case <-q.quit:
- if done != nil {
- log.Debug("Waiting for blob proof conversion to exit")
- interrupt.Store(1)
- <-done
- }
- if q.billyTaskDone != nil {
- log.Debug("Waiting for blobpool billy conversion to exit")
- <-q.billyTaskDone
- }
- // Signal any tasks that were queued for the next batch but never started
- // so callers blocked in convert() receive an error instead of hanging.
- for _, t := range txTasks {
- // Best-effort notify; t.done is a buffered channel of size 1
- // created by convert(), and we send exactly once per task.
- t.done <- errors.New("conversion queue closed")
- }
- // Drop references to allow GC of the backing array.
- txTasks = txTasks[:0]
- return
- }
- }
-}
-
-func (q *conversionQueue) runNextBillyTask() {
- if len(q.billyQueue) == 0 {
- q.billyTaskDone = nil
- return
- }
-
- fn := q.billyQueue[0]
- q.billyQueue = append(q.billyQueue[:0], q.billyQueue[1:]...)
-
- done := make(chan struct{})
- go func() { defer close(done); fn() }()
- q.billyTaskDone = done
-}
diff --git a/core/txpool/blobpool/conversion_test.go b/core/txpool/blobpool/conversion_test.go
deleted file mode 100644
index 7ffffb2e4d36..000000000000
--- a/core/txpool/blobpool/conversion_test.go
+++ /dev/null
@@ -1,171 +0,0 @@
-// Copyright 2025 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package blobpool
-
-import (
- "crypto/ecdsa"
- "crypto/sha256"
- "sync"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/crypto/kzg4844"
- "github.com/ethereum/go-ethereum/params"
- "github.com/holiman/uint256"
-)
-
-// createV1BlobTx creates a blob transaction with version 1 sidecar for testing.
-func createV1BlobTx(nonce uint64, key *ecdsa.PrivateKey) *types.Transaction {
- blob := &kzg4844.Blob{byte(nonce)}
- commitment, _ := kzg4844.BlobToCommitment(blob)
- cellProofs, _ := kzg4844.ComputeCellProofs(blob)
-
- blobtx := &types.BlobTx{
- ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
- Nonce: nonce,
- GasTipCap: uint256.NewInt(1),
- GasFeeCap: uint256.NewInt(1000),
- Gas: 21000,
- BlobFeeCap: uint256.NewInt(100),
- BlobHashes: []common.Hash{kzg4844.CalcBlobHashV1(sha256.New(), &commitment)},
- Value: uint256.NewInt(100),
- Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProofs),
- }
- return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
-}
-
-func TestConversionQueueBasic(t *testing.T) {
- queue := newConversionQueue()
- defer queue.close()
-
- key, _ := crypto.GenerateKey()
- tx := makeTx(0, 1, 1, 1, key)
- if err := queue.convert(tx); err != nil {
- t.Fatalf("Expected successful conversion, got error: %v", err)
- }
- if tx.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
- t.Errorf("Expected sidecar version to be %d, got %d", types.BlobSidecarVersion1, tx.BlobTxSidecar().Version)
- }
-}
-
-func TestConversionQueueV1BlobTx(t *testing.T) {
- queue := newConversionQueue()
- defer queue.close()
-
- key, _ := crypto.GenerateKey()
- tx := createV1BlobTx(0, key)
- version := tx.BlobTxSidecar().Version
-
- err := queue.convert(tx)
- if err != nil {
- t.Fatalf("Expected successful conversion, got error: %v", err)
- }
- if tx.BlobTxSidecar().Version != version {
- t.Errorf("Expected sidecar version to remain %d, got %d", version, tx.BlobTxSidecar().Version)
- }
-}
-
-func TestConversionQueueClosed(t *testing.T) {
- queue := newConversionQueue()
-
- // Close the queue first
- queue.close()
- key, _ := crypto.GenerateKey()
- tx := makeTx(0, 1, 1, 1, key)
-
- err := queue.convert(tx)
- if err == nil {
- t.Fatal("Expected error when converting on closed queue, got nil")
- }
-}
-
-func TestConversionQueueDoubleClose(t *testing.T) {
- queue := newConversionQueue()
- queue.close()
- queue.close() // Should not panic
-}
-
-func TestConversionQueueAutoRestartBatch(t *testing.T) {
- queue := newConversionQueue()
- defer queue.close()
-
- key, _ := crypto.GenerateKey()
-
- // Create a heavy transaction to ensure the first batch runs long enough
- // for subsequent tasks to be queued while it is active.
- heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0)
-
- var wg sync.WaitGroup
- wg.Add(1)
- heavyDone := make(chan error, 1)
- go func() {
- defer wg.Done()
- heavyDone <- queue.convert(heavy)
- }()
-
- // Give the conversion worker a head start so that the following tasks are
- // enqueued while the first batch is running.
- time.Sleep(200 * time.Millisecond)
-
- tx1 := makeTx(1, 1, 1, 1, key)
- tx2 := makeTx(2, 1, 1, 1, key)
-
- wg.Add(2)
- done1 := make(chan error, 1)
- done2 := make(chan error, 1)
- go func() { defer wg.Done(); done1 <- queue.convert(tx1) }()
- go func() { defer wg.Done(); done2 <- queue.convert(tx2) }()
-
- select {
- case err := <-done1:
- if err != nil {
- t.Fatalf("tx1 conversion error: %v", err)
- }
- case <-time.After(30 * time.Second):
- t.Fatal("timeout waiting for tx1 conversion")
- }
-
- select {
- case err := <-done2:
- if err != nil {
- t.Fatalf("tx2 conversion error: %v", err)
- }
- case <-time.After(30 * time.Second):
- t.Fatal("timeout waiting for tx2 conversion")
- }
-
- select {
- case err := <-heavyDone:
- if err != nil {
- t.Fatalf("heavy conversion error: %v", err)
- }
- case <-time.After(30 * time.Second):
- t.Fatal("timeout waiting for heavy conversion")
- }
-
- wg.Wait()
-
- if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
- t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1)
- }
- if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
- t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1)
- }
-}
diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go
index 874ca85b8c2d..7607cd487a2b 100644
--- a/core/txpool/blobpool/lookup.go
+++ b/core/txpool/blobpool/lookup.go
@@ -110,13 +110,3 @@ func (l *lookup) untrack(tx *blobTxMeta) {
}
}
}
-
-// update updates the transaction index. It should only be used in the conversion.
-func (l *lookup) update(hash common.Hash, id uint64, size uint64) bool {
- meta, exists := l.txIndex[hash]
- if !exists {
- return false
- }
- meta.id, meta.size = id, size
- return true
-}
diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go
index 0386bac55619..e9b933bf89ba 100644
--- a/eth/catalyst/api.go
+++ b/eth/catalyst/api.go
@@ -517,7 +517,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
if len(hashes) > 128 {
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
}
- blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0, false)
+ blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0)
if err != nil {
return nil, engine.InvalidParams.With(err)
}
@@ -578,7 +578,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
return nil, nil
}
- blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1, false)
+ blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1)
if err != nil {
return nil, engine.InvalidParams.With(err)
}