diff --git a/include/pulsar/TableViewConfiguration.h b/include/pulsar/TableViewConfiguration.h index eef58818..1627eb65 100644 --- a/include/pulsar/TableViewConfiguration.h +++ b/include/pulsar/TableViewConfiguration.h @@ -19,6 +19,8 @@ #ifndef PULSAR_TABLEVIEW_CONFIGURATION_H_ #define PULSAR_TABLEVIEW_CONFIGURATION_H_ +#include +#include #include #include @@ -31,6 +33,10 @@ struct TableViewConfiguration { SchemaInfo schemaInfo; // The name of the subscription to the topic. Default value is reader-{random string}. std::string subscriptionName; + // the shared pointer to CryptoKeyReader + CryptoKeyReaderPtr cryptoKeyReader; + // the ConsumerCryptoFailureAction to use + ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction::FAIL; }; } // namespace pulsar #endif /* PULSAR_TABLEVIEW_CONFIGURATION_H_ */ diff --git a/include/pulsar/c/table_view_configuration.h b/include/pulsar/c/table_view_configuration.h index 26389b6a..651d76b3 100644 --- a/include/pulsar/c/table_view_configuration.h +++ b/include/pulsar/c/table_view_configuration.h @@ -21,6 +21,7 @@ #include +#include "consumer_configuration.h" #include "producer_configuration.h" #ifdef __cplusplus @@ -43,6 +44,16 @@ PULSAR_PUBLIC void pulsar_table_view_configuration_set_subscription_name( PULSAR_PUBLIC const char *pulsar_table_view_configuration_get_subscription_name( pulsar_table_view_configuration_t *table_view_configuration_t); +PULSAR_PUBLIC void pulsar_table_view_configuration_set_default_crypto_key_reader( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *public_key_path, + const char *private_key_path); + +PULSAR_PUBLIC pulsar_consumer_crypto_failure_action pulsar_table_view_configuration_get_crypto_failure_action( + pulsar_table_view_configuration_t *table_view_configuration_t); + +PULSAR_PUBLIC void pulsar_table_view_configuration_set_crypto_failure_action( + pulsar_table_view_configuration_t *table_view_configuration_t, + pulsar_consumer_crypto_failure_action crypto_failure_action); #ifdef __cplusplus } #endif diff --git a/lib/TableViewImpl.cc b/lib/TableViewImpl.cc index fd249326..2c906250 100644 --- a/lib/TableViewImpl.cc +++ b/lib/TableViewImpl.cc @@ -39,6 +39,11 @@ Future TableViewImpl::start() { readerConfiguration.setReadCompacted(true); readerConfiguration.setInternalSubscriptionName(conf_.subscriptionName); + if (conf_.cryptoKeyReader != nullptr) { + readerConfiguration.setCryptoFailureAction(conf_.cryptoFailureAction); + readerConfiguration.setCryptoKeyReader(conf_.cryptoKeyReader); + } + TableViewImplPtr self = shared_from_this(); ReaderCallback readerCallback = [self, promise](Result res, Reader reader) { if (res == ResultOk) { diff --git a/lib/c/c_TableViewConfiguration.cc b/lib/c/c_TableViewConfiguration.cc index 3f8132a3..af879af2 100644 --- a/lib/c/c_TableViewConfiguration.cc +++ b/lib/c/c_TableViewConfiguration.cc @@ -44,3 +44,24 @@ const char *pulsar_table_view_configuration_get_subscription_name( pulsar_table_view_configuration_t *table_view_configuration_t) { return table_view_configuration_t->tableViewConfiguration.subscriptionName.c_str(); } + +void pulsar_table_view_configuration_set_default_crypto_key_reader( + pulsar_table_view_configuration_t *table_view_configuration_t, const char *public_key_path, + const char *private_key_path) { + std::shared_ptr keyReader = + std::make_shared(public_key_path, private_key_path); + table_view_configuration_t->tableViewConfiguration.cryptoKeyReader = keyReader; +} + +pulsar_consumer_crypto_failure_action pulsar_table_view_configuration_get_crypto_failure_action( + pulsar_table_view_configuration_t *table_view_configuration_t) { + return (pulsar_consumer_crypto_failure_action) + table_view_configuration_t->tableViewConfiguration.cryptoFailureAction; +} + +void pulsar_table_view_configuration_set_crypto_failure_action( + pulsar_table_view_configuration_t *table_view_configuration_t, + pulsar_consumer_crypto_failure_action crypto_failure_action) { + table_view_configuration_t->tableViewConfiguration.cryptoFailureAction = + (pulsar::ConsumerCryptoFailureAction)crypto_failure_action; +} diff --git a/tests/TableViewTest.cc b/tests/TableViewTest.cc index 2515818d..3acdb9b8 100644 --- a/tests/TableViewTest.cc +++ b/tests/TableViewTest.cc @@ -216,3 +216,50 @@ TEST(TableViewTest, testMultiTopicAndAutoUpdatePartitions) { client.close(); } + +TEST(TableViewTest, testMessageEncryption) { + ClientConfiguration config; + Client client(lookupUrl); + + std::string topicName = "testMessageEncryption" + std::to_string(time(nullptr)); + + std::string PUBLIC_CERT_FILE_PATH = "../test-conf/public-key.client-rsa.pem"; + std::string PRIVATE_CERT_FILE_PATH = "../test-conf/private-key.client-rsa.pem"; + std::shared_ptr keyReader = + std::make_shared(PUBLIC_CERT_FILE_PATH, PRIVATE_CERT_FILE_PATH); + + ProducerConfiguration producerConfig; + producerConfig.addEncryptionKey("client-rsa.pem"); + producerConfig.setCryptoKeyReader(keyReader); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer)); + + TableViewConfiguration tvConf; + tvConf.cryptoKeyReader = keyReader; + TableView tableView; + ASSERT_EQ(ResultOk, client.createTableView(topicName, tvConf, tableView)); + + int numberOfMessages = 10; + std::string msgContent = "msg-content"; + std::set valuesSent; + Message msg; + for (int i = 0; i < numberOfMessages; ++i) { + std::string key = std::to_string(i); + auto value = msgContent + key; + valuesSent.emplace(value); + msg = MessageBuilder().setPartitionKey(key).setContent(value).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + std::set valuesReceived; + for (int i = 0; i < numberOfMessages; ++i) { + std::string key = std::to_string(i); + std::string value; + ASSERT_EQ(ResultOk, tableView.getValue(key, value)); + valuesReceived.emplace(value); + } + + ASSERT_EQ(valuesSent, valuesReceived); + + ASSERT_EQ(ResultOk, client.close()); +}