From e8015716c5588fa06178d2def3caab12d19da9a5 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 4 Dec 2025 12:55:15 +0800 Subject: [PATCH] core/txpool/blobpool: remove legacy sidecar conversion --- core/txpool/blobpool/blobpool.go | 244 ++-------------------- core/txpool/blobpool/blobpool_test.go | 258 +----------------------- core/txpool/blobpool/conversion.go | 218 -------------------- core/txpool/blobpool/conversion_test.go | 171 ---------------- core/txpool/blobpool/lookup.go | 10 - eth/catalyst/api.go | 4 +- 6 files changed, 25 insertions(+), 880 deletions(-) delete mode 100644 core/txpool/blobpool/conversion.go delete mode 100644 core/txpool/blobpool/conversion_test.go diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index bfaf4d5b8e3c..791c7012e266 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -21,12 +21,10 @@ import ( "container/heap" "errors" "fmt" - "maps" "math" "math/big" "os" "path/filepath" - "slices" "sort" "sync" "sync/atomic" @@ -337,9 +335,8 @@ type BlobPool struct { stored uint64 // Useful data size of all transactions on disk limbo *limbo // Persistent data store for the non-finalized blobs - signer types.Signer // Transaction signer to use for sender recovery - chain BlockChain // Chain object to access the state through - cQueue *conversionQueue // The queue for performing legacy sidecar conversion (TODO: remove after Osaka) + signer types.Signer // Transaction signer to use for sender recovery + chain BlockChain // Chain object to access the state through head atomic.Pointer[types.Header] // Current head of the chain state *state.StateDB // Current state at the head of the chain @@ -368,7 +365,6 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo hasPendingAuth: hasPendingAuth, signer: types.LatestSigner(chain.Config()), chain: chain, - cQueue: newConversionQueue(), // Deprecate it after the osaka fork lookup: newLookup(), index: make(map[common.Address][]*blobTxMeta), spent: make(map[common.Address]*uint256.Int), @@ -485,9 +481,6 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser // Close closes down the underlying persistent store. func (p *BlobPool) Close() error { - // Terminate the conversion queue - p.cQueue.close() - var errs []error if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set if err := p.limbo.Close(); err != nil { @@ -885,172 +878,6 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { basefeeGauge.Update(int64(basefee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64())) p.updateStorageMetrics() - - // Perform the conversion logic at the fork boundary - if !p.chain.Config().IsOsaka(oldHead.Number, oldHead.Time) && p.chain.Config().IsOsaka(newHead.Number, newHead.Time) { - // Deep copy all indexed transaction metadata. - var ( - ids = make(map[common.Address]map[uint64]uint64) - txs = make(map[common.Address]map[uint64]common.Hash) - ) - for sender, list := range p.index { - ids[sender] = make(map[uint64]uint64) - txs[sender] = make(map[uint64]common.Hash) - for _, m := range list { - ids[sender][m.nonce] = m.id - txs[sender][m.nonce] = m.hash - } - } - // Initiate the background conversion thread. - p.cQueue.launchBillyConversion(func() { - p.convertLegacySidecars(ids, txs) - }) - } -} - -// compareAndSwap checks if the specified transaction is still tracked in the pool -// and replace the metadata accordingly. It should only be used in the fork boundary -// bulk conversion. If it fails for some reason, the subsequent txs won't be dropped -// for simplicity which we assume it's very likely to happen. -// -// The returned flag indicates whether the replacement succeeded. -func (p *BlobPool) compareAndSwap(address common.Address, hash common.Hash, blob []byte, oldID uint64, oldStorageSize uint32) bool { - p.lock.Lock() - defer p.lock.Unlock() - - newId, err := p.store.Put(blob) - if err != nil { - log.Error("Failed to store transaction", "hash", hash, "err", err) - return false - } - newSize := uint64(len(blob)) - newStorageSize := p.store.Size(newId) - - // Terminate the procedure if the transaction was already evicted. The - // newly added blob should be removed before return. - if !p.lookup.update(hash, newId, newSize) { - if derr := p.store.Delete(newId); derr != nil { - log.Error("Failed to delete the dangling blob tx", "err", derr) - } else { - log.Warn("Deleted the dangling blob tx", "id", newId) - } - return false - } - // Update the metadata of blob transaction - for _, meta := range p.index[address] { - if meta.hash == hash { - meta.id = newId - meta.version = types.BlobSidecarVersion1 - meta.storageSize = newStorageSize - meta.size = newSize - - p.stored += uint64(newStorageSize) - p.stored -= uint64(oldStorageSize) - break - } - } - if err := p.store.Delete(oldID); err != nil { - log.Error("Failed to delete the legacy transaction", "hash", hash, "id", oldID, "err", err) - } - return true -} - -// convertLegacySidecar fetches transaction data from the store, performs an -// on-the-fly conversion. This function is intended for use only during the -// Osaka fork transition period. -// -// The returned flag indicates whether the replacement succeeds or not. -func (p *BlobPool) convertLegacySidecar(sender common.Address, hash common.Hash, id uint64) bool { - start := time.Now() - - // Retrieves the legacy blob transaction from the underlying store with - // read lock held, preventing any potential data race around the slot - // specified by the id. - p.lock.RLock() - data, err := p.store.Get(id) - if err != nil { - p.lock.RUnlock() - // The transaction may have been evicted simultaneously, safe to skip conversion. - log.Debug("Blob transaction is missing", "hash", hash, "id", id, "err", err) - return false - } - oldStorageSize := p.store.Size(id) - p.lock.RUnlock() - - // Decode the transaction, the failure is not expected and report the error - // loudly if possible. If the blob transaction in this slot is corrupted, - // leave it in the store, it will be dropped during the next pool - // initialization. - var tx types.Transaction - if err = rlp.DecodeBytes(data, &tx); err != nil { - log.Error("Blob transaction is corrupted", "hash", hash, "id", id, "err", err) - return false - } - - // Skip conversion if the transaction does not match the expected hash, or if it was - // already converted. This can occur if the original transaction was evicted from the - // pool and the slot was reused by a new one. - if tx.Hash() != hash { - log.Warn("Blob transaction was replaced", "hash", hash, "id", id, "stored", tx.Hash()) - return false - } - sc := tx.BlobTxSidecar() - if sc.Version >= types.BlobSidecarVersion1 { - log.Debug("Skipping conversion of blob tx", "hash", hash, "id", id) - return false - } - - // Perform the sidecar conversion, the failure is not expected and report the error - // loudly if possible. - if err := tx.BlobTxSidecar().ToV1(); err != nil { - log.Error("Failed to convert blob transaction", "hash", hash, "err", err) - return false - } - - // Encode the converted transaction, the failure is not expected and report - // the error loudly if possible. - blob, err := rlp.EncodeToBytes(&tx) - if err != nil { - log.Error("Failed to encode blob transaction", "hash", tx.Hash(), "err", err) - return false - } - - // Replace the legacy blob transaction with the converted format. - if !p.compareAndSwap(sender, hash, blob, id, oldStorageSize) { - log.Error("Failed to replace the legacy transaction", "hash", hash) - return false - } - log.Debug("Converted legacy blob transaction", "hash", hash, "elapsed", common.PrettyDuration(time.Since(start))) - return true -} - -// convertLegacySidecars converts all given transactions to sidecar version 1. -// -// If any of them fails to be converted, the subsequent transactions will still -// be processed, as we assume the failure is very unlikely to happen. If happens, -// these transactions will be stuck in the pool until eviction. -func (p *BlobPool) convertLegacySidecars(ids map[common.Address]map[uint64]uint64, txs map[common.Address]map[uint64]common.Hash) { - var ( - start = time.Now() - success int - failure int - ) - for addr, list := range txs { - // Transactions evicted from the pool must be contiguous, if in any case, - // the transactions are gapped with each other, they will be discarded. - nonces := slices.Collect(maps.Keys(list)) - slices.Sort(nonces) - - // Convert the txs with nonce order - for _, nonce := range nonces { - if p.convertLegacySidecar(addr, list[nonce], ids[addr][nonce]) { - success++ - } else { - failure++ - } - } - } - log.Info("Completed blob transaction conversion", "discarded", failure, "injected", success, "elapsed", common.PrettyDuration(time.Since(start))) } // reorg assembles all the transactors and missing transactions between an old @@ -1530,8 +1357,8 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { // // The version argument specifies the type of proofs to return, either the // blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is -// CPU intensive, so only done if explicitly requested with the convert flag. -func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) { +// CPU intensive and prohibited in the blobpool explicitly. +func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blob, []kzg4844.Commitment, [][]kzg4844.Proof, error) { var ( blobs = make([]*kzg4844.Blob, len(vhashes)) commitments = make([]kzg4844.Commitment, len(vhashes)) @@ -1582,7 +1409,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ( } // Mark hash as seen. filled[hash] = struct{}{} - if sidecar.Version != version && !convert { + if sidecar.Version != version { // Skip blobs with incompatible version. Note we still track the blob hash // in `filled` here, ensuring that we do not resolve this tx another time. continue @@ -1591,29 +1418,13 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte, convert bool) ( var pf []kzg4844.Proof switch version { case types.BlobSidecarVersion0: - if sidecar.Version == types.BlobSidecarVersion0 { - pf = []kzg4844.Proof{sidecar.Proofs[i]} - } else { - proof, err := kzg4844.ComputeBlobProof(&sidecar.Blobs[i], sidecar.Commitments[i]) - if err != nil { - return nil, nil, nil, err - } - pf = []kzg4844.Proof{proof} - } + pf = []kzg4844.Proof{sidecar.Proofs[i]} case types.BlobSidecarVersion1: - if sidecar.Version == types.BlobSidecarVersion0 { - cellProofs, err := kzg4844.ComputeCellProofs(&sidecar.Blobs[i]) - if err != nil { - return nil, nil, nil, err - } - pf = cellProofs - } else { - cellProofs, err := sidecar.CellProofsAt(i) - if err != nil { - return nil, nil, nil, err - } - pf = cellProofs + cellProofs, err := sidecar.CellProofsAt(i) + if err != nil { + return nil, nil, nil, err } + pf = cellProofs } for _, index := range list { blobs[index] = &sidecar.Blobs[i] @@ -1640,45 +1451,26 @@ func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) int { return available } -// preCheck performs the static validation upon the provided tx and converts -// the legacy sidecars if Osaka fork has been activated with a short time window. +// preCheck performs the static validation upon the provided tx. // // This function is pure static and lock free. func (p *BlobPool) preCheck(tx *types.Transaction) error { var ( - head = p.head.Load() - isOsaka = p.chain.Config().IsOsaka(head.Number, head.Time) - deadline time.Time + head = p.head.Load() + version = types.BlobSidecarVersion0 ) - if isOsaka { - deadline = time.Unix(int64(*p.chain.Config().OsakaTime), 0).Add(conversionTimeWindow) + if p.chain.Config().IsOsaka(head.Number, head.Time) { + version = types.BlobSidecarVersion1 } // Validate the transaction statically at first to avoid unnecessary // conversion. This step doesn't require lock protection. if err := p.ValidateTxBasics(tx); err != nil { return err } - // Before the Osaka fork, reject the blob txs with cell proofs - if !isOsaka { - if tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 { - return nil - } else { - return errors.New("cell proof is not supported yet") - } - } - // After the Osaka fork, reject the legacy blob txs if the conversion - // time window is passed. - if tx.BlobTxSidecar().Version == types.BlobSidecarVersion1 { - return nil - } - if head.Time > uint64(deadline.Unix()) { - return errors.New("legacy blob tx is not supported") + if tx.BlobTxSidecar().Version != version { + return fmt.Errorf("sidecar version is not supported, got: %d, want: %d", tx.BlobTxSidecar().Version, version) } - // Convert the legacy sidecar after Osaka fork. This could be a long - // procedure which takes a few seconds, even minutes if there is a long - // queue. Fortunately it will only block the routine of the source peer - // announcing the tx, without affecting other parts. - return p.cQueue.convert(tx) + return nil } // Add inserts a set of blob transactions into the pool if they pass validation (both diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 2fa1927cae2c..ed36019f4030 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -433,11 +433,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { hashes = append(hashes, tx.vhashes...) } } - blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0, false) + blobs1, _, proofs1, err := pool.GetBlobs(hashes, types.BlobSidecarVersion0) if err != nil { t.Fatal(err) } - blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1, false) + blobs2, _, proofs2, err := pool.GetBlobs(hashes, types.BlobSidecarVersion1) if err != nil { t.Fatal(err) } @@ -1329,7 +1329,7 @@ func TestBlobCountLimit(t *testing.T) { // Check that first succeeds second fails. if errs[0] != nil { - t.Fatalf("expected tx with 7 blobs to succeed") + t.Fatalf("expected tx with 7 blobs to succeed, got %v", errs[0]) } if !errors.Is(errs[1], txpool.ErrTxBlobLimitExceeded) { t.Fatalf("expected tx with 8 blobs to fail, got: %v", errs[1]) @@ -1806,66 +1806,6 @@ func TestAdd(t *testing.T) { } } -// Tests that transactions with legacy sidecars are accepted within the -// conversion window but rejected after it has passed. -func TestAddLegacyBlobTx(t *testing.T) { - testAddLegacyBlobTx(t, true) // conversion window has not yet passed - testAddLegacyBlobTx(t, false) // conversion window passed -} - -func testAddLegacyBlobTx(t *testing.T, accept bool) { - var ( - key1, _ = crypto.GenerateKey() - key2, _ = crypto.GenerateKey() - - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = crypto.PubkeyToAddress(key2.PublicKey) - ) - - statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) - statedb.AddBalance(addr1, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) - statedb.AddBalance(addr2, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) - statedb.Commit(0, true, false) - - chain := &testBlockChain{ - config: params.MergedTestChainConfig, - basefee: uint256.NewInt(1050), - blobfee: uint256.NewInt(105), - statedb: statedb, - } - var timeDiff uint64 - if accept { - timeDiff = uint64(conversionTimeWindow.Seconds()) - 1 - } else { - timeDiff = uint64(conversionTimeWindow.Seconds()) + 1 - } - time := *params.MergedTestChainConfig.OsakaTime + timeDiff - chain.setHeadTime(time) - - pool := New(Config{Datadir: t.TempDir()}, chain, nil) - if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { - t.Fatalf("failed to create blob pool: %v", err) - } - - // Attempt to add legacy blob transactions. - var ( - tx1 = makeMultiBlobTx(0, 1, 1000, 100, 6, 0, key1, types.BlobSidecarVersion0) - tx2 = makeMultiBlobTx(0, 1, 800, 70, 6, 6, key2, types.BlobSidecarVersion0) - txs = []*types.Transaction{tx1, tx2} - ) - errs := pool.Add(txs, true) - for _, err := range errs { - if accept && err != nil { - t.Fatalf("expected tx add to succeed, %v", err) - } - if !accept && err == nil { - t.Fatal("expected tx add to fail") - } - } - verifyPoolInternals(t, pool) - pool.Close() -} - func TestGetBlobs(t *testing.T) { //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) @@ -1952,7 +1892,6 @@ func TestGetBlobs(t *testing.T) { limit int fillRandom bool // Whether to randomly fill some of the requested blobs with unknowns version byte // Blob sidecar version to request - convert bool // Whether to convert version on retrieval }{ { start: 0, limit: 6, @@ -2018,11 +1957,6 @@ func TestGetBlobs(t *testing.T) { start: 0, limit: 18, fillRandom: true, version: types.BlobSidecarVersion1, }, - { - start: 0, limit: 18, fillRandom: true, - version: types.BlobSidecarVersion1, - convert: true, // Convert some version 0 blobs to version 1 while retrieving - }, } for i, c := range cases { var ( @@ -2044,7 +1978,7 @@ func TestGetBlobs(t *testing.T) { filled[len(vhashes)] = struct{}{} vhashes = append(vhashes, testrand.Hash()) } - blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version, c.convert) + blobs, _, proofs, err := pool.GetBlobs(vhashes, c.version) if err != nil { t.Errorf("Unexpected error for case %d, %v", i, err) } @@ -2070,8 +2004,7 @@ func TestGetBlobs(t *testing.T) { // If an item is missing, but shouldn't, error if blobs[j] == nil || proofs[j] == nil { // This is only an error if there was no version mismatch - if c.convert || - (c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) || + if (c.version == types.BlobSidecarVersion1 && 6 <= testBlobIndex && testBlobIndex < 12) || (c.version == types.BlobSidecarVersion0 && (testBlobIndex < 6 || 12 <= testBlobIndex)) { t.Errorf("tracked blob retrieval failed: item %d, hash %x", j, vhashes[j]) } @@ -2098,185 +2031,6 @@ func TestGetBlobs(t *testing.T) { pool.Close() } -// TestSidecarConversion will verify that after the Osaka fork, all legacy -// sidecars in the pool are successfully convert to v1 sidecars. -func TestSidecarConversion(t *testing.T) { - // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelTrace, true))) - - // Create a temporary folder for the persistent backend - storage := t.TempDir() - os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700) - - var ( - preOsakaTxs = make(types.Transactions, 10) - postOsakaTxs = make(types.Transactions, 3) - keys = make([]*ecdsa.PrivateKey, len(preOsakaTxs)+len(postOsakaTxs)) - addrs = make([]common.Address, len(preOsakaTxs)+len(postOsakaTxs)) - statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) - ) - for i := range keys { - keys[i], _ = crypto.GenerateKey() - addrs[i] = crypto.PubkeyToAddress(keys[i].PublicKey) - statedb.AddBalance(addrs[i], uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) - } - for i := range preOsakaTxs { - preOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 2, 0, keys[i], types.BlobSidecarVersion0) - } - for i := range postOsakaTxs { - if i == 0 { - // First has a v0 sidecar. - postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion0) - } - postOsakaTxs[i] = makeMultiBlobTx(0, 1, 1000, 100, 1, 0, keys[len(preOsakaTxs)+i], types.BlobSidecarVersion1) - } - statedb.Commit(0, true, false) - - // Test plan: - // 1) Create a bunch v0 sidecar txs and add to pool before Osaka. - // 2) Pass in new Osaka header to activate the conversion thread. - // 3) Continue adding both v0 and v1 transactions to the pool. - // 4) Verify that as additional blocks come in, transactions involved in the - // migration are correctly discarded. - - config := ¶ms.ChainConfig{ - ChainID: big.NewInt(1), - LondonBlock: big.NewInt(0), - BerlinBlock: big.NewInt(0), - CancunTime: newUint64(0), - PragueTime: newUint64(0), - OsakaTime: newUint64(1), - BlobScheduleConfig: params.DefaultBlobSchedule, - } - chain := &testBlockChain{ - config: config, - basefee: uint256.NewInt(1050), - blobfee: uint256.NewInt(105), - statedb: statedb, - blocks: make(map[uint64]*types.Block), - } - - // Create 3 blocks: - // - the current block, before Osaka - // - the first block after Osaka - // - another post-Osaka block with several transactions in it - header0 := chain.CurrentBlock() - header0.Time = 0 - chain.blocks[0] = types.NewBlockWithHeader(header0) - - header1 := chain.CurrentBlock() - header1.Number = big.NewInt(1) - header1.Time = 1 - chain.blocks[1] = types.NewBlockWithHeader(header1) - - header2 := chain.CurrentBlock() - header2.Time = 2 - header2.Number = big.NewInt(2) - - // Make a copy of one of the pre-Osaka transactions and convert it to v1 here - // so that we can add it to the pool later and ensure a duplicate is not added - // by the conversion queue. - tx := preOsakaTxs[len(preOsakaTxs)-1] - sc := *tx.BlobTxSidecar() // copy sidecar - sc.ToV1() - tx.WithBlobTxSidecar(&sc) - - block2 := types.NewBlockWithHeader(header2).WithBody(types.Body{Transactions: append(postOsakaTxs, tx)}) - chain.blocks[2] = block2 - - pool := New(Config{Datadir: storage}, chain, nil) - if err := pool.Init(1, header0, newReserver()); err != nil { - t.Fatalf("failed to create blob pool: %v", err) - } - - errs := pool.Add(preOsakaTxs, true) - for i, err := range errs { - if err != nil { - t.Errorf("failed to insert blob tx from %s: %s", addrs[i], errs[i]) - } - } - - // Kick off migration. - pool.Reset(header0, header1) - - // Add the v0 sidecar tx, but don't block so we can keep doing other stuff - // while it converts the sidecar. - addDone := make(chan struct{}) - go func() { - pool.Add(types.Transactions{postOsakaTxs[0]}, false) - close(addDone) - }() - - // Add the post-Osaka v1 sidecar txs. - errs = pool.Add(postOsakaTxs[1:], false) - for _, err := range errs { - if err != nil { - t.Fatalf("expected tx add to succeed: %v", err) - } - } - - // Wait for the first tx's conversion to complete, then check that all - // transactions added after Osaka can be accounted for in the pool. - <-addDone - pending := pool.Pending(txpool.PendingFilter{BlobTxs: true, BlobVersion: types.BlobSidecarVersion1}) - for _, tx := range postOsakaTxs { - from, _ := pool.signer.Sender(tx) - if len(pending[from]) != 1 || pending[from][0].Hash != tx.Hash() { - t.Fatalf("expected post-Osaka txs to be pending") - } - } - - // Now update the pool with the next block. This should cause the pool to - // clear out the post-Osaka txs since they were included in block 2. Since the - // test blockchain doesn't manage nonces, we'll just do that manually before - // the reset is called. Don't forget about the pre-Osaka transaction we also - // added to block 2! - for i := range postOsakaTxs { - statedb.SetNonce(addrs[len(preOsakaTxs)+i], 1, tracing.NonceChangeEoACall) - } - statedb.SetNonce(addrs[len(preOsakaTxs)-1], 1, tracing.NonceChangeEoACall) - pool.Reset(header1, block2.Header()) - - // Now verify no post-Osaka transactions are tracked by the pool. - for i, tx := range postOsakaTxs { - if pool.Get(tx.Hash()) != nil { - t.Fatalf("expected txs added post-osaka to have been placed in limbo due to inclusion in a block: index %d, hash %s", i, tx.Hash()) - } - } - - // Wait for the pool migration to complete. - <-pool.cQueue.anyBillyConversionDone - - // Verify all transactions in the pool were converted and verify the - // subsequent cell proofs. - count, _ := pool.Stats() - if count != len(preOsakaTxs)-1 { - t.Errorf("expected pending count to match initial tx count: pending=%d, expected=%d", count, len(preOsakaTxs)-1) - } - for addr, acc := range pool.index { - for _, m := range acc { - if m.version != types.BlobSidecarVersion1 { - t.Errorf("expected sidecar to have been converted: from %s, hash %s", addr, m.hash) - } - tx := pool.Get(m.hash) - if tx == nil { - t.Errorf("failed to get tx by hash: %s", m.hash) - } - sc := tx.BlobTxSidecar() - if err := kzg4844.VerifyCellProofs(sc.Blobs, sc.Commitments, sc.Proofs); err != nil { - t.Errorf("failed to verify cell proofs for tx %s after conversion: %s", m.hash, err) - } - } - } - - verifyPoolInternals(t, pool) - - // Launch conversion a second time. - // This is just a sanity check to ensure we can handle it. - pool.Reset(header0, header1) - - pool.Close() -} - // fakeBilly is a billy.Database implementation which just drops data on the floor. type fakeBilly struct { billy.Database @@ -2360,5 +2114,3 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } } } - -func newUint64(val uint64) *uint64 { return &val } diff --git a/core/txpool/blobpool/conversion.go b/core/txpool/blobpool/conversion.go deleted file mode 100644 index afdc10554f66..000000000000 --- a/core/txpool/blobpool/conversion.go +++ /dev/null @@ -1,218 +0,0 @@ -// Copyright 2025 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package blobpool - -import ( - "errors" - "slices" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" -) - -// maxPendingConversionTasks caps the number of pending conversion tasks. This -// prevents excessive memory usage; the worst-case scenario (2k transactions -// with 6 blobs each) would consume approximately 1.5GB of memory. -const maxPendingConversionTasks = 2048 - -// txConvert represents a conversion task with an attached legacy blob transaction. -type txConvert struct { - tx *types.Transaction // Legacy blob transaction - done chan error // Channel for signaling back if the conversion succeeds -} - -// conversionQueue is a dedicated queue for converting legacy blob transactions -// received from the network after the Osaka fork. Since conversion is expensive, -// it is performed in the background by a single thread, ensuring the main Geth -// process is not overloaded. -type conversionQueue struct { - tasks chan *txConvert - startBilly chan func() - quit chan struct{} - closed chan struct{} - - billyQueue []func() - billyTaskDone chan struct{} - - // This channel will be closed when the first billy conversion finishes. - // It's added for unit tests to synchronize with the conversion progress. - anyBillyConversionDone chan struct{} -} - -// newConversionQueue constructs the conversion queue. -func newConversionQueue() *conversionQueue { - q := &conversionQueue{ - tasks: make(chan *txConvert), - startBilly: make(chan func()), - quit: make(chan struct{}), - closed: make(chan struct{}), - anyBillyConversionDone: make(chan struct{}), - } - go q.loop() - return q -} - -// convert accepts a legacy blob transaction with version-0 blobs and queues it -// for conversion. -// -// This function may block for a long time until the transaction is processed. -func (q *conversionQueue) convert(tx *types.Transaction) error { - done := make(chan error, 1) - select { - case q.tasks <- &txConvert{tx: tx, done: done}: - return <-done - case <-q.closed: - return errors.New("conversion queue closed") - } -} - -// launchBillyConversion starts a conversion task in the background. -func (q *conversionQueue) launchBillyConversion(fn func()) error { - select { - case q.startBilly <- fn: - return nil - case <-q.closed: - return errors.New("conversion queue closed") - } -} - -// close terminates the conversion queue. -func (q *conversionQueue) close() { - select { - case <-q.closed: - return - default: - close(q.quit) - <-q.closed - } -} - -// run converts a batch of legacy blob txs to the new cell proof format. -func (q *conversionQueue) run(tasks []*txConvert, done chan struct{}, interrupt *atomic.Int32) { - defer close(done) - - for _, t := range tasks { - if interrupt != nil && interrupt.Load() != 0 { - t.done <- errors.New("conversion is interrupted") - continue - } - sidecar := t.tx.BlobTxSidecar() - if sidecar == nil { - t.done <- errors.New("tx without sidecar") - continue - } - // Run the conversion, the original sidecar will be mutated in place - start := time.Now() - err := sidecar.ToV1() - t.done <- err - log.Trace("Converted legacy blob tx", "hash", t.tx.Hash(), "err", err, "elapsed", common.PrettyDuration(time.Since(start))) - } -} - -func (q *conversionQueue) loop() { - defer close(q.closed) - - var ( - done chan struct{} // Non-nil if background routine is active - interrupt *atomic.Int32 // Flag to signal conversion interruption - - // The pending tasks for sidecar conversion. We assume the number of legacy - // blob transactions requiring conversion will not be excessive. However, - // a hard cap is applied as a protective measure. - txTasks []*txConvert - - firstBilly = true - ) - - for { - select { - case t := <-q.tasks: - if len(txTasks) >= maxPendingConversionTasks { - t.done <- errors.New("conversion queue is overloaded") - continue - } - txTasks = append(txTasks, t) - - // Launch the background conversion thread if it's idle - if done == nil { - done, interrupt = make(chan struct{}), new(atomic.Int32) - - tasks := slices.Clone(txTasks) - txTasks = txTasks[:0] - go q.run(tasks, done, interrupt) - } - - case <-done: - done, interrupt = nil, nil - if len(txTasks) > 0 { - done, interrupt = make(chan struct{}), new(atomic.Int32) - tasks := slices.Clone(txTasks) - txTasks = txTasks[:0] - go q.run(tasks, done, interrupt) - } - - case fn := <-q.startBilly: - q.billyQueue = append(q.billyQueue, fn) - q.runNextBillyTask() - - case <-q.billyTaskDone: - if firstBilly { - close(q.anyBillyConversionDone) - firstBilly = false - } - q.runNextBillyTask() - - case <-q.quit: - if done != nil { - log.Debug("Waiting for blob proof conversion to exit") - interrupt.Store(1) - <-done - } - if q.billyTaskDone != nil { - log.Debug("Waiting for blobpool billy conversion to exit") - <-q.billyTaskDone - } - // Signal any tasks that were queued for the next batch but never started - // so callers blocked in convert() receive an error instead of hanging. - for _, t := range txTasks { - // Best-effort notify; t.done is a buffered channel of size 1 - // created by convert(), and we send exactly once per task. - t.done <- errors.New("conversion queue closed") - } - // Drop references to allow GC of the backing array. - txTasks = txTasks[:0] - return - } - } -} - -func (q *conversionQueue) runNextBillyTask() { - if len(q.billyQueue) == 0 { - q.billyTaskDone = nil - return - } - - fn := q.billyQueue[0] - q.billyQueue = append(q.billyQueue[:0], q.billyQueue[1:]...) - - done := make(chan struct{}) - go func() { defer close(done); fn() }() - q.billyTaskDone = done -} diff --git a/core/txpool/blobpool/conversion_test.go b/core/txpool/blobpool/conversion_test.go deleted file mode 100644 index 7ffffb2e4d36..000000000000 --- a/core/txpool/blobpool/conversion_test.go +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright 2025 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package blobpool - -import ( - "crypto/ecdsa" - "crypto/sha256" - "sync" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/crypto/kzg4844" - "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" -) - -// createV1BlobTx creates a blob transaction with version 1 sidecar for testing. -func createV1BlobTx(nonce uint64, key *ecdsa.PrivateKey) *types.Transaction { - blob := &kzg4844.Blob{byte(nonce)} - commitment, _ := kzg4844.BlobToCommitment(blob) - cellProofs, _ := kzg4844.ComputeCellProofs(blob) - - blobtx := &types.BlobTx{ - ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID), - Nonce: nonce, - GasTipCap: uint256.NewInt(1), - GasFeeCap: uint256.NewInt(1000), - Gas: 21000, - BlobFeeCap: uint256.NewInt(100), - BlobHashes: []common.Hash{kzg4844.CalcBlobHashV1(sha256.New(), &commitment)}, - Value: uint256.NewInt(100), - Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, []kzg4844.Blob{*blob}, []kzg4844.Commitment{commitment}, cellProofs), - } - return types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) -} - -func TestConversionQueueBasic(t *testing.T) { - queue := newConversionQueue() - defer queue.close() - - key, _ := crypto.GenerateKey() - tx := makeTx(0, 1, 1, 1, key) - if err := queue.convert(tx); err != nil { - t.Fatalf("Expected successful conversion, got error: %v", err) - } - if tx.BlobTxSidecar().Version != types.BlobSidecarVersion1 { - t.Errorf("Expected sidecar version to be %d, got %d", types.BlobSidecarVersion1, tx.BlobTxSidecar().Version) - } -} - -func TestConversionQueueV1BlobTx(t *testing.T) { - queue := newConversionQueue() - defer queue.close() - - key, _ := crypto.GenerateKey() - tx := createV1BlobTx(0, key) - version := tx.BlobTxSidecar().Version - - err := queue.convert(tx) - if err != nil { - t.Fatalf("Expected successful conversion, got error: %v", err) - } - if tx.BlobTxSidecar().Version != version { - t.Errorf("Expected sidecar version to remain %d, got %d", version, tx.BlobTxSidecar().Version) - } -} - -func TestConversionQueueClosed(t *testing.T) { - queue := newConversionQueue() - - // Close the queue first - queue.close() - key, _ := crypto.GenerateKey() - tx := makeTx(0, 1, 1, 1, key) - - err := queue.convert(tx) - if err == nil { - t.Fatal("Expected error when converting on closed queue, got nil") - } -} - -func TestConversionQueueDoubleClose(t *testing.T) { - queue := newConversionQueue() - queue.close() - queue.close() // Should not panic -} - -func TestConversionQueueAutoRestartBatch(t *testing.T) { - queue := newConversionQueue() - defer queue.close() - - key, _ := crypto.GenerateKey() - - // Create a heavy transaction to ensure the first batch runs long enough - // for subsequent tasks to be queued while it is active. - heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0) - - var wg sync.WaitGroup - wg.Add(1) - heavyDone := make(chan error, 1) - go func() { - defer wg.Done() - heavyDone <- queue.convert(heavy) - }() - - // Give the conversion worker a head start so that the following tasks are - // enqueued while the first batch is running. - time.Sleep(200 * time.Millisecond) - - tx1 := makeTx(1, 1, 1, 1, key) - tx2 := makeTx(2, 1, 1, 1, key) - - wg.Add(2) - done1 := make(chan error, 1) - done2 := make(chan error, 1) - go func() { defer wg.Done(); done1 <- queue.convert(tx1) }() - go func() { defer wg.Done(); done2 <- queue.convert(tx2) }() - - select { - case err := <-done1: - if err != nil { - t.Fatalf("tx1 conversion error: %v", err) - } - case <-time.After(30 * time.Second): - t.Fatal("timeout waiting for tx1 conversion") - } - - select { - case err := <-done2: - if err != nil { - t.Fatalf("tx2 conversion error: %v", err) - } - case <-time.After(30 * time.Second): - t.Fatal("timeout waiting for tx2 conversion") - } - - select { - case err := <-heavyDone: - if err != nil { - t.Fatalf("heavy conversion error: %v", err) - } - case <-time.After(30 * time.Second): - t.Fatal("timeout waiting for heavy conversion") - } - - wg.Wait() - - if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 { - t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1) - } - if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 { - t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1) - } -} diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go index 874ca85b8c2d..7607cd487a2b 100644 --- a/core/txpool/blobpool/lookup.go +++ b/core/txpool/blobpool/lookup.go @@ -110,13 +110,3 @@ func (l *lookup) untrack(tx *blobTxMeta) { } } } - -// update updates the transaction index. It should only be used in the conversion. -func (l *lookup) update(hash common.Hash, id uint64, size uint64) bool { - meta, exists := l.txIndex[hash] - if !exists { - return false - } - meta.id, meta.size = id, size - return true -} diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 0386bac55619..e9b933bf89ba 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -517,7 +517,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0, false) + blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion0) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -578,7 +578,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo return nil, nil } - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1, false) + blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(hashes, types.BlobSidecarVersion1) if err != nil { return nil, engine.InvalidParams.With(err) }