Skip to content

Commit f07aa2e

Browse files
Denis DubovitskiyHDT3213
authored andcommitted
Basic stream functionality
1 parent e5a00e1 commit f07aa2e

1 file changed

Lines changed: 41 additions & 1 deletion

File tree

helper/resp.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package helper
22

33
import (
44
"bytes"
5-
"github.com/hdt3213/rdb/model"
65
"io"
76
"sort"
87
"strconv"
8+
9+
"github.com/hdt3213/rdb/model"
910
)
1011

1112
const crlf = "\r\n"
@@ -125,6 +126,42 @@ func makeExpireCmd(obj model.RedisObject) CmdLine {
125126
return args
126127
}
127128

129+
var (
130+
xaddCmd = []byte("XADD")
131+
)
132+
133+
func formatStreamID(streamID *model.StreamId) string {
134+
ms := strconv.FormatUint(streamID.Ms, 10)
135+
seq := strconv.FormatUint(streamID.Sequence, 10)
136+
return ms + "-" + seq
137+
}
138+
139+
func streamToCmd(stream *model.StreamObject) []CmdLine {
140+
commands := make([]CmdLine, 0)
141+
142+
for _, entry := range stream.Entries {
143+
for _, message := range entry.Msgs {
144+
args := make(CmdLine, 0, 3+len(message.Fields)*2)
145+
args = append(
146+
args,
147+
xaddCmd,
148+
[]byte(stream.GetKey()),
149+
[]byte(formatStreamID(message.Id)),
150+
)
151+
152+
for key, value := range message.Fields {
153+
args = append(args, []byte(key), []byte(value))
154+
}
155+
156+
commands = append(commands, args)
157+
158+
// TODO: groups, consumers, pending
159+
}
160+
}
161+
162+
return commands
163+
}
164+
128165
// ObjectToCmd convert redis object to redis command line
129166
func ObjectToCmd(obj model.RedisObject, opts ...interface{}) []CmdLine {
130167
if obj == nil {
@@ -154,6 +191,9 @@ func ObjectToCmd(obj model.RedisObject, opts ...interface{}) []CmdLine {
154191
case model.ZSetType:
155192
zsetObj := obj.(*model.ZSetObject)
156193
cmdLines = append(cmdLines, zSetToCmd(zsetObj))
194+
case model.StreamType:
195+
streamObj := obj.(*model.StreamObject)
196+
cmdLines = append(cmdLines, streamToCmd(streamObj)...)
157197
}
158198
if obj.GetExpiration() != nil {
159199
cmdLines = append(cmdLines, makeExpireCmd(obj))

0 commit comments

Comments
 (0)