Skip to content

Commit 1a39e53

Browse files
authored
[fix][client-cpp] Implement missing pulsar_message_set_schema_version in C API (#552)
1 parent 967529b commit 1a39e53

File tree

4 files changed

+18
-1
lines changed

4 files changed

+18
-1
lines changed

include/pulsar/Message.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ class PULSAR_PUBLIC Message {
197197
*/
198198
const std::string& getSchemaVersion() const;
199199

200+
/**
201+
* Set the schema version of the message.
202+
*/
203+
void setSchemaVersion(const std::string& schemaVersion);
204+
200205
/**
201206
* Get the producer name which produced this message.
202207
*

lib/Message.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ const std::string& Message::getSchemaVersion() const {
209209
return impl_->getSchemaVersion();
210210
}
211211

212+
void Message::setSchemaVersion(const std::string& schemaVersion) {
213+
if (impl_) {
214+
impl_->metadata.set_schema_version(schemaVersion);
215+
}
216+
}
217+
212218
uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
213219

214220
uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }

lib/RetryableLookupService.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
#pragma once
2020

21+
#include <string>
22+
2123
#include "LookupDataResult.h"
2224
#include "LookupService.h"
2325
#include "NamespaceName.h"
@@ -64,7 +66,7 @@ class RetryableLookupService : public LookupService {
6466
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(
6567
const NamespaceNamePtr& nsName, CommandGetTopicsOfNamespace_Mode mode) override {
6668
return namespaceLookupCache_->run(
67-
"get-topics-of-namespace-" + nsName->toString(),
69+
"get-topics-of-namespace-" + nsName->toString() + "-" + std::to_string(mode),
6870
[this, nsName, mode] { return lookupService_->getTopicsOfNamespaceAsync(nsName, mode); });
6971
}
7072

lib/c/c_Message.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ int pulsar_message_has_schema_version(pulsar_message_t *message) {
141141
return message->message.hasSchemaVersion();
142142
}
143143

144+
void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion) {
145+
message->message.setSchemaVersion(schemaVersion ? schemaVersion : "");
146+
}
147+
144148
const char *pulsar_message_get_producer_name(pulsar_message_t *message) {
145149
return message->message.getProducerName().c_str();
146150
}

0 commit comments

Comments
 (0)