Skip to content

Commit 0f1ad36

Browse files
authored
fix(zstd decompression): limit concurrency to 1 to prevent deadlock in zstd library (#61)
* fix(zstd decompression): limit concurrency to 1 to prevent deadlock in zstd library * skip test * use crypt/rand * address review comments
1 parent 2cfec8c commit 0f1ad36

3 files changed

Lines changed: 33 additions & 2 deletions

File tree

encoding/codecv7_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package encoding
22

33
import (
4+
crand "crypto/rand"
45
"encoding/hex"
56
"encoding/json"
67
"fmt"
8+
"log"
79
"math/big"
810
"math/rand"
11+
"net/http"
12+
_ "net/http/pprof"
913
"strings"
1014
"testing"
1115

@@ -17,6 +21,33 @@ import (
1721
"github.com/stretchr/testify/require"
1822
)
1923

24+
// TestDecodeAllDeadlock tests the decompression of random bytes to trigger deadlock in zstd library
25+
// with setting of zstd.WithDecoderConcurrency(2).
26+
func TestDecodeAllDeadlock(t *testing.T) {
27+
t.Skip("Skip test that triggers deadlock in zstd library")
28+
29+
go func() {
30+
log.Println(http.ListenAndServe("localhost:6060", nil))
31+
}()
32+
33+
// generate some random bytes
34+
randomBytes := make([]byte, maxBlobBytes)
35+
_, err := crand.Read(randomBytes)
36+
require.NoError(t, err)
37+
38+
c := NewDACodecV8()
39+
40+
compressed, err := c.CompressScrollBatchBytes(randomBytes)
41+
require.NoError(t, err)
42+
43+
// repeatedly decompress the bytes to trigger deadlock in zstd library
44+
for i := 0; i < 100000; i++ {
45+
uncompressed, err := decompressV7Bytes(compressed)
46+
require.NoError(t, err)
47+
require.Equal(t, randomBytes, uncompressed)
48+
}
49+
}
50+
2051
// TestCodecV7DABlockEncodeDecode tests the encoding and decoding of daBlockV7.
2152
func TestCodecV7DABlockEncodeDecode(t *testing.T) {
2253
codecV7, err := CodecFromVersion(CodecV7)

encoding/codecv7_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ func decompressV7Bytes(compressedBytes []byte) ([]byte, error) {
482482

483483
compressedBytes = append(zstdMagicNumber, compressedBytes...)
484484
r := bytes.NewReader(compressedBytes)
485-
zr, err := zstd.NewReader(r)
485+
zr, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1))
486486
if err != nil {
487487
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
488488
}

encoding/da.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ func decompressScrollBlobToBatch(compressedBytes []byte) ([]byte, error) {
552552
batchOfBytes := make([]byte, readBatchSize)
553553

554554
r := bytes.NewReader(compressedBytes)
555-
zr, err := zstd.NewReader(r)
555+
zr, err := zstd.NewReader(r, zstd.WithDecoderConcurrency(1))
556556
if err != nil {
557557
return nil, err
558558
}

0 commit comments

Comments
 (0)