Skip to content

Commit 1ceef5a

Browse files
committed
support concurrent json convert
1 parent f07aa2e commit 1ceef5a

6 files changed

Lines changed: 167 additions & 82 deletions

File tree

README.md

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Options:
6060
supporting multi separators: -sep sep1 -sep sep2
6161
-regex using regex expression filter keys
6262
-no-expired reserve expired keys
63+
-concurrent The number of concurrent json converters. (CpuNum -1 by default, reserve a core for decoder)
6364
6465
Examples:
6566
parameters between '[' and ']' is optional
@@ -106,6 +107,12 @@ The examples for json result:
106107
]
107108
```
108109

110+
You can use `-concurrent` to change the number of concurrent convertes. The default value is 4.
111+
112+
```
113+
rdb -c json -o intset_16.json -concurrent 8 cases/intset_16.rdb
114+
```
115+
109116
<details>
110117
<summary>Json Fromat Detail</summary>
111118

@@ -542,12 +549,14 @@ func main() {
542549

543550
# Benchmark
544551

545-
Tested on MacBook Pro (16-inch, 2019) 2.6 GHz 6cores Intel Core i7, using a 1.3 GB RDB file encoded with v9 format from Redis 5.0 in production environment.
552+
Tested on MacBook Air(M2,2022年), using a 1.3 GB RDB file encoded with v9 format from Redis 5.0 in production environment.
546553

547554
|usage|elapsed|speed|
548555
|:-:|:-:|:-:|
549-
|ToJson|74.12s|17.96MB/s|
550-
|Memory|18.585s|71.62MB/s|
551-
|AOF|104.77s|12.76MB/s|
552-
|Top10|14.8s|89.95MB/s|
553-
|FlameGraph|21.83s|60.98MB/s|
556+
|ToJson|25s|53.24MB/s|
557+
|Memory|10s|133.12MB/s|
558+
|AOF|25s|53.24MB/s|
559+
|Top10|6s|221.87MB/s|
560+
|Prefix|25s|53.24MB/s|
561+
562+

README_CN.md

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ Options:
4747
supporting multi separators: -sep sep1 -sep sep2
4848
-regex using regex expression filter keys
4949
-no-expired filter expired keys
50+
-concurrent The number of concurrent json converters. (Cpu number by default)
5051
5152
Examples:
5253
parameters between '[' and ']' is optional
@@ -91,6 +92,12 @@ rdb -c json -o intset_16.json cases/intset_16.rdb
9192
]
9293
```
9394

95+
`-concurrent` 选项可以修改转 JSON 的并发数,默认并发数为 CPU 数量 -1 (留一个核心给解析器)。
96+
97+
```shell
98+
rdb -c json -o intset_16.json -concurrent 8 cases/intset_16.rdb
99+
```
100+
94101
<details>
95102
<summary>Json 格式</summary>
96103

@@ -530,12 +537,12 @@ func main() {
530537

531538
# Benchmark
532539

533-
在 MacBook Pro (16-inch, 2019) 2.6 GHz 六核 Intel Core i7 笔记本上,使用从生产环境的 Redis 5.0 上获得 1.3 GB 大小使用 v9 编码的 RDB 文件进行测试:
540+
在 MacBook Air(M2,2022年)笔记本上,使用从生产环境的 Redis 5.0 上获得 1.3 GB 大小使用 v9 编码的 RDB 文件进行测试:
534541

535542
|usage|elapsed|speed|
536543
|:-:|:-:|:-:|
537-
|ToJson|74.12s|17.96MB/s|
538-
|Memory|18.585s|71.62MB/s|
539-
|AOF|104.77s|12.76MB/s|
540-
|Top10|14.8s|89.95MB/s|
541-
|FlameGraph|21.83s|60.98MB/s|
544+
|ToJson|25s|53.24MB/s|
545+
|Memory|10s|133.12MB/s|
546+
|AOF|25s|53.24MB/s|
547+
|Top10|6s|221.87MB/s|
548+
|Prefix|25s|53.24MB/s|

cmd.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Options:
2020
supporting multi separators: -sep sep1 -sep sep2
2121
-regex using regex expression filter keys
2222
-no-expired filter expired keys
23+
-concurrent The number of concurrent json converters. 4 by default.
2324
2425
Examples:
2526
parameters between '[' and ']' is optional
@@ -58,12 +59,14 @@ func main() {
5859
var regexExpr string
5960
var noExpired bool
6061
var maxDepth int
62+
var concurrent int
6163
var err error
6264
flagSet.StringVar(&cmd, "c", "", "command for rdb: json")
6365
flagSet.StringVar(&output, "o", "", "output file path")
6466
flagSet.IntVar(&n, "n", 0, "")
6567
flagSet.IntVar(&maxDepth, "max-depth", 0, "max depth of prefix tree")
6668
flagSet.IntVar(&port, "port", 0, "listen port for web")
69+
flagSet.IntVar(&concurrent, "concurrent", 0, "concurrent number for json converter")
6770
flagSet.Var(&seps, "sep", "separator for flame graph")
6871
flagSet.StringVar(&regexExpr, "regex", "", "regex expression")
6972
flagSet.BoolVar(&noExpired, "no-expired", false, "filter expired keys")
@@ -86,6 +89,9 @@ func main() {
8689
if noExpired {
8790
options = append(options, helper.WithNoExpiredOption())
8891
}
92+
if concurrent != 0 {
93+
options = append(options, helper.WithConcurrent(concurrent))
94+
}
8995

9096
var outputFile *os.File
9197
if output == "" {

helper/converter.go

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,80 +3,11 @@ package helper
33
import (
44
"errors"
55
"fmt"
6-
"github.com/bytedance/sonic"
76
"github.com/hdt3213/rdb/core"
87
"github.com/hdt3213/rdb/model"
98
"os"
109
)
1110

12-
var jsonEncoder = sonic.ConfigDefault
13-
14-
// ToJsons read rdb file and convert to json file
15-
func ToJsons(rdbFilename string, jsonFilename string, options ...interface{}) error {
16-
if rdbFilename == "" {
17-
return errors.New("src file path is required")
18-
}
19-
if jsonFilename == "" {
20-
return errors.New("output file path is required")
21-
}
22-
// open file
23-
rdbFile, err := os.Open(rdbFilename)
24-
if err != nil {
25-
return fmt.Errorf("open rdb %s failed, %v", rdbFilename, err)
26-
}
27-
defer func() {
28-
_ = rdbFile.Close()
29-
}()
30-
jsonFile, err := os.Create(jsonFilename)
31-
if err != nil {
32-
return fmt.Errorf("create json %s failed, %v", jsonFilename, err)
33-
}
34-
defer func() {
35-
_ = jsonFile.Close()
36-
}()
37-
// create decoder
38-
var dec decoder = core.NewDecoder(rdbFile)
39-
if dec, err = wrapDecoder(dec, options...); err != nil {
40-
return err
41-
}
42-
// parse rdb
43-
_, err = jsonFile.WriteString("[\n")
44-
if err != nil {
45-
return fmt.Errorf("write json failed, %v", err)
46-
}
47-
empty := true
48-
err = dec.Parse(func(object model.RedisObject) bool {
49-
data, err := jsonEncoder.Marshal(object) // enable SortMapKeys to ensure same result
50-
if err != nil {
51-
fmt.Printf("json marshal failed: %v", err)
52-
return true
53-
}
54-
data = append(data, ',', '\n')
55-
_, err = jsonFile.Write(data)
56-
if err != nil {
57-
fmt.Printf("write failed: %v", err)
58-
return true
59-
}
60-
empty = false
61-
return true
62-
})
63-
if err != nil {
64-
return err
65-
}
66-
// finish json
67-
if !empty {
68-
_, err = jsonFile.Seek(-2, 2)
69-
if err != nil {
70-
return fmt.Errorf("error during seek in file: %v", err)
71-
}
72-
}
73-
_, err = jsonFile.WriteString("\n]")
74-
if err != nil {
75-
return fmt.Errorf("error during write in file: %v", err)
76-
}
77-
return nil
78-
}
79-
8011
// ToAOF read rdb file and convert to aof file (Redis Serialization )
8112
func ToAOF(rdbFilename string, aofFilename string, options ...interface{}) error {
8213
if rdbFilename == "" {

helper/converter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestToJson(t *testing.T) {
105105
srcRdb := filepath.Join("../cases", filename+".rdb")
106106
actualJSON := filepath.Join("tmp", filename+".json")
107107
expectJSON := filepath.Join("../cases", filename+".json")
108-
err = ToJsons(srcRdb, actualJSON)
108+
err = ToJsons(srcRdb, actualJSON, WithConcurrent(1))
109109
if err != nil {
110110
t.Errorf("error occurs during parse %s, err: %v", filename, err)
111111
continue

helper/json.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package helper
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"os"
7+
"runtime"
8+
"sync"
9+
10+
"github.com/bytedance/sonic"
11+
"github.com/hdt3213/rdb/core"
12+
"github.com/hdt3213/rdb/model"
13+
)
14+
15+
var jsonEncoder = sonic.ConfigDefault
16+
17+
// ConcurrentOption sets the number of goroutines for json converter
18+
type ConcurrentOption int
19+
20+
// WithConcurrent sets the number of goroutines for json converter
21+
func WithConcurrent(c int) ConcurrentOption {
22+
return ConcurrentOption(c)
23+
}
24+
25+
// ToJsons read rdb file and convert to json file
26+
func ToJsons(rdbFilename string, jsonFilename string, options ...interface{}) error {
27+
if rdbFilename == "" {
28+
return errors.New("src file path is required")
29+
}
30+
if jsonFilename == "" {
31+
return errors.New("output file path is required")
32+
}
33+
// open file
34+
rdbFile, err := os.Open(rdbFilename)
35+
if err != nil {
36+
return fmt.Errorf("open rdb %s failed, %v", rdbFilename, err)
37+
}
38+
defer func() {
39+
_ = rdbFile.Close()
40+
}()
41+
jsonFile, err := os.Create(jsonFilename)
42+
if err != nil {
43+
return fmt.Errorf("create json %s failed, %v", jsonFilename, err)
44+
}
45+
defer func() {
46+
_ = jsonFile.Close()
47+
}()
48+
// create decoder
49+
var dec decoder = core.NewDecoder(rdbFile)
50+
if dec, err = wrapDecoder(dec, options...); err != nil {
51+
return err
52+
}
53+
// parse rdb
54+
_, err = jsonFile.WriteString("[\n")
55+
if err != nil {
56+
return fmt.Errorf("write json failed, %v", err)
57+
}
58+
59+
// parse options
60+
concurrent := 1
61+
cpuNum := runtime.NumCPU()
62+
if cpuNum > 1 {
63+
concurrent = cpuNum - 1 // leave one core for parser
64+
}
65+
for _, opt := range options {
66+
switch o := opt.(type) {
67+
case ConcurrentOption:
68+
concurrent = int(o)
69+
}
70+
}
71+
72+
redisObjectBuffer := make(chan model.RedisObject, 1000)
73+
jsonStringBuffer := make(chan []byte, 1000)
74+
75+
// parser goroutine
76+
empty := true
77+
go func() {
78+
err = dec.Parse(func(object model.RedisObject) bool {
79+
redisObjectBuffer <- object
80+
return true
81+
})
82+
close(redisObjectBuffer)
83+
}()
84+
// json marshaller goroutine
85+
wg := &sync.WaitGroup{}
86+
wg.Add(concurrent)
87+
for i := 0; i < concurrent; i++ {
88+
go func() {
89+
for object := range redisObjectBuffer {
90+
data, err := jsonEncoder.Marshal(object) // enable SortMapKeys to ensure same result
91+
if err != nil {
92+
fmt.Printf("json marshal failed: %v", err)
93+
continue
94+
}
95+
jsonStringBuffer <- data
96+
}
97+
wg.Done()
98+
}()
99+
}
100+
// write goroutine
101+
wg2 := &sync.WaitGroup{}
102+
wg2.Add(1)
103+
go func() {
104+
for data := range jsonStringBuffer {
105+
data = append(data, ',', '\n')
106+
_, err = jsonFile.Write(data)
107+
if err != nil {
108+
fmt.Printf("write failed: %v", err)
109+
continue
110+
}
111+
empty = false
112+
}
113+
wg2.Done()
114+
}()
115+
116+
wg.Wait()
117+
close(jsonStringBuffer)
118+
wg2.Wait() // wait writing goroutine
119+
120+
// finish json
121+
if !empty {
122+
_, err = jsonFile.Seek(-2, 2)
123+
if err != nil {
124+
return fmt.Errorf("error during seek in file: %v", err)
125+
}
126+
}
127+
_, err = jsonFile.WriteString("\n]")
128+
if err != nil {
129+
return fmt.Errorf("error during write in file: %v", err)
130+
}
131+
return nil
132+
}

0 commit comments

Comments
 (0)