2222import static com .google .common .base .Preconditions .checkNotNull ;
2323
2424import java .io .Serializable ;
25+ import java .util .HashMap ;
26+ import java .util .Map ;
2527
26- import org .apache .pulsar .client .api .Authentication ;
27- import org .apache .pulsar .client .api .Consumer ;
28- import org .apache .pulsar .client .api .MessageListener ;
29- import org .apache .pulsar .client .api .PulsarClient ;
30- import org .apache .pulsar .client .api .PulsarClientException ;
28+ import org .apache .pulsar .client .api .*;
3129import org .apache .pulsar .client .impl .PulsarClientImpl ;
3230import org .apache .pulsar .client .impl .conf .ConsumerConfigurationData ;
3331import org .apache .spark .storage .StorageLevel ;
@@ -43,34 +41,52 @@ public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
4341 private static final Logger LOG = LoggerFactory .getLogger (SparkStreamingPulsarReceiver .class );
4442
4543 private String serviceUrl ;
46- private ConsumerConfigurationData <byte []> conf ;
44+ private Map <String ,Object > clientConfig ;
45+ private ConsumerConfigurationData <byte []> consumerConfig ;
4746 private Authentication authentication ;
4847 private PulsarClient pulsarClient ;
4948 private Consumer <byte []> consumer ;
5049
5150 public SparkStreamingPulsarReceiver (
5251 String serviceUrl ,
53- ConsumerConfigurationData <byte []> conf ,
52+ ConsumerConfigurationData <byte []> consumerConfig ,
5453 Authentication authentication ) {
55- this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , conf , authentication );
54+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , new HashMap <>(), consumerConfig , authentication );
55+ }
56+
57+ public SparkStreamingPulsarReceiver (
58+ String serviceUrl ,
59+ Map <String ,Object > clientConfig ,
60+ ConsumerConfigurationData <byte []> consumerConfig ,
61+ Authentication authentication ) {
62+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , clientConfig , consumerConfig , authentication );
63+ }
64+
65+ public SparkStreamingPulsarReceiver (StorageLevel storageLevel ,
66+ String serviceUrl ,
67+ ConsumerConfigurationData <byte []> consumerConf ,
68+ Authentication authentication ) {
69+ this (StorageLevel .MEMORY_AND_DISK_2 (), serviceUrl , new HashMap <>(), consumerConf , authentication );
5670 }
5771
5872 public SparkStreamingPulsarReceiver (StorageLevel storageLevel ,
5973 String serviceUrl ,
60- ConsumerConfigurationData <byte []> conf ,
74+ Map <String ,Object > clientConfig ,
75+ ConsumerConfigurationData <byte []> consumerConfig ,
6176 Authentication authentication ) {
6277 super (storageLevel );
6378
6479 checkNotNull (serviceUrl , "serviceUrl must not be null" );
65- checkNotNull (conf , "ConsumerConfigurationData must not be null" );
66- checkArgument (conf .getTopicNames ().size () > 0 , "TopicNames must be set a value." );
67- checkNotNull (conf .getSubscriptionName (), "SubscriptionName must not be null" );
80+ checkNotNull (consumerConfig , "ConsumerConfigurationData must not be null" );
81+ checkNotNull (clientConfig , "Client configuration map must not be null" );
82+ checkArgument (consumerConfig .getTopicNames ().size () > 0 , "TopicNames must be set a value." );
83+ checkNotNull (consumerConfig .getSubscriptionName (), "SubscriptionName must not be null" );
6884
6985 this .serviceUrl = serviceUrl ;
7086 this .authentication = authentication ;
7187
72- if (conf .getMessageListener () == null ) {
73- conf .setMessageListener ((MessageListener <byte []> & Serializable ) (consumer , msg ) -> {
88+ if (consumerConfig .getMessageListener () == null ) {
89+ consumerConfig .setMessageListener ((MessageListener <byte []> & Serializable ) (consumer , msg ) -> {
7490 try {
7591 store (msg .getData ());
7692 consumer .acknowledgeAsync (msg );
@@ -80,13 +96,18 @@ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
8096 }
8197 });
8298 }
83- this .conf = conf ;
99+ this .clientConfig = clientConfig ;
100+ this .consumerConfig = consumerConfig ;
84101 }
85102
86103 public void onStart () {
87104 try {
88- pulsarClient = PulsarClient .builder ().serviceUrl (serviceUrl ).authentication (authentication ).build ();
89- consumer = ((PulsarClientImpl ) pulsarClient ).subscribeAsync (conf ).join ();
105+ ClientBuilder builder = PulsarClient .builder ().serviceUrl (serviceUrl ).authentication (authentication );
106+ if (!clientConfig .isEmpty ()) {
107+ builder .loadConf (clientConfig );
108+ }
109+ pulsarClient = builder .build ();
110+ consumer = ((PulsarClientImpl ) pulsarClient ).subscribeAsync (consumerConfig ).join ();
90111 } catch (Exception e ) {
91112 LOG .error ("Failed to start subscription : {}" , e .getMessage ());
92113 restart ("Restart a consumer" );
0 commit comments