Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* @author Scott Frederick
* @author Lasse Wulff
* @author Yanming Zhou
* @author Jay Choi
* @since 4.0.0
*/
@ConfigurationProperties("spring.rabbitmq")
Expand Down Expand Up @@ -1311,6 +1312,11 @@ public static final class Stream {
*/
private @Nullable String name;

/**
* SSL configuration for RabbitMQ instance with the Stream plugin enabled.
*/
private final StreamSsl ssl = new StreamSsl();

public String getHost() {
return this.host;
}
Expand Down Expand Up @@ -1359,6 +1365,45 @@ public void setName(@Nullable String name) {
this.name = name;
}

public StreamSsl getSsl() {
return this.ssl;
}

public static class StreamSsl {

/**
* Whether to enable SSL support. Enabled automatically if "bundle" is
* provided.
*/
private @Nullable Boolean enabled;

/**
* SSL bundle name.
*/
private @Nullable String bundle;

public @Nullable Boolean getEnabled() {
return this.enabled;
}

public boolean determineEnabled() {
return Boolean.TRUE.equals(getEnabled()) || this.bundle != null;
}

public void setEnabled(@Nullable Boolean enabled) {
this.enabled = enabled;
}

public @Nullable String getBundle() {
return this.bundle;
}

public void setBundle(@Nullable String bundle) {
this.bundle = bundle;
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@

package org.springframework.boot.amqp.autoconfigure;

import javax.net.ssl.SSLException;

import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.EnvironmentBuilder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import org.jspecify.annotations.Nullable;

import org.springframework.amqp.rabbit.config.ContainerCustomizer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.Stream.StreamSsl;
import org.springframework.boot.amqp.autoconfigure.RabbitProperties.StreamContainer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.boot.ssl.SslManagerBundle;
import org.springframework.boot.ssl.SslOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
Expand All @@ -41,21 +50,24 @@
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* Configuration for Spring RabbitMQ Stream plugin support.
*
* @author Gary Russell
* @author Eddú Meléndez
* @author Jay Choi
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamRabbitListenerContainerFactory.class)
class RabbitStreamConfiguration {

@Bean
@ConditionalOnMissingBean
RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties) {
return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream());
RabbitStreamConnectionDetails rabbitStreamConnectionDetails(RabbitProperties rabbitProperties,
@Nullable SslBundles sslBundles) {
return new PropertiesRabbitStreamConnectionDetails(rabbitProperties.getStream(), sslBundles);
}

@Bean(name = "rabbitListenerContainerFactory")
Expand Down Expand Up @@ -131,15 +143,41 @@ private static EnvironmentBuilder configure(EnvironmentBuilder builder, RabbitPr
.to(builder::virtualHost);
map.from(streamConnectionDetails.getUsername()).orFrom(connectionDetails::getUsername).to(builder::username);
map.from(streamConnectionDetails.getPassword()).orFrom(connectionDetails::getPassword).to(builder::password);
SslBundle sslBundle = streamConnectionDetails.getSslBundle();
if (sslBundle != null) {
builder.tls().sslContext(createSslContext(sslBundle));
}
else if (stream.getSsl().determineEnabled()) {
builder.tls();
}
return builder;
}

private static SslContext createSslContext(SslBundle sslBundle) {
SslOptions options = sslBundle.getOptions();
SslManagerBundle managers = sslBundle.getManagers();
try {
return SslContextBuilder.forClient()
.keyManager(managers.getKeyManagerFactory())
.trustManager(managers.getTrustManagerFactory())
.ciphers(SslOptions.asSet(options.getCiphers()))
.protocols(options.getEnabledProtocols())
.build();
}
catch (SSLException ex) {
throw new IllegalStateException("Failed to create SSL context for RabbitMQ Stream", ex);
}
}

static class PropertiesRabbitStreamConnectionDetails implements RabbitStreamConnectionDetails {

private final Stream streamProperties;

PropertiesRabbitStreamConnectionDetails(Stream streamProperties) {
private final @Nullable SslBundles sslBundles;

PropertiesRabbitStreamConnectionDetails(Stream streamProperties, @Nullable SslBundles sslBundles) {
this.streamProperties = streamProperties;
this.sslBundles = sslBundles;
}

@Override
Expand Down Expand Up @@ -167,6 +205,19 @@ public int getPort() {
return this.streamProperties.getPassword();
}

@Override
public @Nullable SslBundle getSslBundle() {
StreamSsl ssl = this.streamProperties.getSsl();
if (!ssl.determineEnabled()) {
return null;
}
if (StringUtils.hasLength(ssl.getBundle())) {
Assert.notNull(this.sslBundles, "SSL bundle name has been set but no SSL bundles found in context");
return this.sslBundles.getBundle(ssl.getBundle());
}
return null;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.jspecify.annotations.Nullable;

import org.springframework.boot.autoconfigure.service.connection.ConnectionDetails;
import org.springframework.boot.ssl.SslBundle;

/**
* Details required to establish a connection to a RabbitMQ Stream service.
*
* @author Eddú Meléndez
* @author Jay Choi
* @since 4.1.0
*/
public interface RabbitStreamConnectionDetails extends ConnectionDetails {
Expand Down Expand Up @@ -64,4 +66,12 @@ public interface RabbitStreamConnectionDetails extends ConnectionDetails {
return null;
}

/**
* SSL bundle to use.
* @return the SSL bundle to use or {@code null}
*/
default @Nullable SslBundle getSslBundle() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.springframework.boot.docker.compose.core.RunningService;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionDetailsFactory;
import org.springframework.boot.docker.compose.service.connection.DockerComposeConnectionSource;
import org.springframework.boot.ssl.SslBundle;

/**
* {@link DockerComposeConnectionDetailsFactory} to create {@link RabbitConnectionDetails}
Expand All @@ -32,6 +33,7 @@
* @author Andy Wilkinson
* @author Phillip Webb
* @author Scott Frederick
* @author Jay Choi
*/
class RabbitStreamDockerComposeConnectionDetailsFactory
extends DockerComposeConnectionDetailsFactory<RabbitStreamConnectionDetails> {
Expand Down Expand Up @@ -66,11 +68,14 @@ static class RabbitStreamDockerComposeConnectionDetails extends DockerComposeCon

private final int port;

private final @Nullable SslBundle sslBundle;

protected RabbitStreamDockerComposeConnectionDetails(RunningService service) {
super(service);
this.environment = new RabbitEnvironment(service.env());
this.host = service.host();
this.port = service.ports().get(RABBITMQ_STREAMS_PORT);
this.sslBundle = getSslBundle(service);
}

@Override
Expand Down Expand Up @@ -98,6 +103,11 @@ public int getPort() {
return this.port;
}

@Override
public @Nullable SslBundle getSslBundle() {
return this.sslBundle;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package org.springframework.boot.amqp.testcontainers;

import org.jspecify.annotations.Nullable;
import org.testcontainers.rabbitmq.RabbitMQContainer;

import org.springframework.boot.amqp.autoconfigure.RabbitStreamConnectionDetails;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionDetailsFactory;
import org.springframework.boot.testcontainers.service.connection.ContainerConnectionSource;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
Expand All @@ -29,6 +31,7 @@
* {@link ServiceConnection @ServiceConnection}-annotated {@link RabbitMQContainer}.
*
* @author Eddú Meléndez
* @author Jay Choi
*/
class RabbitStreamContainerConnectionDetailsFactory
extends ContainerConnectionDetailsFactory<RabbitMQContainer, RabbitStreamConnectionDetails> {
Expand Down Expand Up @@ -81,6 +84,11 @@ public String getPassword() {
return getContainer().getAdminPassword();
}

@Override
public @Nullable SslBundle getSslBundle() {
return super.getSslBundle();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* @author Stephane Nicoll
* @author Rafael Carvalho
* @author Scott Frederick
* @author Jay Choi
*/
class RabbitPropertiesTests {

Expand Down Expand Up @@ -381,4 +382,34 @@ void hostPropertyMustBeSingleHost() {
.withMessageContaining("spring.rabbitmq.host");
}

@Test
void streamSslIsDisabledByDefault() {
assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse();
}

@Test
void streamSslIsEnabledWhenEnabledIsTrue() {
this.properties.getStream().getSsl().setEnabled(true);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}

@Test
void streamSslIsEnabledWhenBundleIsSet() {
this.properties.getStream().getSsl().setBundle("test-bundle");
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}

@Test
void streamSslIsDisabledWhenEnabledIsFalseAndBundleIsNotSet() {
this.properties.getStream().getSsl().setEnabled(false);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isFalse();
}

@Test
void streamSslIsEnabledWhenBundleIsSetButEnabledIsFalse() {
this.properties.getStream().getSsl().setBundle("test-bundle");
this.properties.getStream().getSsl().setEnabled(false);
assertThat(this.properties.getStream().getSsl().determineEnabled()).isTrue();
}

}
Loading