From 0a84a228af6d5fd31559516aeed6b34faab05298 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Mon, 19 Dec 2022 16:05:53 +0800 Subject: [PATCH 1/7] [improve][pip-230] Throw exception when MessageIdImpl and BatchMessageIdImpl compare with each other --- .../client/impl/BatchMessageIdImpl.java | 20 ++- .../pulsar/client/impl/MessageIdImpl.java | 21 +-- ...sistentAcknowledgmentsGroupingTracker.java | 4 +- .../client/impl/MessageIdCompareToTest.java | 122 ++++++++++-------- 4 files changed, 101 insertions(+), 66 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 7e3a143dff8e0..9382181973b4d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import javax.annotation.Nonnull; +import com.google.common.collect.ComparisonChain; import org.apache.pulsar.client.api.MessageId; /** @@ -71,11 +72,14 @@ public int getBatchIndex() { @Override public int compareTo(@Nonnull MessageId o) { if (o instanceof MessageIdImpl) { + if (!(o instanceof BatchMessageIdImpl)) { + throw new UnsupportedOperationException(this.getClass().getName() + + " can't compare with " + o.getClass().getName()); + } MessageIdImpl other = (MessageIdImpl) o; - int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).batchIndex : NO_BATCH; return messageIdCompare( this.ledgerId, this.entryId, this.partitionIndex, this.batchIndex, - other.ledgerId, other.entryId, other.partitionIndex, batchIndex + other.ledgerId, other.entryId, other.partitionIndex, ((BatchMessageIdImpl) o).batchIndex ); } else if (o instanceof TopicMessageIdImpl) { return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); @@ -140,4 +144,16 @@ public BatchMessageAcker getAcker() { return acker; } + static int messageIdCompare( + long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1, + long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2 + ) { + return ComparisonChain.start() + .compare(ledgerId1, ledgerId2) + .compare(entryId1, entryId2) + .compare(partitionIndex1, partitionIndex2) + .compare(batchIndex1, batchIndex2) + .result(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 02298e0f9d66d..57d6b8e2d3a1a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -204,11 +204,13 @@ public byte[] toByteArray() { @Override public int compareTo(@Nonnull MessageId o) { if (o instanceof MessageIdImpl) { + if (o instanceof BatchMessageIdImpl) { + throw new UnsupportedOperationException(this.getClass().getName() + + " can't compare with " + o.getClass().getName()); + } MessageIdImpl other = (MessageIdImpl) o; - int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH; return messageIdCompare( - this.ledgerId, this.entryId, this.partitionIndex, NO_BATCH, - other.ledgerId, other.entryId, other.partitionIndex, batchIndex + this.ledgerId, this.entryId, this.partitionIndex, other.ledgerId, other.entryId, other.partitionIndex ); } else if (o instanceof TopicMessageIdImpl) { return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); @@ -222,14 +224,13 @@ static int messageIdHashCode(long ledgerId, long entryId, int partitionIndex, in } static int messageIdCompare( - long ledgerId1, long entryId1, int partitionIndex1, int batchIndex1, - long ledgerId2, long entryId2, int partitionIndex2, int batchIndex2 + long ledgerId1, long entryId1, int partitionIndex1, + long ledgerId2, long entryId2, int partitionIndex2 ) { return ComparisonChain.start() - .compare(ledgerId1, ledgerId2) - .compare(entryId1, entryId2) - .compare(partitionIndex1, partitionIndex2) - .compare(batchIndex1, batchIndex2) - .result(); + .compare(ledgerId1, ledgerId2) + .compare(entryId1, entryId2) + .compare(partitionIndex1, partitionIndex2) + .result(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index fef0bcb8906f1..ca81000cfd2e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -666,13 +666,13 @@ public synchronized int compareTo(MessageId messageId) { if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) { final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId; final MessageIdImpl rhs = (MessageIdImpl) messageId; - return MessageIdImpl.messageIdCompare( + return BatchMessageIdImpl.messageIdCompare( lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(), rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE); } else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){ final MessageIdImpl lhs = this.messageId; final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId; - return MessageIdImpl.messageIdCompare( + return BatchMessageIdImpl.messageIdCompare( lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE, rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex()); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 4f0eca6ea4af8..595916c8a1377 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -34,6 +34,14 @@ */ public class MessageIdCompareToTest { + private static final String COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE = + "org.apache.pulsar.client.impl.MessageIdImpl " + + "can't compare with org.apache.pulsar.client.impl.BatchMessageIdImpl"; + + private static final String COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE = + "org.apache.pulsar.client.impl.BatchMessageIdImpl " + + "can't compare with org.apache.pulsar.client.impl.MessageIdImpl"; + @Test public void testEqual() { MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); @@ -111,82 +119,92 @@ public void testLessThan() { } @Test - public void testCompareDifferentType() { + public void testMessageIdImplCompareToBatchMessageIdImpl() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 566, 789); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 789); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(messageIdImpl); - assertTrue(messageIdImpl.compareTo(batchMessageId1) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertEquals(messageIdImpl.compareTo(batchMessageId3), 0, "Expected to be equal"); - assertTrue(batchMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); - assertTrue(batchMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); - assertEquals(batchMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); - } + BatchMessageIdImpl batchMessageId = + new BatchMessageIdImpl(123L, 345L, 566, 789); + try { + messageIdImpl.compareTo(batchMessageId); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); + } + + try { + batchMessageId.compareTo(messageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); + } - @Test - public void compareToSymmetricTest() { - MessageIdImpl simpleMessageId = new MessageIdImpl(123L, 345L, 567); - // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, -1); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 1); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(123L, 345L, 566, 1); - BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(123L, 345L, 566, -1); - - assertEquals(simpleMessageId.compareTo(batchMessageId1), 0, "Expected to be equal"); - assertEquals(batchMessageId1.compareTo(simpleMessageId), 0, "Expected to be equal"); - assertTrue(batchMessageId2.compareTo(simpleMessageId) > 0, "Expected to be greater than"); - assertTrue(simpleMessageId.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertTrue(simpleMessageId.compareTo(batchMessageId3) > 0, "Expected to be greater than"); - assertTrue(batchMessageId3.compareTo(simpleMessageId) < 0, "Expected to be less than"); - assertTrue(simpleMessageId.compareTo(batchMessageId4) > 0, "Expected to be greater than"); - assertTrue(batchMessageId4.compareTo(simpleMessageId) < 0, "Expected to be less than"); } @Test public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + TopicMessageIdImpl topicBatchMessageIdImpl = new TopicMessageIdImpl( "test-topic-partition-0", "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( "test-topic-partition-0", "test-topic", - new BatchMessageIdImpl(123L, 345L, 567, 789)); + new MessageIdImpl(123L, 346L, 567)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( "test-topic-partition-0", "test-topic", - new BatchMessageIdImpl(messageIdImpl)); - assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); + messageIdImpl); + try { + messageIdImpl.compareTo(topicBatchMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); + } + + try { + topicBatchMessageIdImpl.compareTo(messageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); + } assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than"); assertEquals(messageIdImpl.compareTo(topicMessageId3), 0, "Expected to be equal"); - assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); assertEquals(topicMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); } @Test public void testBatchMessageIdImplCompareToTopicMessageId() { - BatchMessageIdImpl messageIdImpl1 = new BatchMessageIdImpl(123L, 345L, 567, 789); - BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); - BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); - TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new MessageIdImpl(123L, 345L, 566)); - TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new MessageIdImpl(123L, 345L, 567)); - assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than"); - assertEquals(messageIdImpl3.compareTo(topicMessageId2), 0, "Expected to be equal"); - assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); + BatchMessageIdImpl messageIdImpl = new BatchMessageIdImpl(123L, 345L, 567, 789); + TopicMessageIdImpl topicBatchMessageId = new TopicMessageIdImpl( + "test-topic-partition-0", + "test-topic", + new BatchMessageIdImpl(123L, 345L, 566, 788)); + TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl( + "test-topic-partition-0", + "test-topic", + new MessageIdImpl(123L, 345L, 566)); + assertTrue(messageIdImpl.compareTo(topicBatchMessageId) > 0, "Expected to be greater than"); + assertTrue(topicBatchMessageId.compareTo(messageIdImpl) < 0, "Expected to be less than"); + try { + messageIdImpl.compareTo(topicMessageId); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); + } + + try { + topicMessageId.compareTo(messageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); + } } @Test From 927e4d2aa55720d2a9d6eaf388714cbb3aaf7748 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Wed, 21 Dec 2022 00:53:44 +0800 Subject: [PATCH 2/7] fix the code style --- .../java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 9382181973b4d..b405e788d4a42 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -18,8 +18,8 @@ */ package org.apache.pulsar.client.impl; -import javax.annotation.Nonnull; import com.google.common.collect.ComparisonChain; +import javax.annotation.Nonnull; import org.apache.pulsar.client.api.MessageId; /** From 90adc8e6bce139278c3e63f4526439aeac718b76 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Wed, 21 Dec 2022 21:17:42 +0800 Subject: [PATCH 3/7] fix some comment --- .../client/impl/BatchMessageIdImpl.java | 22 +-- .../pulsar/client/impl/MessageIdImpl.java | 15 ++- .../client/impl/MessageIdCompareToTest.java | 126 ++++++++---------- 3 files changed, 81 insertions(+), 82 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index b405e788d4a42..30e7c3004095a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -72,15 +72,21 @@ public int getBatchIndex() { @Override public int compareTo(@Nonnull MessageId o) { if (o instanceof MessageIdImpl) { - if (!(o instanceof BatchMessageIdImpl)) { - throw new UnsupportedOperationException(this.getClass().getName() - + " can't compare with " + o.getClass().getName()); - } MessageIdImpl other = (MessageIdImpl) o; - return messageIdCompare( - this.ledgerId, this.entryId, this.partitionIndex, this.batchIndex, - other.ledgerId, other.entryId, other.partitionIndex, ((BatchMessageIdImpl) o).batchIndex - ); + int compareWithoutBatchIndex = messageIdCompare( + this.ledgerId, this.entryId, this.partitionIndex, + other.ledgerId, other.entryId, other.partitionIndex); + if (compareWithoutBatchIndex != 0) { + return compareWithoutBatchIndex; + } else { + if (!(o instanceof BatchMessageIdImpl)) { + throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + + o.getClass().getName() + " when they have the same `LedgerId` and `EntryId`."); + } else { + return ComparisonChain.start().compare(this.batchIndex, + ((BatchMessageIdImpl) o).batchIndex).result(); + } + } } else if (o instanceof TopicMessageIdImpl) { return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 57d6b8e2d3a1a..8eb3373917055 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -204,14 +204,17 @@ public byte[] toByteArray() { @Override public int compareTo(@Nonnull MessageId o) { if (o instanceof MessageIdImpl) { - if (o instanceof BatchMessageIdImpl) { - throw new UnsupportedOperationException(this.getClass().getName() - + " can't compare with " + o.getClass().getName()); - } MessageIdImpl other = (MessageIdImpl) o; - return messageIdCompare( - this.ledgerId, this.entryId, this.partitionIndex, other.ledgerId, other.entryId, other.partitionIndex + int compareWithoutBatchIndex = messageIdCompare( + this.ledgerId, this.entryId, this.partitionIndex, + other.ledgerId, other.entryId, other.partitionIndex ); + if (compareWithoutBatchIndex != 0 || !(o instanceof BatchMessageIdImpl)) { + return compareWithoutBatchIndex; + } else { + throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + + o.getClass().getName() + " when they have the same `LedgerId` and `EntryId`."); + } } else if (o instanceof TopicMessageIdImpl) { return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); } else { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 595916c8a1377..709f30a3cb5f1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -34,14 +34,6 @@ */ public class MessageIdCompareToTest { - private static final String COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE = - "org.apache.pulsar.client.impl.MessageIdImpl " - + "can't compare with org.apache.pulsar.client.impl.BatchMessageIdImpl"; - - private static final String COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE = - "org.apache.pulsar.client.impl.BatchMessageIdImpl " - + "can't compare with org.apache.pulsar.client.impl.MessageIdImpl"; - @Test public void testEqual() { MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); @@ -119,92 +111,90 @@ public void testLessThan() { } @Test - public void testMessageIdImplCompareToBatchMessageIdImpl() { + public void testCompareDifferentType() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - BatchMessageIdImpl batchMessageId = - new BatchMessageIdImpl(123L, 345L, 566, 789); - try { - messageIdImpl.compareTo(batchMessageId); - fail(); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); - } + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 566, 789); + BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 789); + BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(messageIdImpl); + assertTrue(messageIdImpl.compareTo(batchMessageId1) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl.compareTo(batchMessageId2) < 0, "Expected to be less than"); + assertEquals(messageIdImpl.compareTo(batchMessageId3), 0, "Expected to be equal"); + assertTrue(batchMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); + assertTrue(batchMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); + assertEquals(batchMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); + } + + @Test + public void compareToSymmetricTest() { + MessageIdImpl simpleMessageId = new MessageIdImpl(123L, 345L, 567); + // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, -1); + BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 1); + BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(123L, 345L, 566, 1); + BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(123L, 345L, 566, -1); try { - batchMessageId.compareTo(messageIdImpl); + simpleMessageId.compareTo(batchMessageId1); fail(); } catch (Exception e) { assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); + assertEquals(e.getMessage(), "org.apache.pulsar.client.impl.MessageIdImpl " + + "can't compare with org.apache.pulsar.client.impl.BatchMessageIdImpl " + + "when they have the same `LedgerId` and `EntryId`."); } - + assertEquals(batchMessageId1.compareTo(simpleMessageId), 0, "Expected to be equal"); + assertTrue(batchMessageId2.compareTo(simpleMessageId) > 0, "Expected to be greater than"); + assertTrue(simpleMessageId.compareTo(batchMessageId2) < 0, "Expected to be less than"); + assertTrue(simpleMessageId.compareTo(batchMessageId3) > 0, "Expected to be greater than"); + assertTrue(batchMessageId3.compareTo(simpleMessageId) < 0, "Expected to be less than"); + assertTrue(simpleMessageId.compareTo(batchMessageId4) > 0, "Expected to be greater than"); + assertTrue(batchMessageId4.compareTo(simpleMessageId) < 0, "Expected to be less than"); } @Test public void testMessageIdImplCompareToTopicMessageId() { MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - TopicMessageIdImpl topicBatchMessageIdImpl = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new BatchMessageIdImpl(123L, 345L, 566, 789)); + TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( + "test-topic-partition-0", + "test-topic", + new BatchMessageIdImpl(123L, 345L, 566, 789)); TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new MessageIdImpl(123L, 346L, 567)); + "test-topic-partition-0", + "test-topic", + new BatchMessageIdImpl(123L, 345L, 567, 789)); TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - messageIdImpl); - try { - messageIdImpl.compareTo(topicBatchMessageIdImpl); - fail(); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); - } - - try { - topicBatchMessageIdImpl.compareTo(messageIdImpl); - fail(); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); - } + "test-topic-partition-0", + "test-topic", + new BatchMessageIdImpl(messageIdImpl)); + assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than"); assertEquals(messageIdImpl.compareTo(topicMessageId3), 0, "Expected to be equal"); + assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); assertEquals(topicMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); } @Test public void testBatchMessageIdImplCompareToTopicMessageId() { - BatchMessageIdImpl messageIdImpl = new BatchMessageIdImpl(123L, 345L, 567, 789); - TopicMessageIdImpl topicBatchMessageId = new TopicMessageIdImpl( + BatchMessageIdImpl messageIdImpl1 = new BatchMessageIdImpl(123L, 345L, 567, 789); + BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); + BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); + TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( "test-topic-partition-0", "test-topic", - new BatchMessageIdImpl(123L, 345L, 566, 788)); - TopicMessageIdImpl topicMessageId = new TopicMessageIdImpl( + new MessageIdImpl(123L, 345L, 566)); + TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( "test-topic-partition-0", "test-topic", - new MessageIdImpl(123L, 345L, 566)); - assertTrue(messageIdImpl.compareTo(topicBatchMessageId) > 0, "Expected to be greater than"); - assertTrue(topicBatchMessageId.compareTo(messageIdImpl) < 0, "Expected to be less than"); - try { - messageIdImpl.compareTo(topicMessageId); - fail(); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_BATCH_MESSAGE_ID_AND_MESSAGE_ID_THROW_MESSAGE); - } - - try { - topicMessageId.compareTo(messageIdImpl); - fail(); - } catch (Exception e) { - assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), COMPARE_MESSAGE_ID_AND_BATCH_MESSAGE_ID_THROW_MESSAGE); - } + new MessageIdImpl(123L, 345L, 567)); + assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than"); + assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than"); + assertEquals(messageIdImpl3.compareTo(topicMessageId2), 0, "Expected to be equal"); + assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl1) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); + assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); } @Test From f1529de0f5a0b9418d68ee46b27cc286871fe0e3 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Thu, 22 Dec 2022 15:31:01 +0800 Subject: [PATCH 4/7] delete some test and add some tset --- .../client/impl/BatchMessageIdImpl.java | 3 +- .../pulsar/client/impl/MessageIdImpl.java | 3 +- .../client/impl/MessageIdCompareToTest.java | 613 ++++++++++++++---- 3 files changed, 481 insertions(+), 138 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 30e7c3004095a..95631ebb66b03 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -81,7 +81,8 @@ public int compareTo(@Nonnull MessageId o) { } else { if (!(o instanceof BatchMessageIdImpl)) { throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " - + o.getClass().getName() + " when they have the same `LedgerId` and `EntryId`."); + + o.getClass().getName() + + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); } else { return ComparisonChain.start().compare(this.batchIndex, ((BatchMessageIdImpl) o).batchIndex).result(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 8eb3373917055..b463280b6d20f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -213,7 +213,8 @@ public int compareTo(@Nonnull MessageId o) { return compareWithoutBatchIndex; } else { throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " - + o.getClass().getName() + " when they have the same `LedgerId` and `EntryId`."); + + o.getClass().getName() + + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); } } else if (o instanceof TopicMessageIdImpl) { return compareTo(((TopicMessageIdImpl) o).getInnerMessageId()); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 709f30a3cb5f1..93ac386c06126 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -34,167 +34,508 @@ */ public class MessageIdCompareToTest { + private final static String BATCH_MESSAGE_ID_COMPARE_WITH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE = "org.apache.pulsar" + + ".client.impl.BatchMessageIdImpl can't compare with " + + "org.apache.pulsar.client.impl.MessageIdImpl when they " + + "have the same `LedgerId`, `EntryId` and `PartitionIndex`."; + + private final static String MESSAGE_ID_COMPARE_WITH_BATCH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE = "org.apache.pulsar" + + ".client.impl.MessageIdImpl can't compare with " + + "org.apache.pulsar.client.impl.BatchMessageIdImpl " + + "when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."; + + // MessageIdImpl + private final MessageIdImpl normalMessageIdImpl = new MessageIdImpl(123L, 345L, 567); + + // BatchMessageIdImpl + private final BatchMessageIdImpl normalBatchMessageId = + new BatchMessageIdImpl(123L, 345L, 567, 567); + private final TopicMessageIdImpl normalTopicMessageIdImplByMessageIdImpl = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", + new MessageIdImpl(123L, 345L, 567)); + + private final TopicMessageIdImpl normalTopicMessageIdImplByBatchMessageIdImpl = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", + new BatchMessageIdImpl(123L, 345L, 567, 567)); + + @Test public void testEqual() { MessageIdImpl messageIdImpl1 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(234L, 345L, 456, 567); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 345L, 456, 567); + BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, 567); + + TopicMessageIdImpl topicMessageIdWithMessageId = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", + new MessageIdImpl(123L, 345L, 567)); + + TopicMessageIdImpl topicMessageIdWithBatchMessageId = new TopicMessageIdImpl( + "test-topic-partition-0", "test-topic", + new BatchMessageIdImpl(123L, 345L, 567, 567)); + + + assertEquals(messageIdImpl1.compareTo(normalMessageIdImpl), 0, "Expected to be equal"); + assertEquals(messageIdImpl1.compareTo(normalTopicMessageIdImplByMessageIdImpl), 0, "Expected to be equal"); + + assertEquals(batchMessageId1.compareTo(normalBatchMessageId), 0, "Expected to be equal"); + assertEquals(batchMessageId1.compareTo(normalTopicMessageIdImplByBatchMessageIdImpl), + 0, "Expected to be equal"); + + assertEquals(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(normalBatchMessageId), 0, "Expected to be equal"); + assertEquals(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithBatchMessageId), 0, "Expected to be equal"); + + assertEquals(normalTopicMessageIdImplByMessageIdImpl.compareTo(normalMessageIdImpl), + 0, "Expected to be equal"); + assertEquals(normalTopicMessageIdImplByMessageIdImpl.compareTo(topicMessageIdWithMessageId), + 0, "Expected to be equal"); - assertEquals(messageIdImpl1.compareTo(messageIdImpl2), 0, "Expected to be equal"); - assertEquals(batchMessageId1.compareTo(batchMessageId2), 0, "Expected to be equal"); } @Test public void testGreaterThan() { - MessageIdImpl messageIdImpl1 = new MessageIdImpl(124L, 345L, 567); - MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl3 = new MessageIdImpl(123L, 344L, 567); - MessageIdImpl messageIdImpl4 = new MessageIdImpl(123L, 344L, 566); - - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(235L, 345L, 456, 567); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 346L, 456, 567); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(234L, 345L, 456, 568); - BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(234L, 345L, 457, 567); - BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(234L, 345L, 456, 567); - - assertTrue(messageIdImpl1.compareTo(messageIdImpl2) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl1.compareTo(messageIdImpl3) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl1.compareTo(messageIdImpl4) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl2.compareTo(messageIdImpl3) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl2.compareTo(messageIdImpl4) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl3.compareTo(messageIdImpl4) > 0, "Expected to be greater than"); - - assertTrue(batchMessageId1.compareTo(batchMessageId2) > 0, "Expected to be greater than"); - assertTrue(batchMessageId1.compareTo(batchMessageId3) > 0, "Expected to be greater than"); - assertTrue(batchMessageId1.compareTo(batchMessageId4) > 0, "Expected to be greater than"); - assertTrue(batchMessageId1.compareTo(batchMessageId5) > 0, "Expected to be greater than"); - assertTrue(batchMessageId2.compareTo(batchMessageId3) > 0, "Expected to be greater than"); - assertTrue(batchMessageId2.compareTo(batchMessageId4) > 0, "Expected to be greater than"); - assertTrue(batchMessageId2.compareTo(batchMessageId5) > 0, "Expected to be greater than"); - assertTrue(batchMessageId4.compareTo(batchMessageId3) > 0, "Expected to be greater than"); - assertTrue(batchMessageId3.compareTo(batchMessageId5) > 0, "Expected to be greater than"); - assertTrue(batchMessageId4.compareTo(batchMessageId5) > 0, "Expected to be greater than"); + + // MessageIdImpl + MessageIdImpl messageIdWithSmallerLedgerId = new MessageIdImpl(122L, 345L, 567); + MessageIdImpl messageIdWithSmallerEntryId = new MessageIdImpl(123L, 344L, 567); + MessageIdImpl messageIdWithSmallerPartitionIndex = new MessageIdImpl(123L, 345L, 566); + + // BatchMessageIdImpl + BatchMessageIdImpl batchMessageIdWithSmallerLedgerId = + new BatchMessageIdImpl(122L, 345L, 567, 567); + BatchMessageIdImpl batchMessageIdWithSmallerEntryId = + new BatchMessageIdImpl(123L, 344L, 567, 567); + BatchMessageIdImpl batchMessageIdWithSmallerPartitionIndex = + new BatchMessageIdImpl(123L, 345L, 566, 567); + BatchMessageIdImpl batchMessageIdWithSmallerBatchIndex = + new BatchMessageIdImpl(123L, 345L, 567, 566); + + // TopicMessageIdImpl with BatchMessageIdImpl + TopicMessageIdImpl topicMessageIdWithSmallerLedgerIdByBatchMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(122L, 345L, 567, 567)); + TopicMessageIdImpl topicMessageIdWithSmallerEntryIdByBatchMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 344L, 567, 567)); + TopicMessageIdImpl topicMessageIdWithSmallerPartitionIndexByBatchMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 345L, 566, 567)); + TopicMessageIdImpl topicMessageIdWithSmallerBatchIndexByBatchMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 566)); + + // TopicMessageIdImpl with MessageIdImpl + TopicMessageIdImpl topicMessageIdWithSmallerLedgerIdByMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(122L, 345L, 567)); + TopicMessageIdImpl topicMessageIdWithSmallerEntryIdByMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(123L, 344L, 567)); + TopicMessageIdImpl topicMessageIdWithSmallerPartitionIndexByMessageId = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(123L, 345L, 566)); + + + // MessageIdImpl compare with MessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(messageIdWithSmallerLedgerId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(messageIdWithSmallerEntryId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(messageIdWithSmallerPartitionIndex) > 0, + "Expected to be greater than"); + + // MessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithSmallerLedgerId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithSmallerEntryId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithSmallerPartitionIndex) > 0, + "Expected to be greater than"); + + // MessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerLedgerIdByMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerEntryIdByMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerPartitionIndexByMessageId) > 0, + "Expected to be greater than"); + + // MessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerLedgerIdByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerEntryIdByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithSmallerPartitionIndexByBatchMessageId) > 0, + "Expected to be greater than"); + + // BatchMessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithSmallerLedgerId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithSmallerEntryId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithSmallerPartitionIndex) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithSmallerBatchIndex) > 0, + "Expected to be greater than"); + + // BatchMessageIdImpl compare with MessageIdImpl + assertTrue(normalBatchMessageId.compareTo(messageIdWithSmallerLedgerId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(messageIdWithSmallerEntryId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(messageIdWithSmallerPartitionIndex) > 0, + "Expected to be greater than"); + + // BatchMessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerLedgerIdByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerEntryIdByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerPartitionIndexByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerBatchIndexByBatchMessageId) > 0, + "Expected to be greater than"); + + // BatchMessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerLedgerIdByMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerEntryIdByMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithSmallerPartitionIndexByMessageId) > 0, + "Expected to be greater than"); + + + // TopicMessageIdImpl by MessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithSmallerLedgerIdByMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl. + compareTo(topicMessageIdWithSmallerEntryIdByMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithSmallerPartitionIndexByMessageId) > 0, "Expected to be greater than"); + + // TopicMessageIdImpl by MessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithSmallerLedgerIdByBatchMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithSmallerEntryIdByBatchMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithSmallerPartitionIndexByBatchMessageId) > 0, + "Expected to be greater than"); + + // TopicMessageIdImpl by MessageIdImpl compare with MessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(messageIdWithSmallerLedgerId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl. + compareTo(messageIdWithSmallerEntryId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(messageIdWithSmallerPartitionIndex) > 0, "Expected to be greater than"); + + // TopicMessageIdImpl by MessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithSmallerLedgerId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithSmallerEntryId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithSmallerPartitionIndex) > 0, "Expected to be greater than"); + + // TopicMessageIdImpl by BatchMessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithSmallerLedgerIdByBatchMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(topicMessageIdWithSmallerEntryIdByBatchMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithSmallerPartitionIndexByBatchMessageId) > 0, + "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithSmallerBatchIndexByBatchMessageId) > 0, + "Expected to be greater than"); + + + // TopicMessageIdImpl by BatchMessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithSmallerLedgerIdByMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(topicMessageIdWithSmallerEntryIdByMessageId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithSmallerPartitionIndexByMessageId) > 0, "Expected to be greater than"); + + // TopicMessageIdImpl by BatchMessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithSmallerLedgerId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(batchMessageIdWithSmallerEntryId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithSmallerPartitionIndex) > 0, + "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithSmallerBatchIndex) > 0, + "Expected to be greater than"); + + + // TopicMessageIdImpl by BatchMessageIdImpl compare with MessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(messageIdWithSmallerLedgerId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(messageIdWithSmallerEntryId) > 0, "Expected to be greater than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(messageIdWithSmallerPartitionIndex) > 0, "Expected to be greater than"); } @Test public void testLessThan() { - MessageIdImpl messageIdImpl1 = new MessageIdImpl(124L, 345L, 567); - MessageIdImpl messageIdImpl2 = new MessageIdImpl(123L, 345L, 567); - MessageIdImpl messageIdImpl3 = new MessageIdImpl(123L, 344L, 567); - MessageIdImpl messageIdImpl4 = new MessageIdImpl(123L, 344L, 566); - - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(235L, 345L, 456, 567); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(234L, 346L, 456, 567); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(234L, 345L, 456, 568); - BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(234L, 345L, 457, 567); - BatchMessageIdImpl batchMessageId5 = new BatchMessageIdImpl(234L, 345L, 456, 567); - - assertTrue(messageIdImpl2.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(messageIdImpl3.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(messageIdImpl4.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(messageIdImpl3.compareTo(messageIdImpl2) < 0, "Expected to be less than"); - assertTrue(messageIdImpl4.compareTo(messageIdImpl2) < 0, "Expected to be less than"); - assertTrue(messageIdImpl4.compareTo(messageIdImpl3) < 0, "Expected to be less than"); - - assertTrue(batchMessageId2.compareTo(batchMessageId1) < 0, "Expected to be less than"); - assertTrue(batchMessageId3.compareTo(batchMessageId1) < 0, "Expected to be less than"); - assertTrue(batchMessageId4.compareTo(batchMessageId1) < 0, "Expected to be less than"); - assertTrue(batchMessageId5.compareTo(batchMessageId1) < 0, "Expected to be less than"); - assertTrue(batchMessageId3.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertTrue(batchMessageId4.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertTrue(batchMessageId5.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertTrue(batchMessageId3.compareTo(batchMessageId4) < 0, "Expected to be less than"); - assertTrue(batchMessageId5.compareTo(batchMessageId3) < 0, "Expected to be less than"); - assertTrue(batchMessageId5.compareTo(batchMessageId4) < 0, "Expected to be less than"); + // MessageIdImpl + MessageIdImpl messageIdImplWithLargerLedgerId = new MessageIdImpl(124L, 345L, 567); + MessageIdImpl messageIdImplWithLargerEntryId = new MessageIdImpl(123L, 346L, 567); + MessageIdImpl messageIdImplWithLargerPartitionIndex = new MessageIdImpl(124L, 345L, 567); + + // BatchMessageIdImpl + BatchMessageIdImpl batchMessageIdWithLargerLedgerId = + new BatchMessageIdImpl(124L, 345L, 567, 567); + BatchMessageIdImpl batchMessageIdWithLargerEntryId = + new BatchMessageIdImpl(123L, 346L, 567, 567); + BatchMessageIdImpl batchMessageIdWithLargerPartitionIndex = + new BatchMessageIdImpl(123L, 345L, 568, 567); + BatchMessageIdImpl batchMessageIdWithLargerBatchIndex = + new BatchMessageIdImpl(123L, 345L, 567, 568); + + // TopicMessageIdImpl with BatchMessageIdImpl + TopicMessageIdImpl topicMessageIdWithLargerLedgerIdByBatchMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(124L, 345L, 567, 567)); + TopicMessageIdImpl topicMessageIdWithLargerEntryIdByBatchMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 346L, 567, 567)); + TopicMessageIdImpl topicMessageIdWithLargerPartitionIndexByBatchMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 345L, 568, 567)); + TopicMessageIdImpl topicMessageIdWithLargerBatchIndexByBatchMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new BatchMessageIdImpl(123L, 345L, 567, 568)); + + // TopicMessageIdImpl with MessageIdImpl + TopicMessageIdImpl topicMessageIdWithLargerLedgerIdByMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(124L, 345L, 567)); + TopicMessageIdImpl topicMessageIdWithLargerEntryIdByMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(123L, 346L, 567)); + TopicMessageIdImpl topicMessageIdWithLargerPartitionIndexMessageIdImpl = + new TopicMessageIdImpl("test-topic-partition-0", + "test-topic", new MessageIdImpl(123L, 345L, 568)); + + // MessageIdImpl compare with MessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(messageIdImplWithLargerLedgerId) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(messageIdImplWithLargerEntryId) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(messageIdImplWithLargerPartitionIndex) < 0, + "Expected to be less than"); + + // MessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithLargerLedgerId) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithLargerEntryId) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(batchMessageIdWithLargerPartitionIndex) < 0, + "Expected to be less than"); + + // MessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerLedgerIdByMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerEntryIdByMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerPartitionIndexMessageIdImpl) < 0, + "Expected to be less than"); + + // MessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerLedgerIdByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerEntryIdByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalMessageIdImpl.compareTo(topicMessageIdWithLargerPartitionIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + + // BatchMessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithLargerLedgerId) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithLargerEntryId) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithLargerPartitionIndex) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(batchMessageIdWithLargerBatchIndex) < 0, + "Expected to be less than"); + + // BatchMessageIdImpl compare with MessageIdImpl + assertTrue(normalBatchMessageId.compareTo(messageIdImplWithLargerLedgerId) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(messageIdImplWithLargerEntryId) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(messageIdImplWithLargerPartitionIndex) < 0, + "Expected to be less than"); + + // BatchMessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerLedgerIdByMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerEntryIdByMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerPartitionIndexMessageIdImpl) < 0, + "Expected to be less than"); + + // BatchMessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerLedgerIdByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerEntryIdByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId + .compareTo(topicMessageIdWithLargerPartitionIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalBatchMessageId.compareTo(topicMessageIdWithLargerBatchIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + + + // TopicMessageIdImpl by MessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithLargerLedgerIdByMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl. + compareTo(topicMessageIdWithLargerEntryIdByMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithLargerPartitionIndexMessageIdImpl) < 0, "Expected to be less than"); + + // TopicMessageIdImpl by MessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithLargerLedgerIdByBatchMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithLargerEntryIdByBatchMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(topicMessageIdWithLargerPartitionIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + + // TopicMessageIdImpl by MessageIdImpl compare with MessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(messageIdImplWithLargerLedgerId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl. + compareTo(messageIdImplWithLargerEntryId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(messageIdImplWithLargerPartitionIndex) < 0, "Expected to be less than"); + + // TopicMessageIdImpl by MessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithLargerLedgerId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithLargerEntryId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByMessageIdImpl + .compareTo(batchMessageIdWithLargerPartitionIndex) < 0, + "Expected to be less than"); + + + // TopicMessageIdImpl by BatchMessageIdImpl compare with TopicMessageIdImpl by BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithLargerLedgerIdByBatchMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(topicMessageIdWithLargerEntryIdByBatchMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithLargerPartitionIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithLargerBatchIndexByBatchMessageIdImpl) < 0, + "Expected to be less than"); + + // TopicMessageIdImpl by BatchMessageIdImpl compare with TopicMessageIdImpl by MessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithLargerLedgerIdByMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(topicMessageIdWithLargerEntryIdByMessageIdImpl) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(topicMessageIdWithLargerPartitionIndexMessageIdImpl) < 0, "Expected to be less than"); + + // TopicMessageIdImpl by BatchMessageIdImpl compare with BatchMessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithLargerLedgerId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(batchMessageIdWithLargerEntryId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithLargerPartitionIndex) < 0, + "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(batchMessageIdWithLargerBatchIndex) < 0, + "Expected to be less than"); + + // TopicMessageIdImpl by BatchMessageIdImpl compare with MessageIdImpl + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(messageIdImplWithLargerLedgerId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl. + compareTo(messageIdImplWithLargerEntryId) < 0, "Expected to be less than"); + assertTrue(normalTopicMessageIdImplByBatchMessageIdImpl + .compareTo(messageIdImplWithLargerPartitionIndex) < 0, "Expected to be less than"); } @Test - public void testCompareDifferentType() { - MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 566, 789); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 789); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(messageIdImpl); - assertTrue(messageIdImpl.compareTo(batchMessageId1) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertEquals(messageIdImpl.compareTo(batchMessageId3), 0, "Expected to be equal"); - assertTrue(batchMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); - assertTrue(batchMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); - assertEquals(batchMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); - } + public void testMessageIdImplAndBatchMessageIdImplWithSameLedgerIdAndEntryIdAndPartitionIndex() { + try { + normalMessageIdImpl.compareTo(normalBatchMessageId); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), MESSAGE_ID_COMPARE_WITH_BATCH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } - @Test - public void compareToSymmetricTest() { - MessageIdImpl simpleMessageId = new MessageIdImpl(123L, 345L, 567); - // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message - BatchMessageIdImpl batchMessageId1 = new BatchMessageIdImpl(123L, 345L, 567, -1); - BatchMessageIdImpl batchMessageId2 = new BatchMessageIdImpl(123L, 345L, 567, 1); - BatchMessageIdImpl batchMessageId3 = new BatchMessageIdImpl(123L, 345L, 566, 1); - BatchMessageIdImpl batchMessageId4 = new BatchMessageIdImpl(123L, 345L, 566, -1); + try { + normalMessageIdImpl.compareTo(normalTopicMessageIdImplByBatchMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), MESSAGE_ID_COMPARE_WITH_BATCH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } try { - simpleMessageId.compareTo(batchMessageId1); + normalBatchMessageId.compareTo(normalMessageIdImpl); fail(); } catch (Exception e) { assertTrue(e instanceof UnsupportedOperationException); - assertEquals(e.getMessage(), "org.apache.pulsar.client.impl.MessageIdImpl " - + "can't compare with org.apache.pulsar.client.impl.BatchMessageIdImpl " - + "when they have the same `LedgerId` and `EntryId`."); + assertEquals(e.getMessage(), BATCH_MESSAGE_ID_COMPARE_WITH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); } - assertEquals(batchMessageId1.compareTo(simpleMessageId), 0, "Expected to be equal"); - assertTrue(batchMessageId2.compareTo(simpleMessageId) > 0, "Expected to be greater than"); - assertTrue(simpleMessageId.compareTo(batchMessageId2) < 0, "Expected to be less than"); - assertTrue(simpleMessageId.compareTo(batchMessageId3) > 0, "Expected to be greater than"); - assertTrue(batchMessageId3.compareTo(simpleMessageId) < 0, "Expected to be less than"); - assertTrue(simpleMessageId.compareTo(batchMessageId4) > 0, "Expected to be greater than"); - assertTrue(batchMessageId4.compareTo(simpleMessageId) < 0, "Expected to be less than"); - } - @Test - public void testMessageIdImplCompareToTopicMessageId() { - MessageIdImpl messageIdImpl = new MessageIdImpl(123L, 345L, 567); - TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new BatchMessageIdImpl(123L, 345L, 566, 789)); - TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new BatchMessageIdImpl(123L, 345L, 567, 789)); - TopicMessageIdImpl topicMessageId3 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new BatchMessageIdImpl(messageIdImpl)); - assertTrue(messageIdImpl.compareTo(topicMessageId1) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl.compareTo(topicMessageId2) < 0, "Expected to be less than"); - assertEquals(messageIdImpl.compareTo(topicMessageId3), 0, "Expected to be equal"); - assertTrue(topicMessageId1.compareTo(messageIdImpl) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl) > 0, "Expected to be greater than"); - assertEquals(topicMessageId3.compareTo(messageIdImpl), 0, "Expected to be equal"); - } + try { + normalBatchMessageId.compareTo(normalTopicMessageIdImplByMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), BATCH_MESSAGE_ID_COMPARE_WITH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } + + try { + normalTopicMessageIdImplByMessageIdImpl.compareTo(normalTopicMessageIdImplByBatchMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), MESSAGE_ID_COMPARE_WITH_BATCH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } + + try { + normalTopicMessageIdImplByMessageIdImpl.compareTo(normalBatchMessageId); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), MESSAGE_ID_COMPARE_WITH_BATCH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } + + try { + normalTopicMessageIdImplByBatchMessageIdImpl.compareTo(normalTopicMessageIdImplByMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), BATCH_MESSAGE_ID_COMPARE_WITH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } + + try { + normalTopicMessageIdImplByBatchMessageIdImpl.compareTo(normalMessageIdImpl); + fail(); + } catch (Exception e) { + assertTrue(e instanceof UnsupportedOperationException); + assertEquals(e.getMessage(), BATCH_MESSAGE_ID_COMPARE_WITH_MESSAGE_ID_THROW_EXCEPTION_MESSAGE); + } - @Test - public void testBatchMessageIdImplCompareToTopicMessageId() { - BatchMessageIdImpl messageIdImpl1 = new BatchMessageIdImpl(123L, 345L, 567, 789); - BatchMessageIdImpl messageIdImpl2 = new BatchMessageIdImpl(123L, 345L, 567, 0); - BatchMessageIdImpl messageIdImpl3 = new BatchMessageIdImpl(123L, 345L, 567, -1); - TopicMessageIdImpl topicMessageId1 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new MessageIdImpl(123L, 345L, 566)); - TopicMessageIdImpl topicMessageId2 = new TopicMessageIdImpl( - "test-topic-partition-0", - "test-topic", - new MessageIdImpl(123L, 345L, 567)); - assertTrue(messageIdImpl1.compareTo(topicMessageId1) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl1.compareTo(topicMessageId2) > 0, "Expected to be greater than"); - assertTrue(messageIdImpl2.compareTo(topicMessageId2) > 0, "Expected to be greater than"); - assertEquals(messageIdImpl3.compareTo(topicMessageId2), 0, "Expected to be equal"); - assertTrue(topicMessageId1.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl1) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); - assertTrue(topicMessageId2.compareTo(messageIdImpl2) < 0, "Expected to be less than"); } @Test From 23bdcd1ba1a7acd50e47cd924d1642c0e6029b2d Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Fri, 23 Dec 2022 12:01:42 +0800 Subject: [PATCH 5/7] add earliest and latest compare --- .../apache/pulsar/client/impl/BatchMessageIdImpl.java | 9 +++++++++ .../org/apache/pulsar/client/impl/MessageIdImpl.java | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index 95631ebb66b03..b685a06c9c52b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -80,6 +80,15 @@ public int compareTo(@Nonnull MessageId o) { return compareWithoutBatchIndex; } else { if (!(o instanceof BatchMessageIdImpl)) { + // if ledgerId == -1 represent the messageID and batchMessageId + // are earliest, so return they are equals + + // if ledgerId == Long.MAX_VALUE represent the messageID + // and batchMessageId are latest, so return they are equals + if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId + || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId) { + return 0; + } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + o.getClass().getName() + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index b463280b6d20f..899f3b01d45e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -212,6 +212,13 @@ public int compareTo(@Nonnull MessageId o) { if (compareWithoutBatchIndex != 0 || !(o instanceof BatchMessageIdImpl)) { return compareWithoutBatchIndex; } else { + // if ledgerId == -1 represent the messageID and batchMessageId are earliest, so return they are equals + // if ledgerId == Long.MAX_VALUE represent the messageID + // and batchMessageId are latest, so return they are equals + if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId + || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId) { + return 0; + } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + o.getClass().getName() + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); From 127bf9f8f2aebef57f632e7703b3989e45f2fb62 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Fri, 23 Dec 2022 14:38:48 +0800 Subject: [PATCH 6/7] add the ledgerId and entryId is special value compareTo --- .../client/impl/BatchMessageIdImpl.java | 6 +++++- .../pulsar/client/impl/MessageIdImpl.java | 10 ++++++++-- .../client/impl/MessageIdCompareToTest.java | 19 +++++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index b685a06c9c52b..b6a27376d3167 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -85,8 +85,12 @@ public int compareTo(@Nonnull MessageId o) { // if ledgerId == Long.MAX_VALUE represent the messageID // and batchMessageId are latest, so return they are equals + + // if entryId == -1 represent the messageID + // and batchMessageId are the entryId initialized by ledger, so return they are equals if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId - || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId) { + || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId + || this.entryId == ((MessageIdImpl) MessageId.earliest).entryId) { return 0; } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 899f3b01d45e2..49470bfd9874f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -212,11 +212,17 @@ public int compareTo(@Nonnull MessageId o) { if (compareWithoutBatchIndex != 0 || !(o instanceof BatchMessageIdImpl)) { return compareWithoutBatchIndex; } else { - // if ledgerId == -1 represent the messageID and batchMessageId are earliest, so return they are equals + // if ledgerId == -1 represent the messageID and batchMessageId + // are earliest, so return they are equals + // if ledgerId == Long.MAX_VALUE represent the messageID // and batchMessageId are latest, so return they are equals + + // if entryId == -1 represent the messageID + // and batchMessageId are the entryId initialized by ledger, so return they are equals if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId - || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId) { + || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId + || this.entryId == ((MessageIdImpl) MessageId.earliest).entryId) { return 0; } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 93ac386c06126..9ec0de1fd41dd 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -90,7 +90,26 @@ public void testEqual() { 0, "Expected to be equal"); assertEquals(normalTopicMessageIdImplByMessageIdImpl.compareTo(topicMessageIdWithMessageId), 0, "Expected to be equal"); + } + + @Test + public void testMessageIdAndBatchIdSpecialEquals() { + BatchMessageIdImpl minBatchMessageId = new BatchMessageIdImpl(-1, -1, -1, -1); + BatchMessageIdImpl maxBatchMessageId = new BatchMessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1, -1); + + assertEquals(minBatchMessageId.compareTo(MessageId.earliest), 0, "Expected to be equal"); + assertEquals(maxBatchMessageId.compareTo(MessageId.latest), 0, "Expected to be equal"); + + assertEquals(MessageId.earliest.compareTo(minBatchMessageId), 0, "Expected to be equal"); + assertEquals(MessageId.latest.compareTo(maxBatchMessageId), 0, "Expected to be equal"); + + BatchMessageIdImpl batchMessageIdWithMinEntryId = new BatchMessageIdImpl(123, -1, -1, -1); + MessageIdImpl messageIdWithMinEntryId = new MessageIdImpl(123, -1, -1); + assertEquals(batchMessageIdWithMinEntryId.compareTo(messageIdWithMinEntryId), 0, + "Expected to be equal"); + assertEquals(messageIdWithMinEntryId.compareTo(batchMessageIdWithMinEntryId), 0, + "Expected to be equal"); } @Test From 3a7b44e9de57057cfb110e605745272614084a02 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Sat, 24 Dec 2022 16:04:42 +0800 Subject: [PATCH 7/7] fix some test --- .../client/impl/BatchMessageIdImpl.java | 13 ------- .../pulsar/client/impl/MessageIdImpl.java | 13 ------- .../client/impl/MessageIdCompareToTest.java | 38 +++++++++---------- 3 files changed, 19 insertions(+), 45 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java index b6a27376d3167..95631ebb66b03 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java @@ -80,19 +80,6 @@ public int compareTo(@Nonnull MessageId o) { return compareWithoutBatchIndex; } else { if (!(o instanceof BatchMessageIdImpl)) { - // if ledgerId == -1 represent the messageID and batchMessageId - // are earliest, so return they are equals - - // if ledgerId == Long.MAX_VALUE represent the messageID - // and batchMessageId are latest, so return they are equals - - // if entryId == -1 represent the messageID - // and batchMessageId are the entryId initialized by ledger, so return they are equals - if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId - || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId - || this.entryId == ((MessageIdImpl) MessageId.earliest).entryId) { - return 0; - } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + o.getClass().getName() + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 49470bfd9874f..b463280b6d20f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -212,19 +212,6 @@ public int compareTo(@Nonnull MessageId o) { if (compareWithoutBatchIndex != 0 || !(o instanceof BatchMessageIdImpl)) { return compareWithoutBatchIndex; } else { - // if ledgerId == -1 represent the messageID and batchMessageId - // are earliest, so return they are equals - - // if ledgerId == Long.MAX_VALUE represent the messageID - // and batchMessageId are latest, so return they are equals - - // if entryId == -1 represent the messageID - // and batchMessageId are the entryId initialized by ledger, so return they are equals - if (this.ledgerId == ((MessageIdImpl) MessageId.earliest).ledgerId - || this.ledgerId == ((MessageIdImpl) MessageId.latest).ledgerId - || this.entryId == ((MessageIdImpl) MessageId.earliest).entryId) { - return 0; - } throw new UnsupportedOperationException(this.getClass().getName() + " can't compare with " + o.getClass().getName() + " when they have the same `LedgerId`, `EntryId` and `PartitionIndex`."); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java index 9ec0de1fd41dd..e06adae1f445c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageIdCompareToTest.java @@ -92,25 +92,25 @@ public void testEqual() { 0, "Expected to be equal"); } - @Test - public void testMessageIdAndBatchIdSpecialEquals() { - BatchMessageIdImpl minBatchMessageId = new BatchMessageIdImpl(-1, -1, -1, -1); - BatchMessageIdImpl maxBatchMessageId = new BatchMessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1, -1); - - assertEquals(minBatchMessageId.compareTo(MessageId.earliest), 0, "Expected to be equal"); - assertEquals(maxBatchMessageId.compareTo(MessageId.latest), 0, "Expected to be equal"); - - assertEquals(MessageId.earliest.compareTo(minBatchMessageId), 0, "Expected to be equal"); - assertEquals(MessageId.latest.compareTo(maxBatchMessageId), 0, "Expected to be equal"); - - BatchMessageIdImpl batchMessageIdWithMinEntryId = new BatchMessageIdImpl(123, -1, -1, -1); - MessageIdImpl messageIdWithMinEntryId = new MessageIdImpl(123, -1, -1); - - assertEquals(batchMessageIdWithMinEntryId.compareTo(messageIdWithMinEntryId), 0, - "Expected to be equal"); - assertEquals(messageIdWithMinEntryId.compareTo(batchMessageIdWithMinEntryId), 0, - "Expected to be equal"); - } +// @Test +// public void testMessageIdAndBatchIdSpecialEquals() { +// BatchMessageIdImpl minBatchMessageId = new BatchMessageIdImpl(-1, -1, -1, -1); +// BatchMessageIdImpl maxBatchMessageId = new BatchMessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1, -1); +// +// assertEquals(minBatchMessageId.compareTo(MessageId.earliest), 0, "Expected to be equal"); +// assertEquals(maxBatchMessageId.compareTo(MessageId.latest), 0, "Expected to be equal"); +// +// assertEquals(MessageId.earliest.compareTo(minBatchMessageId), 0, "Expected to be equal"); +// assertEquals(MessageId.latest.compareTo(maxBatchMessageId), 0, "Expected to be equal"); +// +// BatchMessageIdImpl batchMessageIdWithMinEntryId = new BatchMessageIdImpl(123, -1, -1, -1); +// MessageIdImpl messageIdWithMinEntryId = new MessageIdImpl(123, -1, -1); +// +// assertEquals(batchMessageIdWithMinEntryId.compareTo(messageIdWithMinEntryId), 0, +// "Expected to be equal"); +// assertEquals(messageIdWithMinEntryId.compareTo(batchMessageIdWithMinEntryId), 0, +// "Expected to be equal"); +// } @Test public void testGreaterThan() {