diff --git a/include/pulsar/Client.h b/include/pulsar/Client.h index e9813e3d..be3f33ec 100644 --- a/include/pulsar/Client.h +++ b/include/pulsar/Client.h @@ -36,11 +36,15 @@ #include #include +#include namespace pulsar { typedef std::function CreateProducerCallback; +typedef std::function)> CreateProducerCallbackV2; typedef std::function SubscribeCallback; +typedef std::function)> SubscribeCallbackV2; typedef std::function ReaderCallback; +typedef std::function)> ReaderCallbackV2; typedef std::function TableViewCallback; typedef std::function&)> GetPartitionsCallback; typedef std::function CloseCallback; @@ -108,7 +112,9 @@ class PULSAR_PUBLIC Client { * @return ResultOk if the producer has been successfully created * @return ResultError if there was an error */ - Result createProducer(const std::string& topic, const ProducerConfiguration& conf, Producer& producer); + [[deprecated("use createProducerV2 instead")]] Result createProducer(const std::string& topic, + const ProducerConfiguration& conf, + Producer& producer); /** * Asynchronously create a producer with the default ProducerConfiguration for publishing on a specific @@ -118,7 +124,18 @@ class PULSAR_PUBLIC Client { * @param callback the callback that is triggered when the producer is created successfully or not * @param callback Callback function that is invoked when the operation is completed */ - void createProducerAsync(const std::string& topic, const CreateProducerCallback& callback); + [[deprecated("use createProducerAsyncV2 instead")]] void createProducerAsync( + const std::string& topic, const CreateProducerCallback& callback); + + std::variant createProducerV2(const std::string& topic); + + std::variant createProducerV2(const std::string& topic, + const ProducerConfiguration& conf); + + void createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback); + + void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback); /** * Asynchronously create a producer with the customized ProducerConfiguration for publishing on a specific @@ -151,6 +168,11 @@ class PULSAR_PUBLIC Client { Result subscribe(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer); + std::variant subscribeV2(const std::string& topic, const std::string& subscriptionName); + + std::variant subscribeV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf); + /** * Asynchronously subscribe to a given topic and subscription combination with the default * ConsumerConfiguration @@ -163,6 +185,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const SubscribeCallbackV2& callback); + /** * Asynchronously subscribe to a given topic and subscription combination with the customized * ConsumerConfiguration @@ -176,6 +201,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + /** * Subscribe to multiple topics under the same namespace. * @@ -197,6 +225,13 @@ class PULSAR_PUBLIC Client { Result subscribe(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, Consumer& consumer); + std::variant subscribeV2(const std::vector& topics, + const std::string& subscriptionName); + + std::variant subscribeV2(const std::vector& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf); + /** * Asynchronously subscribe to a list of topics and subscription combination using the default ConsumerConfiguration @@ -210,6 +245,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const SubscribeCallbackV2& callback); + /** * Asynchronously subscribe to a list of topics and subscription combination using the customized * ConsumerConfiguration @@ -223,6 +261,9 @@ class PULSAR_PUBLIC Client { void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + /** * Subscribe to multiple topics, which match given regexPattern, under the same namespace. */ @@ -291,6 +332,9 @@ class PULSAR_PUBLIC Client { Result createReader(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, Reader& reader); + std::variant createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf); + /** * Asynchronously create a topic reader with the customized ReaderConfiguration for reading messages from * the specified topic. @@ -320,6 +364,9 @@ class PULSAR_PUBLIC Client { void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + /** * Create a table view with given {@code TableViewConfiguration} for specified topic. * diff --git a/include/pulsar/Result.h b/include/pulsar/Result.h index a6c30d4c..7b1bb1a6 100644 --- a/include/pulsar/Result.h +++ b/include/pulsar/Result.h @@ -23,6 +23,7 @@ #include #include +#include namespace pulsar { @@ -101,6 +102,12 @@ enum Result : int8_t PULSAR_PUBLIC const char* strResult(Result result); PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, pulsar::Result result); + +struct Error { + Result result; + std::string message; +}; + } // namespace pulsar #endif /* ERROR_HPP_ */ diff --git a/lib/Client.cc b/lib/Client.cc index 48e92dda..dce18ce2 100644 --- a/lib/Client.cc +++ b/lib/Client.cc @@ -63,6 +63,30 @@ void Client::createProducerAsync(const std::string& topic, const CreateProducerC createProducerAsync(topic, ProducerConfiguration(), callback); } +std::variant Client::createProducerV2(const std::string& topic) { + return createProducerV2(topic, ProducerConfiguration()); +} + +std::variant Client::createProducerV2(const std::string& topic, + const ProducerConfiguration& conf) { + Promise > promise; + createProducerAsyncV2(topic, conf, [promise](const auto& result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + +void Client::createProducerAsyncV2(const std::string& topic, const CreateProducerCallbackV2& callback) { + createProducerAsyncV2(topic, ProducerConfiguration(), callback); +} + +void Client::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback) { + impl_->createProducerAsyncV2(topic, conf, callback); +} + void Client::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback) { impl_->createProducerAsync(topic, conf, callback); @@ -81,17 +105,46 @@ Result Client::subscribe(const std::string& topic, const std::string& subscripti return future.get(consumer); } +std::variant Client::subscribeV2(const std::string& topic, + const std::string& subscriptionName) { + return subscribeV2(topic, subscriptionName, ConsumerConfiguration()); +} + +std::variant Client::subscribeV2(const std::string& topic, + const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + Promise > promise; + subscribeAsyncV2(topic, subscriptionName, conf, + [promise](const auto& result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const SubscribeCallback& callback) { subscribeAsync(topic, subscriptionName, ConsumerConfiguration(), callback); } +void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const SubscribeCallbackV2& callback) { + subscribeAsyncV2(topic, subscriptionName, ConsumerConfiguration(), callback); +} + void Client::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { LOG_INFO("Subscribing on Topic :" << topic); impl_->subscribeAsync(topic, subscriptionName, conf, callback); } +void Client::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { + LOG_INFO("Subscribing on Topic :" << topic); + impl_->subscribeAsyncV2(topic, subscriptionName, conf, callback); +} + Result Client::subscribe(const std::vector& topics, const std::string& subscriptionName, Consumer& consumer) { return subscribe(topics, subscriptionName, ConsumerConfiguration(), consumer); @@ -106,16 +159,44 @@ Result Client::subscribe(const std::vector& topics, const std::stri return future.get(consumer); } +std::variant Client::subscribeV2(const std::vector& topics, + const std::string& subscriptionName) { + return subscribeV2(topics, subscriptionName, ConsumerConfiguration()); +} + +std::variant Client::subscribeV2(const std::vector& topics, + const std::string& subscriptionName, + const ConsumerConfiguration& conf) { + Promise > promise; + subscribeAsyncV2(topics, subscriptionName, conf, + [promise](const auto& result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const SubscribeCallback& callback) { subscribeAsync(topics, subscriptionName, ConsumerConfiguration(), callback); } +void Client::subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const SubscribeCallbackV2& callback) { + subscribeAsyncV2(topics, subscriptionName, ConsumerConfiguration(), callback); +} + void Client::subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { impl_->subscribeAsync(topics, subscriptionName, conf, callback); } +void Client::subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { + impl_->subscribeAsyncV2(topics, subscriptionName, conf, callback); +} + Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName, Consumer& consumer) { return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer); @@ -149,11 +230,28 @@ Result Client::createReader(const std::string& topic, const MessageId& startMess return future.get(reader); } +std::variant Client::createReaderV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf) { + Promise > promise; + createReaderAsyncV2(topic, startMessageId, conf, + [promise](const auto& result) { promise.setValue(result); }); + Future > future = promise.getFuture(); + + std::variant result; + future.get(result); + return result; +} + void Client::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { impl_->createReaderAsync(topic, startMessageId, conf, callback); } +void Client::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback) { + impl_->createReaderAsyncV2(topic, startMessageId, conf, callback); +} + Result Client::createTableView(const std::string& topic, const TableViewConfiguration& conf, TableView& tableView) { Promise promise; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index a135ade7..620ba932 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -1711,7 +1711,11 @@ void ClientConnection::handleError(const proto::CommandError& error) { pendingRequests_.erase(it); lock.unlock(); - request->fail(result); + ResponseData data; + if (error.has_message()) { + data.errorMessage = error.message(); + } + request->fail(result, data); } else { auto it = pendingGetLastMessageIdRequests_.find(error.request_id()); if (it != pendingGetLastMessageIdRequests_.end()) { diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h index c8cd86fe..f17354c1 100644 --- a/lib/ClientConnection.h +++ b/lib/ClientConnection.h @@ -113,9 +113,10 @@ class CommandSuccess; // Data returned on the request operation. Mostly used on create-producer command struct ResponseData { std::string producerName; - int64_t lastSequenceId; + int64_t lastSequenceId = -1L; std::string schemaVersion; optional topicEpoch; + std::string errorMessage; }; typedef std::shared_ptr> NamespaceTopicsPtr; diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc index b84c14c4..a51dcd87 100644 --- a/lib/ClientImpl.cc +++ b/lib/ClientImpl.cc @@ -189,6 +189,20 @@ LookupServicePtr ClientImpl::getLookup(const std::string& redirectedClusterURI) void ClientImpl::createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback, bool autoDownloadSchema) { + createProducerAsyncV2( + topic, conf, + [callback](std::variant result) { + if (auto producer = std::get_if(&result)) { + callback(ResultOk, *producer); + } else { + callback(std::get(result).result, {}); + } + }, + autoDownloadSchema); +} + +void ClientImpl::createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback, bool autoDownloadSchema) { if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) { throw std::invalid_argument("Batching and chunking of messages can't be enabled together"); } @@ -197,11 +211,11 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Producer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Producer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -211,18 +225,18 @@ void ClientImpl::createProducerAsync(const std::string& topic, const ProducerCon getSchema(topicName).addListener( [self, topicName, callback](Result res, const SchemaInfo& topicSchema) { if (res != ResultOk) { - callback(res, Producer()); + callback(Error{res, ""}); return; } ProducerConfiguration conf; conf.setSchema(topicSchema); self->getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1, + std::bind(&ClientImpl::handleCreateProducerV2, self, std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); }); } else { getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleCreateProducerV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, conf, callback)); } } @@ -258,6 +272,37 @@ void ClientImpl::handleCreateProducer(Result result, const LookupDataResultPtr& } } +void ClientImpl::handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback) { + if (!result) { + ProducerImplBasePtr producer; + + auto interceptors = std::make_shared(conf.getInterceptors()); + + try { + if (partitionMetadata->getPartitions() > 0) { + producer = std::make_shared( + shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors); + } else { + producer = std::make_shared(shared_from_this(), *topicName, conf, interceptors); + } + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create producer: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + producer->getProducerCreatedFuture().addListener( + std::bind(&ClientImpl::handleProducerCreatedV2, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, callback, producer)); + producer->start(); + } else { + LOG_ERROR("Error Checking/Getting Partition Metadata while creating producer on " + << topicName->toString() << " -- " << result); + callback(Error{result, ""}); + } +} + void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, const CreateProducerCallback& callback, const ProducerImplBasePtr& producer) { @@ -277,25 +322,55 @@ void ClientImpl::handleProducerCreated(Result result, const ProducerImplBaseWeak } } +void ClientImpl::handleProducerCreatedV2(Result result, const ProducerImplBaseWeakPtr& producerBaseWeakPtr, + const CreateProducerCallbackV2& callback, + const ProducerImplBasePtr& producer) { + if (result == ResultOk) { + auto address = producer.get(); + auto existingProducer = producers_.putIfAbsent(address, producer); + if (existingProducer) { + auto producer = existingProducer.value().lock(); + LOG_ERROR("Unexpected existing producer at the same address: " + << address << ", producer: " << (producer ? producer->getProducerName() : "(null)")); + callback(Error{ResultUnknownError, ""}); + return; + } + callback(Producer(producer)); + } else { + callback(Error{result, producer->getLastErrorMessage()}); + } +} + void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback) { + createReaderAsyncV2(topic, startMessageId, conf, [callback](std::variant result) { + if (auto reader = std::get_if(&result)) { + callback(ResultOk, *reader); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Reader()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Reader()); + callback(Error{ResultInvalidTopicName, ""}); return; } } MessageId msgId(startMessageId); getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleReaderMetadataLookupV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, msgId, conf, callback)); } @@ -364,6 +439,56 @@ void ClientImpl::handleReaderMetadataLookup(Result result, const LookupDataResul }); } +void ClientImpl::handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const MessageId& startMessageId, + const ReaderConfiguration& conf, + const ReaderCallbackV2& callback) { + if (result != ResultOk) { + LOG_ERROR("Error Checking/Getting Partition Metadata while creating readeron " + << topicName->toString() << " -- " << result); + callback(Error{result, ""}); + return; + } + + auto readerWeak = std::make_shared(); + ReaderCallback readerCallback = [readerWeak, callback](Result result, const Reader& reader) { + if (result == ResultOk) { + callback(reader); + } else { + auto readerImpl = readerWeak->lock(); + callback(Error{result, readerImpl ? readerImpl->getLastErrorMessage() : ""}); + } + }; + + ReaderImplPtr reader; + try { + reader.reset(new ReaderImpl(shared_from_this(), topicName->toString(), + partitionMetadata->getPartitions(), conf, + getListenerExecutorProvider()->get(), readerCallback)); + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create reader: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + *readerWeak = reader; + ConsumerImplBasePtr consumer = reader->getConsumer(); + auto self = shared_from_this(); + reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) { + auto consumer = weakConsumerPtr.lock(); + if (consumer) { + auto address = consumer.get(); + auto existingConsumer = consumers_.putIfAbsent(address, consumer); + if (existingConsumer) { + consumer = existingConsumer.value().lock(); + LOG_ERROR("Unexpected existing consumer at the same address: " + << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); + } + } else { + LOG_ERROR("Unexpected case: the consumer is somehow expired"); + } + }); +} + void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { @@ -445,6 +570,19 @@ void ClientImpl::createPatternMultiTopicsConsumer(Result result, const Namespace void ClientImpl::subscribeAsync(const std::vector& originalTopics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(originalTopics, subscriptionName, conf, + [callback](std::variant result) { + if (auto consumer = std::get_if(&result)) { + callback(ResultOk, *consumer); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::subscribeAsyncV2(const std::vector& originalTopics, + const std::string& subscriptionName, const ConsumerConfiguration& conf, + const SubscribeCallbackV2& callback) { TopicNamePtr topicNamePtr; // Remove duplicates from the list of topics @@ -456,12 +594,12 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else { if (!topics.empty() && !(topicNamePtr = MultiTopicsConsumerImpl::topicNamesValid(topics))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } } @@ -479,7 +617,7 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, ConsumerImplBasePtr consumer = std::make_shared( shared_from_this(), topics, subscriptionName, topicNamePtr, conf, interceptors); - consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreated, + consumer->getConsumerCreatedFuture().addListener(std::bind(&ClientImpl::handleConsumerCreatedV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, callback, consumer)); consumer->start(); @@ -487,28 +625,39 @@ void ClientImpl::subscribeAsync(const std::vector& originalTopics, void ClientImpl::subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback) { + subscribeAsyncV2(topic, subscriptionName, conf, [callback](std::variant result) { + if (auto consumer = std::get_if(&result)) { + callback(ResultOk, *consumer); + } else { + callback(std::get(result).result, {}); + } + }); +} + +void ClientImpl::subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback) { TopicNamePtr topicName; { std::shared_lock lock(mutex_); if (state_ != Open) { lock.unlock(); - callback(ResultAlreadyClosed, Consumer()); + callback(Error{ResultAlreadyClosed, ""}); return; } else if (!(topicName = TopicName::get(topic))) { lock.unlock(); - callback(ResultInvalidTopicName, Consumer()); + callback(Error{ResultInvalidTopicName, ""}); return; } else if (conf.isReadCompacted() && (topicName->getDomain().compare("persistent") != 0 || (conf.getConsumerType() != ConsumerExclusive && conf.getConsumerType() != ConsumerFailover))) { lock.unlock(); - callback(ResultInvalidConfiguration, Consumer()); + callback(Error{ResultInvalidConfiguration, ""}); return; } } getPartitionMetadataAsync(topicName).addListener( - std::bind(&ClientImpl::handleSubscribe, shared_from_this(), std::placeholders::_1, + std::bind(&ClientImpl::handleSubscribeV2, shared_from_this(), std::placeholders::_1, std::placeholders::_2, topicName, subscriptionName, conf, callback)); } @@ -556,6 +705,50 @@ void ClientImpl::handleSubscribe(Result result, const LookupDataResultPtr& parti } } +void ClientImpl::handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const std::string& subscriptionName, + ConsumerConfiguration conf, const SubscribeCallbackV2& callback) { + if (result == ResultOk) { + // generate random name if not supplied by the customer. + if (conf.getConsumerName().empty()) { + conf.setConsumerName(generateRandomName()); + } + ConsumerImplBasePtr consumer; + auto interceptors = std::make_shared(conf.getInterceptors()); + + try { + if (partitionMetadata->getPartitions() > 0) { + if (conf.getReceiverQueueSize() == 0) { + LOG_ERROR("Can't use partitioned topic if the queue size is 0."); + callback(Error{ResultInvalidConfiguration, ""}); + return; + } + consumer = std::make_shared(shared_from_this(), topicName, + partitionMetadata->getPartitions(), + subscriptionName, conf, interceptors); + } else { + auto consumerImpl = std::make_shared(shared_from_this(), topicName->toString(), + subscriptionName, conf, + topicName->isPersistent(), interceptors); + consumerImpl->setPartitionIndex(topicName->getPartitionIndex()); + consumer = consumerImpl; + } + } catch (const std::runtime_error& e) { + LOG_ERROR("Failed to create consumer: " << e.what()); + callback(Error{ResultConnectError, e.what()}); + return; + } + consumer->getConsumerCreatedFuture().addListener( + std::bind(&ClientImpl::handleConsumerCreatedV2, shared_from_this(), std::placeholders::_1, + std::placeholders::_2, callback, consumer)); + consumer->start(); + } else { + LOG_ERROR("Error Checking/Getting Partition Metadata while Subscribing on " << topicName->toString() + << " -- " << result); + callback(Error{result, ""}); + } +} + void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr, const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer) { @@ -582,6 +775,34 @@ void ClientImpl::handleConsumerCreated(Result result, const ConsumerImplBaseWeak } } +void ClientImpl::handleConsumerCreatedV2(Result result, + const ConsumerImplBaseWeakPtr& consumerImplBaseWeakPtr, + const SubscribeCallbackV2& callback, + const ConsumerImplBasePtr& consumer) { + if (result == ResultOk) { + auto address = consumer.get(); + auto existingConsumer = consumers_.putIfAbsent(address, consumer); + if (existingConsumer) { + auto consumer = existingConsumer.value().lock(); + LOG_ERROR("Unexpected existing consumer at the same address: " + << address << ", consumer: " << (consumer ? consumer->getName() : "(null)")); + callback(Error{ResultUnknownError, ""}); + return; + } + callback(Consumer(consumer)); + } else { + const auto errorMessage = consumer->getLastErrorMessage(); + // In order to be compatible with the current broker error code confusion. + // https://github.com/apache/pulsar/blob/cd2aa550d0fe4e72b5ff88c4f6c1c2795b3ff2cd/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java#L240-L241 + if (result == ResultProducerBusy) { + LOG_ERROR("Failed to create consumer: SubscriptionName cannot be empty."); + callback(Error{ResultInvalidConfiguration, errorMessage}); + } else { + callback(Error{result, errorMessage}); + } + } +} + GetConnectionFuture ClientImpl::getConnection(const std::string& redirectedClusterURI, const std::string& topic, size_t key) { Promise promise; diff --git a/lib/ClientImpl.h b/lib/ClientImpl.h index 7772b15b..a96d510d 100644 --- a/lib/ClientImpl.h +++ b/lib/ClientImpl.h @@ -95,18 +95,30 @@ class ClientImpl : public std::enable_shared_from_this { void createProducerAsync(const std::string& topic, const ProducerConfiguration& conf, const CreateProducerCallback& callback, bool autoDownloadSchema = false); + void createProducerAsyncV2(const std::string& topic, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback, bool autoDownloadSchema = false); + void subscribeAsync(const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::string& topic, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + void subscribeAsync(const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); + void subscribeAsyncV2(const std::vector& topics, const std::string& subscriptionName, + const ConsumerConfiguration& conf, const SubscribeCallbackV2& callback); + void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, const SubscribeCallback& callback); void createReaderAsync(const std::string& topic, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void createReaderAsyncV2(const std::string& topic, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + void createTableViewAsync(const std::string& topic, const TableViewConfiguration& conf, const TableViewCallback& callback); @@ -176,21 +188,38 @@ class ClientImpl : public std::enable_shared_from_this { const TopicNamePtr& topicName, const ProducerConfiguration& conf, const CreateProducerCallback& callback); + void handleCreateProducerV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const ProducerConfiguration& conf, + const CreateProducerCallbackV2& callback); + void handleSubscribe(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const std::string& consumerName, ConsumerConfiguration conf, const SubscribeCallback& callback); + void handleSubscribeV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const std::string& consumerName, + ConsumerConfiguration conf, const SubscribeCallbackV2& callback); + void handleReaderMetadataLookup(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const MessageId& startMessageId, const ReaderConfiguration& conf, const ReaderCallback& callback); + void handleReaderMetadataLookupV2(Result result, const LookupDataResultPtr& partitionMetadata, + const TopicNamePtr& topicName, const MessageId& startMessageId, + const ReaderConfiguration& conf, const ReaderCallbackV2& callback); + void handleGetPartitions(Result result, const LookupDataResultPtr& partitionMetadata, const TopicNamePtr& topicName, const GetPartitionsCallback& callback); void handleProducerCreated(Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, const CreateProducerCallback& callback, const ProducerImplBasePtr& producer); + void handleProducerCreatedV2(Result result, const ProducerImplBaseWeakPtr& producerWeakPtr, + const CreateProducerCallbackV2& callback, + const ProducerImplBasePtr& producer); void handleConsumerCreated(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, const SubscribeCallback& callback, const ConsumerImplBasePtr& consumer); + void handleConsumerCreatedV2(Result result, const ConsumerImplBaseWeakPtr& consumerWeakPtr, + const SubscribeCallbackV2& callback, const ConsumerImplBasePtr& consumer); typedef std::shared_ptr SharedInt; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 85f49946..6d5ff7ca 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -214,6 +214,11 @@ const std::string& ConsumerImpl::getSubscriptionName() const { return originalSu const std::string& ConsumerImpl::getTopic() const { return topic(); } +std::string ConsumerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + void ConsumerImpl::start() { HandlerBase::start(); ackGroupingTrackerPtr_->start(get_shared_this_ptr()); @@ -263,7 +268,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c setFirstRequestIdAfterConnect(requestId); cnx->sendRequestWithId(cmd, requestId, "SUBSCRIBE") .addListener([this, self, cnx, promise](Result result, const ResponseData& responseData) { - Result handleResult = handleCreateConsumer(cnx, result); + Result handleResult = handleCreateConsumer(cnx, result, responseData); if (handleResult != ResultOk) { promise.setFailed(handleResult); return; @@ -301,6 +306,11 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n } Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { + return handleCreateConsumer(cnx, result, ResponseData{}); +} + +Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData) { Result handleResult = ResultOk; if (result == ResultOk) { @@ -361,6 +371,10 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result } consumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { + { + Lock lock(mutex_); + lastErrorMessage_ = responseData.errorMessage; + } if (result == ResultTimeout) { // Creating the consumer has timed out. We need to ensure the broker closes the consumer // in case it was indeed created, otherwise it might prevent new subscribe operation, diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h index 6f287aa2..58bd467d 100644 --- a/lib/ConsumerImpl.h +++ b/lib/ConsumerImpl.h @@ -50,6 +50,7 @@ class ExecutorService; class ConsumerImpl; class MessageCrypto; class GetLastMessageIdResponse; +struct ResponseData; typedef std::shared_ptr MessageCryptoPtr; typedef std::shared_ptr BackoffPtr; typedef std::function ProcessDLQCallBack; @@ -113,6 +114,7 @@ class ConsumerImpl : public ConsumerImplBase { Future getConsumerCreatedFuture() override; const std::string& getSubscriptionName() const override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; void receiveAsync(const ReceiveCallback& callback) override; @@ -174,6 +176,8 @@ class ConsumerImpl : public ConsumerImplBase { void notifyBatchPendingReceivedCallback(const BatchReceiveCallback& callback) override; Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); + Result handleCreateConsumer(const ClientConnectionPtr& cnx, Result result, + const ResponseData& responseData); void internalListener(); @@ -250,6 +254,7 @@ class ConsumerImpl : public ConsumerImplBase { const int receiverQueueRefillThreshold_; const uint64_t consumerId_; const std::string consumerStr_; + std::string lastErrorMessage_; int32_t partitionIndex_ = -1; Promise consumerCreatedPromise_; std::atomic_bool messageListenerRunning_; diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h index ffc0e3cb..6b7e2aff 100644 --- a/lib/ConsumerImplBase.h +++ b/lib/ConsumerImplBase.h @@ -53,6 +53,7 @@ class ConsumerImplBase : public HandlerBase { virtual Future getConsumerCreatedFuture() = 0; virtual const std::string& getTopic() const = 0; virtual const std::string& getSubscriptionName() const = 0; + virtual std::string getLastErrorMessage() const = 0; virtual Result receive(Message& msg) = 0; virtual Result receive(Message& msg, int timeout) = 0; virtual void receiveAsync(const ReceiveCallback& callback) = 0; diff --git a/lib/Future.h b/lib/Future.h index 3b2b5246..1ca5cae8 100644 --- a/lib/Future.h +++ b/lib/Future.h @@ -137,6 +137,8 @@ class Promise { bool setFailed(Result result) const { return state_->complete(result, {}); } + bool setFailed(Result result, const Type &value) const { return state_->complete(result, value); } + bool setSuccess() const { return setValue({}); } bool isComplete() const { return state_->completed(); } diff --git a/lib/MockServer.h b/lib/MockServer.h index 6f8d1390..63a11cc9 100644 --- a/lib/MockServer.h +++ b/lib/MockServer.h @@ -36,6 +36,10 @@ namespace pulsar { class MockServer : public std::enable_shared_from_this { public: using RequestDelayType = std::unordered_map; + struct RequestError { + proto::ServerError error; + std::string message; + }; MockServer(const ClientConnectionPtr& connection) : connection_(connection) { requestDelays_["CLOSE_CONSUMER"] = 1; @@ -48,6 +52,11 @@ class MockServer : public std::enable_shared_from_this { } } + void setRequestError(const std::string& request, proto::ServerError error, const std::string& message) { + std::lock_guard lock(mutex_); + requestErrors_[request] = RequestError{error, message}; + } + bool sendRequest(const std::string& request, uint64_t requestId) { auto connection = connection_.lock(); if (!connection) { @@ -75,9 +84,23 @@ class MockServer : public std::enable_shared_from_this { } }); } + bool shouldFail = false; + proto::ServerError error = proto::UnknownError; + std::string message; + if (auto errorIter = requestErrors_.find(request); errorIter != requestErrors_.end()) { + shouldFail = true; + error = errorIter->second.error; + message = errorIter->second.message; + } schedule(connection, request + std::to_string(requestId), iter->second, - [connection, request, requestId] { - if (request == "CONSUMER_STATS") { + [connection, request, requestId, shouldFail, error, message] { + if (shouldFail) { + proto::CommandError response; + response.set_request_id(requestId); + response.set_error(error); + response.set_message(message); + connection->handleError(response); + } else if (request == "CONSUMER_STATS") { proto::CommandConsumerStatsResponse response; response.set_request_id(requestId); connection->handleConsumerStatsResponse(response); @@ -132,6 +155,7 @@ class MockServer : public std::enable_shared_from_this { private: mutable std::mutex mutex_; std::unordered_map requestDelays_; + std::unordered_map requestErrors_; std::unordered_map pendingTimers_; ClientConnectionWeakPtr connection_; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index a5699781..04d6cf08 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -255,6 +255,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions( subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what()); + { + Lock lock(mutex_); + lastErrorMessage_ = e.what(); + } topicSubResultPromise->setFailed(ResultConnectError); return; } @@ -276,6 +280,10 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions( subscriptionMode_, startMessageId_); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what()); + { + Lock lock(mutex_); + lastErrorMessage_ = e.what(); + } topicSubResultPromise->setFailed(ResultConnectError); return; } @@ -310,6 +318,11 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( assert(previous > 0); if (result != ResultOk) { + if (auto consumer = consumerImplBaseWeakPtr.lock()) { + auto lastErrorMessage = consumer->getLastErrorMessage(); + Lock lock(mutex_); + lastErrorMessage_ = lastErrorMessage; + } topicSubResultPromise->setFailed(result); LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); return; @@ -763,6 +776,11 @@ const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); } +std::string MultiTopicsConsumerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } void MultiTopicsConsumerImpl::shutdown() { internalShutdown(); } diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index 38a44cdf..b49eaed8 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -67,6 +67,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { Future getConsumerCreatedFuture() override; const std::string& getSubscriptionName() const override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Result receive(Message& msg) override; Result receive(Message& msg, int timeout) override; void receiveAsync(const ReceiveCallback& callback) override; @@ -117,6 +118,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { TimeDuration partitionsUpdateInterval_; std::shared_ptr> numberTopicPartitions_; std::atomic failedResult{ResultOk}; + std::string lastErrorMessage_; Promise multiTopicsConsumerCreatedPromise_; UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_; const std::vector topics_; diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc index 1aa5c87b..dbb170b8 100644 --- a/lib/PartitionedProducerImpl.cc +++ b/lib/PartitionedProducerImpl.cc @@ -81,6 +81,11 @@ PartitionedProducerImpl::~PartitionedProducerImpl() { internalShutdown(); } // override const std::string& PartitionedProducerImpl::getTopic() const { return topic_; } +std::string PartitionedProducerImpl::getLastErrorMessage() const { + Lock lock(producersMutex_); + return lastErrorMessage_; +} + unsigned int PartitionedProducerImpl::getNumPartitions() const { return static_cast(topicMetadata_->getNumPartitions()); } @@ -163,6 +168,11 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated( if (result != ResultOk) { LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); + if (auto producer = producerWeakPtr.lock()) { + const auto lastErrorMessage = producer->getLastErrorMessage(); + Lock lock(producersMutex_); + lastErrorMessage_ = lastErrorMessage; + } partitionedProducerCreatedPromise_.setFailed(result); state_ = Failed; if (++numProducersCreated_ == numPartitions) { diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h index 94ba7179..6e1bb9d9 100644 --- a/lib/PartitionedProducerImpl.h +++ b/lib/PartitionedProducerImpl.h @@ -78,6 +78,7 @@ class PartitionedProducerImpl : public ProducerImplBase, void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; @@ -101,6 +102,7 @@ class PartitionedProducerImpl : public ProducerImplBase, const TopicNamePtr topicName_; const std::string topic_; + std::string lastErrorMessage_; std::atomic_uint numProducersCreated_{0}; diff --git a/lib/PendingRequest.h b/lib/PendingRequest.h index 465073f6..8a9f9633 100644 --- a/lib/PendingRequest.h +++ b/lib/PendingRequest.h @@ -57,6 +57,11 @@ class PendingRequest : public std::enable_shared_from_this> { cancelTimer(timer_); } + void fail(Result result, const T& value) { + promise_.setFailed(result, value); + cancelTimer(timer_); + } + void disableTimeout() { timeoutDisabled_.store(true, std::memory_order_release); } auto getFuture() const { return promise_.getFuture(); } diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 7632581e..6222e611 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -127,6 +127,11 @@ const std::string& ProducerImpl::getTopic() const { return topic(); } const std::string& ProducerImpl::getProducerName() const { return producerName_; } +std::string ProducerImpl::getLastErrorMessage() const { + Lock lock(mutex_); + return lastErrorMessage_; +} + int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; } const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; } @@ -261,6 +266,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result } else { // Producer creation failed + lastErrorMessage_ = responseData.errorMessage; if (result == ResultTimeout) { // Creating the producer has timed out. We need to ensure the broker closes the producer // in case it was indeed created, otherwise it might prevent new create producer operation, diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h index 26207f80..78bfa15f 100644 --- a/lib/ProducerImpl.h +++ b/lib/ProducerImpl.h @@ -87,6 +87,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { void internalShutdown(); bool isClosed() override; const std::string& getTopic() const override; + std::string getLastErrorMessage() const override; Future getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; @@ -180,6 +181,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { std::string producerName_; bool userProvidedProducerName_; std::string producerStr_; + std::string lastErrorMessage_; uint64_t producerId_; std::unique_ptr batchMessageContainer_; diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h index 25a12c8a..f5ee5b81 100644 --- a/lib/ProducerImplBase.h +++ b/lib/ProducerImplBase.h @@ -44,6 +44,7 @@ class ProducerImplBase { virtual bool isClosed() = 0; virtual void shutdown() = 0; virtual const std::string& getTopic() const = 0; + virtual std::string getLastErrorMessage() const = 0; virtual Future getProducerCreatedFuture() = 0; virtual void triggerFlush() = 0; virtual void flushAsync(FlushCallback callback) = 0; diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc index 754137c5..a1f1a5b9 100644 --- a/lib/ReaderImpl.cc +++ b/lib/ReaderImpl.cc @@ -117,6 +117,10 @@ void ReaderImpl::start(const MessageId& startMessageId, const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); } +std::string ReaderImpl::getLastErrorMessage() const { + return consumer_ ? consumer_->getLastErrorMessage() : ""; +} + Result ReaderImpl::readNext(Message& msg) { Result res = consumer_->receive(msg); acknowledgeIfNecessary(res, msg); diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h index 020a5037..2c5ed934 100644 --- a/lib/ReaderImpl.h +++ b/lib/ReaderImpl.h @@ -66,6 +66,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this const std::function& callback); const std::string& getTopic() const; + std::string getLastErrorMessage() const; Result readNext(Message& msg); Result readNext(Message& msg, int timeoutMs); diff --git a/tests/ClientTest.cc b/tests/ClientTest.cc index 63ac9d16..807b1c7f 100644 --- a/tests/ClientTest.cc +++ b/tests/ClientTest.cc @@ -328,6 +328,68 @@ TEST(ClientTest, testTimedOutPendingRequestsAreErasedFromConnectionMaps) { executorProvider->close(); } +TEST(ClientTest, testRequestErrorMessageIsReturnedInResponseData) { + ClientConfiguration conf; + + auto executorProvider = std::make_shared(1); + AtomicSharedPtr serviceInfo; + serviceInfo.store(std::make_shared(lookupUrl)); + ConnectionPool pool(serviceInfo, conf, executorProvider, ""); + auto connection = std::make_shared(lookupUrl, lookupUrl, *serviceInfo.load(), + executorProvider->get(), conf, "", pool, 0); + PulsarFriend::setServerProtocolVersion(*connection, 8); + + auto mockServer = std::make_shared(connection); + connection->attachMockServer(mockServer); + + const std::string errorMessage = "bad token"; + mockServer->setRequestDelay({{"PRODUCER", 1}}); + mockServer->setRequestError("PRODUCER", proto::AuthenticationError, errorMessage); + + auto future = connection->sendRequestWithId(Commands::newPing(), 0, "PRODUCER"); + + ResponseData responseData; + ASSERT_EQ(ResultAuthenticationError, future.get(responseData)); + ASSERT_EQ(errorMessage, responseData.errorMessage); + ASSERT_EQ(0u, PulsarFriend::getPendingRequests(*connection)); + ASSERT_EQ(0u, mockServer->close()); + + connection->close(ResultDisconnected).wait(); + executorProvider->close(); +} + +TEST(ClientTest, testCreateProducerV2ReturnsError) { + Client client(lookupUrl); + + auto result = client.createProducerV2("persistent://prop//unit/ns1/testCreateProducerV2ReturnsError"); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + +TEST(ClientTest, testSubscribeV2ReturnsError) { + Client client(lookupUrl); + + auto result = client.subscribeV2("persistent://prop//unit/ns1/testSubscribeV2ReturnsError", "sub"); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + +TEST(ClientTest, testCreateReaderV2ReturnsError) { + Client client(lookupUrl); + ReaderConfiguration conf; + + auto result = client.createReaderV2("persistent://prop//unit/ns1/testCreateReaderV2ReturnsError", + MessageId::earliest(), conf); + auto error = std::get_if(&result); + + ASSERT_NE(nullptr, error); + ASSERT_EQ(ResultInvalidTopicName, error->result); +} + TEST(ClientTest, testGetNumberOfReferences) { Client client("pulsar://localhost:6650");