Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .build/build-bench.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<jvmarg line="${java-jvmargs}"/>
<jvmarg line="${_std-test-jvmargs}"/>
<jvmarg line="${test.jvm.args}"/>
<jvmarg line="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints"/>

<!-- total memory must fit within the pod constraints, see comments in .jenkins/Jenkinsfile and dind's container resourceRequestMemory in .jenkins/k8s/jenkins-deployment.yaml -->
<!-- note! this is used for both the JMH runner and VMH fork -->
Expand Down
90 changes: 90 additions & 0 deletions src/java/org/apache/cassandra/concurrent/CassandraThread.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import org.apache.cassandra.metrics.ThreadLocalMetrics;

import io.netty.util.concurrent.FastThreadLocalThread;

public class CassandraThread extends FastThreadLocalThread
{
private ThreadLocalMetrics threadLocalMetrics;
private ExecutorLocals executorLocals;

public CassandraThread(ThreadGroup group, Runnable target, String name)
{
super(group, target, name);
}

public CassandraThread()
{
super();
}

public CassandraThread(Runnable target)
{
super(target);
}

public ThreadLocalMetrics getThreadLocalMetrics()
{
ThreadLocalMetrics current = threadLocalMetrics;
if (current != null)
return current;

threadLocalMetrics = ThreadLocalMetrics.create();
return threadLocalMetrics;
}

public ExecutorLocals getExecutorLocals()
{
ExecutorLocals current = executorLocals;
if (current != null)
return current;

executorLocals = ExecutorLocals.none();
return executorLocals;
}

public void setExecutorLocals(ExecutorLocals executorLocals)
{
this.executorLocals = executorLocals;
}

public ExecutorLocals replaceExecutorLocals(ExecutorLocals newExecutorLocals)
{
ExecutorLocals current = executorLocals;
executorLocals = newExecutorLocals;
return current != null ? current : ExecutorLocals.none();
}

// final to avoid skipping of the cleanup logic in child classes
final public void run()
{
try
{
super.run();
}
finally
{
if (threadLocalMetrics != null)
threadLocalMetrics.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import io.netty.util.concurrent.DefaultThreadFactory;

public class CassandraThreadFactory extends DefaultThreadFactory
{
public CassandraThreadFactory(String poolName, boolean daemon)
{
super(poolName, daemon);
}

protected Thread newThread(Runnable r, String name)
{
return new CassandraThread(this.threadGroup, r, name);
}
}
52 changes: 41 additions & 11 deletions src/java/org/apache/cassandra/concurrent/ExecutorLocals.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public static class Impl
@SuppressWarnings("resource")
protected static void set(TraceState traceState, ClientWarn.State clientWarnState, boolean eligibleForArtificialLatency)
{
if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) locals.set(none);
else locals.set(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
if (traceState == null && clientWarnState == null && !eligibleForArtificialLatency) setLocal(none);
else setLocal(new ExecutorLocals(traceState, clientWarnState, eligibleForArtificialLatency));
}
}

Expand All @@ -64,12 +64,46 @@ protected ExecutorLocals(TraceState traceState, ClientWarn.State clientWarnState
this.eligibleForArtificialLatency = eligibleForArtificialLatency;
}

private static void setLocal(ExecutorLocals executorLocals)
{
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
((CassandraThread) currentThread).setExecutorLocals(executorLocals);
else
locals.set(executorLocals);
}

private ExecutorLocals replace()
{
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
{
return ((CassandraThread) currentThread).replaceExecutorLocals(this);
}
else
{
ExecutorLocals old = locals.get();
if (old != this)
locals.set(this);
return old;
}
}

/**
* @return an ExecutorLocals object which has the current trace state and client warn state.
*/
public static ExecutorLocals current()
{
return locals.get();
Thread currentThread = Thread.currentThread();
if (currentThread instanceof CassandraThread)
return ((CassandraThread) currentThread).getExecutorLocals();
else
return locals.get();
}

public static ExecutorLocals none()
{
return none;
}

/**
Expand All @@ -84,29 +118,25 @@ public static WithResources propagate()

public static ExecutorLocals create(TraceState traceState)
{
ExecutorLocals current = locals.get();
ExecutorLocals current = current();
return current.traceState == traceState ? current : new ExecutorLocals(traceState, current.clientWarnState, current.eligibleForArtificialLatency);
}

public static void clear()
{
locals.set(none);
setLocal(none);
}

/**
* Overwrite current locals, and return the previous ones
*/
public ExecutorLocals get()
{
// TODO (desired): add compareAndSet to save one thread local round trip
ExecutorLocals old = current();
if (old != this)
locals.set(this);
return old;
return replace();
}

public void close()
{
locals.set(this);
setLocal(this);
}
}
20 changes: 2 additions & 18 deletions src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.utils.JVMStabilityInspector;

import io.netty.util.concurrent.FastThreadLocalThread;

/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
Expand Down Expand Up @@ -169,12 +167,12 @@ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, St
if (PRESERVE_THREAD_CREATION_STACKTRACE)
thread = new InspectableFastThreadLocalThread(threadGroup, runnable, threadName);
else
thread = new FastThreadLocalThread(threadGroup, runnable, threadName);
thread = new CassandraThread(threadGroup, runnable, threadName);
thread.setDaemon(daemon);
return thread;
}

public static class InspectableFastThreadLocalThread extends FastThreadLocalThread
public static class InspectableFastThreadLocalThread extends CassandraThread
{
public StackTraceElement[] creationTrace;

Expand All @@ -184,22 +182,8 @@ private void setStack()
creationTrace = Arrays.copyOfRange(creationTrace, 2, creationTrace.length);
}

public InspectableFastThreadLocalThread() { super(); setStack(); }
Comment thread
netudima marked this conversation as resolved.

public InspectableFastThreadLocalThread(Runnable target) { super(target); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target) { super(group, target); setStack(); }

public InspectableFastThreadLocalThread(String name) { super(name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, String name) { super(group, name); setStack(); }

public InspectableFastThreadLocalThread(Runnable target, String name) { super(target, name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name) { super(group, target, name); setStack(); }

public InspectableFastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { super(group, target, name, stackSize); setStack(); }

}
public static <T extends Thread> T setupThread(T thread, int priority, ClassLoader contextClassLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
{
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

import org.apache.cassandra.utils.JVMStabilityInspector;

import io.netty.util.concurrent.FastThreadLocalThread;

import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.RETURNED_WORK_PERMIT;
import static org.apache.cassandra.concurrent.SEPExecutor.TakeTaskPermitResult.TOOK_PERMIT;
import static org.apache.cassandra.config.CassandraRelevantProperties.SET_SEP_THREAD_NAME;
Expand Down Expand Up @@ -60,7 +58,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
this.pool = pool;
this.workerId = workerId;
this.workerIdThreadSuffix = '-' + workerId.toString();
thread = new FastThreadLocalThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
thread = new CassandraThread(threadGroup, this, threadGroup.getName() + "-Worker-" + workerId);
thread.setDaemon(true);
set(initialState);
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.google.common.base.Throwables;

import org.apache.cassandra.concurrent.CassandraThread;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.SerializationHeader;
Expand All @@ -40,8 +41,6 @@
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;

import io.netty.util.concurrent.FastThreadLocalThread;

import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQueue;

/**
Expand All @@ -68,6 +67,8 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter

private final BlockingQueue<Buffer> writeQueue = newBlockingQueue(0);
private final DiskWriter diskWriter = new DiskWriter();
private final Thread diskWriterThread = new CassandraThread(diskWriter);


public SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, RegularAndStaticColumns columns, long maxSSTableSizeInMiB)
{
Expand All @@ -80,7 +81,7 @@ public SSTableSimpleUnsortedWriter(File directory, TableMetadataRef metadata, Re
this.maxSStableSizeInBytes = maxSSTableSizeInMiB * 1024L * 1024L;
this.header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS);
this.helper = new SerializationHelper(this.header);
diskWriter.start();
diskWriterThread.start();
this.owner = owner;
}

Expand Down Expand Up @@ -146,7 +147,7 @@ public void close() throws IOException
put(SENTINEL);
try
{
diskWriter.join();
diskWriterThread.join();
checkForWriterException();
}
catch (Throwable e)
Expand Down Expand Up @@ -210,7 +211,7 @@ public static class SyncException extends RuntimeException
//// typedef
static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate.Builder> {}

private class DiskWriter extends FastThreadLocalThread
private class DiskWriter implements Runnable
{
volatile Throwable exception = null;

Expand Down
Loading