diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java index 419b6da1c6a0..480bb3b1788f 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitProperties.java @@ -52,6 +52,7 @@ * @author Scott Frederick * @author Lasse Wulff * @author Yanming Zhou + * @author Jay Choi * @since 4.0.0 */ @ConfigurationProperties("spring.rabbitmq") @@ -1311,6 +1312,11 @@ public static final class Stream { */ private @Nullable String name; + /** + * SSL configuration for RabbitMQ instance with the Stream plugin enabled. + */ + private final StreamSsl ssl = new StreamSsl(); + public String getHost() { return this.host; } @@ -1359,6 +1365,45 @@ public void setName(@Nullable String name) { this.name = name; } + public StreamSsl getSsl() { + return this.ssl; + } + + public static class StreamSsl { + + /** + * Whether to enable SSL support. Enabled automatically if "bundle" is + * provided. + */ + private @Nullable Boolean enabled; + + /** + * SSL bundle name. + */ + private @Nullable String bundle; + + public @Nullable Boolean getEnabled() { + return this.enabled; + } + + public boolean determineEnabled() { + return Boolean.TRUE.equals(getEnabled()) || this.bundle != null; + } + + public void setEnabled(@Nullable Boolean enabled) { + this.enabled = enabled; + } + + public @Nullable String getBundle() { + return this.bundle; + } + + public void setBundle(@Nullable String bundle) { + this.bundle = bundle; + } + + } + } } diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java index 0b396dd01f7e..e49039a54f76 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfiguration.java @@ -16,19 +16,28 @@ package org.springframework.boot.amqp.autoconfigure; +import javax.net.ssl.SSLException; + import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; import org.jspecify.annotations.Nullable; import org.springframework.amqp.rabbit.config.ContainerCustomizer; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream; +import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream.StreamSsl; import org.springframework.boot.amqp.autoconfigure.RabbitProperties.StreamContainer; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.boot.ssl.SslBundle; +import org.springframework.boot.ssl.SslBundles; +import org.springframework.boot.ssl.SslManagerBundle; +import org.springframework.boot.ssl.SslOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory; @@ -41,12 +50,14 @@ import org.springframework.rabbit.stream.producer.RabbitStreamTemplate; import org.springframework.rabbit.stream.support.converter.StreamMessageConverter; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * Configuration for Spring RabbitMQ Stream plugin support. * * @author Gary Russell * @author Eddú Meléndez + * @author Jay Choi */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamRabbitListenerContainerFactory.class) @@ -54,8 +65,9 @@ class RabbitStreamConfiguration { @Bean @ConditionalOnMissingBean - RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties) { - return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream()); + RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties, + @Nullable SslBundles sslBundles) { + return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream(), sslBundles); } @Bean(name = "rabbitListenerContainerFactory") @@ -131,15 +143,41 @@ private static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitPr .to(builder::virtualHost); map.from(streamConnectionDetails.getUsername()).orFrom(connectionDetails::getUsername).to(builder::username); map.from(streamConnectionDetails.getPassword()).orFrom(connectionDetails::getPassword).to(builder::password); + SslBundle sslBundle = streamConnectionDetails.getSslBundle(); + if (sslBundle != null) { + builder.tls().sslContext(createSslContext(sslBundle)); + } + else if (stream.getSsl().determineEnabled()) { + builder.tls(); + } return builder; } + private static SslContext createSslContext(SslBundle sslBundle) { + SslOptions options = sslBundle.getOptions(); + SslManagerBundle managers = sslBundle.getManagers(); + try { + return SslContextBuilder.forClient() + .keyManager(managers.getKeyManagerFactory()) + .trustManager(managers.getTrustManagerFactory()) + .ciphers(SslOptions.asSet(options.getCiphers())) + .protocols(options.getEnabledProtocols()) + .build(); + } + catch (SSLException ex) { + throw new IllegalStateException("Failed to create SSL context for RabbitMQ Stream", ex); + } + } + static class PropertiesRabbitStreamConnectionDetails implements RabbitStreamConnectionDetails { private final Stream streamProperties; - PropertiesRabbitStreamConnectionDetails(Stream streamProperties) { + private final @Nullable SslBundles sslBundles; + + PropertiesRabbitStreamConnectionDetails(Stream streamProperties, @Nullable SslBundles sslBundles) { this.streamProperties = streamProperties; + this.sslBundles = sslBundles; } @Override @@ -167,6 +205,19 @@ public int getPort() { return this.streamProperties.getPassword(); } + @Override + public @Nullable SslBundle getSslBundle() { + StreamSsl ssl = this.streamProperties.getSsl(); + if (!ssl.determineEnabled()) { + return null; + } + if (StringUtils.hasLength(ssl.getBundle())) { + Assert.notNull(this.sslBundles, "SSL bundle name has been set but no SSL bundles found in context"); + return this.sslBundles.getBundle(ssl.getBundle()); + } + return null; + } + } } diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java index 8b08349167d8..5de8fc8301cd 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConnectionDetails.java @@ -19,11 +19,13 @@ import org.jspecify.annotations.Nullable; import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails; +import org.springframework.boot.ssl.SslBundle; /** * Details required to establish a connection to a RabbitMQ Stream service. * * @author Eddú Meléndez + * @author Jay Choi * @since 4.1.0 */ public interface RabbitStreamConnectionDetails extends ConnectionDetails { @@ -64,4 +66,12 @@ public interface RabbitStreamConnectionDetails extends ConnectionDetails { return null; } + /** + * SSL bundle to use. + * @return the SSL bundle to use or {@code null} + */ + default @Nullable SslBundle getSslBundle() { + return null; + } + } diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java index e482c04c0220..186db12035c7 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/docker/compose/RabbitStreamDockerComposeConnectionDetailsFactory.java @@ -23,6 +23,7 @@ import org.springframework.boot.docker.compose.core.RunningService; import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory; import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource; +import org.springframework.boot.ssl.SslBundle; /** * {@link DockerComposeConnectionDetailsFactory} to create {@link RabbitConnectionDetails} @@ -32,6 +33,7 @@ * @author Andy Wilkinson * @author Phillip Webb * @author Scott Frederick + * @author Jay Choi */ class RabbitStreamDockerComposeConnectionDetailsFactory extends DockerComposeConnectionDetailsFactory { @@ -66,11 +68,14 @@ static class RabbitStreamDockerComposeConnectionDetails extends DockerComposeCon private final int port; + private final @Nullable SslBundle sslBundle; + protected RabbitStreamDockerComposeConnectionDetails(RunningService service) { super(service); this.environment = new RabbitEnvironment(service.env()); this.host = service.host(); this.port = service.ports().get(RABBITMQ_STREAMS_PORT); + this.sslBundle = getSslBundle(service); } @Override @@ -98,6 +103,11 @@ public int getPort() { return this.port; } + @Override + public @Nullable SslBundle getSslBundle() { + return this.sslBundle; + } + } } diff --git a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java index 8d2bf2bc9dc4..6f321c774604 100644 --- a/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java +++ b/module/spring-boot-amqp/src/main/java/org/springframework/boot/amqp/testcontainers/RabbitStreamContainerConnectionDetailsFactory.java @@ -16,9 +16,11 @@ package org.springframework.boot.amqp.testcontainers; +import org.jspecify.annotations.Nullable; import org.testcontainers.rabbitmq.RabbitMQContainer; import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails; +import org.springframework.boot.ssl.SslBundle; import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory; import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; @@ -29,6 +31,7 @@ * {@link ServiceConnection @ServiceConnection}-annotated {@link RabbitMQContainer}. * * @author Eddú Meléndez + * @author Jay Choi */ class RabbitStreamContainerConnectionDetailsFactory extends ContainerConnectionDetailsFactory { @@ -81,6 +84,11 @@ public String getPassword() { return getContainer().getAdminPassword(); } + @Override + public @Nullable SslBundle getSslBundle() { + return super.getSslBundle(); + } + } } diff --git a/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java b/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java index cc3f17a5a802..93548c283a41 100644 --- a/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java +++ b/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitPropertiesTests.java @@ -38,6 +38,7 @@ * @author Stephane Nicoll * @author Rafael Carvalho * @author Scott Frederick + * @author Jay Choi */ class RabbitPropertiesTests { @@ -381,4 +382,34 @@ void hostPropertyMustBeSingleHost() { .withMessageContaining("spring.rabbitmq.host"); } + @Test + void streamSslIsDisabledByDefault() { + assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse(); + } + + @Test + void streamSslIsEnabledWhenEnabledIsTrue() { + this.properties.getStream().getSsl().setEnabled(true); + assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue(); + } + + @Test + void streamSslIsEnabledWhenBundleIsSet() { + this.properties.getStream().getSsl().setBundle("test-bundle"); + assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue(); + } + + @Test + void streamSslIsDisabledWhenEnabledIsFalseAndBundleIsNotSet() { + this.properties.getStream().getSsl().setEnabled(false); + assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse(); + } + + @Test + void streamSslIsEnabledWhenBundleIsSetButEnabledIsFalse() { + this.properties.getStream().getSsl().setBundle("test-bundle"); + this.properties.getStream().getSsl().setEnabled(false); + assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue(); + } + } diff --git a/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java b/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java index 088c31f6ccdb..87230b0618c9 100644 --- a/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java +++ b/module/spring-boot-amqp/src/test/java/org/springframework/boot/amqp/autoconfigure/RabbitStreamConfigurationTests.java @@ -25,6 +25,7 @@ import com.rabbitmq.stream.Environment; import com.rabbitmq.stream.EnvironmentBuilder; import org.assertj.core.api.InstanceOfAssertFactories; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.MessageListenerContainer; @@ -35,6 +36,10 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.amqp.autoconfigure.RabbitStreamConfiguration.PropertiesRabbitStreamConnectionDetails; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration; +import org.springframework.boot.ssl.NoSuchSslBundleException; +import org.springframework.boot.ssl.SslBundle; +import org.springframework.boot.ssl.SslBundles; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -53,8 +58,11 @@ import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; /** * Tests for {@link RabbitStreamConfiguration}. @@ -63,11 +71,12 @@ * @author Andy Wilkinson * @author Eddú Meléndez * @author Moritz Halbritter + * @author Jay Choi */ class RabbitStreamConfigurationTests { private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class)); + .withConfiguration(AutoConfigurations.of(RabbitAutoConfiguration.class, SslAutoConfiguration.class)); @Test @SuppressWarnings("unchecked") @@ -150,7 +159,7 @@ void environmentUsesConnectionDetailsByDefault() { RabbitProperties properties = new RabbitProperties(); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().port(5552); then(builder).should().host("localhost"); then(builder).should().virtualHost("vhost"); @@ -167,7 +176,7 @@ void whenStreamPortIsSetThenEnvironmentUsesCustomPort() { properties.getStream().setPort(5553); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().port(5553); } @@ -178,7 +187,7 @@ void whenStreamHostIsSetThenEnvironmentUsesCustomHost() { properties.getStream().setHost("stream.rabbit.example.com"); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().host("stream.rabbit.example.com"); } @@ -189,7 +198,7 @@ void whenStreamVirtualHostIsSetThenEnvironmentUsesCustomVirtualHost() { properties.getStream().setVirtualHost("stream-virtual-host"); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().virtualHost("stream-virtual-host"); } @@ -200,7 +209,7 @@ void whenStreamVirtualHostIsNotSetButDefaultVirtualHostIsSetThenEnvironmentUsesD properties.setVirtualHost("default-virtual-host"); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "default-virtual-host"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().virtualHost("default-virtual-host"); } @@ -212,7 +221,7 @@ void whenStreamCredentialsAreNotSetThenEnvironmentUsesConnectionDetailsCredentia properties.setPassword("secret"); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("bob", "password", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().username("bob"); then(builder).should().password("password"); } @@ -227,7 +236,7 @@ void whenStreamCredentialsAreSetThenEnvironmentUsesStreamCredentials() { properties.getStream().setPassword("confidential"); RabbitStreamConfiguration.configure(builder, properties, new TestRabbitConnectionDetails("guest", "guest", "vhost"), - getRabbitStreamConnectionDetails(properties)); + getRabbitStreamConnectionDetails(properties, null)); then(builder).should().username("bob"); then(builder).should().password("confidential"); } @@ -319,8 +328,92 @@ void connectionDetailsAreApplied() { .containsExactly("rabbitmq", 5555)); } - private RabbitStreamConnectionDetails getRabbitStreamConnectionDetails(RabbitProperties properties) { - return new PropertiesRabbitStreamConnectionDetails(properties.getStream()); + @Test + void whenStreamSslIsNotConfiguredThenTlsIsNotUsed() { + EnvironmentBuilder builder = mock(EnvironmentBuilder.class); + RabbitProperties properties = new RabbitProperties(); + RabbitStreamConfiguration.configure(builder, properties, + new TestRabbitConnectionDetails("guest", "guest", "vhost"), + getRabbitStreamConnectionDetails(properties, null)); + then(builder).should(never()).tls(); + } + + @Test + void whenStreamSslIsEnabledThenTlsIsUsed() { + EnvironmentBuilder builder = mock(EnvironmentBuilder.class); + RabbitProperties properties = new RabbitProperties(); + properties.getStream().getSsl().setEnabled(true); + RabbitStreamConfiguration.configure(builder, properties, + new TestRabbitConnectionDetails("guest", "guest", "vhost"), + getRabbitStreamConnectionDetails(properties, null)); + then(builder).should().tls(); + } + + @Test + void whenStreamSslBundleIsConfiguredThenTlsIsUsed() { + this.contextRunner.withPropertyValues("spring.rabbitmq.stream.ssl.bundle=test-bundle", + "spring.ssl.bundle.jks.test-bundle.keystore.location=classpath:org/springframework/boot/amqp/autoconfigure/test.jks", + "spring.ssl.bundle.jks.test-bundle.keystore.password=secret") + .run((context) -> { + assertThat(context).hasNotFailed(); + assertThat(context).hasSingleBean(Environment.class); + }); + } + + @Test + void whenStreamSslIsDisabledThenTlsIsNotUsed() { + EnvironmentBuilder builder = mock(EnvironmentBuilder.class); + RabbitProperties properties = new RabbitProperties(); + properties.getStream().getSsl().setEnabled(false); + RabbitStreamConfiguration.configure(builder, properties, + new TestRabbitConnectionDetails("guest", "guest", "vhost"), + getRabbitStreamConnectionDetails(properties, null)); + then(builder).should(never()).tls(); + } + + @Test + void whenStreamSslIsDisabledWithBundleThenTlsIsStillUsed() { + EnvironmentBuilder builder = mock(EnvironmentBuilder.class); + SslBundles sslBundles = mock(SslBundles.class); + RabbitProperties properties = new RabbitProperties(); + properties.getStream().getSsl().setEnabled(false); + properties.getStream().getSsl().setBundle("some-bundle"); + RabbitStreamConfiguration.configure(builder, properties, + new TestRabbitConnectionDetails("guest", "guest", "vhost"), + getRabbitStreamConnectionDetails(properties, sslBundles)); + then(builder).should().tls(); + } + + @Test + void whenConnectionDetailsSslBundleIsProvidedThenTlsIsUsedWithoutProperties() { + this.contextRunner.withUserConfiguration(CustomConnectionDetails.class) + .withPropertyValues( + "spring.ssl.bundle.jks.test-bundle.keystore.location=classpath:org/springframework/boot/amqp/autoconfigure/test.jks", + "spring.ssl.bundle.jks.test-bundle.keystore.password=secret") + .run((context) -> { + assertThat(context).hasNotFailed(); + assertThat(context).hasSingleBean(Environment.class); + }); + } + + @Test + void whenStreamSslBundleIsInvalidThenFails() { + EnvironmentBuilder builder = mock(EnvironmentBuilder.class); + SslBundles sslBundles = mock(SslBundles.class); + given(sslBundles.getBundle("invalid-bundle")).willThrow( + new NoSuchSslBundleException("invalid-bundle", "SSL bundle name 'invalid-bundle' cannot be found")); + RabbitProperties properties = new RabbitProperties(); + properties.getStream().getSsl().setBundle("invalid-bundle"); + assertThatExceptionOfType(NoSuchSslBundleException.class) + .isThrownBy(() -> RabbitStreamConfiguration.configure(builder, properties, + new TestRabbitConnectionDetails("guest", "guest", "vhost"), + getRabbitStreamConnectionDetails(properties, sslBundles))) + .withMessageContaining("invalid-bundle"); + } + + private RabbitStreamConnectionDetails getRabbitStreamConnectionDetails(RabbitProperties properties, + @Nullable SslBundles sslBundles) { + return new PropertiesRabbitStreamConnectionDetails(properties.getStream(), sslBundles); } @Configuration(proxyBeanMethods = false) @@ -448,7 +541,7 @@ public List
getAddresses() { static class CustomConnectionDetails { @Bean - RabbitStreamConnectionDetails customRabbitMqStreamConnectionDetails() { + RabbitStreamConnectionDetails customRabbitMqStreamConnectionDetails(SslBundles sslBundles) { return new RabbitStreamConnectionDetails() { @Override public String getHost() { @@ -459,6 +552,14 @@ public String getHost() { public int getPort() { return 5555; } + + @Override + public @Nullable SslBundle getSslBundle() { + if (sslBundles.getBundleNames().contains("test-bundle")) { + return sslBundles.getBundle("test-bundle"); + } + return null; + } }; }