An interface that can implement publishing and consuming messages from various message queue platforms.
The tests are integration tests and depend on zookeeper and kafka running.
$ make run$ make stop./gradlew clean testexport MESSAGE_QUEUE_HOST='localhost:9092'PelicanAppConfig appConfig = new PelicanAppConfig();
appConfig.setMessageQueueHost("localhost:9092");In order to change the subscribe and publish settings implement a local PelicanAppConfig and override the methods,
propertiesForSubscribe, propertiesForPublish. Then use the local implementation to construct publishers and subscribers.
public class PubSubConfig extends PelicanAppConfig {
@Override
public Properties propertiesForSubscribe(String clientId, String consumerGroup) {
Properties properties = super.propertiesForSubscribe(clientId, consumerGroup);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
@Override
public Properties propertiesForPublish(String clientId) {
Properties properties = super.propertiesForPublish(clientId);
properties.put("my.new.prop.key", "my.new.prop.value");
return properties;
}
}PelicanAppConfig appConfig = new PelicanAppConfig();
Map<String, String> message = new HashMap<>();
message.put("test_key", "test_value");
Publish publish = appConfig.publish();
String topic = "test";
publish.send(topic, message);AppConfig appConfig = new AppConfig();
List<String> topics = Arrays.asList("test");
String consumerGroup = "test";
Subscribe subject = appConfig.subscribe(topics, consumerGroup);
Duration timeout = Duration.ofSeconds(100);
List<Map<String, String>> messages = subject.poll(timeout);
// tell the queue the record has been processed.
subject.processed();This repo also has the definitions for:
They contain image tags that mirror this project's versions.
The images are to be used for local development only and not production ready.