Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,13 @@ var (

// ErrReadError is a generic read error for testing
ErrReadError = errors.New("read error")

// ErrTransactionNil is returned when a transaction is nil during serialization
ErrTransactionNil = errors.New("transaction is nil, cannot serialize")

// ErrTransactionWrite is returned when writing a transaction fails
ErrTransactionWrite = errors.New("error writing transaction")

// ErrTransactionRead is returned when reading a transaction fails
ErrTransactionRead = errors.New("error reading transaction")
)
157 changes: 157 additions & 0 deletions subtree_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,163 @@
return buf.Bytes(), nil
}

// WriteTransactionsToWriter writes a range of transactions directly to a writer.
//
// This enables memory-efficient serialization by streaming transactions to disk as they are loaded,
// without requiring all transactions to be in memory simultaneously. Transactions in the specified
// range are written sequentially, skipping any nil entries.
//
// Parameters:
// - w: Writer to stream transactions to
// - startIdx: Starting index (inclusive) of transactions to write
// - endIdx: Ending index (exclusive) of transactions to write
//
// Returns an error if writing fails or if required transactions are missing (nil).
func (s *Data) WriteTransactionsToWriter(w io.Writer, startIdx, endIdx int) error {
if s.Subtree == nil {
return ErrCannotSerializeSubtreeNotSet
}

for i := startIdx; i < endIdx; i++ {
// Skip coinbase placeholder if it's the first transaction
if i == 0 && s.Subtree.Nodes[0].Hash.Equal(*CoinbasePlaceholderHash) {
continue
}

if s.Txs[i] == nil {
return ErrTransactionNil
}

// Serialize and stream transaction bytes to writer
txBytes := s.Txs[i].SerializeBytes()
if _, err := w.Write(txBytes); err != nil {
return fmt.Errorf("%w at index %d: %w", ErrTransactionWrite, i, err)

Check failure on line 144 in subtree_data.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "%w at index %d: %w" 3 times.

See more on https://sonarcloud.io/project/issues?id=bsv-blockchain_go-subtree&issues=AZsU-gDLqrAAlGaxXQNA&open=AZsU-gDLqrAAlGaxXQNA&pullRequest=73
}
}

return nil
}

// WriteTransactionChunk writes a slice of transactions directly to a writer.
//
// This is a simplified streaming function that writes transactions without requiring a SubtreeData
// structure. It's useful for workflows where transactions are already loaded and just need to be
// streamed to disk.
//
// Parameters:
// - w: Writer to stream transactions to
// - txs: Slice of transactions to write
//
// Returns an error if writing fails.
func WriteTransactionChunk(w io.Writer, txs []*bt.Tx) error {
for _, tx := range txs {
if tx == nil {
continue // Skip nil transactions
}

txBytes := tx.SerializeBytes()
if _, err := w.Write(txBytes); err != nil {
return fmt.Errorf("%w: %w", ErrTransactionWrite, err)
}
}

return nil
}

// ReadTransactionChunk reads and validates a chunk of transactions from a reader.
//
// This is a simplified streaming function that reads transactions directly into a new slice,
// validates them against the subtree structure, and returns the populated slice. This is more
// memory-efficient than ReadTransactionsFromReader for processing workflows where the SubtreeData
// array is not needed.
//
// Parameters:
// - r: Reader to read transactions from
// - subtree: Subtree structure for hash validation
// - startIdx: Starting index in subtree for validation
// - count: Number of transactions to read
//
// Returns a slice of transactions and any error encountered.
func ReadTransactionChunk(r io.Reader, subtree *Subtree, startIdx, count int) ([]*bt.Tx, error) {
if subtree == nil || len(subtree.Nodes) == 0 {
return nil, ErrSubtreeNodesEmpty
}

txs := make([]*bt.Tx, 0, count)

for i := 0; i < count; i++ {
idx := startIdx + i
if idx >= len(subtree.Nodes) {
break // Reached end of subtree
}

// Skip coinbase placeholder
if idx == 0 && subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) {
continue
}

tx := &bt.Tx{}
if _, err := tx.ReadFrom(r); err != nil {
if errors.Is(err, io.EOF) {
break
}
return txs, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, idx, err)
}

// Validate tx hash matches expected
if !subtree.Nodes[idx].Hash.Equal(*tx.TxIDChainHash()) {
return txs, ErrTxHashMismatch
}

txs = append(txs, tx)
}

return txs, nil
}

// ReadTransactionsFromReader reads a range of transactions from a reader.
//
// This enables memory-efficient deserialization by reading only a chunk of transactions
// from disk at a time, rather than loading all transactions into memory.
//
// Parameters:
// - r: Reader to read transactions from
// - startIdx: Starting index (inclusive) where transactions should be stored
// - endIdx: Ending index (exclusive) where transactions should be stored
//
// Returns the number of transactions read and any error encountered.
func (s *Data) ReadTransactionsFromReader(r io.Reader, startIdx, endIdx int) (int, error) {
if s.Subtree == nil || len(s.Subtree.Nodes) == 0 {
return 0, ErrSubtreeNodesEmpty
}

txsRead := 0
for i := startIdx; i < endIdx; i++ {
// Skip coinbase placeholder
if i == 0 && s.Subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) {
continue
}

tx := &bt.Tx{}
if _, err := tx.ReadFrom(r); err != nil {
if errors.Is(err, io.EOF) {
break
}
return txsRead, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, i, err)
}

// Validate tx hash matches expected
if !s.Subtree.Nodes[i].Hash.Equal(*tx.TxIDChainHash()) {
return txsRead, ErrTxHashMismatch
}

s.Txs[i] = tx
txsRead++
}

return txsRead, nil
}

// serializeFromReader reads transactions from the provided reader and populates the Txs field.
func (s *Data) serializeFromReader(buf io.Reader) error {
var (
Expand Down
107 changes: 107 additions & 0 deletions subtree_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,110 @@ type mockReader struct {
func (r *mockReader) Read(_ []byte) (n int, err error) {
return 0, r.err
}

// Helper to create test subtree with 4 versioned transactions
func setupTestSubtreeData(t *testing.T) (*Subtree, *Data, []*bt.Tx) {
txs := make([]*bt.Tx, 4)
for i := range txs {
txs[i] = tx.Clone()
txs[i].Version = uint32(i + 1) //nolint:gosec // G115: test data, safe conversion
}

subtree, err := NewTree(2)
require.NoError(t, err)

for _, tx := range txs {
_ = subtree.AddNode(*tx.TxIDChainHash(), 111, 0)
}

subtreeData := NewSubtreeData(subtree)
for i, tx := range txs {
require.NoError(t, subtreeData.AddTx(tx, i))
}

return subtree, subtreeData, txs
}

func TestWriteTransactionsToWriter(t *testing.T) {
t.Run("write full range of transactions", func(t *testing.T) {
subtree, subtreeData, txs := setupTestSubtreeData(t)

buf := &bytes.Buffer{}
err := subtreeData.WriteTransactionsToWriter(buf, 0, 4)
require.NoError(t, err)

newSubtreeData, err := NewSubtreeDataFromBytes(subtree, buf.Bytes())
require.NoError(t, err)
for i, tx := range txs {
assert.Equal(t, tx.Version, newSubtreeData.Txs[i].Version)
}
})

t.Run("write partial range", func(t *testing.T) {
_, subtreeData, txs := setupTestSubtreeData(t)

buf := &bytes.Buffer{}
err := subtreeData.WriteTransactionsToWriter(buf, 1, 3)
require.NoError(t, err)

expectedSize := len(txs[1].SerializeBytes()) + len(txs[2].SerializeBytes())
assert.Equal(t, expectedSize, buf.Len())
})

t.Run("error on nil transaction", func(t *testing.T) {
_, subtreeData, _ := setupTestSubtreeData(t)
subtreeData.Txs[1] = nil // Null out one transaction

buf := &bytes.Buffer{}
err := subtreeData.WriteTransactionsToWriter(buf, 0, 2)
require.ErrorIs(t, err, ErrTransactionNil)
})
}

func TestReadTransactionsFromReader(t *testing.T) {
t.Run("read full range", func(t *testing.T) {
subtree, sourceData, txs := setupTestSubtreeData(t)

serialized, err := sourceData.Serialize()
require.NoError(t, err)

targetData := NewSubtreeData(subtree)
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 4)
require.NoError(t, err)
assert.Equal(t, 4, numRead)

for i, tx := range txs {
assert.Equal(t, tx.Version, targetData.Txs[i].Version)
}
})

t.Run("read partial range", func(t *testing.T) {
subtree, sourceData, txs := setupTestSubtreeData(t)

serialized, err := sourceData.Serialize()
require.NoError(t, err)

targetData := NewSubtreeData(subtree)
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 2)
require.NoError(t, err)
assert.Equal(t, 2, numRead)

assert.Equal(t, txs[0].Version, targetData.Txs[0].Version)
assert.Equal(t, txs[1].Version, targetData.Txs[1].Version)
assert.Nil(t, targetData.Txs[2])
assert.Nil(t, targetData.Txs[3])
})

t.Run("read EOF gracefully", func(t *testing.T) {
subtree, sourceData, _ := setupTestSubtreeData(t)

serialized, err := sourceData.Serialize()
require.NoError(t, err)

// Try to read more transactions than available
targetData := NewSubtreeData(subtree)
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 10)
require.NoError(t, err) // EOF not an error
assert.Equal(t, 4, numRead) // Only 4 available
})
}
Loading