Skip to content

Commit 438fe97

Browse files
committed
rdb 12 (redis 7.4+): add support for hash with field expiration
1 parent dbcc2d4 commit 438fe97

5 files changed

Lines changed: 333 additions & 22 deletions

File tree

core/decoder.go

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import (
77
"encoding/binary"
88
"errors"
99
"fmt"
10-
"github.com/hdt3213/rdb/memprofiler"
11-
"github.com/hdt3213/rdb/model"
1210
"io"
1311
"strconv"
1412
"time"
13+
14+
"github.com/hdt3213/rdb/memprofiler"
15+
"github.com/hdt3213/rdb/model"
1516
)
1617

1718
// Decoder is an instance of rdb parsing process
@@ -87,27 +88,41 @@ const (
8788
typeStreamListPacks2
8889
typeSetListPack
8990
typeStreamListPacks3
91+
typeHashWithHfeRc // rdb 12 (only redis 7.4 rc)
92+
typeHashListPackWithHfeRc // rdb 12 (only redis 7.4 rc)
93+
typeHashWithHfe // since rdb 12 (redis 7.4)
94+
typeHashListPackWithHfe // since rdb 12 (redis 7.4)
95+
)
96+
97+
const (
98+
EB_EXPIRE_TIME_MAX int64 = 0x0000FFFFFFFFFFFF
99+
EB_EXPIRE_TIME_INVALID int64 = EB_EXPIRE_TIME_MAX + 1
100+
HFE_MAX_ABS_TIME_MSEC int64 = EB_EXPIRE_TIME_MAX >> 2
90101
)
91102

92103
var encodingMap = map[int]string{
93-
typeString: model.StringEncoding,
94-
typeList: model.ListEncoding,
95-
typeSet: model.SetEncoding,
96-
typeZset: model.ZSetEncoding,
97-
typeHash: model.HashEncoding,
98-
typeZset2: model.ZSet2Encoding,
99-
typeHashZipMap: model.ZipMapEncoding,
100-
typeListZipList: model.ZipListEncoding,
101-
typeSetIntSet: model.IntSetEncoding,
102-
typeZsetZipList: model.ZipListEncoding,
103-
typeHashZipList: model.ZipListEncoding,
104-
typeListQuickList: model.QuickListEncoding,
105-
typeStreamListPacks: model.ListPackEncoding,
106-
typeStreamListPacks2: model.ListPackEncoding,
107-
typeHashListPack: model.ListPackEncoding,
108-
typeZsetListPack: model.ListPackEncoding,
109-
typeListQuickList2: model.QuickList2Encoding,
110-
typeSetListPack: model.ListPackEncoding,
104+
typeString: model.StringEncoding,
105+
typeList: model.ListEncoding,
106+
typeSet: model.SetEncoding,
107+
typeZset: model.ZSetEncoding,
108+
typeHash: model.HashEncoding,
109+
typeZset2: model.ZSet2Encoding,
110+
typeHashZipMap: model.ZipMapEncoding,
111+
typeListZipList: model.ZipListEncoding,
112+
typeSetIntSet: model.IntSetEncoding,
113+
typeZsetZipList: model.ZipListEncoding,
114+
typeHashZipList: model.ZipListEncoding,
115+
typeListQuickList: model.QuickListEncoding,
116+
typeStreamListPacks: model.ListPackEncoding,
117+
typeStreamListPacks2: model.ListPackEncoding,
118+
typeHashListPack: model.ListPackEncoding,
119+
typeZsetListPack: model.ListPackEncoding,
120+
typeListQuickList2: model.QuickList2Encoding,
121+
typeSetListPack: model.ListPackEncoding,
122+
typeHashWithHfeRc: model.HashExEncoding,
123+
typeHashListPackWithHfeRc: model.ListPackExEncoding,
124+
typeHashWithHfe: model.HashExEncoding,
125+
typeHashListPackWithHfe: model.ListPackExEncoding,
111126
}
112127

113128
// checkHeader checks whether input has valid RDB file header
@@ -311,6 +326,44 @@ func (dec *Decoder) readObject(flag byte, base *model.BaseObject) (model.RedisOb
311326
BaseObject: base,
312327
Members: set,
313328
}, nil
329+
case typeHashWithHfe:
330+
hash, err := dec.readHashMapEx(false)
331+
if err != nil {
332+
return nil, err
333+
}
334+
return &model.HashObjectEx{
335+
BaseObject: base,
336+
Hash: hash,
337+
}, nil
338+
case typeHashWithHfeRc:
339+
hash, err := dec.readHashMapEx(true)
340+
if err != nil {
341+
return nil, err
342+
}
343+
return &model.HashObjectEx{
344+
BaseObject: base,
345+
Hash: hash,
346+
}, nil
347+
case typeHashListPackWithHfe:
348+
m, extra, err := dec.readListPackHashEx(false)
349+
if err != nil {
350+
return nil, err
351+
}
352+
base.Extra = extra
353+
return &model.HashObjectEx{
354+
BaseObject: base,
355+
Hash: m,
356+
}, nil
357+
case typeHashListPackWithHfeRc:
358+
m, extra, err := dec.readListPackHashEx(true)
359+
if err != nil {
360+
return nil, err
361+
}
362+
base.Extra = extra
363+
return &model.HashObjectEx{
364+
BaseObject: base,
365+
Hash: m,
366+
}, nil
314367
}
315368
return nil, fmt.Errorf("unknown type flag: %b", flag)
316369
}

core/hash.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package core
33
import (
44
"encoding/binary"
55
"errors"
6+
"fmt"
7+
68
"github.com/hdt3213/rdb/model"
79
)
810

@@ -30,6 +32,58 @@ func (dec *Decoder) readHashMap() (map[string][]byte, error) {
3032
return m, nil
3133
}
3234

35+
func (dec *Decoder) readHashMapEx(rc bool) (map[string]model.HashObjectExField, error) {
36+
var minExpire int64 = EB_EXPIRE_TIME_INVALID
37+
var expire int64
38+
if !rc {
39+
// Hash with HFEs. min TTL at start (7.4+), 7.4RC not included
40+
min, err := dec.readInt64()
41+
if err != nil {
42+
return nil, err
43+
}
44+
if min > EB_EXPIRE_TIME_INVALID {
45+
return nil, fmt.Errorf("hash read invalid minExpire value: %d", min)
46+
}
47+
minExpire = min
48+
}
49+
size, _, err := dec.readLength()
50+
if err != nil {
51+
return nil, err
52+
} else if size == 0 {
53+
return nil, fmt.Errorf("hash read empty key")
54+
}
55+
m := make(map[string]model.HashObjectExField)
56+
for i := 0; i < int(size); i++ {
57+
ttl, _, err := dec.readLength()
58+
if err != nil {
59+
return nil, err
60+
}
61+
if rc {
62+
// Value is absolute for 7.4RC
63+
expire = int64(ttl)
64+
} else if ttl == 0 {
65+
// 0 Indicates no TTL. This is common case so we keep it small.
66+
expire = 0
67+
} else {
68+
// TTL is relative to minExpire (with +1 to avoid 0 that already taken)
69+
expire = int64(ttl) + minExpire - 1
70+
}
71+
if expire > EB_EXPIRE_TIME_MAX {
72+
return nil, fmt.Errorf("invalid expireAt time: %d", expire)
73+
}
74+
field, err := dec.readString()
75+
if err != nil {
76+
return nil, err
77+
}
78+
value, err := dec.readString()
79+
if err != nil {
80+
return nil, err
81+
}
82+
m[unsafeBytes2Str(field)] = model.HashObjectExField{Expire: expire, Value: value}
83+
}
84+
return m, nil
85+
}
86+
3387
func (dec *Decoder) readZipMapHash() (map[string][]byte, error) {
3488
buf, err := dec.readString()
3589
if err != nil {
@@ -179,6 +233,47 @@ func (dec *Decoder) readListPackHash() (map[string][]byte, *model.ListpackDetail
179233
return m, detail, nil
180234
}
181235

236+
func (dec *Decoder) readListPackHashEx(rc bool) (map[string]model.HashObjectExField, *model.ListpackDetail, error) {
237+
if !rc {
238+
// This value was serialized for future use-case of streaming the object directly to FLASH (while keeping in mem its next expiration time)
239+
_, err := dec.readInt64()
240+
if err != nil {
241+
return nil, nil, err
242+
}
243+
}
244+
buf, err := dec.readString()
245+
if err != nil {
246+
return nil, nil, err
247+
}
248+
cursor := 0
249+
size := readListPackLength(buf, &cursor)
250+
if size == 0 {
251+
return nil, nil, fmt.Errorf("hash listpack read empty key")
252+
} else if size%3 != 0 {
253+
return nil, nil, fmt.Errorf("hash listpack read invalid size %d", size)
254+
}
255+
m := make(map[string]model.HashObjectExField)
256+
for i := 0; i < size; i += 3 {
257+
key, err := dec.readListPackEntryAsString(buf, &cursor)
258+
if err != nil {
259+
return nil, nil, err
260+
}
261+
val, err := dec.readListPackEntryAsString(buf, &cursor)
262+
if err != nil {
263+
return nil, nil, err
264+
}
265+
expire, err := dec.readListPackEntryAsInt(buf, &cursor)
266+
if err != nil {
267+
return nil, nil, err
268+
}
269+
m[unsafeBytes2Str(key)] = model.HashObjectExField{Expire: expire, Value: val}
270+
}
271+
detail := &model.ListpackDetail{
272+
RawStringSize: len(buf),
273+
}
274+
return m, detail, nil
275+
}
276+
182277
func (enc *Encoder) WriteHashMapObject(key string, hash map[string][]byte, options ...interface{}) error {
183278
err := enc.beforeWriteObject(options...)
184279
if err != nil {
@@ -198,6 +293,20 @@ func (enc *Encoder) WriteHashMapObject(key string, hash map[string][]byte, optio
198293
return nil
199294
}
200295

296+
func (enc *Encoder) WriteHashMapObjectEx(key string, hash map[string]model.HashObjectExField, options ...interface{}) error {
297+
err := enc.beforeWriteObject(options...)
298+
if err != nil {
299+
return err
300+
}
301+
302+
err = enc.writeHashEncodingEx(key, hash, options...)
303+
if err != nil {
304+
return err
305+
}
306+
enc.state = writtenObjectState
307+
return nil
308+
}
309+
201310
func (enc *Encoder) writeHashEncoding(key string, hash map[string][]byte, options ...interface{}) error {
202311
err := enc.write([]byte{typeHash})
203312
if err != nil {
@@ -224,6 +333,49 @@ func (enc *Encoder) writeHashEncoding(key string, hash map[string][]byte, option
224333
return nil
225334
}
226335

336+
func (enc *Encoder) writeHashEncodingEx(key string, hash map[string]model.HashObjectExField, options ...interface{}) error {
337+
err := enc.write([]byte{typeHashWithHfe})
338+
if err != nil {
339+
return err
340+
}
341+
err = enc.writeString(key)
342+
if err != nil {
343+
return err
344+
}
345+
// Hash with HFEs. min TTL at start (7.4+), 7.4RC not included
346+
var minExpire int64 = 0
347+
for _, val := range hash {
348+
if val.Expire > minExpire {
349+
minExpire = val.Expire
350+
}
351+
}
352+
minExpireBytes := make([]byte, 8)
353+
binary.LittleEndian.PutUint64(minExpireBytes[:], uint64(minExpire))
354+
err = enc.write(minExpireBytes)
355+
if err != nil {
356+
return err
357+
}
358+
err = enc.writeLength(uint64(len(hash)))
359+
if err != nil {
360+
return err
361+
}
362+
for field, value := range hash {
363+
err = enc.writeLength(uint64(value.Expire + 1 - minExpire))
364+
if err != nil {
365+
return err
366+
}
367+
err = enc.writeString(field)
368+
if err != nil {
369+
return err
370+
}
371+
err = enc.writeString(unsafeBytes2Str(value.Value))
372+
if err != nil {
373+
return err
374+
}
375+
}
376+
return nil
377+
}
378+
227379
func (enc *Encoder) tryWriteZipListHashMap(key string, hash map[string][]byte, options ...interface{}) (bool, error) {
228380
if len(hash) > enc.hashZipListOpt.getMaxEntries() {
229381
return false, nil

core/string.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import (
44
"encoding/binary"
55
"errors"
66
"fmt"
7-
"github.com/hdt3213/rdb/lzf"
87
"math"
98
"strconv"
109
"unicode"
10+
11+
"github.com/hdt3213/rdb/lzf"
1112
)
1213

1314
const (
@@ -119,13 +120,23 @@ func (dec *Decoder) readInt16() (int16, error) {
119120
func (dec *Decoder) readInt32() (int32, error) {
120121
err := dec.readFull(dec.buffer[:4])
121122
if err != nil {
122-
return 0, fmt.Errorf("read uint16 error: %v", err)
123+
return 0, fmt.Errorf("read uint32 error: %v", err)
123124
}
124125

125126
i := binary.LittleEndian.Uint32(dec.buffer[:4])
126127
return int32(i), nil
127128
}
128129

130+
func (dec *Decoder) readInt64() (int64, error) {
131+
err := dec.readFull(dec.buffer[:8])
132+
if err != nil {
133+
return 0, fmt.Errorf("read uint64 error: %v", err)
134+
}
135+
136+
i := binary.LittleEndian.Uint64(dec.buffer[:8])
137+
return int64(i), nil
138+
}
139+
129140
func (dec *Decoder) readLiteralFloat() (float64, error) {
130141
first, err := dec.readByte()
131142
if err != nil {

0 commit comments

Comments
 (0)