@@ -602,6 +602,50 @@ private void verifyTableBucketInBatches(
602602 assertThat (tableBucketsInBatch ).containsExactlyInAnyOrder (tb );
603603 }
604604
605+ @ Test
606+ void testDrainContinuesWhenBucketAtMaxInflight () throws Exception {
607+ int batchSize = 1024 ;
608+ conf .set (ConfigOptions .CLIENT_WRITER_BATCH_TIMEOUT , Duration .ofMillis (0 ));
609+ conf .set (ConfigOptions .CLIENT_WRITER_BUFFER_MEMORY_SIZE , new MemorySize (10L * batchSize ));
610+ conf .set (ConfigOptions .CLIENT_WRITER_BUFFER_PAGE_SIZE , new MemorySize (256 ));
611+ conf .set (ConfigOptions .CLIENT_WRITER_BATCH_SIZE , new MemorySize (batchSize ));
612+
613+ IdempotenceManager idempotenceManager =
614+ new IdempotenceManager (true , /* maxInflightPerBucket */ 1 , null , null );
615+ idempotenceManager .setWriterId (1L );
616+
617+ RecordAccumulator accum =
618+ new RecordAccumulator (
619+ conf , idempotenceManager , TestingWriterMetricGroup .newInstance (), clock );
620+
621+ cluster = updateCluster (Arrays .asList (bucket1 , bucket2 ));
622+ IndexedRow row = indexedRow (DATA1_ROW_TYPE , new Object [] {1 , "a" });
623+
624+ // Drain both buckets so each has 1 in-flight batch.
625+ accum .append (createRecord (row ), writeCallback , cluster , tb1 .getBucket (), false );
626+ accum .append (createRecord (row ), writeCallback , cluster , tb2 .getBucket (), false );
627+
628+ Map <Integer , List <ReadyWriteBatch >> firstDrain =
629+ accum .drain (cluster , Collections .singleton (node1 .id ()), Integer .MAX_VALUE );
630+ List <ReadyWriteBatch > firstBatches = firstDrain .get (node1 .id ());
631+ assertThat (firstBatches ).hasSize (2 );
632+
633+ // Complete only tb2, leaving tb1 at max in-flight.
634+ ReadyWriteBatch tb2Batch =
635+ firstBatches .stream ().filter (b -> b .tableBucket ().equals (tb2 )).findFirst ().get ();
636+ idempotenceManager .handleCompletedBatch (tb2Batch );
637+
638+ // Append again to both. On drain, tb1 should be skipped but tb2 should still be drained.
639+ accum .append (createRecord (row ), writeCallback , cluster , tb1 .getBucket (), false );
640+ accum .append (createRecord (row ), writeCallback , cluster , tb2 .getBucket (), false );
641+
642+ Map <Integer , List <ReadyWriteBatch >> secondDrain =
643+ accum .drain (cluster , Collections .singleton (node1 .id ()), Integer .MAX_VALUE );
644+ List <ReadyWriteBatch > secondBatches = secondDrain .get (node1 .id ());
645+ assertThat (secondBatches ).hasSize (1 );
646+ assertThat (secondBatches .get (0 ).tableBucket ()).isEqualTo (tb2 );
647+ }
648+
605649 /** Return the offset delta. */
606650 private int expectedNumAppends (IndexedRow row , int batchSize ) {
607651 int size = recordBatchHeaderSize (CURRENT_LOG_MAGIC_VALUE );
0 commit comments