Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,8 @@ private List<ReadyWriteBatch> drainBatchesForOneNode(Cluster cluster, Integer no
break;
} else {
if (shouldStopDrainBatchesForBucket(first, tableBucket)) {
break;
// Buckets are independent — skip this one, keep draining others.
continue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,50 @@ private void verifyTableBucketInBatches(
assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb);
}

@Test
void testDrainContinuesWhenBucketAtMaxInflight() throws Exception {
int batchSize = 1024;
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(0));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(10L * batchSize));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(256));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(batchSize));

IdempotenceManager idempotenceManager =
new IdempotenceManager(true, /* maxInflightPerBucket */ 1, null, null);
idempotenceManager.setWriterId(1L);

RecordAccumulator accum =
new RecordAccumulator(
conf, idempotenceManager, TestingWriterMetricGroup.newInstance(), clock);

cluster = updateCluster(Arrays.asList(bucket1, bucket2));
IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});

// Drain both buckets so each has 1 in-flight batch.
accum.append(createRecord(row), writeCallback, cluster, tb1.getBucket(), false);
accum.append(createRecord(row), writeCallback, cluster, tb2.getBucket(), false);

Map<Integer, List<ReadyWriteBatch>> firstDrain =
accum.drain(cluster, Collections.singleton(node1.id()), Integer.MAX_VALUE);
List<ReadyWriteBatch> firstBatches = firstDrain.get(node1.id());
assertThat(firstBatches).hasSize(2);

// Complete only tb2, leaving tb1 at max in-flight.
ReadyWriteBatch tb2Batch =
firstBatches.stream().filter(b -> b.tableBucket().equals(tb2)).findFirst().get();
idempotenceManager.handleCompletedBatch(tb2Batch);

// Append again to both. On drain, tb1 should be skipped but tb2 should still be drained.
accum.append(createRecord(row), writeCallback, cluster, tb1.getBucket(), false);
accum.append(createRecord(row), writeCallback, cluster, tb2.getBucket(), false);

Map<Integer, List<ReadyWriteBatch>> secondDrain =
accum.drain(cluster, Collections.singleton(node1.id()), Integer.MAX_VALUE);
List<ReadyWriteBatch> secondBatches = secondDrain.get(node1.id());
assertThat(secondBatches).hasSize(1);
assertThat(secondBatches.get(0).tableBucket()).isEqualTo(tb2);
}

/** Return the offset delta. */
private int expectedNumAppends(IndexedRow row, int batchSize) {
int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
Expand Down