Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -163,6 +164,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private String duplexNetworkConnectorId;
private final CompletableFuture<ConnectionId> initialConnectionId = new CompletableFuture<>();

/**
* @param taskRunnerFactory - can be null if you want direct dispatch to the transport
Expand Down Expand Up @@ -851,11 +853,16 @@ public Response processAddConnection(ConnectionInfo info) throws Exception {

try {
broker.addConnection(context, info);
// Complete the future with the connectionId if we completed
// the broker.addConnection() chain successfully
initialConnectionId.complete(info.getConnectionId());
} catch (Exception e) {
synchronized (brokerConnectionStates) {
brokerConnectionStates.remove(info.getConnectionId());
}
unregisterConnectionState(info.getConnectionId());
// complete with the exception
initialConnectionId.completeExceptionally(e);
LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} due to {}",
info.getConnectionId(), clientId, info.getClientIp(), e.getLocalizedMessage());
//AMQ-6561 - stop for all exceptions on addConnection
Expand Down Expand Up @@ -1388,13 +1395,10 @@ public Response processBrokerInfo(BrokerInfo info) {
LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName());
} else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
try {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(getNetworkConfiguration(info), info);
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}", info.getBrokerId(), e);
LOG.error("Failed to register durable sync for network bridge creation from broker {}", info.getBrokerId(), e);
return null;
}
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
Expand All @@ -1404,10 +1408,8 @@ public Response processBrokerInfo(BrokerInfo info) {
NetworkBridgeConfiguration config = getNetworkConfiguration(info);
config.setBrokerName(broker.getBrokerName());

if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(), config));
}
// register durable sync to be sent after ConnectionInfo has been handled
registerDurableSync(config, info);

// check for existing duplex connection hanging about

Expand Down Expand Up @@ -1474,6 +1476,30 @@ public Response processBrokerInfo(BrokerInfo info) {
return null;
}

private void registerDurableSync(final NetworkBridgeConfiguration config, final BrokerInfo info) {
if (config.isSyncDurableSubs() && protocolVersion.get() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
// this will complete when the connection id has been set, or immediately if already set
initialConnectionId.whenComplete((connectionId, t) -> {
try {
if (t != null) {
LOG.warn("SyncDurableSubs will be skipped due to error {}",
t.getMessage());
return;
}
// check connection still registered
if (lookupConnectionState(connectionId) != null) {
LOG.debug("SyncDurableSubs is enabled, Sending BrokerSubscriptionInfo");
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
this.broker.getBrokerService(), config));
}
} catch (Exception e) {
LOG.error("Failed to respond to network bridge creation from broker {}",
info.getBrokerId(), e);
}
});
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private HashMap<String, String> createMap(Properties properties) {
return new HashMap(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,18 +562,32 @@ private static void validateAllowedUri(URI uri, int depth) throws URISyntaxExcep
// First check the main URI scheme
validateAllowedScheme(uri.getScheme());

// If composite, iterate and check each of the composite URIs
if (URISupport.isCompositeURI(uri)) {
URISupport.CompositeData data = URISupport.parseComposite(uri);
// We need to check if the URI is composite and/or contains nested URIs
// The utility method URISupport#isCompositeURI is not good enough here
// because it misses if there are no parentheses and also is primarily meant
// for checking comma separated URIs and not nested URIs.
//
// The best way to handle all cases is to use the same logic that the transports
// use to process the URIs and that is to simply attempt to parse it and check each
// of the parsed components. This wll correctly handle the case when there
// are parentheses and also when the parentheses are skipped.
final URISupport.CompositeData data;
try {
data = URISupport.parseComposite(uri);
} catch (URISyntaxException e) {
// If this is not a valid URI then we can stop checking
// This can happen when parsing a nested URI and at the last portion
return;
}

if (data.getComponents() != null) {
depth++;
for (URI component : data.getComponents()) {
// Each URI could be a nested composite URI so call validateAllowedUri()
// to validate it. This check if composite first so we don't add to
// the recursive stack depth if there's a lot of URIs that are not composite
if (URISupport.isCompositeURI(uri)) {
// Each URI could be a nested and/or composite URI so call validateAllowedUri()
// to validate it. If the scheme is null then the original URI is not composite
// or nested so we can skip the check, and we are finished.
if (component.getScheme() != null) {
validateAllowedUri(component, depth);
} else {
validateAllowedScheme(uri.getScheme());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,21 @@ protected SecurityContext checkSecurityContext(ConnectionContext context) throws
return securityContext;
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
Destination existing = this.getDestinationMap(destination).get(destination);
if (existing != null) {
protected boolean checkDestinationAdminAdd(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) != null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdminRemove(SecurityContext securityContext, ActiveMQDestination destination) {
if (this.getDestinationMap(destination).get(destination) == null) {
return true;
}
return checkDestinationAdmin(securityContext, destination);
}

protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveMQDestination destination) {
if (!securityContext.isBrokerContext()) {
Set<?> allowedACLs = null;
if (!destination.isTemporary()) {
Expand All @@ -100,18 +109,18 @@ protected boolean checkDestinationAdmin(SecurityContext securityContext, ActiveM
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminAdd(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + info.getDestination());
}

super.addDestinationInfo(context, info);
}

@Override
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminAdd(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to create: " + destination);
}

Expand All @@ -122,7 +131,7 @@ public Destination addDestination(ConnectionContext context, ActiveMQDestination
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, destination)) {
if (!checkDestinationAdminRemove(securityContext, destination)) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + destination);
}

Expand All @@ -135,7 +144,7 @@ public void removeDestination(ConnectionContext context, ActiveMQDestination des
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
final SecurityContext securityContext = checkSecurityContext(context);

if (!checkDestinationAdmin(securityContext, info.getDestination())) {
if (!checkDestinationAdminRemove(securityContext, info.getDestination())) {
throw new SecurityException("User " + securityContext.getUserName() + " is not authorized to remove: " + info.getDestination());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import java.util.stream.Collectors;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerFactoryHandler;
import org.apache.activemq.broker.BrokerRegistry;
Expand All @@ -44,12 +47,29 @@

public class VMTransportFactory extends TransportFactory {

public static final String VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP =
"org.apache.activemq.transport.VM_TRANSPORT_FACTORY_SCHEMES_ENABLED";
public static final String DEFAULT_ALLOWED_SCHEMES = "broker,properties";

public static final ConcurrentMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
public static final ConcurrentMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
public static final ConcurrentMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);

BrokerFactoryHandler brokerFactoryHandler;
private final Set<String> allowedSchemes;

public VMTransportFactory() {
final String allowedSchemes = System.getProperty(VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP,
DEFAULT_ALLOWED_SCHEMES);

// Asterisk will map to null which will allow all and skip checking
// Empty string will map to an empty set and will deny all
this.allowedSchemes = !allowedSchemes.equals("*") ?
Arrays.stream(allowedSchemes.split("\\s*,\\s*"))
.filter(s -> !s.trim().isEmpty())
.collect(Collectors.toSet()) : null;
}

@Override
public Transport doConnect(URI location) throws Exception {
Expand Down Expand Up @@ -119,6 +139,7 @@ public Transport doCompositeConnect(URI location) throws Exception {
throw new IOException("Broker named '" + host + "' does not exist.");
}
try {
validateBrokerCreationSchema(host, brokerURI);
if (brokerFactoryHandler != null) {
broker = brokerFactoryHandler.createBroker(brokerURI);
} else {
Expand Down Expand Up @@ -162,6 +183,20 @@ public Transport doCompositeConnect(URI location) throws Exception {
return transport;
}

private void validateBrokerCreationSchema(String host, URI brokerURI) {
if (allowedSchemes != null) {
final String detectedScheme = brokerURI.getScheme();
if (detectedScheme == null) {
throw new IllegalArgumentException("Could not detect scheme in given URI [" + brokerURI + "]");
}
if (!allowedSchemes.contains(detectedScheme)){
throw new IllegalArgumentException("Broker named '" + host + "' does not exist and "
+ "broker creation using the scheme '" + detectedScheme + "' is not enabled via the VMTransportFactory. "
+ "To allow creation, configure the system property " + VM_TRANSPORT_FACTORY_SCHEMES_ENABLED_PROP);
}
}
}

private static String extractHost(URI location) {
String host = location.getHost();
if (host == null || host.length() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,14 @@ public void testAddVmConnectorBlockedBrokerView() throws Exception {
assertEquals("VM scheme is not allowed", e.getMessage());
}

try {
// verify any composite URI is blocked as well without parens
brokerView.addConnector("static:tcp://0.0.0.0:0,vm://" + brokerName);
fail("Should have failed trying to add vm connector");
} catch (IllegalArgumentException e) {
assertEquals("VM scheme is not allowed", e.getMessage());
}

try {
// verify nested composite URI is blocked
brokerView.addConnector("failover:(failover:(failover:(vm://localhost)))");
Expand All @@ -2033,6 +2041,14 @@ public void testAddVmConnectorBlockedBrokerView() throws Exception {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}

try {
// verify nested composite URI with more than 5 levels is blocked without parens
brokerView.addConnector(
"static:static:static:static:static:static:tcp://localhost:0");
fail("Should have failed trying to add vm connector bridge");
} catch (IllegalArgumentException e) {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,34 @@
*/
package org.apache.activemq.config;

import java.io.File;
import java.util.Hashtable;

import javax.naming.Context;
import javax.naming.InitialContext;
import static org.apache.activemq.util.VmTransportTestUtils.resetVmTransportFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;

import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.util.Hashtable;

/**
*
*/
public class BrokerXmlConfigFromJNDITest extends JmsTopicSendReceiveWithTwoConnectionsTest {

@Override
protected void setUp() throws Exception {
// reset before each test
resetVmTransportFactory("xbean");
super.setUp();
}

@Override
protected void tearDown() throws Exception {
super.tearDown();
resetVmTransportFactory();
}

@Override
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
assertBaseDirectoryContainsSpaces();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ public void testVmBridgeBlocked() throws Exception {
assertEquals("VM scheme is not allowed", e.getMessage());
}

// Test composite with missing parens
try {
proxy.addNetworkConnector("static:vm://localhost,tcp://127.0.0.1:0");
fail("Should have failed trying to add vm connector bridge");
} catch (IllegalArgumentException e) {
assertEquals("VM scheme is not allowed", e.getMessage());
}

// verify direct connector as well
try {
proxy.addNetworkConnector("multicast:(vm://localhost)");
fail("Should have failed trying to add vm connector bridge");
Expand All @@ -112,6 +121,14 @@ public void testVmBridgeBlocked() throws Exception {
} catch (IllegalArgumentException e) {
assertEquals("VM scheme is not allowed", e.getMessage());
}

try {
// verify nested composite URI is blocked when not using parens
proxy.addNetworkConnector("static:static:static:tcp://localhost:0,vm://localhost");
fail("Should have failed trying to add vm connector bridge");
} catch (IllegalArgumentException e) {
assertEquals("VM scheme is not allowed", e.getMessage());
}
}

@Test
Expand All @@ -124,5 +141,14 @@ public void testAddNetworkConnectorMaxComposite() throws Exception {
} catch (IllegalArgumentException e) {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}

try {
// verify nested composite URI with more than 5 levels is blocked without parens
proxy.addNetworkConnector(
"static:static:static:static:static:static:tcp://localhost:0");
fail("Should have failed trying to add more than 5 connector bridges");
} catch (IllegalArgumentException e) {
assertEquals("URI can't contain more than 5 nested composite URIs", e.getMessage());
}
}
}
Loading