Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;

import java.io.Serializable;
import java.util.List;

Expand All @@ -30,9 +27,8 @@
*/
public interface CommittableStateManager<GlobalCommitT> extends Serializable {

void initializeState(StateInitializationContext context, Committer<?, GlobalCommitT> committer)
void initializeState(Committer.Context context, Committer<?, GlobalCommitT> committer)
throws Exception;

void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
throws Exception;
void snapshotState(List<GlobalCommitT> committables) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.paimon.flink.sink;

import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.paimon.flink.sink.state.StateStore;

import org.apache.flink.metrics.MetricGroup;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -61,6 +62,15 @@ int filterAndCommit(

Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT> committables);

/**
* Persist any per-checkpoint state that this committer (and its listeners) owns. Called by the
* containing operator / coordinator at the snapshot boundary, before the committables for the
* current checkpoint are flushed to the {@link CommittableStateManager}.
*
* <p>Default implementation is a no-op for backwards compatibility.
*/
default void snapshotState() throws Exception {}

/** Factory to create {@link Committer}. */
interface Factory<CommitT, GlobalCommitT> extends Serializable {

Expand All @@ -73,13 +83,13 @@ interface Context {
String commitUser();

@Nullable
OperatorMetricGroup metricGroup();
MetricGroup metricGroup();

boolean streamingCheckpointEnabled();

boolean isRestored();

OperatorStateStore stateStore();
StateStore stateStore();

int getParallelism();

Expand All @@ -88,10 +98,10 @@ interface Context {

static Context createContext(
String commitUser,
@Nullable OperatorMetricGroup metricGroup,
@Nullable MetricGroup metricGroup,
boolean streamingCheckpointEnabled,
boolean isRestored,
OperatorStateStore stateStore,
StateStore stateStore,
int parallelism,
int subtaskIndex) {
return new Committer.Context() {
Expand All @@ -101,7 +111,7 @@ public String commitUser() {
}

@Override
public OperatorMetricGroup metricGroup() {
public MetricGroup metricGroup() {
return metricGroup;
}

Expand All @@ -116,7 +126,7 @@ public boolean isRestored() {
}

@Override
public OperatorStateStore stateStore() {
public StateStore stateStore() {
return stateStore;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;

/** Flink metrics for {@link Committer}. */
Expand All @@ -34,14 +36,27 @@ public class CommitterMetrics {
private final Counter numBytesOutCounter;
private final Counter numRecordsOutCounter;

public CommitterMetrics(OperatorIOMetricGroup metricGroup) {
MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP);
public CommitterMetrics(MetricGroup metricGroup) {
MetricGroup sinkMetricGroup;

// When the committer runs as a regular operator we can wire its counters into the
// operator's IO metric group; when it runs inside an OperatorCoordinator the coordinator
// exposes a plain MetricGroup, so we fall back to local SimpleCounters.
if (metricGroup instanceof OperatorMetricGroup) {
OperatorIOMetricGroup operatorIOMetricGroup =
((OperatorMetricGroup) metricGroup).getIOMetricGroup();
sinkMetricGroup = operatorIOMetricGroup.addGroup(SINK_METRIC_GROUP);
numBytesOutCounter = operatorIOMetricGroup.getNumBytesOutCounter();
numRecordsOutCounter = operatorIOMetricGroup.getNumRecordsOutCounter();
} else {
sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP);
numBytesOutCounter = new SimpleCounter();
numRecordsOutCounter = new SimpleCounter();
}

numBytesOutCounter = metricGroup.getNumBytesOutCounter();
sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT, numBytesOutCounter);
sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOutCounter));

numRecordsOutCounter = metricGroup.getNumRecordsOutCounter();
sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT, numRecordsOutCounter);
sinkMetricGroup.meter(
MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOutCounter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.sink.state.OperatorBackendStateStore;
import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.utils.Preconditions;

Expand Down Expand Up @@ -133,18 +134,18 @@ public void initializeState(StateInitializationContext context) throws Exception
int index = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());

// parallelism of commit operator is always 1, so commitUser will never be null
committer =
committerFactory.create(
Committer.createContext(
commitUser,
getMetricGroup(),
streamingCheckpointEnabled,
context.isRestored(),
context.getOperatorStateStore(),
parallelism,
index));

committableStateManager.initializeState(context, committer);
Committer.Context committerContext =
Committer.createContext(
commitUser,
getMetricGroup(),
streamingCheckpointEnabled,
context.isRestored(),
new OperatorBackendStateStore(context.getOperatorStateStore()),
parallelism,
index);
committer = committerFactory.create(committerContext);

committableStateManager.initializeState(committerContext, committer);
}

@Override
Expand All @@ -164,7 +165,8 @@ private GlobalCommitT toCommittables(long checkpoint, List<CommitT> inputs) thro
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
pollInputs();
committableStateManager.snapshotState(context, committables(committablesPerCheckpoint));
committer.snapshotState();
committableStateManager.snapshotState(committables(committablesPerCheckpoint));
}

private List<GlobalCommitT> committables(NavigableMap<Long, GlobalCommitT> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

import org.apache.paimon.manifest.ManifestCommittable;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;

import java.util.List;

/**
Expand All @@ -36,14 +33,13 @@ public class NoopCommittableStateManager implements CommittableStateManager<Mani

@Override
public void initializeState(
StateInitializationContext context, Committer<?, ManifestCommittable> committer)
Committer.Context context, Committer<?, ManifestCommittable> committer)
throws Exception {
// nothing to do
}

@Override
public void snapshotState(StateSnapshotContext context, List<ManifestCommittable> committables)
throws Exception {
public void snapshotState(List<ManifestCommittable> committables) throws Exception {
// nothing to do
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;

import java.util.ArrayList;
Expand Down Expand Up @@ -59,12 +57,11 @@ public RestoreCommittableStateManager(
}

@Override
public void initializeState(
StateInitializationContext context, Committer<?, GlobalCommitT> committer)
public void initializeState(Committer.Context context, Committer<?, GlobalCommitT> committer)
throws Exception {
streamingCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
context.stateStore()
.getListState(
new ListStateDescriptor<>(
"streaming_committer_raw_states",
Expand All @@ -82,8 +79,7 @@ protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommi
}

@Override
public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables)
throws Exception {
public void snapshotState(List<GlobalCommitT> committables) throws Exception {
streamingCommitterState.update(committables);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public StoreCommitter(FileStoreTable table, TableCommit commit, Context context)

if (context.metricGroup() != null) {
this.commit.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup()));
this.committerMetrics = new CommitterMetrics(context.metricGroup().getIOMetricGroup());
this.committerMetrics = new CommitterMetrics(context.metricGroup());
} else {
this.committerMetrics = null;
}
Expand Down Expand Up @@ -117,13 +117,12 @@ public int filterAndCommit(
}

@Override
public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> committables) {
try {
commitListeners.snapshotState();
} catch (Exception e) {
throw new RuntimeException(e);
}
public void snapshotState() throws Exception {
commitListeners.snapshotState();
}

@Override
public Map<Long, List<Committable>> groupByCheckpoint(Collection<Committable> committables) {
Map<Long, List<Committable>> grouped = new HashMap<>();
for (Committable c : committables) {
grouped.computeIfAbsent(c.checkpointId(), k -> new ArrayList<>()).add(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ private Map<Identifier, List<ManifestCommittable>> groupByTable(
t -> t.f0, Collectors.mapping(t -> t.f1, Collectors.toList())));
}

@Override
public void snapshotState() throws Exception {
for (StoreCommitter committer : tableCommitters.values()) {
committer.snapshotState();
}
}

@Override
public Map<Long, List<MultiTableCommittable>> groupByCheckpoint(
Collection<MultiTableCommittable> committables) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions.PartitionMarkDoneActionMode;
import org.apache.paimon.flink.sink.state.StateStore;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
Expand All @@ -33,7 +34,6 @@
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.java.tuple.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,7 +66,7 @@ public static Optional<PartitionMarkDoneListener> create(
ClassLoader cl,
boolean isStreaming,
boolean isRestored,
OperatorStateStore stateStore,
StateStore stateStore,
FileStoreTable table)
throws Exception {
CoreOptions coreOptions = table.coreOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.sink.state.StateStore;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -231,7 +231,7 @@ private static class PartitionMarkDoneTriggerState implements State {
private final boolean isRestored;
private final ListState<List<String>> pendingPartitionsState;

public PartitionMarkDoneTriggerState(boolean isRestored, OperatorStateStore stateStore)
public PartitionMarkDoneTriggerState(boolean isRestored, StateStore stateStore)
throws Exception {
this.isRestored = isRestored;
this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
Expand All @@ -256,8 +256,7 @@ public void update(List<String> partitions) throws Exception {
}

public static PartitionMarkDoneTrigger create(
CoreOptions coreOptions, boolean isRestored, OperatorStateStore stateStore)
throws Exception {
CoreOptions coreOptions, boolean isRestored, StateStore stateStore) throws Exception {
Options options = coreOptions.toConfiguration();
return new PartitionMarkDoneTrigger(
new PartitionMarkDoneTrigger.PartitionMarkDoneTriggerState(isRestored, stateStore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.listener;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.state.StateStore;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -31,7 +32,6 @@

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class ReportPartStatsListener implements CommitListener {
private ReportPartStatsListener(
InternalRowPartitionComputer partitionComputer,
PartitionStatisticsReporter partitionStatisticsReporter,
OperatorStateStore store,
StateStore store,
boolean isRestored,
long idleTime)
throws Exception {
Expand Down Expand Up @@ -141,8 +141,7 @@ public void snapshotState() throws Exception {
}

public static Optional<ReportPartStatsListener> create(
boolean isRestored, OperatorStateStore stateStore, FileStoreTable table)
throws Exception {
boolean isRestored, StateStore stateStore, FileStoreTable table) throws Exception {

CoreOptions coreOptions = table.coreOptions();
Options options = coreOptions.toConfiguration();
Expand Down
Loading
Loading