diff --git a/helper/resp.go b/helper/resp.go index 8f1da73..758a333 100644 --- a/helper/resp.go +++ b/helper/resp.go @@ -2,10 +2,11 @@ package helper import ( "bytes" - "github.com/hdt3213/rdb/model" "io" "sort" "strconv" + + "github.com/hdt3213/rdb/model" ) const crlf = "\r\n" @@ -125,6 +126,42 @@ func makeExpireCmd(obj model.RedisObject) CmdLine { return args } +var ( + xaddCmd = []byte("XADD") +) + +func formatStreamID(streamID *model.StreamId) string { + ms := strconv.FormatUint(streamID.Ms, 10) + seq := strconv.FormatUint(streamID.Sequence, 10) + return ms + "-" + seq +} + +func streamToCmd(stream *model.StreamObject) []CmdLine { + commands := make([]CmdLine, 0) + + for _, entry := range stream.Entries { + for _, message := range entry.Msgs { + args := make(CmdLine, 0, 3+len(message.Fields)*2) + args = append( + args, + xaddCmd, + []byte(stream.GetKey()), + []byte(formatStreamID(message.Id)), + ) + + for key, value := range message.Fields { + args = append(args, []byte(key), []byte(value)) + } + + commands = append(commands, args) + + // TODO: groups, consumers, pending + } + } + + return commands +} + // ObjectToCmd convert redis object to redis command line func ObjectToCmd(obj model.RedisObject, opts ...interface{}) []CmdLine { if obj == nil { @@ -154,6 +191,9 @@ func ObjectToCmd(obj model.RedisObject, opts ...interface{}) []CmdLine { case model.ZSetType: zsetObj := obj.(*model.ZSetObject) cmdLines = append(cmdLines, zSetToCmd(zsetObj)) + case model.StreamType: + streamObj := obj.(*model.StreamObject) + cmdLines = append(cmdLines, streamToCmd(streamObj)...) } if obj.GetExpiration() != nil { cmdLines = append(cmdLines, makeExpireCmd(obj))