diff --git a/errors.go b/errors.go index 52414b2..0136846 100644 --- a/errors.go +++ b/errors.go @@ -65,4 +65,13 @@ var ( // ErrReadError is a generic read error for testing ErrReadError = errors.New("read error") + + // ErrTransactionNil is returned when a transaction is nil during serialization + ErrTransactionNil = errors.New("transaction is nil, cannot serialize") + + // ErrTransactionWrite is returned when writing a transaction fails + ErrTransactionWrite = errors.New("error writing transaction") + + // ErrTransactionRead is returned when reading a transaction fails + ErrTransactionRead = errors.New("error reading transaction") ) diff --git a/subtree_data.go b/subtree_data.go index 97766cc..d096804 100644 --- a/subtree_data.go +++ b/subtree_data.go @@ -111,6 +111,163 @@ func (s *Data) Serialize() ([]byte, error) { return buf.Bytes(), nil } +// WriteTransactionsToWriter writes a range of transactions directly to a writer. +// +// This enables memory-efficient serialization by streaming transactions to disk as they are loaded, +// without requiring all transactions to be in memory simultaneously. Transactions in the specified +// range are written sequentially, skipping any nil entries. +// +// Parameters: +// - w: Writer to stream transactions to +// - startIdx: Starting index (inclusive) of transactions to write +// - endIdx: Ending index (exclusive) of transactions to write +// +// Returns an error if writing fails or if required transactions are missing (nil). +func (s *Data) WriteTransactionsToWriter(w io.Writer, startIdx, endIdx int) error { + if s.Subtree == nil { + return ErrCannotSerializeSubtreeNotSet + } + + for i := startIdx; i < endIdx; i++ { + // Skip coinbase placeholder if it's the first transaction + if i == 0 && s.Subtree.Nodes[0].Hash.Equal(*CoinbasePlaceholderHash) { + continue + } + + if s.Txs[i] == nil { + return ErrTransactionNil + } + + // Serialize and stream transaction bytes to writer + txBytes := s.Txs[i].SerializeBytes() + if _, err := w.Write(txBytes); err != nil { + return fmt.Errorf("%w at index %d: %w", ErrTransactionWrite, i, err) + } + } + + return nil +} + +// WriteTransactionChunk writes a slice of transactions directly to a writer. +// +// This is a simplified streaming function that writes transactions without requiring a SubtreeData +// structure. It's useful for workflows where transactions are already loaded and just need to be +// streamed to disk. +// +// Parameters: +// - w: Writer to stream transactions to +// - txs: Slice of transactions to write +// +// Returns an error if writing fails. +func WriteTransactionChunk(w io.Writer, txs []*bt.Tx) error { + for _, tx := range txs { + if tx == nil { + continue // Skip nil transactions + } + + txBytes := tx.SerializeBytes() + if _, err := w.Write(txBytes); err != nil { + return fmt.Errorf("%w: %w", ErrTransactionWrite, err) + } + } + + return nil +} + +// ReadTransactionChunk reads and validates a chunk of transactions from a reader. +// +// This is a simplified streaming function that reads transactions directly into a new slice, +// validates them against the subtree structure, and returns the populated slice. This is more +// memory-efficient than ReadTransactionsFromReader for processing workflows where the SubtreeData +// array is not needed. +// +// Parameters: +// - r: Reader to read transactions from +// - subtree: Subtree structure for hash validation +// - startIdx: Starting index in subtree for validation +// - count: Number of transactions to read +// +// Returns a slice of transactions and any error encountered. +func ReadTransactionChunk(r io.Reader, subtree *Subtree, startIdx, count int) ([]*bt.Tx, error) { + if subtree == nil || len(subtree.Nodes) == 0 { + return nil, ErrSubtreeNodesEmpty + } + + txs := make([]*bt.Tx, 0, count) + + for i := 0; i < count; i++ { + idx := startIdx + i + if idx >= len(subtree.Nodes) { + break // Reached end of subtree + } + + // Skip coinbase placeholder + if idx == 0 && subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) { + continue + } + + tx := &bt.Tx{} + if _, err := tx.ReadFrom(r); err != nil { + if errors.Is(err, io.EOF) { + break + } + return txs, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, idx, err) + } + + // Validate tx hash matches expected + if !subtree.Nodes[idx].Hash.Equal(*tx.TxIDChainHash()) { + return txs, ErrTxHashMismatch + } + + txs = append(txs, tx) + } + + return txs, nil +} + +// ReadTransactionsFromReader reads a range of transactions from a reader. +// +// This enables memory-efficient deserialization by reading only a chunk of transactions +// from disk at a time, rather than loading all transactions into memory. +// +// Parameters: +// - r: Reader to read transactions from +// - startIdx: Starting index (inclusive) where transactions should be stored +// - endIdx: Ending index (exclusive) where transactions should be stored +// +// Returns the number of transactions read and any error encountered. +func (s *Data) ReadTransactionsFromReader(r io.Reader, startIdx, endIdx int) (int, error) { + if s.Subtree == nil || len(s.Subtree.Nodes) == 0 { + return 0, ErrSubtreeNodesEmpty + } + + txsRead := 0 + for i := startIdx; i < endIdx; i++ { + // Skip coinbase placeholder + if i == 0 && s.Subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) { + continue + } + + tx := &bt.Tx{} + if _, err := tx.ReadFrom(r); err != nil { + if errors.Is(err, io.EOF) { + break + } + return txsRead, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, i, err) + } + + // Validate tx hash matches expected + if !s.Subtree.Nodes[i].Hash.Equal(*tx.TxIDChainHash()) { + return txsRead, ErrTxHashMismatch + } + + s.Txs[i] = tx + txsRead++ + } + + return txsRead, nil +} + // serializeFromReader reads transactions from the provided reader and populates the Txs field. func (s *Data) serializeFromReader(buf io.Reader) error { var ( diff --git a/subtree_data_test.go b/subtree_data_test.go index f91d5fc..d974109 100644 --- a/subtree_data_test.go +++ b/subtree_data_test.go @@ -391,3 +391,110 @@ type mockReader struct { func (r *mockReader) Read(_ []byte) (n int, err error) { return 0, r.err } + +// Helper to create test subtree with 4 versioned transactions +func setupTestSubtreeData(t *testing.T) (*Subtree, *Data, []*bt.Tx) { + txs := make([]*bt.Tx, 4) + for i := range txs { + txs[i] = tx.Clone() + txs[i].Version = uint32(i + 1) //nolint:gosec // G115: test data, safe conversion + } + + subtree, err := NewTree(2) + require.NoError(t, err) + + for _, tx := range txs { + _ = subtree.AddNode(*tx.TxIDChainHash(), 111, 0) + } + + subtreeData := NewSubtreeData(subtree) + for i, tx := range txs { + require.NoError(t, subtreeData.AddTx(tx, i)) + } + + return subtree, subtreeData, txs +} + +func TestWriteTransactionsToWriter(t *testing.T) { + t.Run("write full range of transactions", func(t *testing.T) { + subtree, subtreeData, txs := setupTestSubtreeData(t) + + buf := &bytes.Buffer{} + err := subtreeData.WriteTransactionsToWriter(buf, 0, 4) + require.NoError(t, err) + + newSubtreeData, err := NewSubtreeDataFromBytes(subtree, buf.Bytes()) + require.NoError(t, err) + for i, tx := range txs { + assert.Equal(t, tx.Version, newSubtreeData.Txs[i].Version) + } + }) + + t.Run("write partial range", func(t *testing.T) { + _, subtreeData, txs := setupTestSubtreeData(t) + + buf := &bytes.Buffer{} + err := subtreeData.WriteTransactionsToWriter(buf, 1, 3) + require.NoError(t, err) + + expectedSize := len(txs[1].SerializeBytes()) + len(txs[2].SerializeBytes()) + assert.Equal(t, expectedSize, buf.Len()) + }) + + t.Run("error on nil transaction", func(t *testing.T) { + _, subtreeData, _ := setupTestSubtreeData(t) + subtreeData.Txs[1] = nil // Null out one transaction + + buf := &bytes.Buffer{} + err := subtreeData.WriteTransactionsToWriter(buf, 0, 2) + require.ErrorIs(t, err, ErrTransactionNil) + }) +} + +func TestReadTransactionsFromReader(t *testing.T) { + t.Run("read full range", func(t *testing.T) { + subtree, sourceData, txs := setupTestSubtreeData(t) + + serialized, err := sourceData.Serialize() + require.NoError(t, err) + + targetData := NewSubtreeData(subtree) + numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 4) + require.NoError(t, err) + assert.Equal(t, 4, numRead) + + for i, tx := range txs { + assert.Equal(t, tx.Version, targetData.Txs[i].Version) + } + }) + + t.Run("read partial range", func(t *testing.T) { + subtree, sourceData, txs := setupTestSubtreeData(t) + + serialized, err := sourceData.Serialize() + require.NoError(t, err) + + targetData := NewSubtreeData(subtree) + numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 2) + require.NoError(t, err) + assert.Equal(t, 2, numRead) + + assert.Equal(t, txs[0].Version, targetData.Txs[0].Version) + assert.Equal(t, txs[1].Version, targetData.Txs[1].Version) + assert.Nil(t, targetData.Txs[2]) + assert.Nil(t, targetData.Txs[3]) + }) + + t.Run("read EOF gracefully", func(t *testing.T) { + subtree, sourceData, _ := setupTestSubtreeData(t) + + serialized, err := sourceData.Serialize() + require.NoError(t, err) + + // Try to read more transactions than available + targetData := NewSubtreeData(subtree) + numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 10) + require.NoError(t, err) // EOF not an error + assert.Equal(t, 4, numRead) // Only 4 available + }) +}