From e0776b639b078f9763556cadc9d26c3ba8fe8d87 Mon Sep 17 00:00:00 2001 From: Dmitry Konstantinov Date: Tue, 18 Nov 2025 15:10:51 +0000 Subject: [PATCH] Implement custom CassandraThread to keep direct references to frequently used thread local objects patch by Dmitry Konstantinov; reviewed by Benedict Elliott Smith, Stefan Miklosovic for CASSANDRA-21020 --- .build/build-bench.xml | 1 + .../cassandra/concurrent/CassandraThread.java | 90 +++++++++++++++++++ .../concurrent/CassandraThreadFactory.java | 34 +++++++ .../cassandra/concurrent/ExecutorLocals.java | 52 ++++++++--- .../concurrent/NamedThreadFactory.java | 20 +---- .../cassandra/concurrent/SEPWorker.java | 4 +- .../sstable/SSTableSimpleUnsortedWriter.java | 11 +-- .../cassandra/metrics/ThreadLocalMetrics.java | 22 +++-- .../db/commitlog/CommitLogStressTest.java | 38 +++++--- .../microbench/CassandraThreadLocalBench.java | 70 +++++++++++++++ .../test/microbench/FastThreadExecutor.java | 4 +- .../microbench/ThreadLocalMetricsBench.java | 22 ++--- 12 files changed, 301 insertions(+), 67 deletions(-) create mode 100644 src/java/org/apache/cassandra/concurrent/CassandraThread.java create mode 100644 src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java create mode 100644 test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java diff --git a/.build/build-bench.xml b/.build/build-bench.xml index a0715b9dc40a..d0bae9e9cb18 100644 --- a/.build/build-bench.xml +++ b/.build/build-bench.xml @@ -95,6 +95,7 @@ + diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThread.java b/src/java/org/apache/cassandra/concurrent/CassandraThread.java new file mode 100644 index 000000000000..5afec8815b5a --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/CassandraThread.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.metrics.ThreadLocalMetrics; + +import io.netty.util.concurrent.FastThreadLocalThread; + +public class CassandraThread extends FastThreadLocalThread +{ + private ThreadLocalMetrics threadLocalMetrics; + private ExecutorLocals executorLocals; + + public CassandraThread(ThreadGroup group, Runnable target, String name) + { + super(group, target, name); + } + + public CassandraThread() + { + super(); + } + + public CassandraThread(Runnable target) + { + super(target); + } + + public ThreadLocalMetrics getThreadLocalMetrics() + { + ThreadLocalMetrics current = threadLocalMetrics; + if (current != null) + return current; + + threadLocalMetrics = ThreadLocalMetrics.create(); + return threadLocalMetrics; + } + + public ExecutorLocals getExecutorLocals() + { + ExecutorLocals current = executorLocals; + if (current != null) + return current; + + executorLocals = ExecutorLocals.none(); + return executorLocals; + } + + public void setExecutorLocals(ExecutorLocals executorLocals) + { + this.executorLocals = executorLocals; + } + + public ExecutorLocals replaceExecutorLocals(ExecutorLocals newExecutorLocals) + { + ExecutorLocals current = executorLocals; + executorLocals = newExecutorLocals; + return current != null ? current : ExecutorLocals.none(); + } + + // final to avoid skipping of the cleanup logic in child classes + final public void run() + { + try + { + super.run(); + } + finally + { + if (threadLocalMetrics != null) + threadLocalMetrics.release(); + } + } +} diff --git a/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java new file mode 100644 index 000000000000..d7fd907bc0f4 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/CassandraThreadFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import io.netty.util.concurrent.DefaultThreadFactory; + +public class CassandraThreadFactory extends DefaultThreadFactory +{ + public CassandraThreadFactory(String poolName, boolean daemon) + { + super(poolName, daemon); + } + + protected Thread newThread(Runnable r, String name) + { + return new CassandraThread(this.threadGroup, r, name); + } +} diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java index 55c6ee24642a..b4b7f094cc0d 100644 --- a/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java +++ b/src/java/org/apache/cassandra/concurrent/ExecutorLocals.java @@ -48,8 +48,8 @@ public static class Impl @SuppressWarnings("resource") protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency) { - if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none); - else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency)); + if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) setLocal(none); + else setLocal(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency)); } } @@ -64,12 +64,46 @@ protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState this.eligibleForArtificialLatency = eligibleForArtificialLatency; } + private static void setLocal(ExecutorLocals executorLocals) + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + ((CassandraThread) currentThread).setExecutorLocals(executorLocals); + else + locals.set(executorLocals); + } + + private ExecutorLocals replace() + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + { + return ((CassandraThread) currentThread).replaceExecutorLocals(this); + } + else + { + ExecutorLocals old = locals.get(); + if (old != this) + locals.set(this); + return old; + } + } + /** * @return an ExecutorLocals object which has the current trace state and client warn state. */ public static ExecutorLocals current() { - return locals.get(); + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + return ((CassandraThread) currentThread).getExecutorLocals(); + else + return locals.get(); + } + + public static ExecutorLocals none() + { + return none; } /** @@ -84,13 +118,13 @@ public static WithResources propagate() public static ExecutorLocals create(TraceState traceState) { - ExecutorLocals current = locals.get(); + ExecutorLocals current = current(); return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency); } public static void clear() { - locals.set(none); + setLocal(none); } /** @@ -98,15 +132,11 @@ public static void clear() */ public ExecutorLocals get() { - // TODO (desired): add compareAndSet to save one thread local round trip - ExecutorLocals old = current(); - if (old != this) - locals.set(this); - return old; + return replace(); } public void close() { - locals.set(this); + setLocal(this); } } diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index de43f33e7ebe..e2b2c970d1e5 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -26,8 +26,6 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.utils.JVMStabilityInspector; -import io.netty.util.concurrent.FastThreadLocalThread; - /** * This class is an implementation of the ThreadFactory interface. This * is useful to give Java threads meaningful names which is useful when using @@ -169,12 +167,12 @@ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, St if (PRESERVE_THREAD_CREATION_STACKTRACE) thread = new InspectableFastThreadLocalThread(threadGroup, runnable, threadName); else - thread = new FastThreadLocalThread(threadGroup, runnable, threadName); + thread = new CassandraThread(threadGroup, runnable, threadName); thread.setDaemon(daemon); return thread; } - public static class InspectableFastThreadLocalThread extends FastThreadLocalThread + public static class InspectableFastThreadLocalThread extends CassandraThread { public StackTraceElement[] creationTrace; @@ -184,22 +182,8 @@ private void setStack() creationTrace = Arrays.copyOfRange(creationTrace, 2, creationTrace.length); } - public InspectableFastThreadLocalThread() { super(); setStack(); } - - public InspectableFastThreadLocalThread(Runnable target) { super(target); setStack(); } - - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); setStack(); } - - public InspectableFastThreadLocalThread(String name) { super(name); setStack(); } - - public InspectableFastThreadLocalThread(ThreadGroup group, String name) { super(group, name); setStack(); } - - public InspectableFastThreadLocalThread(Runnable target, String name) { super(target, name); setStack(); } - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); setStack(); } - public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setStack(); } - } public static T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java index 8f1a5e9aa285..8eb34303e27d 100644 --- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java +++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java @@ -27,8 +27,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.RETURNED_WORK_PERMIT; import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.TOOK_PERMIT; import static org.apache.cassandra.config.CassandraRelevantProperties.SET_SEP_THREAD_NAME; @@ -60,7 +58,7 @@ final class SEPWorker extends AtomicReference implements Runnabl this.pool = pool; this.workerId = workerId; this.workerIdThreadSuffix = '-' + workerId.toString(); - thread = new FastThreadLocalThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); + thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId); thread.setDaemon(true); set(initialState); thread.start(); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 8c9b2f979621..d1c6a5fbc0d9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -26,6 +26,7 @@ import com.google.common.base.Throwables; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SerializationHeader; @@ -40,8 +41,6 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; /** @@ -68,6 +67,8 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private final BlockingQueue writeQueue = newBlockingQueue(0); private final DiskWriter diskWriter = new DiskWriter(); + private final Thread diskWriterThread = new CassandraThread(diskWriter); + public SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB) { @@ -80,7 +81,7 @@ public SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, Re this.maxSStableSizeInBytes = maxSSTableSizeInMiB * 1024L * 1024L; this.header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS); this.helper = new SerializationHelper(this.header); - diskWriter.start(); + diskWriterThread.start(); this.owner = owner; } @@ -146,7 +147,7 @@ public void close() throws IOException put(SENTINEL); try { - diskWriter.join(); + diskWriterThread.join(); checkForWriterException(); } catch (Throwable e) @@ -210,7 +211,7 @@ public static class SyncException extends RuntimeException //// typedef static class Buffer extends TreeMap {} - private class DiskWriter extends FastThreadLocalThread + private class DiskWriter implements Runnable { volatile Throwable exception = null; diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java index 0135522ac051..feb96c02bdba 100644 --- a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java @@ -36,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Shutdownable; @@ -84,10 +85,7 @@ public class ThreadLocalMetrics @Override protected ThreadLocalMetrics initialValue() { - ThreadLocalMetrics result = new ThreadLocalMetrics(); - allThreadLocalMetrics.add(result); - destroyWhenUnreachable(Thread.currentThread(), result::release); - return result; + return create(); } // this method is invoked when a thread is going to finish, but it works only for FastThreadLocalThread @@ -99,6 +97,14 @@ protected void onRemoval(ThreadLocalMetrics value) } }; + public static ThreadLocalMetrics create() + { + ThreadLocalMetrics result = new ThreadLocalMetrics(); + allThreadLocalMetrics.add(result); + destroyWhenUnreachable(Thread.currentThread(), result::release); + return result; + } + private static volatile AtomicLongArray summaryValues = new AtomicLongArray(INITIAL_COUNTERS_CAPACITY); private static final Shutdownable cleaner; @@ -182,7 +188,7 @@ public static void shutdownCleaner(long timeout, TimeUnit unit) throws Interrupt shutdownAndWait(timeout, unit, of(cleaner)); } - private void release() + public void release() { // Using this lock while moving we want to avoid races with readers in getCount // such races can cause a transfered value lost or its double-counting by a reader @@ -274,7 +280,11 @@ public static long getCountAndReset(int metricId) return getCount(metricId, true); } - public static ThreadLocalMetrics get() { + public static ThreadLocalMetrics get() + { + Thread currentThread = Thread.currentThread(); + if (currentThread instanceof CassandraThread) + return ((CassandraThread)currentThread).getThreadLocalMetrics(); return threadLocalMetricsCurrent.get(); } diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index 5d1c005153b9..8db7a96eed9e 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -50,6 +50,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.UpdateBuilder; import org.apache.cassandra.Util; +import org.apache.cassandra.concurrent.CassandraThread; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; @@ -70,8 +71,6 @@ import org.apache.cassandra.security.EncryptionContext; import org.apache.cassandra.security.EncryptionContextGenerator; -import io.netty.util.concurrent.FastThreadLocalThread; - import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; @Ignore @@ -237,8 +236,8 @@ private void testLog(CommitLog commitLog) throws IOException, InterruptedExcepti for (CommitlogThread t: threads) { t.join(); - if (t.clsp.compareTo(discardedPos) > 0) - discardedPos = t.clsp; + if (t.runnable.clsp.compareTo(discardedPos) > 0) + discardedPos = t.runnable.clsp; } verifySizes(commitLog); @@ -262,8 +261,8 @@ private void testLog(CommitLog commitLog) throws IOException, InterruptedExcepti for (CommitlogThread t: threads) { t.join(); - hash += t.hash; - cells += t.cells; + hash += t.runnable.hash; + cells += t.runnable.cells; } verifySizes(commitLog); @@ -337,7 +336,7 @@ private ScheduledExecutorService startThreads(final CommitLog commitLog, final L { stop = false; for (int ii = 0; ii < NUM_THREADS; ii++) { - final CommitlogThread t = new CommitlogThread(commitLog, new Random(ii)); + final CommitlogThread t = buildCommitlogThread(commitLog, new Random(ii)); threads.add(t); t.start(); } @@ -357,8 +356,8 @@ public void run() long sz = 0; for (CommitlogThread clt : threads) { - temp += clt.counter.get(); - sz += clt.dataSize; + temp += clt.runnable.counter.get(); + sz += clt.runnable.dataSize; } double time = (currentTimeMillis() - start) / 1000.0; double avg = (temp / time); @@ -403,7 +402,22 @@ private static ByteBuffer randomBytes(int quantity, Random tlr) return slice; } - public class CommitlogThread extends FastThreadLocalThread + public class CommitlogThread extends CassandraThread + { + final CommitlogRunnable runnable; + CommitlogThread(CommitlogRunnable runnable) + { + super(runnable); + this.runnable = runnable; + } + } + + public CommitlogThread buildCommitlogThread(CommitLog commitLog, Random rand) + { + return new CommitlogThread(new CommitlogRunnable(commitLog, rand)); + } + + public class CommitlogRunnable implements Runnable { final AtomicLong counter = new AtomicLong(); int hash = 0; @@ -415,10 +429,10 @@ public class CommitlogThread extends FastThreadLocalThread volatile CommitLogPosition clsp; - CommitlogThread(CommitLog commitLog, Random rand) + public CommitlogRunnable(CommitLog commitLog, Random random) { this.commitLog = commitLog; - this.random = rand; + this.random = random; } public void run() diff --git a/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java new file mode 100644 index 000000000000..d93e4e1d2bcf --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/CassandraThreadLocalBench.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import org.apache.cassandra.metrics.ThreadLocalMetrics; + +import io.netty.util.concurrent.FastThreadLocal; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 30, timeUnit = TimeUnit.SECONDS) +@Fork(value = 2, + jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) +@State(Scope.Benchmark) +public class CassandraThreadLocalBench +{ + private static final FastThreadLocal threadLocalMetricsCurrent = new FastThreadLocal<>() + { + + @Override + protected ThreadLocalMetrics initialValue() + { + return ThreadLocalMetrics.create(); + } + }; + + @Benchmark + public void netty() + { + threadLocalMetricsCurrent.get(); + } + + @Benchmark + public void cassandra() + { + ThreadLocalMetrics.get(); + } + +} diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java index 03ea710abcea..58f753924f30 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java +++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java @@ -20,7 +20,7 @@ import java.util.concurrent.ThreadPoolExecutor; -import io.netty.util.concurrent.DefaultThreadFactory; +import org.apache.cassandra.concurrent.CassandraThreadFactory; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue; @@ -35,6 +35,6 @@ public class FastThreadExecutor extends ThreadPoolExecutor { public FastThreadExecutor(int size, String name) { - super(size, size, 10, SECONDS, newBlockingQueue(), new DefaultThreadFactory(name, true)); + super(size, size, 10, SECONDS, newBlockingQueue(), new CassandraThreadFactory(name, true)); } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java index 8a514b554b77..a4d6c13b29e9 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java +++ b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java @@ -18,8 +18,6 @@ package org.apache.cassandra.test.microbench; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.LongAdder; @@ -44,26 +42,29 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.NANOSECONDS) @Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 2, time = 60, timeUnit = TimeUnit.SECONDS) @Fork(value = 2, jvmArgsAppend = { "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) @Threads(4) @State(Scope.Benchmark) public class ThreadLocalMetricsBench { - @Param({"LongAdder", "PlainArray"}) + @Param({"LongAdder", "ThreadLocalCounter"}) private String type; + @Param({"true", "false"}) + private boolean polluteCpuCaches; + @Param({"50", "100"}) private int metricsCount; - private List counters; + private Counter[] counters; @Setup(Level.Trial) public void setup() throws Throwable { - counters = new ArrayList<>(metricsCount); + counters = new Counter[metricsCount]; for (int i = 0; i < metricsCount; i++) { Counter counter; @@ -72,13 +73,13 @@ public void setup() throws Throwable case "LongAdder": counter = new LongAdderCounter(); break; - case "PlainArray": + case "ThreadLocalCounter": counter = new ThreadLocalCounter(); break; default: throw new UnsupportedOperationException(); } - counters.add(counter); + counters[i] = counter; } } @@ -87,8 +88,9 @@ public void setup() throws Throwable @Setup(Level.Invocation) public void polluteCpuCaches() { - for (int i = 0; i < anotherMemory.length(); i++) - anotherMemory.incrementAndGet(i); + if (polluteCpuCaches) + for (int i = 0; i < anotherMemory.length(); i++) + anotherMemory.incrementAndGet(i); } @Benchmark