diff --git a/build.gradle b/build.gradle index 8a1ffbe..d9299df 100644 --- a/build.gradle +++ b/build.gradle @@ -153,7 +153,7 @@ subprojects { } test { - jvmArgs = [ "-javaagent:${configurations.agent.singleFile}" ] + jvmArgs "-javaagent:${configurations.agent.singleFile}" } } diff --git a/plugin-transform-json/build.gradle b/plugin-transform-json/build.gradle index 24337f1..efe3e33 100644 --- a/plugin-transform-json/build.gradle +++ b/plugin-transform-json/build.gradle @@ -1,5 +1,12 @@ project.description = 'Kestra Plugin Transformation for Json.' +// -Xss512k matches the constrained stack that triggered the production crash (Windows default is +// ~320 KB; 512k is slightly above the HotSpot minimum and safely above the Kestra framework needs). +// Without this, the Linux default thread stack (~8 MB) is far too large to reproduce the overflow. +test { + jvmArgs '-Xss512k' +} + jar { manifest { attributes( diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java index fd6b3e3..a5ab67e 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/Transform.java @@ -20,6 +20,11 @@ import lombok.experimental.SuperBuilder; import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import io.kestra.core.models.enums.MonacoLanguages; import io.kestra.core.models.annotations.PluginProperty; @@ -32,16 +37,52 @@ public abstract class Transform extends Task implements JSONataInterface, RunnableTask { private static final ObjectMapper MAPPER = JacksonMapper.ofJson(); + // 4 MB: fits default maxDepth=50 × ~8 JVM frames/level with large headroom. + // Also isolates StackOverflowError inside the eval thread so the worker thread never crashes. + private static final long EVAL_THREAD_STACK_SIZE = 4 * 1024 * 1024; @PluginProperty(language = MonacoLanguages.JAVASCRIPT, group = "advanced") private Property expression; + // Default 50: each JSONata recursion level pushes ~8 JVM frames; 256 KB worker stacks + // (~300 usable frames) overflow before maxDepth fires at 200. 50 × 8 = 400 frames — safe. + // Users needing deeper recursion should increase both this value and the JVM stack size. @Builder.Default - private Property maxDepth = Property.ofValue(200); + private Property maxDepth = Property.ofValue(50); @Getter(AccessLevel.PRIVATE) private Jsonata parsedExpression; + // Lazy-initialized; lifecycle managed by evalExecutor() / shutdownEvalExecutor(). + // Assumption: Flux pipelines in subclasses are sequential (no parallel()/publishOn). + @Getter(AccessLevel.NONE) + @ToString.Exclude + @EqualsAndHashCode.Exclude + private transient ExecutorService evalExecutor; + + private ExecutorService evalExecutor() { + if (this.evalExecutor == null) { + this.evalExecutor = Executors.newSingleThreadExecutor(r -> { + var t = new Thread(null, r, "jsonata-eval", EVAL_THREAD_STACK_SIZE); + t.setDaemon(true); + return t; + }); + } + return this.evalExecutor; + } + + protected void shutdownEvalExecutor() { + if (this.evalExecutor != null) { + this.evalExecutor.shutdown(); + try { + this.evalExecutor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + this.evalExecutor = null; + } + } + public void init(RunContext runContext) throws Exception { var exprString = runContext.render(this.expression).as(String.class).orElseThrow(); try { @@ -62,12 +103,43 @@ protected JsonNode evaluateExpression(RunContext runContext, JsonNode jsonNode) var frame = this.parsedExpression.createFrame(); frame.setRuntimeBounds(timeoutInMilli, rMaxDepth); - var result = this.parsedExpression.evaluate(data, frame); - if (result == null) { - return NullNode.getInstance(); + var resultRef = new AtomicReference(); + var errorRef = new AtomicReference(); + + // Eval runs on a dedicated executor thread (4 MB stack) that is reused across all records + // in the same task run. This serves two purposes: + // 1. Normal case: worker stack size (e.g. 256 KB on Windows) cannot constrain the evaluator. + // 2. Edge case (user sets very high maxDepth): if a StackOverflowError occurs in the eval + // thread, it is contained there. The worker thread reads the stored error and throws a + // clean RuntimeException — the worker never crashes. + // The catch is intentionally Throwable: this is a throwaway-thread sandbox, so every escape + // (including Errors like StackOverflowError and OutOfMemoryError) must land in errorRef. + // A narrower catch would let some Errors escape, leaving both refs null and producing a + // silent-null return after future.get(). + var future = evalExecutor().submit(() -> { + try { + var result = this.parsedExpression.evaluate(data, frame); + resultRef.set(result != null ? MAPPER.valueToTree(result) : NullNode.getInstance()); + } catch (Throwable t) { + errorRef.set(t); + } + return null; + }); + + try { + future.get(); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to evaluate expression", e.getCause()); + } + + if (errorRef.get() != null) { + throw new RuntimeException("Failed to evaluate expression", errorRef.get()); } - return MAPPER.valueToTree(result); - } catch (JException | IllegalVariableEvaluationException e) { + return resultRef.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("JSONata evaluation interrupted", e); + } catch (IllegalVariableEvaluationException e) { throw new RuntimeException("Failed to evaluate expression", e); } } diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java index 9d976ff..05bbae1 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformItems.java @@ -120,40 +120,44 @@ public Output run(RunContext runContext) throws Exception { init(runContext); - final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow()); - - try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) { - Flux flux = FileSerde.readAll(reader, new TypeReference<>() { - }); - final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); - try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) { - - // transform - Flux values = flux.map(node -> this.evaluateExpression(runContext, node)); - - if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) { - values = values.flatMap(jsonNode -> { - if (jsonNode.isArray()) { - Iterable iterable = jsonNode::elements; - return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false)); - } - return Mono.just(jsonNode); - }); + try { + final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow()); + + try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) { + Flux flux = FileSerde.readAll(reader, new TypeReference<>() { + }); + final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); + try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) { + + // transform + Flux values = flux.map(node -> this.evaluateExpression(runContext, node)); + + if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) { + values = values.flatMap(jsonNode -> { + if (jsonNode.isArray()) { + Iterable iterable = jsonNode::elements; + return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false)); + } + return Mono.just(jsonNode); + }); + } + + Long processedItemsTotal = FileSerde.writeAll(writer, values).block(); + + URI uri = runContext.storage().putFile(outputFilePath.toFile()); + + // output + return Output + .builder() + .uri(uri) + .processedItemsTotal(processedItemsTotal) + .build(); + } finally { + Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error } - - Long processedItemsTotal = FileSerde.writeAll(writer, values).block(); - - URI uri = runContext.storage().putFile(outputFilePath.toFile()); - - // output - return Output - .builder() - .uri(uri) - .processedItemsTotal(processedItemsTotal) - .build(); - } finally { - Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error } + } finally { + shutdownEvalExecutor(); } } diff --git a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java index c4ea096..797f654 100644 --- a/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java +++ b/plugin-transform-json/src/main/java/io/kestra/plugin/transform/jsonata/TransformValue.java @@ -101,13 +101,17 @@ public class TransformValue extends Transform implements public Output run(RunContext runContext) throws Exception { init(runContext); - final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow()); + try { + final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow()); - // transform - JsonNode transformed = evaluateExpression(runContext, from); + // transform + JsonNode transformed = evaluateExpression(runContext, from); - // output - return Output.builder().value(transformed).build(); + // output + return Output.builder().value(transformed).build(); + } finally { + shutdownEvalExecutor(); + } } private static JsonNode parseJson(String from) { diff --git a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java index be985fd..02862d7 100644 --- a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java +++ b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformItemsTest.java @@ -123,6 +123,38 @@ void shouldGetSingleRecordForValidExprReturningArrayGivenExplodeFalse() throws E Assertions.assertEquals(2, transformationResult.getFirst().size()); } + @Test + void shouldReuseEvalThreadAcrossRecords() throws Exception { + // Verifies executor reuse: after run() completes, awaitTermination in shutdownEvalExecutor() + // guarantees the jsonata-eval thread is gone. If the old per-call new Thread() approach were + // used, 3 threads would be started and could still be alive briefly, making liveAfter > 0 + // probabilistically — so this assertion is a reliable regression guard. + RunContext runContext = runContextFactory.of(); + final Path outputFilePath = runContext.workingDir().createTempFile(".ion"); + try (final Writer writer = new OutputStreamWriter(Files.newOutputStream(outputFilePath))) { + FileSerde.writeAll(writer, Flux.just( + Map.of("v", 1), + Map.of("v", 2), + Map.of("v", 3) + )).block(); + writer.flush(); + } + URI uri = runContext.storage().putFile(outputFilePath.toFile()); + + TransformItems task = TransformItems.builder() + .from(Property.ofValue(uri.toString())) + .expression(Property.ofValue("$")) + .build(); + + task.run(runContext); + + long liveAfter = Thread.getAllStackTraces().keySet().stream() + .filter(t -> "jsonata-eval".equals(t.getName())) + .count(); + + Assertions.assertEquals(0, liveAfter, "jsonata-eval thread should be terminated after run()"); + } + @Test void shouldTransformJsonInputWithDefaultIonMapper() throws Exception { // Given diff --git a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java index 99583c0..4bc0b62 100644 --- a/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java +++ b/plugin-transform-json/src/test/java/io/kestra/plugin/transform/jsonata/TransformValueTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.transform.jsonata; +import com.dashjoin.jsonata.JException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.kestra.core.junit.annotations.KestraTest; @@ -9,8 +10,11 @@ import jakarta.inject.Inject; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; @KestraTest class TransformValueTest { @@ -133,4 +137,93 @@ void shouldHandleNestedArrayExpressionFromIssue40() throws Exception { assertThat(result.get(1).isArray()).isTrue(); assertThat(result.get(1).get(2).asText()).isEqualTo("8796977843745/8796995341857/8796999765537"); } + + // Regression tests for StackOverflow protection in evaluateExpression(). + // + // Root cause: each JSONata recursion level pushes ~8 JVM frames. On 256 KB worker stacks + // (~300 usable frames), even maxDepth=200 allows 200 × 8 = 1600 frames — far past overflow. + // + // Fix (two layers): + // 1. Default maxDepth lowered to 50 (50 × 8 = 400 frames — safe on 256 KB stacks). + // Bounds check fires and throws JException before any stack risk. + // 2. Evaluation runs on a dedicated thread with a 4 MB stack. If the user sets a high + // maxDepth that allows overflow, the StackOverflowError is caught as Throwable inside + // the throwaway eval thread. The worker thread reads the stored error and throws a + // clean RuntimeException — the worker never crashes. + // + // Production crash: Windows worker default stack ~256 KB, crashed at depth=999. + // Test JVM is pinned to -Xss512k (see build.gradle). + // "+ 0" makes the expression non-tail-recursive, preventing TCO, so frames stay live. + + @ParameterizedTest + @ValueSource(ints = {50, 200, 500, 1000}) + void shouldNeverThrowStackOverflowForCommonMaxDepthValues(int maxDepth) throws Exception { + // Each maxDepth value runs on a 4 MB eval thread. The bounds check fires at maxDepth + // (JException) well before the stack could overflow, regardless of worker stack size. + RunContext runContext = runContextFactory.of(); + TransformValue task = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(10000))" + )) + .maxDepth(Property.ofValue(maxDepth)) + .build(); + + assertThatThrownBy(() -> task.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to evaluate expression") + .hasCauseInstanceOf(JException.class); + } + + @Test + void shouldContinueWorkingAfterStackOverflowError() throws Exception { + // Validates that a StackOverflowError in one run does not poison the executor or the task. + // Each call to run() creates a fresh executor (via init + shutdownEvalExecutor in finally), + // so the second run always gets a clean state. + RunContext runContext = runContextFactory.of(); + + TransformValue taskWithHighDepth = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))" + )) + .maxDepth(Property.ofValue(50000)) + .build(); + + assertThatThrownBy(() -> taskWithHighDepth.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(StackOverflowError.class); + + // Second run with a simple expression must succeed — no lingering poisoned state. + RunContext runContext2 = runContextFactory.of(); + TransformValue simpleTask = TransformValue.builder() + .from(Property.ofValue("{\"x\": 42}")) + .expression(Property.ofValue("x")) + .build(); + + TransformValue.Output output = simpleTask.run(runContext2); + assertThat(output.getValue()).isNotNull(); + assertThat(output.getValue().toString()).isEqualTo("42"); + } + + @Test + void shouldIsolateStackOverflowInEvalThreadWhenMaxDepthExceedsStackCapacity() throws Exception { + // User sets maxDepth high enough that bounds check never fires before stack exhaustion. + // On 4 MB eval thread (~40k safe levels), $f(49999) overflows the eval thread. + // StackOverflowError is caught as Throwable inside the eval thread; worker thread gets + // a clean RuntimeException instead of crashing. + RunContext runContext = runContextFactory.of(); + TransformValue task = TransformValue.builder() + .from(Property.ofValue("{}")) + .expression(Property.ofValue( + "($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))" + )) + .maxDepth(Property.ofValue(50000)) + .build(); + + assertThatThrownBy(() -> task.run(runContext)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Failed to evaluate expression") + .hasCauseInstanceOf(StackOverflowError.class); + } } \ No newline at end of file