diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 166004faa27e9..2fa493898a365 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -515,6 +515,10 @@ public final class IgniteSystemProperties { public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE = "IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE"; + /** Logging a warning message when metrics quantity exceeded a specified number. */ + public static final String IGNITE_DISCOVERY_METRICS_QNT_WARN = + "IGNITE_DISCOVERY_METRICS_QNT_WARN"; + /** Time interval that indicates that client reconnect throttle must be reset to zero. 2 minutes by default. */ public static final String CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL = "CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL"; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 8d7e2a4b63ba1..49d90edddc6b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -974,6 +974,34 @@ private NavigableSet allVisibleNodes() { return res; } + /** {@inheritDoc} */ + @Override public void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tsNanos) + { + boolean isLocDaemon = spi.locNode.isDaemon(); + + assert nodeId != null; + assert metrics != null; + assert isLocDaemon || cacheMetrics != null; + + TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); + + if (node != null && node.visible()) { + node.setMetrics(metrics); + + if (!isLocDaemon) + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTimeNanos(tsNanos); + + msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null); + } + else if (log.isDebugEnabled()) + log.debug("Received metrics from unknown node: " + nodeId); + } + /** * FOR TEST PURPOSE ONLY! */ @@ -2439,23 +2467,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { log.debug("Received metrics response: " + msg); } else { - long tsNanos = System.nanoTime(); - - if (msg.hasMetrics()) { - for (Map.Entry e : msg.metrics().entrySet()) { - UUID nodeId = e.getKey(); - - TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); - - Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? - msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); - - updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos); - - for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos); - } - } + if (msg.hasMetrics()) + processMsgCacheMetrics(msg, System.nanoTime()); } } @@ -2555,39 +2568,6 @@ private void processPingRequest() { sockWriter.sendMessage(res); } - /** - * @param nodeId Node ID. - * @param metrics Metrics. - * @param cacheMetrics Cache metrics. - * @param tsNanos Timestamp as returned by {@link System#nanoTime()}. - */ - private void updateMetrics(UUID nodeId, - ClusterMetrics metrics, - Map cacheMetrics, - long tsNanos) - { - boolean isLocDaemon = spi.locNode.isDaemon(); - - assert nodeId != null; - assert metrics != null; - assert isLocDaemon || cacheMetrics != null; - - TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); - - if (node != null && node.visible()) { - node.setMetrics(metrics); - - if (!isLocDaemon) - node.setCacheMetrics(cacheMetrics); - - node.lastUpdateTimeNanos(tsNanos); - - notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes()); - } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); - } - /** * @param type Event type. * @param topVer Topology version. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b7aeb0ccbe7ef..61beb9498f867 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1853,6 +1853,29 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) { return threads; } + /** {@inheritDoc} */ + @Override public void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tsNanos) + { + assert nodeId != null; + assert metrics != null; + + TcpDiscoveryNode node = ring.node(nodeId); + + if (node != null) { + node.setMetrics(metrics); + node.setCacheMetrics(cacheMetrics); + + node.lastUpdateTimeNanos(tsNanos); + + notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); + } + else if (log.isDebugEnabled()) + log.debug("Received metrics from unknown node: " + nodeId); + } + /** * FOR TEST ONLY!!! *

@@ -5568,23 +5591,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { long tsNanos = System.nanoTime(); - if (spiStateCopy() == CONNECTED) { - if (msg.hasMetrics()) { - for (Map.Entry e : msg.metrics().entrySet()) { - UUID nodeId = e.getKey(); - - TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); - - Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? - msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); - - updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos); - - for (T2 t : metricsSet.clientMetrics()) - updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos); - } - } - } + if (spiStateCopy() == CONNECTED && msg.hasMetrics()) + processMsgCacheMetrics(msg, tsNanos); if (sendMessageToRemotes(msg)) { if (laps == 0 && spiStateCopy() == CONNECTED) { @@ -5653,34 +5661,6 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) { } } - /** - * @param nodeId Node ID. - * @param metrics Metrics. - * @param cacheMetrics Cache metrics. - * @param tsNanos Timestamp as returned by {@link System#nanoTime()}. - */ - private void updateMetrics(UUID nodeId, - ClusterMetrics metrics, - Map cacheMetrics, - long tsNanos) - { - assert nodeId != null; - assert metrics != null; - - TcpDiscoveryNode node = ring.node(nodeId); - - if (node != null) { - node.setMetrics(metrics); - node.setCacheMetrics(cacheMetrics); - - node.lastUpdateTimeNanos(tsNanos); - - notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node); - } - else if (log.isDebugEnabled()) - log.debug("Received metrics from unknown node: " + nodeId); - } - /** * Processes discard message and discards previously registered pending messages. * diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java index 4c5183930d5b9..2d1a28e5bcb09 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java @@ -24,13 +24,17 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedDeque; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteFeatures; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; @@ -40,9 +44,13 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN; +import static org.apache.ignite.IgniteSystemProperties.getInteger; + /** * */ @@ -59,6 +67,9 @@ abstract class TcpDiscoveryImpl { /** Response join impossible. */ protected static final int RES_JOIN_IMPOSSIBLE = 255; + /** How often the warning message should occur in logs to prevent log spam. */ + public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L; + /** */ protected final TcpDiscoverySpi spi; @@ -78,6 +89,12 @@ abstract class TcpDiscoveryImpl { @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") protected ConcurrentLinkedDeque debugLogQ; + /** Logging a warning message when metrics quantity exceeded a specified number. */ + protected int METRICS_QNT_WARN = getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, 500); + + /** */ + protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis(); + /** */ protected final ServerImpl.DebugLogger debugLog = new DebugLogger() { /** {@inheritDoc} */ @@ -347,6 +364,17 @@ protected static String threadStatus(Thread t) { */ protected abstract Collection threads(); + /** + * @param nodeId Node ID. + * @param metrics Metrics. + * @param cacheMetrics Cache metrics. + * @param tsNanos Timestamp as returned by {@link System#nanoTime()}. + */ + public abstract void updateMetrics(UUID nodeId, + ClusterMetrics metrics, + Map cacheMetrics, + long tsNanos); + /** * @throws IgniteSpiException If failed. */ @@ -409,6 +437,32 @@ protected boolean checkAckTimeout(long ackTimeout) { return true; } + /** */ + public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) { + for (Map.Entry e : msg.metrics().entrySet()) { + UUID nodeId = e.getKey(); + + TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue(); + + Map cacheMetrics = msg.hasCacheMetrics(nodeId) ? + msg.cacheMetrics().get(nodeId) : Collections.emptyMap(); + + if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis() + && cacheMetrics.size() >= METRICS_QNT_WARN) + { + log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" + + "To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option."); + + endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT; + } + + updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos); + + for (T2 t : metricsSet.clientMetrics()) + updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos); + } + } + /** * @param addrs Addresses. */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java new file mode 100644 index 0000000000000..f6e2b4343bfc5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMetricsWarnLogTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN; + +/** + * Class for testing warning log message about too many cache metrics. + */ +public class TcpDiscoveryMetricsWarnLogTest extends GridCommonAbstractTest { + /** Listener log messages. */ + private static ListeningTestLogger testLog; + + /** Desired message. */ + public static final String LOG_MSG = "To prevent Discovery blocking use"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + testLog = new ListeningTestLogger(false, log); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + testLog.clearListeners(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setMetricsUpdateFrequency(500L) + .setGridLogger(testLog); + } + + /** + * Test checks that the desired message occurs in logs. + * + * @throws Exception If any error occurs. + */ + public void testMetricsWarningLog() throws Exception { + withSystemProperty(IGNITE_DISCOVERY_METRICS_QNT_WARN, "20"); + + testLog.warning("IGNITE_DISCOVERY_METRICS_QNT_WARN = " + + System.getProperty(IGNITE_DISCOVERY_METRICS_QNT_WARN)); + + LogListener logLsnr0 = LogListener.matches(LOG_MSG) + .andMatches("TcpDiscoveryMetricsWarnLogTest0") + .atLeast(1) + .build(); + + LogListener logLsnr1 = LogListener.matches(LOG_MSG) + .andMatches("TcpDiscoveryMetricsWarnLogTest1") + .atLeast(1) + .build(); + + LogListener logLsnr2 = LogListener.matches(LOG_MSG) + .andMatches("TcpDiscoveryMetricsWarnLogTest2") + .atLeast(1) + .build(); + + testLog.registerListener(logLsnr0); + testLog.registerListener(logLsnr1); + testLog.registerListener(logLsnr2); + + Ignite ignite0 = startGrid(0); + + startGrid(1); + + startClientGrid(2); + + for (int i = 1; i <= 30; i++) + createAndFillCache(i, ignite0); + + awaitMetricsUpdate(3); + + assertTrue(logLsnr0.check()); + assertTrue(logLsnr1.check()); + assertTrue(logLsnr2.check()); + } + + public void testMetricsWarningLog0() throws Exception { + withSystemProperty(IGNITE_DISCOVERY_METRICS_QNT_WARN, "0"); + + testMetricsWarningLog(); + } + + /** + * Creates and fills cahes with test data. + * + * @param cacheNum Cache number to generate a cache name. + * @param ignite Ignite instance to create a cache in. + */ + private void createAndFillCache(int cacheNum, Ignite ignite) { + IgniteCache cache = ignite.getOrCreateCache( + new CacheConfiguration<>(DEFAULT_CACHE_NAME + cacheNum).setStatisticsEnabled(true) + ); + + for (int i = 1; i < 100; i++) + cache.put(i, i); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 484a63eb843a5..cbd64d01d8f96 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -44,6 +44,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryCoordinatorFailureTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryIpFinderCleanerTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMarshallerCheckSelfTest; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMetricsWarnLogTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryMultiThreadedTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNetworkIssuesTest; import org.apache.ignite.spi.discovery.tcp.TcpDiscoveryNodeAttributesUpdateOnReconnectTest; @@ -118,6 +119,7 @@ public static TestSuite suite() throws Exception { suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class)); suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class)); suite.addTest(new TestSuite(TcpClientDiscoveryUnresolvedHostTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryMetricsWarnLogTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class)); suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));