diff --git a/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java b/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java index 1c107e2..925128a 100644 --- a/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java +++ b/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java @@ -1,5 +1,13 @@ package io.github.protocol.pulsar.admin.jdk; +import com.fasterxml.jackson.core.type.TypeReference; +import io.github.openfacade.http.HttpResponse; +import io.github.protocol.pulsar.admin.common.JacksonService; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; + public class PersistentTopics extends BaseTopicsImpl { private static final String BASE_URL_PERSISTENT_DOMAIN = "/admin/v2" + "/persistent"; @@ -11,4 +19,58 @@ public PersistentTopics(InnerHttpClient httpClient) { public String getDomainBaseUrl() { return BASE_URL_PERSISTENT_DOMAIN; } + + public void createSubscription(String tenant, String namespace, String encodedTopic, String subscriptionName, + boolean replicated, boolean authoritative, SubscriptionMessageId messageId) + throws PulsarAdminException { + String url = String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, + subscriptionName); + try { + HttpResponse response = + httpClient.put(url, messageId, "replicated", String.valueOf(replicated), "authoritative", + String.valueOf(authoritative)); + if (response.statusCode() != 204) { + throw new PulsarAdminException( + String.format("failed to create subscription %s for topic %s/%s/%s, status code %s, body : %s", + subscriptionName, tenant, namespace, encodedTopic, response.statusCode(), + response.bodyAsString())); + } + } catch (IOException | InterruptedException | ExecutionException e) { + throw new PulsarAdminException(e); + } + } + + public void deleteSubscription(String tenant, String namespace, String encodedTopic, String subName, boolean force, + boolean authoritative) throws PulsarAdminException { + String url = + String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, subName); + try { + HttpResponse response = + httpClient.delete(url, "force", String.valueOf(force), "authoritative", String.valueOf(authoritative)); + if (response.statusCode() != 204) { + throw new PulsarAdminException( + String.format("failed to delete subscription %s of topic %s/%s/%s, status code %s, body : %s", + subName, tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString())); + } + } catch (IOException | InterruptedException | ExecutionException e) { + throw new PulsarAdminException(e); + } + } + + public List getSubscriptions(String tenant, String namespace, String encodedTopic, boolean authoritative) + throws PulsarAdminException { + String url = String.format("%s/%s/%s/%s/subscriptions", getDomainBaseUrl(), tenant, namespace, encodedTopic); + try { + HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative)); + if (response.statusCode() != 200) { + throw new PulsarAdminException( + String.format("failed to get subscriptions of topic %s/%s/%s, status code %s, body : %s", tenant, + namespace, encodedTopic, response.statusCode(), response.bodyAsString())); + } + return JacksonService.toRefer(response.body(), new TypeReference>() { + }); + } catch (IOException | InterruptedException | ExecutionException e) { + throw new PulsarAdminException(e); + } + } } diff --git a/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/SubscriptionMessageId.java b/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/SubscriptionMessageId.java new file mode 100644 index 0000000..edb0393 --- /dev/null +++ b/pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/SubscriptionMessageId.java @@ -0,0 +1,38 @@ +package io.github.protocol.pulsar.admin.jdk; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Map; + +@Getter +@Setter +@NoArgsConstructor +@EqualsAndHashCode +public class SubscriptionMessageId { + + private Integer batchIndex = -1; + + private Long entryId = -1L; + + private Long ledgerId = -1L; + + private Integer partitionIndex = -1; + + private Map properties = null; + + public static SubscriptionMessageId earliest() { + SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId(); + return subscriptionMessageId; + } + + public static SubscriptionMessageId latest() { + SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId(); + subscriptionMessageId.setEntryId(Long.MAX_VALUE); + subscriptionMessageId.setLedgerId(Long.MAX_VALUE); + subscriptionMessageId.setPartitionIndex(Integer.MAX_VALUE); + return subscriptionMessageId; + } +} diff --git a/pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java b/pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java index ac9b06f..f44e340 100644 --- a/pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java +++ b/pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java @@ -9,6 +9,7 @@ import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.TreeMap; public class PersistentTopicsTest extends BaseTest { @@ -197,4 +198,49 @@ public void getPartitionedStatsTest(PulsarAdmin pulsarAdmin) throws PulsarAdminE Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false)); } + @ParameterizedTest + @MethodSource("providePulsarAdmins") + public void subscriptionTest(PulsarAdmin pulsarAdmin) throws PulsarAdminException { + String namespace = RandomUtil.randomString(); + String topic = RandomUtil.randomString(); + String subscriptionNameLatest = RandomUtil.randomString(); + String subscriptionNameEarliest = RandomUtil.randomString(); + + // Create namespace and topic + pulsarAdmin.namespaces().createNamespace(tenant, namespace); + pulsarAdmin.persistentTopics().createNonPartitionedTopic(tenant, namespace, topic, false, null); + + // Create subscription with message ID latest and earliest + pulsarAdmin.persistentTopics() + .createSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false, + SubscriptionMessageId.latest()); + pulsarAdmin.persistentTopics() + .createSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false, + SubscriptionMessageId.earliest()); + + // Verify subscription was created + List subscriptions = pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, topic, false); + Assertions.assertTrue(subscriptions.contains(subscriptionNameEarliest), + "Should contain subscription created with message ID"); + Assertions.assertTrue(subscriptions.contains(subscriptionNameLatest), + "Should contain subscription created with message ID"); + + // test create subscription invalid + Assertions.assertThrows(PulsarAdminException.class, + () -> pulsarAdmin.persistentTopics().createSubscription(tenant, namespace, topic, "", false, false, null)); + + // test get subscription, topic invalid + Assertions.assertThrows(PulsarAdminException.class, + () -> pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, "", false)); + + // test delete subscription invalid + Assertions.assertThrows(PulsarAdminException.class, + () -> pulsarAdmin.persistentTopics().deleteSubscription(tenant, namespace, topic, "", true, false)); + + // Clean up + pulsarAdmin.persistentTopics() + .deleteSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false); + pulsarAdmin.persistentTopics() + .deleteSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false); + } }