Skip to content

Commit ec0e6a8

Browse files
author
hankunming
committed
make the json format of ConsumeRunningInfo and HeartBeatData compatible with RocketMQ Java main project
1 parent 6523dcc commit ec0e6a8

2 files changed

Lines changed: 49 additions & 6 deletions

File tree

src/protocol/ConsumerRunningInfo.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,18 @@ string ConsumerRunningInfo::encode() {
104104
Json::FastWriter fastwrite;
105105
string finals = fastwrite.write(root);
106106
string key = "\"mqTable\":";
107-
key.append("{");
107+
key.append("[");
108108
for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin(); it != mqTable.end(); ++it) {
109+
key.append("[");
109110
key.append((it->first).toJson().toStyledString());
110111
key.erase(key.end() - 1);
111-
key.append(":");
112+
key.append(",");
112113
key.append((it->second).toJson().toStyledString());
114+
key.append("]");
113115
key.append(",");
114116
}
115117
key.erase(key.end() - 1);
116-
key.append("}");
118+
key.append("]");
117119

118120
// insert mqTable to final string
119121
key.append(",");

src/protocol/HeartbeatData.h

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,50 @@ class ConsumerData {
5050
Json::Value toJson() const {
5151
Json::Value outJson;
5252
outJson["groupName"] = groupName;
53-
outJson["consumeFromWhere"] = consumeFromWhere;
54-
outJson["consumeType"] = consumeType;
55-
outJson["messageModel"] = messageModel;
53+
54+
switch (consumeFromWhere) {
55+
case CONSUME_FROM_FIRST_OFFSET:
56+
outJson["consumeFromWhere"] = "CONSUME_FROM_FIRST_OFFSET";
57+
break;
58+
case CONSUME_FROM_TIMESTAMP:
59+
outJson["consumeFromWhere"] = "CONSUME_FROM_TIMESTAMP";
60+
break;
61+
case CONSUME_FROM_LAST_OFFSET:
62+
outJson["consumeFromWhere"] = "CONSUME_FROM_LAST_OFFSET";
63+
break;
64+
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
65+
outJson["consumeFromWhere"] = "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST";
66+
break;
67+
case CONSUME_FROM_MAX_OFFSET:
68+
outJson["consumeFromWhere"] = "CONSUME_FROM_MAX_OFFSET";
69+
break;
70+
case CONSUME_FROM_MIN_OFFSET:
71+
outJson["consumeFromWhere"] = "CONSUME_FROM_MIN_OFFSET";
72+
break;
73+
default:
74+
break;
75+
}
76+
77+
switch (consumeType) {
78+
case CONSUME_ACTIVELY:
79+
outJson["consumeType"] = "CONSUME_ACTIVELY";
80+
break;
81+
case CONSUME_PASSIVELY:
82+
outJson["consumeType"] = "CONSUME_PASSIVELY";
83+
default:
84+
break;
85+
}
86+
87+
switch (messageModel) {
88+
case CLUSTERING:
89+
outJson["messageModel"] = "CLUSTERING";
90+
break;
91+
case BROADCASTING:
92+
outJson["messageModel"] = "BROADCASTING";
93+
default:
94+
break;
95+
}
96+
5697

5798
vector<SubscriptionData>::const_iterator it = subscriptionDataSet.begin();
5899
for (; it != subscriptionDataSet.end(); it++) {

0 commit comments

Comments
 (0)