diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index b4126c955f97fc..ccba94eacdae23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -221,12 +222,14 @@ public boolean disableRebalancingCancellationOptimization() { assert part != null; assert part.id() == p; + GridDhtPartitionState state = part.state(); + // Do not rebalance OWNING or LOST partitions. - if (part.state() == OWNING || part.state() == LOST) + if (state == OWNING || state == LOST) continue; // State should be switched to MOVING during PME. - if (part.state() != MOVING) { + if (state != MOVING) { throw new AssertionError("Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 5ecbe893fb8525..d29f5c5216cb1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -923,7 +923,7 @@ private Collection updateTopologyHistory(long topVer, @Nullable Tcp Collection top = topHist.get(topVer); - assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']'; + assert top != null : "Failed to find topology history [top=" + topVer + ", msg=" + msg + ", hist=" + topHist + ']'; return top; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index aef67a57809d8c..ddb33a9ce07da0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -41,7 +41,6 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.NoSuchElementException; import java.util.Objects; import java.util.Queue; import java.util.Set; @@ -1129,7 +1128,7 @@ private void joinTopology() throws IgniteSpiException { FutureTask fut = msgWorker.addTask(new FutureTask() { @Override protected Void body() { pendingCustomMsgs.clear(); - msgWorker.pendingMsgs.reset(null, null, null); + msgWorker.pendingMsgs.reset(null); msgWorker.newNextNode(null); failedNodes.clear(); leavingNodes.clear(); @@ -1836,8 +1835,7 @@ private void printStatistics() { private void prepareNodeAddedMessage( TcpDiscoveryAbstractMessage msg, UUID destNodeId, - @Nullable Collection msgs, - @Nullable IgniteUuid discardCustomMsgId + @Nullable Collection msgs ) { assert destNodeId != null; @@ -1874,10 +1872,7 @@ private void prepareNodeAddedMessage( } } - // No need to send discardMsgId because we already filtered out - // cleaned up messages. - // TODO IGNITE-11271 - nodeAddedMsg.messages(msgs0, null, discardCustomMsgId); + nodeAddedMsg.messages(msgs0); Map> hist; @@ -1900,7 +1895,7 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); - nodeAddedMsg.messages(null, null, null); + nodeAddedMsg.messages(null); nodeAddedMsg.clearUnmarshalledDiscoveryData(); } } @@ -2659,7 +2654,7 @@ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUI TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg); - prepareNodeAddedMessage(msg0, destNodeId, null, null); + prepareNodeAddedMessage(msg0, destNodeId, null); msg0.topology(addedMsg.clientTopology()); @@ -2717,13 +2712,7 @@ private static class PendingMessages implements Iterable msgs = new ArrayDeque<>(MAX * 2); /** Processed custom message IDs. */ - private Set procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2); - - /** Discarded message ID. */ - private IgniteUuid discardId; - - /** Discarded custom message ID. */ - private IgniteUuid customDiscardId; + private final Set procCustomMsgs = new GridBoundedLinkedHashSet<>(MAX * 2); /** * Adds pending message and shrinks queue if it exceeds limit @@ -2733,42 +2722,15 @@ private static class PendingMessages implements Iterable MAX) { - PendingMessage queueHead = msgs.peek(); - - assert queueHead != null; - - if (queueHead.customMsg && customDiscardId != null) { - if (queueHead.id.equals(customDiscardId)) - customDiscardId = null; - } - else if (!queueHead.customMsg && discardId != null) { - if (queueHead.id.equals(discardId)) - discardId = null; - } - else - break; - - msgs.poll(); - } } /** * Resets pending messages. * * @param msgs Message. - * @param discardId Discarded message ID. - * @param customDiscardId Discarded custom event message ID. */ - void reset( - @Nullable Collection msgs, - @Nullable IgniteUuid discardId, - @Nullable IgniteUuid customDiscardId - ) { + void reset(@Nullable Collection msgs) { this.msgs.clear(); - this.customDiscardId = null; - this.discardId = null; if (msgs != null) { for (TcpDiscoveryAbstractMessage msg : msgs) { @@ -2777,9 +2739,6 @@ void reset( this.msgs.add(pm); } } - - this.discardId = discardId; - this.customDiscardId = customDiscardId; } /** @@ -2792,12 +2751,31 @@ void discard(IgniteUuid id, boolean custom) { if (!hasPendingMessage(custom, id)) return; - if (custom) - customDiscardId = id; - else - discardId = id; + IgniteUuid customDiscardId = custom ? id : null; + IgniteUuid discardId = custom ? null : id; + + Iterator it = msgs.iterator(); - cleanup(); + while (it.hasNext()) { + PendingMessage msg = it.next(); + + if (msg.customMsg) { + if (customDiscardId != null) { + it.remove(); + + if (Objects.equals(customDiscardId, msg.id)) + customDiscardId = null; + } + } + else { + if (discardId != null) { + it.remove(); + + if (Objects.equals(discardId, msg.id)) + discardId = null; + } + } + } } /** @@ -2814,134 +2792,36 @@ private boolean hasPendingMessage(boolean custom, IgniteUuid id) { return false; } - /** - * - */ - void cleanup() { - Iterator msgIt = msgs.iterator(); - - boolean skipMsg = discardId != null; - boolean skipCustomMsg = customDiscardId != null; - - while (msgIt.hasNext()) { - PendingMessage msg = msgIt.next(); - - if (msg.customMsg) { - if (skipCustomMsg) { - assert customDiscardId != null; - - if (Objects.equals(customDiscardId, msg.id)) { - msg.msg = null; - - if (msg.verified) - return; - } - } - } - else { - if (skipMsg) { - assert discardId != null; - - if (Objects.equals(discardId, msg.id)) { - msg.msg = null; - - if (msg.verified) - return; - } - } - } - } - } - /** * Gets iterator for non-discarded messages. * * @return Non-discarded messages iterator. */ @Override public Iterator iterator() { - return new SkipIterator(); + return new PendingMessageIterator(); } /** * */ - private class SkipIterator implements Iterator { - /** Skip non-custom messages flag. */ - private boolean skipMsg = discardId != null; - - /** Skip custom messages flag. */ - private boolean skipCustomMsg = customDiscardId != null; - + private class PendingMessageIterator implements Iterator { /** Internal iterator. */ - private Iterator msgIt = msgs.iterator(); - - /** Next message. */ - private TcpDiscoveryAbstractMessage next; - - { - advance(); - } + private final Iterator msgIt = msgs.iterator(); /** {@inheritDoc} */ @Override public boolean hasNext() { - return next != null; + return msgIt.hasNext(); } /** {@inheritDoc} */ @Override public TcpDiscoveryAbstractMessage next() { - if (next == null) - throw new NoSuchElementException(); - - TcpDiscoveryAbstractMessage next0 = next; - - advance(); - - return next0; + return msgIt.next().msg; } /** {@inheritDoc} */ @Override public void remove() { throw new UnsupportedOperationException(); } - - /** - * Advances iterator to the next available item. - */ - private void advance() { - next = null; - - while (msgIt.hasNext()) { - PendingMessage msg0 = msgIt.next(); - - if (msg0.customMsg) { - if (skipCustomMsg) { - assert customDiscardId != null; - - if (Objects.equals(customDiscardId, msg0.id) && msg0.verified) - skipCustomMsg = false; - - continue; - } - } - else { - if (skipMsg) { - assert discardId != null; - - if (Objects.equals(discardId, msg0.id) && msg0.verified) - skipMsg = false; - - continue; - } - } - - if (msg0.msg == null) - continue; - - next = msg0.msg; - - break; - } - } } } @@ -3401,7 +3281,7 @@ private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { msg0 = U.unmarshal(spi.marshaller(), msgBytes, U.resolveClassLoader(spi.ignite().configuration())); - prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null, null); + prepareNodeAddedMessage(msg0, clientMsgWorker.clientNodeId, null); msgBytes0 = null; } @@ -3750,8 +3630,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { long tsNanos = System.nanoTime(); - prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs); addFailedNodes(pendingMsg, failedNodes); @@ -3792,8 +3671,7 @@ else if (!spi.failureDetectionTimeoutEnabled() && (e instanceof } if (!(msg instanceof TcpDiscoveryConnectionCheckMessage)) - prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs); try { SecurityUtils.serializeVersion(1); @@ -4006,8 +3884,7 @@ private void processPendingMessagesLocally(TcpDiscoveryAbstractMessage curMsg) { UUID locNodeId = getLocalNodeId(); for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { - prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, - pendingMsgs.customDiscardId); + prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs); pendingMsg.senderNodeId(locNodeId); @@ -4126,7 +4003,7 @@ private boolean hasPendingAddMessage(UUID nodeId) { if (pendingMsg.msg instanceof TcpDiscoveryNodeAddedMessage) { TcpDiscoveryNodeAddedMessage addMsg = (TcpDiscoveryNodeAddedMessage)pendingMsg.msg; - if (addMsg.node().id().equals(nodeId) && addMsg.id().compareTo(pendingMsgs.discardId) > 0) + if (addMsg.node().id().equals(nodeId)) return true; } } @@ -5211,14 +5088,7 @@ else if (spiState == CONNECTING) topHist.clear(); topHist.putAll(msg.topologyHistory()); - pendingMsgs.reset(msg.messages(), msg.discardedMessageId(), - msg.discardedCustomMessageId()); - - // Clear data to minimize message size. - msg.messages(null, null, null); - msg.topology(null); - msg.topologyHistory(null); - msg.clearDiscoveryData(); + pendingMsgs.reset(msg.messages()); } else { if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java new file mode 100644 index 00000000000000..895caf066c137d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/MdcAwareNodesComparator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.internal; + +import java.util.Comparator; + +/** Compares nodes using the Data Center id as a primary factor. */ +public class MdcAwareNodesComparator implements Comparator { + /** */ + @Override public int compare(TcpDiscoveryNode n1, TcpDiscoveryNode n2) { + String n1DcId = n1.dataCenterId() == null ? "" : n1.dataCenterId(); + String n2DcId = n2.dataCenterId() == null ? "" : n2.dataCenterId(); + + int res = n1DcId.compareTo(n2DcId); + + if (res == 0) { + res = n1.compareTo(n2); + } + + return res; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index 6a316a9a599b68..fc8d4b1a5e1ea9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -506,7 +506,10 @@ public void clear() { if (filtered.size() < 2) return null; - Iterator iter = filtered.iterator(); + NavigableSet sorted = new TreeSet<>(new MdcAwareNodesComparator()); + sorted.addAll(filtered); + + Iterator iter = sorted.iterator(); while (iter.hasNext()) { TcpDiscoveryNode node = iter.next(); @@ -515,7 +518,7 @@ public void clear() { break; } - return iter.hasNext() ? iter.next() : F.first(filtered); + return iter.hasNext() ? iter.next() : F.first(sorted); } finally { rwLock.readLock().unlock(); @@ -541,10 +544,13 @@ public void clear() { if (filtered.size() < 2) return null; + NavigableSet sorted = new TreeSet<>(new MdcAwareNodesComparator()); + sorted.addAll(filtered); + TcpDiscoveryNode previous = null; // Get last node that is previous in a ring - for (TcpDiscoveryNode node : filtered) { + for (TcpDiscoveryNode node : sorted) { if (locNode.equals(node) && previous != null) break; @@ -569,11 +575,14 @@ public TcpDiscoveryNode previousNodeOf(TcpDiscoveryNode ringNode) { try { TcpDiscoveryNode prev = null; - for (TcpDiscoveryNode node : nodes) { + NavigableSet sorted = new TreeSet<>(new MdcAwareNodesComparator()); + sorted.addAll(nodes); + + for (TcpDiscoveryNode node : sorted) { if (node.equals(ringNode)) { if (prev == null) // ringNode is the first node, return last node in the ring. - return nodes.last(); + return sorted.last(); return prev; } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 5c624b4c6bba01..36540d8b7dfc15 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -23,7 +23,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; @@ -48,12 +47,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractTraceableM /** Pending messages from previous node. */ private Collection msgs; - /** Discarded message ID. */ - private IgniteUuid discardMsgId; - - /** Discarded message ID. */ - private IgniteUuid discardCustomMsgId; - /** Current topology. Initialized by coordinator. */ @GridToStringInclude private Collection top; @@ -99,8 +92,6 @@ public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { this.node = msg.node; this.msgs = msg.msgs; - this.discardMsgId = msg.discardMsgId; - this.discardCustomMsgId = msg.discardCustomMsgId; this.top = msg.top; this.clientTop = msg.clientTop; this.topHist = msg.topHist; @@ -126,39 +117,15 @@ public TcpDiscoveryNode node() { return msgs; } - /** - * Gets discarded message ID. - * - * @return Discarded message ID. - */ - @Nullable public IgniteUuid discardedMessageId() { - return discardMsgId; - } - - /** - * Gets discarded custom message ID. - * - * @return Discarded message ID. - */ - @Nullable public IgniteUuid discardedCustomMessageId() { - return discardCustomMsgId; - } - /** * Sets pending messages to send to new node. * * @param msgs Pending messages to send to new node. - * @param discardMsgId Discarded message ID. - * @param discardCustomMsgId Discarded custom message ID. */ public void messages( - @Nullable Collection msgs, - @Nullable IgniteUuid discardMsgId, - @Nullable IgniteUuid discardCustomMsgId + @Nullable Collection msgs ) { this.msgs = msgs; - this.discardMsgId = discardMsgId; - this.discardCustomMsgId = discardCustomMsgId; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java new file mode 100644 index 00000000000000..89439e4abcaf23 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeMdcTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** */ +public class CacheExchangeMergeMdcTest extends CacheExchangeMergeTest { + /** */ + protected static final String DC_ID_0 = "DC0"; + + /** */ + protected static final String DC_ID_1 = "DC1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + applyDC(); + + return cfg; + } + + /** */ + protected void applyDC() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + boolean mainDc = rnd.nextBoolean(); + + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, mainDc ? DC_ID_0 : DC_ID_1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRignTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRignTest.java new file mode 100644 index 00000000000000..f7160cf4200d67 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/MultiDataCenterRignTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.Ignition; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public class MultiDataCenterRignTest extends GridCommonAbstractTest { + /** */ + private static final String DC_ID_0 = "DC0"; + + /** */ + private static final String DC_ID_1 = "DC1"; + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + } + + /** */ + @Test + public void testRing() throws Exception { + int cnt = 10; + + generateRandomDcOrderCluster(cnt); + + assertEquals(cnt, grid(0).cluster().nodes().size()); + + checkHops(2); + + stopGrid(cnt - 1); + stopGrid(0); + + assertEquals(cnt - 2, grid(1).cluster().nodes().size()); + + checkHops(2); + } + + /** */ + @Test + public void testMessageOrder() throws Exception { + int cnt = 10; + + generateRandomDcOrderCluster(cnt); + + Collection nodes = G.allGrids(); + + CountDownLatch latch = new CountDownLatch(cnt); + List dcs = new ArrayList<>(); + + for (Ignite node : nodes) { + DiscoverySpi disco = node.configuration().getDiscoverySpi(); + + ((TcpDiscoverySpi)disco).addSendMessageListener(new IgniteInClosure<>() { + @Override public void apply(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + dcs.add(Ignition.localIgnite().cluster().localNode().dataCenterId()); + + latch.countDown(); + } + } + }); + } + + startGrid(cnt + 1); + + latch.await(); + + String curDc = null; + int hops = 0; + + for (String dcId : dcs) { + if (!dcId.equals(curDc)) { + hops++; + curDc = dcId; + } + } + + assertEquals(2, hops); + } + + /** */ + private void generateRandomDcOrderCluster(int cnt) throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < cnt; i++) { + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, rnd.nextBoolean() ? DC_ID_0 : DC_ID_1); + + startGrid(i); + } + + waitForTopology(cnt); + } + + /** */ + private void checkHops(int expected) { + Collection nodes = G.allGrids(); + + int hops = 0; + + for (Ignite node : nodes) { + DiscoverySpi disco = node.configuration().getDiscoverySpi(); + + ServerImpl serverImpl = U.field(disco, "impl"); + + String nextDcId = serverImpl.ring().nextNode().dataCenterId(); + String locDcId = node.cluster().localNode().dataCenterId(); + + if (!locDcId.equals(nextDcId)) + hops++; + } + + assertEquals(expected, hops); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java new file mode 100644 index 00000000000000..89ecb9bca94e46 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMdcSelfTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Test for {@link TcpDiscoverySpi} with Multi Data Centers. + */ +public class TcpDiscoveryMdcSelfTest extends TcpDiscoverySelfTest { + /** */ + private static final String DC_ID_0 = "DC0"; + + /** */ + private static final String DC_ID_1 = "DC1"; + + /** + * @throws Exception If fails. + */ + public TcpDiscoveryMdcSelfTest() throws Exception { + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + String prev = System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev == null ? DC_ID_0 : DC_ID_1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java new file mode 100644 index 00000000000000..3b525df7b60481 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcReversedTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.IgniteSystemProperties; + +/** + * + */ +public class TcpDiscoveryPendingMessageDeliveryMdcReversedTest extends TcpDiscoveryPendingMessageDeliveryMdcTest { + /** */ + @Override protected void applyDC() { + String prev = System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev == null ? DC_ID_1 : DC_ID_0); + } +} + diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java new file mode 100644 index 00000000000000..f4d53f2c85f069 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryPendingMessageDeliveryMdcTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * + */ +public class TcpDiscoveryPendingMessageDeliveryMdcTest extends TcpDiscoveryPendingMessageDeliveryTest { + /** */ + protected static final String DC_ID_0 = "DC0"; + + /** */ + protected static final String DC_ID_1 = "DC1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + applyDC(); + + return cfg; + } + + /** */ + protected void applyDC() { + String prev = System.getProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + + System.setProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID, prev == null ? DC_ID_0 : DC_ID_1); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + System.clearProperty(IgniteSystemProperties.IGNITE_DATA_CENTER_ID); + } +} + diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 3195284ba92621..3b729cb19ea778 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -311,11 +311,15 @@ public void testThreeNodesStartStop() throws Exception { assertNotNull(node); assertNotNull(node.lastSuccessfulAddress()); + assertTrue(spi2.pingNode(ignite3.localNode().id())); + node = (TcpDiscoveryNode)spi2.getNode(ignite3.localNode().id()); assertNotNull(node); assertNotNull(node.lastSuccessfulAddress()); + assertTrue(spi3.pingNode(ignite1.localNode().id())); + node = (TcpDiscoveryNode)spi3.getNode(ignite1.localNode().id()); assertNotNull(node); @@ -1901,11 +1905,7 @@ public void testFailedNodes5() throws Exception { spi.failSingleMsg = true; - long order = ignite.cluster().localNode().order(); - - long nextOrder = order == NODES ? 1 : order + 1; - - Ignite failingNode = nodes.get(nextOrder); + Ignite failingNode = nodes.get(((ServerImpl)spi.impl).ring().nextNode().order()); assertNotNull(failingNode); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java index d0b620794a3164..1f6bae5f4cdd3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerCoordinatorFailTest; import org.apache.ignite.internal.processors.cache.datastructures.IgniteExchangeLatchManagerDiscoHistoryTest; import org.apache.ignite.internal.processors.cache.distributed.CacheClientsConcurrentStartTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeMdcTest; import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest; import org.apache.ignite.internal.processors.cache.distributed.CacheParallelStartTest; import org.apache.ignite.internal.processors.cache.distributed.CachePartitionLossWithRestartsTest; @@ -100,6 +101,7 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, IgnitePessimisticTxSuspendResumeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheExchangeMergeMdcTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, OnePhaseCommitAndNodeLeftTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, PendingExchangeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, ExchangeMergeStaleServerNodesTest.class, ignoredTests); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 77eaf7e82472d8..ab62046aed090a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -36,6 +36,7 @@ import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownSslTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest; import org.apache.ignite.spi.discovery.tcp.IgniteMetricsOverflowTest; +import org.apache.ignite.spi.discovery.tcp.MultiDataCenterRignTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest; import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest; @@ -49,6 +50,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryFailedJoinTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderFailureTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMdcSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMetricsWarnLogTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNetworkIssuesTest; @@ -56,6 +58,8 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConfigConsistentIdSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeConsistentIdSelfTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeJoinAndFailureTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryMdcReversedTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryMdcTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryPendingMessageDeliveryTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryReconnectUnstableTopologyTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryRestartTest; @@ -186,7 +190,12 @@ TcpDiscoveryDeadNodeAddressResolvingTest.class, + // MDC. + TcpDiscoveryMdcSelfTest.class, + TcpDiscoveryPendingMessageDeliveryMdcTest.class, + TcpDiscoveryPendingMessageDeliveryMdcReversedTest.class, MultiDataCenterDeploymentTest.class, + MultiDataCenterRignTest.class, MultiDataCenterClientRoutingTest.class, IgniteDiscoveryMessageSerializationTest.class