diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java index 1f47fd806edb..a39c57986730 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommittableStateManager.java @@ -18,9 +18,6 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; - import java.io.Serializable; import java.util.List; @@ -30,9 +27,8 @@ */ public interface CommittableStateManager extends Serializable { - void initializeState(StateInitializationContext context, Committer committer) + void initializeState(Committer.Context context, Committer committer) throws Exception; - void snapshotState(StateSnapshotContext context, List committables) - throws Exception; + void snapshotState(List committables) throws Exception; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java index 31acb1e91db6..f1684be96ea4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java @@ -18,8 +18,9 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.metrics.groups.OperatorMetricGroup; +import org.apache.paimon.flink.sink.state.StateStore; + +import org.apache.flink.metrics.MetricGroup; import javax.annotation.Nullable; @@ -61,6 +62,15 @@ int filterAndCommit( Map> groupByCheckpoint(Collection committables); + /** + * Persist any per-checkpoint state that this committer (and its listeners) owns. Called by the + * containing operator / coordinator at the snapshot boundary, before the committables for the + * current checkpoint are flushed to the {@link CommittableStateManager}. + * + *

Default implementation is a no-op for backwards compatibility. + */ + default void snapshotState() throws Exception {} + /** Factory to create {@link Committer}. */ interface Factory extends Serializable { @@ -73,13 +83,13 @@ interface Context { String commitUser(); @Nullable - OperatorMetricGroup metricGroup(); + MetricGroup metricGroup(); boolean streamingCheckpointEnabled(); boolean isRestored(); - OperatorStateStore stateStore(); + StateStore stateStore(); int getParallelism(); @@ -88,10 +98,10 @@ interface Context { static Context createContext( String commitUser, - @Nullable OperatorMetricGroup metricGroup, + @Nullable MetricGroup metricGroup, boolean streamingCheckpointEnabled, boolean isRestored, - OperatorStateStore stateStore, + StateStore stateStore, int parallelism, int subtaskIndex) { return new Committer.Context() { @@ -101,7 +111,7 @@ public String commitUser() { } @Override - public OperatorMetricGroup metricGroup() { + public MetricGroup metricGroup() { return metricGroup; } @@ -116,7 +126,7 @@ public boolean isRestored() { } @Override - public OperatorStateStore stateStore() { + public StateStore stateStore() { return stateStore; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java index 51830c8a8bc2..18f934f0e5df 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java @@ -23,7 +23,9 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.MetricNames; /** Flink metrics for {@link Committer}. */ @@ -34,14 +36,27 @@ public class CommitterMetrics { private final Counter numBytesOutCounter; private final Counter numRecordsOutCounter; - public CommitterMetrics(OperatorIOMetricGroup metricGroup) { - MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP); + public CommitterMetrics(MetricGroup metricGroup) { + MetricGroup sinkMetricGroup; + + // When the committer runs as a regular operator we can wire its counters into the + // operator's IO metric group; when it runs inside an OperatorCoordinator the coordinator + // exposes a plain MetricGroup, so we fall back to local SimpleCounters. + if (metricGroup instanceof OperatorMetricGroup) { + OperatorIOMetricGroup operatorIOMetricGroup = + ((OperatorMetricGroup) metricGroup).getIOMetricGroup(); + sinkMetricGroup = operatorIOMetricGroup.addGroup(SINK_METRIC_GROUP); + numBytesOutCounter = operatorIOMetricGroup.getNumBytesOutCounter(); + numRecordsOutCounter = operatorIOMetricGroup.getNumRecordsOutCounter(); + } else { + sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP); + numBytesOutCounter = new SimpleCounter(); + numRecordsOutCounter = new SimpleCounter(); + } - numBytesOutCounter = metricGroup.getNumBytesOutCounter(); sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT, numBytesOutCounter); sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOutCounter)); - numRecordsOutCounter = metricGroup.getNumRecordsOutCounter(); sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT, numRecordsOutCounter); sinkMetricGroup.meter( MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOutCounter)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index 518095168055..034ded9c6446 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.flink.sink.state.OperatorBackendStateStore; import org.apache.paimon.flink.utils.RuntimeContextUtils; import org.apache.paimon.utils.Preconditions; @@ -133,18 +134,18 @@ public void initializeState(StateInitializationContext context) throws Exception int index = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); // parallelism of commit operator is always 1, so commitUser will never be null - committer = - committerFactory.create( - Committer.createContext( - commitUser, - getMetricGroup(), - streamingCheckpointEnabled, - context.isRestored(), - context.getOperatorStateStore(), - parallelism, - index)); - - committableStateManager.initializeState(context, committer); + Committer.Context committerContext = + Committer.createContext( + commitUser, + getMetricGroup(), + streamingCheckpointEnabled, + context.isRestored(), + new OperatorBackendStateStore(context.getOperatorStateStore()), + parallelism, + index); + committer = committerFactory.create(committerContext); + + committableStateManager.initializeState(committerContext, committer); } @Override @@ -164,7 +165,8 @@ private GlobalCommitT toCommittables(long checkpoint, List inputs) thro public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); pollInputs(); - committableStateManager.snapshotState(context, committables(committablesPerCheckpoint)); + committer.snapshotState(); + committableStateManager.snapshotState(committables(committablesPerCheckpoint)); } private List committables(NavigableMap map) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopCommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopCommittableStateManager.java index ab86f0ece4c1..6e694464c771 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopCommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopCommittableStateManager.java @@ -20,9 +20,6 @@ import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; - import java.util.List; /** @@ -36,14 +33,13 @@ public class NoopCommittableStateManager implements CommittableStateManager committer) + Committer.Context context, Committer committer) throws Exception { // nothing to do } @Override - public void snapshotState(StateSnapshotContext context, List committables) - throws Exception { + public void snapshotState(List committables) throws Exception { // nothing to do } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java index b1ed396bdcae..9e5a34ecebf3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java @@ -26,8 +26,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import java.util.ArrayList; @@ -59,12 +57,11 @@ public RestoreCommittableStateManager( } @Override - public void initializeState( - StateInitializationContext context, Committer committer) + public void initializeState(Committer.Context context, Committer committer) throws Exception { streamingCommitterState = new SimpleVersionedListState<>( - context.getOperatorStateStore() + context.stateStore() .getListState( new ListStateDescriptor<>( "streaming_committer_raw_states", @@ -82,8 +79,7 @@ protected int recover(List committables, Committer committables) - throws Exception { + public void snapshotState(List committables) throws Exception { streamingCommitterState.update(committables); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index ceba772140c1..48daabc8d89e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -52,7 +52,7 @@ public StoreCommitter(FileStoreTable table, TableCommit commit, Context context) if (context.metricGroup() != null) { this.commit.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup())); - this.committerMetrics = new CommitterMetrics(context.metricGroup().getIOMetricGroup()); + this.committerMetrics = new CommitterMetrics(context.metricGroup()); } else { this.committerMetrics = null; } @@ -117,13 +117,12 @@ public int filterAndCommit( } @Override - public Map> groupByCheckpoint(Collection committables) { - try { - commitListeners.snapshotState(); - } catch (Exception e) { - throw new RuntimeException(e); - } + public void snapshotState() throws Exception { + commitListeners.snapshotState(); + } + @Override + public Map> groupByCheckpoint(Collection committables) { Map> grouped = new HashMap<>(); for (Committable c : committables) { grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList<>()).add(c); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index ee2278807bb3..855c06f65459 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -202,6 +202,13 @@ private Map> groupByTable( t -> t.f0, Collectors.mapping(t -> t.f1, Collectors.toList()))); } + @Override + public void snapshotState() throws Exception { + for (StoreCommitter committer : tableCommitters.values()) { + committer.snapshotState(); + } + } + @Override public Map> groupByCheckpoint( Collection committables) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java index 92966014a11c..e83728e38f9a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode; +import org.apache.paimon.flink.sink.state.StateStore; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; import org.apache.paimon.partition.actions.PartitionMarkDoneAction; @@ -33,7 +34,6 @@ import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.PartitionPathUtils; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.java.tuple.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +66,7 @@ public static Optional create( ClassLoader cl, boolean isStreaming, boolean isRestored, - OperatorStateStore stateStore, + StateStore stateStore, FileStoreTable table) throws Exception { CoreOptions coreOptions = table.coreOptions(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java index 046258278495..8ddbadb0bc9f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.flink.sink.state.StateStore; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionTimeExtractor; @@ -27,7 +28,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.slf4j.Logger; @@ -231,7 +231,7 @@ private static class PartitionMarkDoneTriggerState implements State { private final boolean isRestored; private final ListState> pendingPartitionsState; - public PartitionMarkDoneTriggerState(boolean isRestored, OperatorStateStore stateStore) + public PartitionMarkDoneTriggerState(boolean isRestored, StateStore stateStore) throws Exception { this.isRestored = isRestored; this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC); @@ -256,8 +256,7 @@ public void update(List partitions) throws Exception { } public static PartitionMarkDoneTrigger create( - CoreOptions coreOptions, boolean isRestored, OperatorStateStore stateStore) - throws Exception { + CoreOptions coreOptions, boolean isRestored, StateStore stateStore) throws Exception { Options options = coreOptions.toConfiguration(); return new PartitionMarkDoneTrigger( new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java index c4db738cdfa2..182937f53868 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/ReportPartStatsListener.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.listener; import org.apache.paimon.CoreOptions; +import org.apache.paimon.flink.sink.state.StateStore; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; @@ -31,7 +32,6 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; @@ -66,7 +66,7 @@ public class ReportPartStatsListener implements CommitListener { private ReportPartStatsListener( InternalRowPartitionComputer partitionComputer, PartitionStatisticsReporter partitionStatisticsReporter, - OperatorStateStore store, + StateStore store, boolean isRestored, long idleTime) throws Exception { @@ -141,8 +141,7 @@ public void snapshotState() throws Exception { } public static Optional create( - boolean isRestored, OperatorStateStore stateStore, FileStoreTable table) - throws Exception { + boolean isRestored, StateStore stateStore, FileStoreTable table) throws Exception { CoreOptions coreOptions = table.coreOptions(); Options options = coreOptions.toConfiguration(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/OperatorBackendStateStore.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/OperatorBackendStateStore.java new file mode 100644 index 000000000000..e57f5e8a449a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/OperatorBackendStateStore.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; + +/** A {@link StateStore} backed by Flink's operator-side {@link OperatorStateStore}. */ +public class OperatorBackendStateStore implements StateStore { + + private final OperatorStateStore delegate; + + public OperatorBackendStateStore(OperatorStateStore delegate) { + this.delegate = delegate; + } + + @Override + public ListState getListState(ListStateDescriptor descriptor) throws Exception { + return delegate.getListState(descriptor); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/StateStore.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/StateStore.java new file mode 100644 index 000000000000..22401eb8b90c --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/state/StateStore.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink.state; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; + +/** + * Abstraction for accessing list state from a {@link + * org.apache.paimon.flink.sink.Committer.Context}. + * + *

Decouples committer / commit-listener state access from Flink's {@code OperatorStateStore} so + * that the same committer code can run in both a stream operator and a job-manager-side {@code + * OperatorCoordinator}. + */ +public interface StateStore { + + /** + * Returns a {@link ListState} for the given descriptor. Implementations should follow the same + * "operator state" semantics: the returned state is local to the current execution component + * (subtask or coordinator) and is checkpointed together with that component. + */ + ListState getListState(ListStateDescriptor descriptor) throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index f09f145ad1dd..39d0e899ac38 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -55,8 +55,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.junit.jupiter.api.AfterEach; @@ -634,12 +632,11 @@ firstTable, new Committable(cpId, write1.prepareCommit(true, cpId).get(0))), new CommittableStateManager() { @Override public void initializeState( - StateInitializationContext context, + Committer.Context context, Committer committer) {} @Override public void snapshotState( - StateSnapshotContext context, List committables) {} }); return createTestHarness(operator); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java index 9d7cad8f7148..238933ed2407 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/CustomPartitionMarkDoneActionTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.listener; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.sink.state.OperatorBackendStateStore; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -67,7 +68,7 @@ public void testCustomPartitionMarkDoneAction() throws Exception { getClass().getClassLoader(), false, false, - new MockOperatorStateStore(), + new OperatorBackendStateStore(new MockOperatorStateStore()), table)) .hasMessageContaining( String.format( @@ -91,7 +92,7 @@ public void testCustomPartitionMarkDoneAction() throws Exception { getClass().getClassLoader(), false, false, - new MockOperatorStateStore(), + new OperatorBackendStateStore(new MockOperatorStateStore()), table2) .get(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java index c873234105f0..88c0768425b2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.sink.Committer; +import org.apache.paimon.flink.sink.state.OperatorBackendStateStore; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileTestUtils; @@ -43,7 +44,7 @@ static Committer.Context createMockContext( null, streamingCheckpointEnabled, isRestored, - new MockOperatorStateStore(), + new OperatorBackendStateStore(new MockOperatorStateStore()), 1, 1); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java index aa0e00067dbe..f05f616216fb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink.listener; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.sink.state.OperatorBackendStateStore; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -86,7 +87,7 @@ private void innerTest( getClass().getClassLoader(), false, false, - new MockOperatorStateStore(), + new OperatorBackendStateStore(new MockOperatorStateStore()), table) .get();