Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,19 @@ public void wakeUp() {
@Override
public void close() throws Exception {
currentSchemaChangeEvents.clear();
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
try {
if (currentFirstBatch != null) {
try {
currentFirstBatch.releaseBatch();
} finally {
currentFirstBatch = null;
}
}
} finally {
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
}
}
}
}
Expand All @@ -199,7 +209,7 @@ private void checkSplitOrStartNext() throws IOException {

final TableAwareFileStoreSourceSplit nextSplit = splits.poll();
if (nextSplit == null) {
return;
throw new IOException("Cannot fetch from another split - no split remaining");
}

// update metric when split changes
Expand Down Expand Up @@ -320,8 +330,11 @@ public RecordAndPosition<Event> next() {

@Override
public void releaseBatch() {
this.iterator.releaseBatch();
pool.recycler().recycle(this);
try {
this.iterator.releaseBatch();
} finally {
pool.recycler().recycle(this);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.pipeline.cdc.source.CDCSource;
import org.apache.paimon.flink.pipeline.cdc.source.TableAwareFileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceReaderTest.DummyMetricGroup;
import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -69,13 +71,20 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link CDCSourceSplitReader}. */
public class CDCSourceSplitReaderTest {
Expand Down Expand Up @@ -444,6 +453,68 @@ public void testMultipleSplits() throws Exception {
reader.close();
}

@Test
public void testNoSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
CDCSourceSplitReader reader = createReader(rw.createReadWithKey());
assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
reader.close();
}

@Test
public void testRecycleIteratorWhenReleaseBatchFails() throws Exception {
CDCSourceSplitReader reader =
createReader(
new TestingTableRead(
new SingleBatchRecordReader(new FailingReleaseIterator())));
try {
assignSplit(
reader,
new TableAwareFileStoreSourceSplit(
"id1",
new TestingSplit(),
0,
Identifier.create(DATABASE, TABLE),
1L,
1L));

RecordsWithSplitIds<RecordIterator<Event>> records = reader.fetch();
assertThatThrownBy(records::recycle).hasMessageContaining("release failed");

RecordsWithSplitIds<RecordIterator<Event>> finishedRecords =
fetchWithoutWaitingForPool(reader);
assertThat(finishedRecords.finishedSplits()).isEqualTo(Collections.singleton("id1"));
} finally {
reader.close();
}
}

@Test
public void testCloseReleasesCurrentFirstBatch() throws Exception {
TrackingRecordIterator iterator = new TrackingRecordIterator();
CDCSourceSplitReader reader =
createReader(new TestingTableRead(new SingleBatchRecordReader(iterator)));
try {
assignSplit(
reader,
new TableAwareFileStoreSourceSplit(
"id1",
new TestingSplit(),
1,
Identifier.create(DATABASE, TABLE),
1L,
1L));
reader.wakeUp();

RecordsWithSplitIds<RecordIterator<Event>> records = reader.fetch();
assertThat(records.finishedSplits()).isEmpty();
assertThat(records.nextSplit()).isNull();
} finally {
reader.close();
}
assertThat(iterator.released()).isTrue();
}

@Test
public void testPauseOrResumeSplits() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tablePath);
Expand Down Expand Up @@ -622,6 +693,22 @@ private void assignSplit(CDCSourceSplitReader reader, TableAwareFileStoreSourceS
reader.handleSplitsChanges(splitsChange);
}

private RecordsWithSplitIds<RecordIterator<Event>> fetchWithoutWaitingForPool(
CDCSourceSplitReader reader) throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<RecordsWithSplitIds<RecordIterator<Event>>> future =
executorService.submit(reader::fetch);
try {
return future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
reader.wakeUp();
future.get(5, TimeUnit.SECONDS);
throw new AssertionError("Timed out waiting for split reader fetch.", e);
} finally {
executorService.shutdownNow();
}
}

public static TableAwareFileStoreSourceSplit newSourceSplit(
String id, BinaryRow partition, int bucket, List<DataFileMeta> files) {
return newSourceSplit(id, partition, bucket, files, false, 0);
Expand Down Expand Up @@ -732,4 +819,112 @@ public List<SchemaChangeEvent> generateSchemaChangeEventList(
return schemaChangeEvents;
}
}

private static class TestingTableRead implements TableRead {

private final RecordReader<InternalRow> recordReader;

private TestingTableRead(RecordReader<InternalRow> recordReader) {
this.recordReader = recordReader;
}

@Override
public TableRead withMetricRegistry(MetricRegistry registry) {
return this;
}

@Override
public TableRead executeFilter() {
return this;
}

@Override
public TableRead withIOManager(IOManager ioManager) {
return this;
}

@Override
public RecordReader<InternalRow> createReader(Split split) {
return recordReader;
}
}

private static class SingleBatchRecordReader implements RecordReader<InternalRow> {

private final RecordReader.RecordIterator<InternalRow> iterator;
private boolean returned;

private SingleBatchRecordReader(RecordReader.RecordIterator<InternalRow> iterator) {
this.iterator = iterator;
}

@Nullable
@Override
public RecordReader.RecordIterator<InternalRow> readBatch() {
if (returned) {
return null;
}

returned = true;
return iterator;
}

@Override
public void close() {}
}

private static class FailingReleaseIterator
implements RecordReader.RecordIterator<InternalRow> {

@Nullable
@Override
public InternalRow next() {
return null;
}

@Override
public void releaseBatch() {
throw new RuntimeException("release failed");
}
}

private static class TrackingRecordIterator
implements RecordReader.RecordIterator<InternalRow> {

private boolean returned;
private boolean released;

@Nullable
@Override
public InternalRow next() {
if (returned) {
return null;
}

returned = true;
return GenericRow.of(1L);
}

@Override
public void releaseBatch() {
released = true;
}

private boolean released() {
return released;
}
}

private static class TestingSplit implements Split {

@Override
public long rowCount() {
return 0;
}

@Override
public OptionalLong mergedRowCount() {
return OptionalLong.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,19 @@ public void wakeUp() {

@Override
public void close() throws Exception {
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
try {
if (currentFirstBatch != null) {
try {
currentFirstBatch.releaseBatch();
} finally {
currentFirstBatch = null;
}
}
} finally {
if (currentReader != null) {
if (currentReader.lazyRecordReader != null) {
currentReader.lazyRecordReader.close();
}
}
}
}
Expand Down Expand Up @@ -319,8 +329,11 @@ public RecordAndPosition<RowData> next() {

@Override
public void releaseBatch() {
this.iterator.releaseBatch();
pool.recycler().recycle(this);
try {
this.iterator.releaseBatch();
} finally {
pool.recycler().recycle(this);
}
}
}

Expand Down
Loading