diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 2d9fc98..d509c2d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -46,4 +46,5 @@ jobs: with: skip-test: ${{ github.event.inputs.skip-test == 'true' }} kestra-version: ${{ github.event.inputs.kestra-version }} + java-version: '25' secrets: inherit diff --git a/build.gradle b/build.gradle index 8a1ffbe..e8ce3ca 100644 --- a/build.gradle +++ b/build.gradle @@ -18,7 +18,7 @@ tasks.withType(JavaCompile) { options.compilerArgs.add("-parameters") } -final targetJavaVersion = JavaVersion.VERSION_21 +final targetJavaVersion = JavaVersion.VERSION_25 group "io.kestra.plugin" allprojects { @@ -121,6 +121,7 @@ subprojects { testImplementation group: "io.kestra", name: "repository-memory", version: kestraVersion testImplementation group: "io.kestra", name: "runner-memory", version: kestraVersion testImplementation group: "io.kestra", name: "storage-local", version: kestraVersion + testImplementation group: "io.kestra", name: "indexer", version: kestraVersion // test testImplementation "org.junit.jupiter:junit-jupiter-engine" diff --git a/gradle.properties b/gradle.properties index 7604614..31ad56a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,3 @@ version=1.5.2-SNAPSHOT -kestraVersion=1.3.0 +kestraVersion=2.0.0-SNAPSHOT +org.gradle.jvmargs=-Xmx2g diff --git a/plugin-transform-grok/src/test/resources/application.yml b/plugin-transform-grok/src/test/resources/application.yml index 636ef67..520f082 100644 --- a/plugin-transform-grok/src/test/resources/application.yml +++ b/plugin-transform-grok/src/test/resources/application.yml @@ -7,3 +7,10 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost + diff --git a/plugin-transform-json/src/test/resources/application.yml b/plugin-transform-json/src/test/resources/application.yml index 636ef67..e264bbb 100644 --- a/plugin-transform-json/src/test/resources/application.yml +++ b/plugin-transform-json/src/test/resources/application.yml @@ -7,3 +7,9 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost \ No newline at end of file diff --git a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java index b575982..6f29cd8 100644 --- a/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java +++ b/plugin-transform-records/src/test/java/io/kestra/plugin/transform/MapFlowTest.java @@ -1,10 +1,13 @@ package io.kestra.plugin.transform; +import io.kestra.core.exceptions.InternalException; import io.kestra.core.junit.annotations.ExecuteFlow; import io.kestra.core.junit.annotations.KestraTest; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; import io.kestra.core.models.flows.State; +import io.kestra.core.services.TaskOutputService; +import jakarta.inject.Inject; import org.junit.jupiter.api.Test; import java.util.List; @@ -16,6 +19,9 @@ @KestraTest(startRunner = true) class MapFlowTest { + @Inject + private TaskOutputService taskOutputService; + @Test @ExecuteFlow("flows/map_flow.yaml") void executesFlow(Execution execution) { @@ -23,7 +29,7 @@ void executesFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("map"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records.size(), is(1)); @@ -35,12 +41,12 @@ void executesFlow(Execution execution) { @Test @ExecuteFlow("flows/map_flow_store.yaml") - void executesStoreFlow(Execution execution) { + void executesStoreFlow(Execution execution) throws InternalException { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); List taskRuns = execution.findTaskRunsByTaskId("map"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -55,7 +61,7 @@ void executesUnnestStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("explode"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -70,7 +76,7 @@ void executesFilterStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("filter"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -85,7 +91,7 @@ void executesAggregateStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("aggregate"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -100,7 +106,7 @@ void executesZipFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("zip"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records.size(), is(2)); @@ -116,7 +122,7 @@ void executesZipStoreFlow(Execution execution) { List taskRuns = execution.findTaskRunsByTaskId("zip"); TaskRun taskRun = taskRuns.getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); assertThat(outputs.containsKey("records"), is(false)); Object uri = outputs.get("uri"); @@ -130,7 +136,7 @@ void executesSelectFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); @@ -147,14 +153,14 @@ void executesSelectStoreFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map selectOutputs = (Map) selectRun.getOutputs(); + Map selectOutputs = (Map) outputsOf(selectRun); assertThat(selectOutputs.containsKey("records"), is(false)); Object uri = selectOutputs.get("uri"); assertThat(uri != null, is(true)); assertThat(uri.toString().startsWith("kestra://"), is(true)); TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst(); - Map readBackOutputs = (Map) readBackRun.getOutputs(); + Map readBackOutputs = (Map) outputsOf(readBackRun); List> records = (List>) readBackOutputs.get("records"); assertThat(records, hasSize(2)); assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L)); @@ -169,13 +175,13 @@ void executesSelectBinaryStoreFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map selectOutputs = (Map) selectRun.getOutputs(); + Map selectOutputs = (Map) outputsOf(selectRun); Object uri = selectOutputs.get("uri"); assertThat(uri != null, is(true)); assertThat(uri.toString().startsWith("kestra://"), is(true)); TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst(); - Map readBackOutputs = (Map) readBackRun.getOutputs(); + Map readBackOutputs = outputsOf(readBackRun); List> records = (List>) readBackOutputs.get("records"); assertThat(records, hasSize(1)); assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L)); @@ -188,7 +194,7 @@ void executesSelectLengthMismatchSkipFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); @@ -202,10 +208,18 @@ void executesSelectOnErrorKeepFlow(Execution execution) { assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst(); - Map outputs = (Map) taskRun.getOutputs(); + Map outputs = outputsOf(taskRun); List> records = (List>) outputs.get("records"); assertThat(records, hasSize(1)); assertThat(records.getFirst().get("total_spent"), is("not-a-number")); } + + protected Map outputsOf(TaskRun taskRun) { + try { + return taskOutputService.getOutputs(taskRun); + } catch (InternalException e) { + throw new RuntimeException(e); + } + } } diff --git a/plugin-transform-records/src/test/resources/application.yml b/plugin-transform-records/src/test/resources/application.yml index 636ef67..919190e 100644 --- a/plugin-transform-records/src/test/resources/application.yml +++ b/plugin-transform-records/src/test/resources/application.yml @@ -7,3 +7,9 @@ kestra: type: local local: base-path: /tmp/unittest + worker: + controllers: + type: STATIC + static: + endpoints: + - host: localhost