@@ -1428,7 +1428,7 @@ protected void pollAndInvoke() {
14281428 doProcessCommits ();
14291429 fixTxOffsetsIfNeeded ();
14301430 idleBetweenPollIfNecessary ();
1431- if (this .seeks .size () > 0 ) {
1431+ if (! this .seeks .isEmpty () ) {
14321432 processSeeks ();
14331433 }
14341434 pauseConsumerIfNecessary ();
@@ -1586,7 +1586,7 @@ private void fixTxOffsetsIfNeeded() {
15861586 toFix .put (tp , createOffsetAndMetadata (position ));
15871587 }
15881588 });
1589- if (toFix .size () > 0 ) {
1589+ if (! toFix .isEmpty () ) {
15901590 this .logger .debug (() -> "Fixing TX offsets: " + toFix );
15911591 if (this .kafkaTxManager == null ) {
15921592 if (this .syncCommits ) {
@@ -1689,7 +1689,7 @@ private synchronized void captureOffsets(ConsumerRecords<K, V> records) {
16891689 }
16901690
16911691 private void checkRebalanceCommits () {
1692- if (this .commitsDuringRebalance .size () > 0 ) {
1692+ if (! this .commitsDuringRebalance .isEmpty () ) {
16931693 // Attempt to recommit the offsets for partitions that we still own
16941694 Map <TopicPartition , OffsetAndMetadata > commits = this .commitsDuringRebalance .entrySet ()
16951695 .stream ()
@@ -1729,7 +1729,8 @@ private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
17291729 .flatMap (p -> records .records (p ).stream ())
17301730 // map to same format as send metadata toString()
17311731 .map (r -> r .topic () + "-" + r .partition () + "@" + r .offset ())
1732- .collect (Collectors .toList ()).toString ());
1732+ .toList ()
1733+ .toString ());
17331734 }
17341735 }
17351736 }
@@ -1756,11 +1757,11 @@ private void pauseConsumerIfNecessary() {
17561757 }
17571758
17581759 private void doPauseConsumerIfNecessary () {
1759- if (this .pausedForNack .size () > 0 ) {
1760+ if (! this .pausedForNack .isEmpty () ) {
17601761 this .logger .debug ("Still paused for nack sleep" );
17611762 return ;
17621763 }
1763- if (this .offsetsInThisBatch != null && this .offsetsInThisBatch .size () > 0 && !this .pausedForAsyncAcks ) {
1764+ if (this .offsetsInThisBatch != null && ! this .offsetsInThisBatch .isEmpty () && !this .pausedForAsyncAcks ) {
17641765 this .pausedForAsyncAcks = true ;
17651766 this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
17661767 }
@@ -1801,7 +1802,7 @@ else if (this.offsetsInThisBatch != null) {
18011802 }
18021803
18031804 private void doResumeConsumerIfNeccessary () {
1804- if (this .pausedForAsyncAcks && this .offsetsInThisBatch .size () == 0 ) {
1805+ if (this .pausedForAsyncAcks && this .offsetsInThisBatch .isEmpty () ) {
18051806 this .pausedForAsyncAcks = false ;
18061807 this .logger .debug ("Resuming after manual async acks cleared" );
18071808 }
@@ -1823,8 +1824,8 @@ private void pausePartitionsIfNecessary() {
18231824 .stream ()
18241825 .filter (tp -> isPartitionPauseRequested (tp )
18251826 && !pausedConsumerPartitions .contains (tp ))
1826- .collect ( Collectors . toList () );
1827- if (partitionsToPause .size () > 0 ) {
1827+ .toList ();
1828+ if (! partitionsToPause .isEmpty () ) {
18281829 this .consumer .pause (partitionsToPause );
18291830 this .pausedPartitions .addAll (partitionsToPause );
18301831 this .logger .debug (() -> "Paused consumption from " + partitionsToPause );
@@ -1840,8 +1841,8 @@ private void resumePartitionsIfNecessary() {
18401841 .stream ()
18411842 .filter (tp -> !isPartitionPauseRequested (tp )
18421843 && this .pausedPartitions .contains (tp ))
1843- .collect ( Collectors . toList () );
1844- if (partitionsToResume .size () > 0 ) {
1844+ .toList ();
1845+ if (! partitionsToResume .isEmpty () ) {
18451846 this .consumer .resume (partitionsToResume );
18461847 this .pausedPartitions .removeAll (partitionsToResume );
18471848 this .logger .debug (() -> "Resumed consumption from " + partitionsToResume );
@@ -1877,7 +1878,7 @@ private void checkIdle() {
18771878 private void idleBetweenPollIfNecessary () {
18781879 long idleBetweenPolls = this .containerProperties .getIdleBetweenPolls ();
18791880 Collection <TopicPartition > assigned = getAssignedPartitions ();
1880- if (idleBetweenPolls > 0 && assigned != null && assigned .size () > 0 ) {
1881+ if (idleBetweenPolls > 0 && assigned != null && ! assigned .isEmpty () ) {
18811882 idleBetweenPolls = Math .min (idleBetweenPolls ,
18821883 this .maxPollInterval - (System .currentTimeMillis () - this .lastPoll )
18831884 - 5000 ); // NOSONAR - less by five seconds to avoid race condition with rebalance
@@ -1960,7 +1961,7 @@ protected void handleConsumerException(Exception e) {
19601961
19611962 private void commitPendingAcks () {
19621963 processCommits ();
1963- if (this .offsets .size () > 0 ) {
1964+ if (! this .offsets .isEmpty () ) {
19641965 // we always commit after stopping the invoker
19651966 commitIfNecessary ();
19661967 }
@@ -2056,19 +2057,19 @@ private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
20562057 TopicPartition part = new TopicPartition (record .topic (), record .partition ());
20572058 List <Long > offs = this .offsetsInThisBatch .get (part );
20582059 List <ConsumerRecord <K , V >> deferred = this .deferredOffsets .get (part );
2059- if (offs .size () > 0 ) {
2060+ if (! offs .isEmpty () ) {
20602061 if (offs .get (0 ) == record .offset ()) {
20612062 offs .remove (0 );
20622063 ConsumerRecord <K , V > recordToAck = record ;
2063- if (deferred .size () > 0 ) {
2064+ if (! deferred .isEmpty () ) {
20642065 Collections .sort (deferred , (a , b ) -> Long .compare (a .offset (), b .offset ()));
2065- while (deferred .size () > 0 && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
2066+ while (! deferred .isEmpty () && deferred .get (0 ).offset () == recordToAck .offset () + 1 ) {
20662067 recordToAck = deferred .remove (0 );
20672068 offs .remove (0 );
20682069 }
20692070 }
20702071 processAck (recordToAck );
2071- if (offs .size () == 0 ) {
2072+ if (offs .isEmpty () ) {
20722073 this .deferredOffsets .remove (part );
20732074 this .offsetsInThisBatch .remove (part );
20742075 }
@@ -2148,7 +2149,7 @@ private void invokeBatchListener(final ConsumerRecords<K, V> recordsArg) {
21482149 if (!this .wantsFullRecords ) {
21492150 recordList = createRecordList (records );
21502151 }
2151- if (this .wantsFullRecords || recordList .size () > 0 ) {
2152+ if (this .wantsFullRecords || ! recordList .isEmpty () ) {
21522153 if (this .transactionTemplate != null ) {
21532154 invokeBatchListenerInTx (records , recordList ); // NOSONAR
21542155 }
@@ -2625,7 +2626,7 @@ private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
26252626 remaining .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
26262627 tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
26272628 }
2628- if (remaining .size () > 0 ) {
2629+ if (! remaining .isEmpty () ) {
26292630 this .remainingRecords = new ConsumerRecords <>(remaining );
26302631 return true ;
26312632 }
@@ -2693,7 +2694,7 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
26932694 Iterator <ConsumerRecord <K , V >> iterator2 = records .iterator ();
26942695 while (iterator2 .hasNext ()) {
26952696 ConsumerRecord <K , V > next = iterator2 .next ();
2696- if (list .size () > 0 || recordsEqual (record , next )) {
2697+ if (! list .isEmpty () || recordsEqual (record , next )) {
26972698 list .add (next );
26982699 }
26992700 }
@@ -2908,7 +2909,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
29082909 records .computeIfAbsent (new TopicPartition (next .topic (), next .partition ()),
29092910 tp -> new ArrayList <ConsumerRecord <K , V >>()).add (next );
29102911 }
2911- if (records .size () > 0 ) {
2912+ if (! records .isEmpty () ) {
29122913 this .remainingRecords = new ConsumerRecords <>(records );
29132914 this .pauseForPending = true ;
29142915 }
@@ -3180,10 +3181,10 @@ private void initPartitionsIfNeeded() {
31803181 private void doInitialSeeks (Map <TopicPartition , OffsetMetadata > partitions , Set <TopicPartition > beginnings ,
31813182 Set <TopicPartition > ends ) {
31823183
3183- if (beginnings .size () > 0 ) {
3184+ if (! beginnings .isEmpty () ) {
31843185 this .consumer .seekToBeginning (beginnings );
31853186 }
3186- if (ends .size () > 0 ) {
3187+ if (! ends .isEmpty () ) {
31873188 this .consumer .seekToEnd (ends );
31883189 }
31893190 for (Entry <TopicPartition , OffsetMetadata > entry : partitions .entrySet ()) {
@@ -3312,7 +3313,7 @@ public void seekToBeginning(String topic, int partition) {
33123313 public void seekToBeginning (Collection <TopicPartition > partitions ) {
33133314 this .seeks .addAll (partitions .stream ()
33143315 .map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .BEGINNING ))
3315- .collect ( Collectors . toList () ));
3316+ .toList ());
33163317 }
33173318
33183319 @ Override
@@ -3324,7 +3325,7 @@ public void seekToEnd(String topic, int partition) {
33243325 public void seekToEnd (Collection <TopicPartition > partitions ) {
33253326 this .seeks .addAll (partitions .stream ()
33263327 .map (tp -> new TopicPartitionOffset (tp .topic (), tp .partition (), SeekPosition .END ))
3327- .collect ( Collectors . toList () ));
3328+ .toList ());
33283329 }
33293330
33303331 @ Override
@@ -3564,15 +3565,15 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35643565 toRepause .add (tp );
35653566 }
35663567 });
3567- if (!ListenerConsumer .this .consumerPaused && toRepause .size () > 0 ) {
3568+ if (!ListenerConsumer .this .consumerPaused && ! toRepause .isEmpty () ) {
35683569 ListenerConsumer .this .consumer .pause (toRepause );
35693570 ListenerConsumer .this .logger .debug (() -> "Paused consumption from: " + toRepause );
35703571 publishConsumerPausedEvent (toRepause , "Re-paused after rebalance" );
35713572 }
35723573 this .revoked .removeAll (toRepause );
35733574 ListenerConsumer .this .pausedPartitions .removeAll (this .revoked );
35743575 this .revoked .clear ();
3575- if (ListenerConsumer .this .pausedForNack .size () > 0 ) {
3576+ if (! ListenerConsumer .this .pausedForNack .isEmpty () ) {
35763577 ListenerConsumer .this .consumer .pause (ListenerConsumer .this .pausedForNack );
35773578 }
35783579 }
@@ -3597,7 +3598,7 @@ private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partition
35973598 return false ;
35983599 }
35993600 }
3600- if (offsetsToCommit .size () > 0 ) {
3601+ if (! offsetsToCommit .isEmpty () ) {
36013602 commitCurrentOffsets (offsetsToCommit );
36023603 }
36033604 return true ;
0 commit comments