Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@

#include <memory>
#include <string>
#include <variant>

namespace pulsar {
typedef std::function<void(Result, Producer)> CreateProducerCallback;
typedef std::function<void(std::variant<Producer, Error>)> CreateProducerCallbackV2;
typedef std::function<void(Result, Consumer)> SubscribeCallback;
typedef std::function<void(std::variant<Consumer, Error>)> SubscribeCallbackV2;
typedef std::function<void(Result, Reader)> ReaderCallback;
typedef std::function<void(std::variant<Reader, Error>)> ReaderCallbackV2;
typedef std::function<void(Result, TableView)> TableViewCallback;
typedef std::function<void(Result, const std::vector<std::string>&)> GetPartitionsCallback;
typedef std::function<void(Result)> CloseCallback;
Expand Down Expand Up @@ -108,7 +112,9 @@ class PULSAR_PUBLIC Client {
* @return ResultOk if the producer has been successfully created
* @return ResultError if there was an error
*/
Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer);
[[deprecated("use createProducerV2 instead")]] Result createProducer(const std::string& topic,
const ProducerConfiguration& conf,
Producer& producer);

/**
* Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific
Expand All @@ -118,7 +124,18 @@ class PULSAR_PUBLIC Client {
* @param callback the callback that is triggered when the producer is created successfully or not
* @param callback Callback function that is invoked when the operation is completed
*/
void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback);
[[deprecated("use createProducerAsyncV2 instead")]] void createProducerAsync(
const std::string& topic, const CreateProducerCallback& callback);

std::variant<Producer, Error> createProducerV2(const std::string& topic);

std::variant<Producer, Error> createProducerV2(const std::string& topic,
const ProducerConfiguration& conf);

void createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback);

void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallbackV2& callback);

/**
* Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific
Expand Down Expand Up @@ -151,6 +168,11 @@ class PULSAR_PUBLIC Client {
Result subscribe(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName);

std::variant<Consumer, Error> subscribeV2(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf);

/**
* Asynchronously subscribe to a given topic and subscription combination with the default
* ConsumerConfiguration
Expand All @@ -163,6 +185,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallback& callback);

void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallbackV2& callback);

/**
* Asynchronously subscribe to a given topic and subscription combination with the customized
* ConsumerConfiguration
Expand All @@ -176,6 +201,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback);

void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);

/**
* Subscribe to multiple topics under the same namespace.
*
Expand All @@ -197,6 +225,13 @@ class PULSAR_PUBLIC Client {
Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer);

std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName);

std::variant<Consumer, Error> subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName,
const ConsumerConfiguration& conf);

/**
* Asynchronously subscribe to a list of topics and subscription combination using the default
ConsumerConfiguration
Expand All @@ -210,6 +245,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallback& callback);

void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallbackV2& callback);

/**
* Asynchronously subscribe to a list of topics and subscription combination using the customized
* ConsumerConfiguration
Expand All @@ -223,6 +261,9 @@ class PULSAR_PUBLIC Client {
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback);

void subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback);

/**
* Subscribe to multiple topics, which match given regexPattern, under the same namespace.
*/
Expand Down Expand Up @@ -291,6 +332,9 @@ class PULSAR_PUBLIC Client {
Result createReader(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, Reader& reader);

std::variant<Reader, Error> createReaderV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf);

/**
* Asynchronously create a topic reader with the customized ReaderConfiguration for reading messages from
* the specified topic.
Expand Down Expand Up @@ -320,6 +364,9 @@ class PULSAR_PUBLIC Client {
void createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallback& callback);

void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallbackV2& callback);

/**
* Create a table view with given {@code TableViewConfiguration} for specified topic.
*
Expand Down
7 changes: 7 additions & 0 deletions include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstdint>
#include <iosfwd>
#include <string>

namespace pulsar {

Expand Down Expand Up @@ -101,6 +102,12 @@ enum Result : int8_t
PULSAR_PUBLIC const char* strResult(Result result);

PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result);

struct Error {
Result result;
std::string message;
};

} // namespace pulsar

#endif /* ERROR_HPP_ */
99 changes: 99 additions & 0 deletions lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC
createProducerAsync(topic, ProducerConfiguration(), callback);
}

std::variant<Producer, Error> Client::createProducerV2(const std::string& topic) {
return createProducerV2(topic, ProducerConfiguration());
}

std::variant<Producer, Error> Client::createProducerV2(const std::string& topic,
const ProducerConfiguration& conf) {
Promise<bool, std::variant<Producer, Error> > promise;
createProducerAsyncV2(topic, conf,
[promise](std::variant<Producer, Error> result) { promise.setValue(result); });
Future<bool, std::variant<Producer, Error> > future = promise.getFuture();

std::variant<Producer, Error> result;
future.get(result);
return result;
}

void Client::createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback) {
createProducerAsyncV2(topic, ProducerConfiguration(), callback);
}

void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallbackV2& callback) {
impl_->createProducerAsyncV2(topic, conf, callback);
}

void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf,
const CreateProducerCallback& callback) {
impl_->createProducerAsync(topic, conf, callback);
Expand All @@ -81,17 +106,46 @@ Result Client::subscribe(const std::string& topic, const std::string& subscripti
return future.get(consumer);
}

std::variant<Consumer, Error> Client::subscribeV2(const std::string& topic,
const std::string& subscriptionName) {
return subscribeV2(topic, subscriptionName, ConsumerConfiguration());
}

std::variant<Consumer, Error> Client::subscribeV2(const std::string& topic,
const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
Promise<bool, std::variant<Consumer, Error> > promise;
subscribeAsyncV2(topic, subscriptionName, conf,
[promise](std::variant<Consumer, Error> result) { promise.setValue(result); });
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();

std::variant<Consumer, Error> result;
future.get(result);
return result;
}

void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallback& callback) {
subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const SubscribeCallbackV2& callback) {
subscribeAsyncV2(topic, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
LOG_INFO("Subscribing on Topic :" << topic);
impl_->subscribeAsync(topic, subscriptionName, conf, callback);
}

void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) {
LOG_INFO("Subscribing on Topic :" << topic);
impl_->subscribeAsyncV2(topic, subscriptionName, conf, callback);
}

Result Client::subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
Consumer& consumer) {
return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer);
Expand All @@ -106,16 +160,44 @@ Result Client::subscribe(const std::vector<std::string>& topics, const std::stri
return future.get(consumer);
}

std::variant<Consumer, Error> Client::subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName) {
return subscribeV2(topics, subscriptionName, ConsumerConfiguration());
}

std::variant<Consumer, Error> Client::subscribeV2(const std::vector<std::string>& topics,
const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
Promise<bool, std::variant<Consumer, Error> > promise;
subscribeAsyncV2(topics, subscriptionName, conf,
[promise](std::variant<Consumer, Error> result) { promise.setValue(result); });
Future<bool, std::variant<Consumer, Error> > future = promise.getFuture();

std::variant<Consumer, Error> result;
future.get(result);
return result;
}

void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallback& callback) {
subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const SubscribeCallbackV2& callback) {
subscribeAsyncV2(topics, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallback& callback) {
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
}

void Client::subscribeAsyncV2(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) {
impl_->subscribeAsyncV2(topics, subscriptionName, conf, callback);
}

Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
Consumer& consumer) {
return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer);
Expand Down Expand Up @@ -149,11 +231,28 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess
return future.get(reader);
}

std::variant<Reader, Error> Client::createReaderV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
Promise<bool, std::variant<Reader, Error> > promise;
createReaderAsyncV2(topic, startMessageId, conf,
[promise](std::variant<Reader, Error> result) { promise.setValue(result); });
Future<bool, std::variant<Reader, Error> > future = promise.getFuture();

std::variant<Reader, Error> result;
future.get(result);
return result;
}

void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallback& callback) {
impl_->createReaderAsync(topic, startMessageId, conf, callback);
}

void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, const ReaderCallbackV2& callback) {
impl_->createReaderAsyncV2(topic, startMessageId, conf, callback);
}

Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf,
TableView& tableView) {
Promise<Result, TableView> promise;
Expand Down
6 changes: 5 additions & 1 deletion lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,11 @@ void ClientConnection::handleError(const proto::CommandError& error) {
pendingRequests_.erase(it);
lock.unlock();

request->fail(result);
ResponseData data;
if (error.has_message()) {
data.errorMessage = error.message();
}
request->fail(result, data);
} else {
auto it = pendingGetLastMessageIdRequests_.find(error.request_id());
if (it != pendingGetLastMessageIdRequests_.end()) {
Expand Down
3 changes: 2 additions & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ class CommandSuccess;
// Data returned on the request operation. Mostly used on create-producer command
struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
int64_t lastSequenceId = -1L;
std::string schemaVersion;
optional<uint64_t> topicEpoch;
std::string errorMessage;
};

typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
Expand Down
Loading
Loading