Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,7 @@ public class AuronCallNativeWrapper {
private Schema arrowSchema;
private long nativeRuntimePtr;
private Consumer<VectorSchemaRoot> batchConsumer;

// initialize native environment
static {
LOG.info("Initializing native environment (batchSize="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.BATCH_SIZE) + ", "
+ "memoryFraction="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.MEMORY_FRACTION) + ")");

// arrow configuration
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND");

// preload JNI bridge classes
try {
Class.forName("org.apache.auron.jni.JniBridge");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load JniBridge class", e);
}

AuronAdaptor.getInstance().loadAuronLib();
Runtime.getRuntime().addShutdownHook(new Thread(JniBridge::onExit));
}
private static volatile boolean initialized = false;

public AuronCallNativeWrapper(
BufferAllocator arrowAllocator,
Expand All @@ -90,13 +70,45 @@ public AuronCallNativeWrapper(
this.stageId = stageId;
this.taskId = taskId;

init();
LOG.warn("Start executing native plan");
this.nativeRuntimePtr = JniBridge.callNative(
nativeMemory,
AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.NATIVE_LOG_LEVEL),
this);
}

private static void init() {
if (!initialized) {
synchronized (AuronCallNativeWrapper.class) {
if (!initialized) {
// initialize native environment
LOG.info("Initializing native environment (batchSize="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.BATCH_SIZE)
+ ", "
+ "memoryFraction="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.MEMORY_FRACTION)
Comment on lines +81 to +90
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init() can run while a Spark task thread is already interrupted/cancelled (the production issue shows ClosedByInterruptException). Even though initialization is now retriable, attempting Files.copy/System.load from an interrupted task is still likely to fail and generate noisy errors. Consider short-circuiting initialization when the current task is not running (e.g., !AuronAdaptor.getInstance().isTaskRunning() or Thread.currentThread().isInterrupted()), and fail fast so a later non-cancelled task can perform the one-time init successfully.

Copilot uses AI. Check for mistakes.
+ ")");

// arrow configuration
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND");

// preload JNI bridge classes
try {
Class.forName("org.apache.auron.jni.JniBridge");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load JniBridge class", e);
}

AuronAdaptor.getInstance().loadAuronLib();
Runtime.getRuntime().addShutdownHook(new Thread(JniBridge::onExit));

initialized = true;
Comment on lines +85 to +106
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because initialization is now retried after failures, AuronAdaptor.getInstance().loadAuronLib() may be invoked multiple times in the same JVM. In the Spark adaptor, each call extracts to a fresh createTempFile(...).deleteOnExit(), so repeated failures can accumulate temp files until executor exit. Consider making loadAuronLib() idempotent / cached (or cleaning up temp files on failure) to avoid disk bloat on flaky/interrupt-driven init attempts.

Suggested change
// initialize native environment
LOG.info("Initializing native environment (batchSize="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.BATCH_SIZE)
+ ", "
+ "memoryFraction="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.MEMORY_FRACTION)
+ ")");
// arrow configuration
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND");
// preload JNI bridge classes
try {
Class.forName("org.apache.auron.jni.JniBridge");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load JniBridge class", e);
}
AuronAdaptor.getInstance().loadAuronLib();
Runtime.getRuntime().addShutdownHook(new Thread(JniBridge::onExit));
initialized = true;
try {
// initialize native environment
LOG.info("Initializing native environment (batchSize="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.BATCH_SIZE)
+ ", "
+ "memoryFraction="
+ AuronAdaptor.getInstance().getAuronConfiguration().get(AuronConfiguration.MEMORY_FRACTION)
+ ")");
// arrow configuration
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND");
// preload JNI bridge classes
try {
Class.forName("org.apache.auron.jni.JniBridge");
} catch (ClassNotFoundException e) {
throw new RuntimeException("Cannot load JniBridge class", e);
}
AuronAdaptor.getInstance().loadAuronLib();
Runtime.getRuntime().addShutdownHook(new Thread(JniBridge::onExit));
} finally {
// Mark initialization as attempted to avoid repeated native library loading
initialized = true;
}

Copilot uses AI. Check for mistakes.
}
}
}
}

/**
* Loads and processes the next batch of data from the native plan.
* <p>
Expand Down
Loading