From 414cb1a4929fb36730d5087a793075ef0e0cf52c Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 29 Mar 2026 03:46:56 +0100 Subject: [PATCH] [client][backport] Skip blocked bucket instead of stopping drain loop in idempotent writer --- .../fluss/client/write/RecordAccumulator.java | 3 +- .../client/write/RecordAccumulatorTest.java | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 991ea7c8ea..70988d0e25 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -729,7 +729,8 @@ private List drainBatchesForOneNode(Cluster cluster, Integer no break; } else { if (shouldStopDrainBatchesForBucket(first, tableBucket)) { - break; + // Buckets are independent — skip this one, keep draining others. + continue; } } diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 773c12a445..d9e2291cec 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -602,6 +602,50 @@ private void verifyTableBucketInBatches( assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb); } + @Test + void testDrainContinuesWhenBucketAtMaxInflight() throws Exception { + int batchSize = 1024; + conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(0)); + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(10L * batchSize)); + conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(256)); + conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(batchSize)); + + IdempotenceManager idempotenceManager = + new IdempotenceManager(true, /* maxInflightPerBucket */ 1, null, null); + idempotenceManager.setWriterId(1L); + + RecordAccumulator accum = + new RecordAccumulator( + conf, idempotenceManager, TestingWriterMetricGroup.newInstance(), clock); + + cluster = updateCluster(Arrays.asList(bucket1, bucket2)); + IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); + + // Drain both buckets so each has 1 in-flight batch. + accum.append(createRecord(row), writeCallback, cluster, tb1.getBucket(), false); + accum.append(createRecord(row), writeCallback, cluster, tb2.getBucket(), false); + + Map> firstDrain = + accum.drain(cluster, Collections.singleton(node1.id()), Integer.MAX_VALUE); + List firstBatches = firstDrain.get(node1.id()); + assertThat(firstBatches).hasSize(2); + + // Complete only tb2, leaving tb1 at max in-flight. + ReadyWriteBatch tb2Batch = + firstBatches.stream().filter(b -> b.tableBucket().equals(tb2)).findFirst().get(); + idempotenceManager.handleCompletedBatch(tb2Batch); + + // Append again to both. On drain, tb1 should be skipped but tb2 should still be drained. + accum.append(createRecord(row), writeCallback, cluster, tb1.getBucket(), false); + accum.append(createRecord(row), writeCallback, cluster, tb2.getBucket(), false); + + Map> secondDrain = + accum.drain(cluster, Collections.singleton(node1.id()), Integer.MAX_VALUE); + List secondBatches = secondDrain.get(node1.id()); + assertThat(secondBatches).hasSize(1); + assertThat(secondBatches.get(0).tableBucket()).isEqualTo(tb2); + } + /** Return the offset delta. */ private int expectedNumAppends(IndexedRow row, int batchSize) { int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);