Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/pulsar/TableViewConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#ifndef PULSAR_TABLEVIEW_CONFIGURATION_H_
#define PULSAR_TABLEVIEW_CONFIGURATION_H_

#include <pulsar/ConsumerCryptoFailureAction.h>
#include <pulsar/CryptoKeyReader.h>
#include <pulsar/Schema.h>
#include <pulsar/defines.h>

Expand All @@ -31,6 +33,10 @@ struct TableViewConfiguration {
SchemaInfo schemaInfo;
// The name of the subscription to the topic. Default value is reader-{random string}.
std::string subscriptionName;
// the shared pointer to CryptoKeyReader
CryptoKeyReaderPtr cryptoKeyReader;
// the ConsumerCryptoFailureAction to use
ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction::FAIL;
};
} // namespace pulsar
#endif /* PULSAR_TABLEVIEW_CONFIGURATION_H_ */
11 changes: 11 additions & 0 deletions include/pulsar/c/table_view_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <pulsar/defines.h>

#include "consumer_configuration.h"
#include "producer_configuration.h"

#ifdef __cplusplus
Expand All @@ -43,6 +44,16 @@ PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name(
PULSAR_PUBLIC const char *pulsar_table_view_configuration_get_subscription_name(
pulsar_table_view_configuration_t *table_view_configuration_t);

PULSAR_PUBLIC void pulsar_table_view_configuration_set_default_crypto_key_reader(
pulsar_table_view_configuration_t *table_view_configuration_t, const char *public_key_path,
const char *private_key_path);

PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_table_view_configuration_get_crypto_failure_action(
pulsar_table_view_configuration_t *table_view_configuration_t);

PULSAR_PUBLIC void pulsar_table_view_configuration_set_crypto_failure_action(
pulsar_table_view_configuration_t *table_view_configuration_t,
pulsar_consumer_crypto_failure_action crypto_failure_action);
#ifdef __cplusplus
}
#endif
5 changes: 5 additions & 0 deletions lib/TableViewImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ Future<Result, TableViewImplPtr> TableViewImpl::start() {
readerConfiguration.setReadCompacted(true);
readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName);

if (conf_.cryptoKeyReader != nullptr) {
readerConfiguration.setCryptoFailureAction(conf_.cryptoFailureAction);
readerConfiguration.setCryptoKeyReader(conf_.cryptoKeyReader);
}

TableViewImplPtr self = shared_from_this();
ReaderCallback readerCallback = [self, promise](Result res, Reader reader) {
if (res == ResultOk) {
Expand Down
21 changes: 21 additions & 0 deletions lib/c/c_TableViewConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ const char *pulsar_table_view_configuration_get_subscription_name(
pulsar_table_view_configuration_t *table_view_configuration_t) {
return table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str();
}

void pulsar_table_view_configuration_set_default_crypto_key_reader(
pulsar_table_view_configuration_t *table_view_configuration_t, const char *public_key_path,
const char *private_key_path) {
std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(public_key_path, private_key_path);
table_view_configuration_t->tableViewConfiguration.cryptoKeyReader = keyReader;
}

pulsar_consumer_crypto_failure_action pulsar_table_view_configuration_get_crypto_failure_action(
pulsar_table_view_configuration_t *table_view_configuration_t) {
return (pulsar_consumer_crypto_failure_action)
table_view_configuration_t->tableViewConfiguration.cryptoFailureAction;
}

void pulsar_table_view_configuration_set_crypto_failure_action(
pulsar_table_view_configuration_t *table_view_configuration_t,
pulsar_consumer_crypto_failure_action crypto_failure_action) {
table_view_configuration_t->tableViewConfiguration.cryptoFailureAction =
(pulsar::ConsumerCryptoFailureAction)crypto_failure_action;
}
47 changes: 47 additions & 0 deletions tests/TableViewTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,50 @@ TEST(TableViewTest, testMultiTopicAndAutoUpdatePartitions) {

client.close();
}

TEST(TableViewTest, testMessageEncryption) {
ClientConfiguration config;
Client client(lookupUrl);

std::string topicName = "testMessageEncryption" + std::to_string(time(nullptr));

std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem";
std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem";
std::shared_ptr<pulsar::DefaultCryptoKeyReader> keyReader =
std::make_shared<pulsar::DefaultCryptoKeyReader>(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH);

ProducerConfiguration producerConfig;
producerConfig.addEncryptionKey("client-rsa.pem");
producerConfig.setCryptoKeyReader(keyReader);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

TableViewConfiguration tvConf;
tvConf.cryptoKeyReader = keyReader;
TableView tableView;
ASSERT_EQ(ResultOk, client.createTableView(topicName, tvConf, tableView));

int numberOfMessages = 10;
std::string msgContent = "msg-content";
std::set<std::string> valuesSent;
Message msg;
for (int i = 0; i < numberOfMessages; ++i) {
std::string key = std::to_string(i);
auto value = msgContent + key;
valuesSent.emplace(value);
msg = MessageBuilder().setPartitionKey(key).setContent(value).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::set<std::string> valuesReceived;
for (int i = 0; i < numberOfMessages; ++i) {
std::string key = std::to_string(i);
std::string value;
ASSERT_EQ(ResultOk, tableView.getValue(key, value));
valuesReceived.emplace(value);
}

ASSERT_EQ(valuesSent, valuesReceived);

ASSERT_EQ(ResultOk, client.close());
}