Skip to content

Commit dd70219

Browse files
authored
Streaming Serialization Methods for Memory-Efficient Large Subtree Processing (#73)
* allow partial/chunked serialise/deserialise subtree using io.Reader and io.Writer * tests * reduce code duplication * linting fixes * simpler implementation
1 parent 5418b8a commit dd70219

File tree

3 files changed

+273
-0
lines changed

3 files changed

+273
-0
lines changed

errors.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,13 @@ var (
6565

6666
// ErrReadError is a generic read error for testing
6767
ErrReadError = errors.New("read error")
68+
69+
// ErrTransactionNil is returned when a transaction is nil during serialization
70+
ErrTransactionNil = errors.New("transaction is nil, cannot serialize")
71+
72+
// ErrTransactionWrite is returned when writing a transaction fails
73+
ErrTransactionWrite = errors.New("error writing transaction")
74+
75+
// ErrTransactionRead is returned when reading a transaction fails
76+
ErrTransactionRead = errors.New("error reading transaction")
6877
)

subtree_data.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,163 @@ func (s *Data) Serialize() ([]byte, error) {
111111
return buf.Bytes(), nil
112112
}
113113

114+
// WriteTransactionsToWriter writes a range of transactions directly to a writer.
115+
//
116+
// This enables memory-efficient serialization by streaming transactions to disk as they are loaded,
117+
// without requiring all transactions to be in memory simultaneously. Transactions in the specified
118+
// range are written sequentially, skipping any nil entries.
119+
//
120+
// Parameters:
121+
// - w: Writer to stream transactions to
122+
// - startIdx: Starting index (inclusive) of transactions to write
123+
// - endIdx: Ending index (exclusive) of transactions to write
124+
//
125+
// Returns an error if writing fails or if required transactions are missing (nil).
126+
func (s *Data) WriteTransactionsToWriter(w io.Writer, startIdx, endIdx int) error {
127+
if s.Subtree == nil {
128+
return ErrCannotSerializeSubtreeNotSet
129+
}
130+
131+
for i := startIdx; i < endIdx; i++ {
132+
// Skip coinbase placeholder if it's the first transaction
133+
if i == 0 && s.Subtree.Nodes[0].Hash.Equal(*CoinbasePlaceholderHash) {
134+
continue
135+
}
136+
137+
if s.Txs[i] == nil {
138+
return ErrTransactionNil
139+
}
140+
141+
// Serialize and stream transaction bytes to writer
142+
txBytes := s.Txs[i].SerializeBytes()
143+
if _, err := w.Write(txBytes); err != nil {
144+
return fmt.Errorf("%w at index %d: %w", ErrTransactionWrite, i, err)
145+
}
146+
}
147+
148+
return nil
149+
}
150+
151+
// WriteTransactionChunk writes a slice of transactions directly to a writer.
152+
//
153+
// This is a simplified streaming function that writes transactions without requiring a SubtreeData
154+
// structure. It's useful for workflows where transactions are already loaded and just need to be
155+
// streamed to disk.
156+
//
157+
// Parameters:
158+
// - w: Writer to stream transactions to
159+
// - txs: Slice of transactions to write
160+
//
161+
// Returns an error if writing fails.
162+
func WriteTransactionChunk(w io.Writer, txs []*bt.Tx) error {
163+
for _, tx := range txs {
164+
if tx == nil {
165+
continue // Skip nil transactions
166+
}
167+
168+
txBytes := tx.SerializeBytes()
169+
if _, err := w.Write(txBytes); err != nil {
170+
return fmt.Errorf("%w: %w", ErrTransactionWrite, err)
171+
}
172+
}
173+
174+
return nil
175+
}
176+
177+
// ReadTransactionChunk reads and validates a chunk of transactions from a reader.
178+
//
179+
// This is a simplified streaming function that reads transactions directly into a new slice,
180+
// validates them against the subtree structure, and returns the populated slice. This is more
181+
// memory-efficient than ReadTransactionsFromReader for processing workflows where the SubtreeData
182+
// array is not needed.
183+
//
184+
// Parameters:
185+
// - r: Reader to read transactions from
186+
// - subtree: Subtree structure for hash validation
187+
// - startIdx: Starting index in subtree for validation
188+
// - count: Number of transactions to read
189+
//
190+
// Returns a slice of transactions and any error encountered.
191+
func ReadTransactionChunk(r io.Reader, subtree *Subtree, startIdx, count int) ([]*bt.Tx, error) {
192+
if subtree == nil || len(subtree.Nodes) == 0 {
193+
return nil, ErrSubtreeNodesEmpty
194+
}
195+
196+
txs := make([]*bt.Tx, 0, count)
197+
198+
for i := 0; i < count; i++ {
199+
idx := startIdx + i
200+
if idx >= len(subtree.Nodes) {
201+
break // Reached end of subtree
202+
}
203+
204+
// Skip coinbase placeholder
205+
if idx == 0 && subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) {
206+
continue
207+
}
208+
209+
tx := &bt.Tx{}
210+
if _, err := tx.ReadFrom(r); err != nil {
211+
if errors.Is(err, io.EOF) {
212+
break
213+
}
214+
return txs, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, idx, err)
215+
}
216+
217+
// Validate tx hash matches expected
218+
if !subtree.Nodes[idx].Hash.Equal(*tx.TxIDChainHash()) {
219+
return txs, ErrTxHashMismatch
220+
}
221+
222+
txs = append(txs, tx)
223+
}
224+
225+
return txs, nil
226+
}
227+
228+
// ReadTransactionsFromReader reads a range of transactions from a reader.
229+
//
230+
// This enables memory-efficient deserialization by reading only a chunk of transactions
231+
// from disk at a time, rather than loading all transactions into memory.
232+
//
233+
// Parameters:
234+
// - r: Reader to read transactions from
235+
// - startIdx: Starting index (inclusive) where transactions should be stored
236+
// - endIdx: Ending index (exclusive) where transactions should be stored
237+
//
238+
// Returns the number of transactions read and any error encountered.
239+
func (s *Data) ReadTransactionsFromReader(r io.Reader, startIdx, endIdx int) (int, error) {
240+
if s.Subtree == nil || len(s.Subtree.Nodes) == 0 {
241+
return 0, ErrSubtreeNodesEmpty
242+
}
243+
244+
txsRead := 0
245+
for i := startIdx; i < endIdx; i++ {
246+
// Skip coinbase placeholder
247+
if i == 0 && s.Subtree.Nodes[0].Hash.Equal(CoinbasePlaceholderHashValue) {
248+
continue
249+
}
250+
251+
tx := &bt.Tx{}
252+
if _, err := tx.ReadFrom(r); err != nil {
253+
if errors.Is(err, io.EOF) {
254+
break
255+
}
256+
return txsRead, fmt.Errorf("%w at index %d: %w", ErrTransactionRead, i, err)
257+
}
258+
259+
// Validate tx hash matches expected
260+
if !s.Subtree.Nodes[i].Hash.Equal(*tx.TxIDChainHash()) {
261+
return txsRead, ErrTxHashMismatch
262+
}
263+
264+
s.Txs[i] = tx
265+
txsRead++
266+
}
267+
268+
return txsRead, nil
269+
}
270+
114271
// serializeFromReader reads transactions from the provided reader and populates the Txs field.
115272
func (s *Data) serializeFromReader(buf io.Reader) error {
116273
var (

subtree_data_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,110 @@ type mockReader struct {
391391
func (r *mockReader) Read(_ []byte) (n int, err error) {
392392
return 0, r.err
393393
}
394+
395+
// Helper to create test subtree with 4 versioned transactions
396+
func setupTestSubtreeData(t *testing.T) (*Subtree, *Data, []*bt.Tx) {
397+
txs := make([]*bt.Tx, 4)
398+
for i := range txs {
399+
txs[i] = tx.Clone()
400+
txs[i].Version = uint32(i + 1) //nolint:gosec // G115: test data, safe conversion
401+
}
402+
403+
subtree, err := NewTree(2)
404+
require.NoError(t, err)
405+
406+
for _, tx := range txs {
407+
_ = subtree.AddNode(*tx.TxIDChainHash(), 111, 0)
408+
}
409+
410+
subtreeData := NewSubtreeData(subtree)
411+
for i, tx := range txs {
412+
require.NoError(t, subtreeData.AddTx(tx, i))
413+
}
414+
415+
return subtree, subtreeData, txs
416+
}
417+
418+
func TestWriteTransactionsToWriter(t *testing.T) {
419+
t.Run("write full range of transactions", func(t *testing.T) {
420+
subtree, subtreeData, txs := setupTestSubtreeData(t)
421+
422+
buf := &bytes.Buffer{}
423+
err := subtreeData.WriteTransactionsToWriter(buf, 0, 4)
424+
require.NoError(t, err)
425+
426+
newSubtreeData, err := NewSubtreeDataFromBytes(subtree, buf.Bytes())
427+
require.NoError(t, err)
428+
for i, tx := range txs {
429+
assert.Equal(t, tx.Version, newSubtreeData.Txs[i].Version)
430+
}
431+
})
432+
433+
t.Run("write partial range", func(t *testing.T) {
434+
_, subtreeData, txs := setupTestSubtreeData(t)
435+
436+
buf := &bytes.Buffer{}
437+
err := subtreeData.WriteTransactionsToWriter(buf, 1, 3)
438+
require.NoError(t, err)
439+
440+
expectedSize := len(txs[1].SerializeBytes()) + len(txs[2].SerializeBytes())
441+
assert.Equal(t, expectedSize, buf.Len())
442+
})
443+
444+
t.Run("error on nil transaction", func(t *testing.T) {
445+
_, subtreeData, _ := setupTestSubtreeData(t)
446+
subtreeData.Txs[1] = nil // Null out one transaction
447+
448+
buf := &bytes.Buffer{}
449+
err := subtreeData.WriteTransactionsToWriter(buf, 0, 2)
450+
require.ErrorIs(t, err, ErrTransactionNil)
451+
})
452+
}
453+
454+
func TestReadTransactionsFromReader(t *testing.T) {
455+
t.Run("read full range", func(t *testing.T) {
456+
subtree, sourceData, txs := setupTestSubtreeData(t)
457+
458+
serialized, err := sourceData.Serialize()
459+
require.NoError(t, err)
460+
461+
targetData := NewSubtreeData(subtree)
462+
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 4)
463+
require.NoError(t, err)
464+
assert.Equal(t, 4, numRead)
465+
466+
for i, tx := range txs {
467+
assert.Equal(t, tx.Version, targetData.Txs[i].Version)
468+
}
469+
})
470+
471+
t.Run("read partial range", func(t *testing.T) {
472+
subtree, sourceData, txs := setupTestSubtreeData(t)
473+
474+
serialized, err := sourceData.Serialize()
475+
require.NoError(t, err)
476+
477+
targetData := NewSubtreeData(subtree)
478+
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 2)
479+
require.NoError(t, err)
480+
assert.Equal(t, 2, numRead)
481+
482+
assert.Equal(t, txs[0].Version, targetData.Txs[0].Version)
483+
assert.Equal(t, txs[1].Version, targetData.Txs[1].Version)
484+
assert.Nil(t, targetData.Txs[2])
485+
assert.Nil(t, targetData.Txs[3])
486+
})
487+
488+
t.Run("read EOF gracefully", func(t *testing.T) {
489+
subtree, sourceData, _ := setupTestSubtreeData(t)
490+
491+
serialized, err := sourceData.Serialize()
492+
require.NoError(t, err)
493+
494+
// Try to read more transactions than available
495+
targetData := NewSubtreeData(subtree)
496+
numRead, err := targetData.ReadTransactionsFromReader(bytes.NewReader(serialized), 0, 10)
497+
require.NoError(t, err) // EOF not an error
498+
assert.Equal(t, 4, numRead) // Only 4 available
499+
})
500+
}

0 commit comments

Comments
 (0)