diff --git a/external-schemas/java/README.md b/external-schemas/java/README.md new file mode 100644 index 0000000..a7899d1 --- /dev/null +++ b/external-schemas/java/README.md @@ -0,0 +1,9 @@ +# Overview + +This directory includes examples of how Pulsar clients work with external schemas. + +# Prerequisites + +- Java 1.8 or higher version +- Kafka client 8.0.0 or higher version +- Maven diff --git a/external-schemas/java/pom.xml b/external-schemas/java/pom.xml new file mode 100644 index 0000000..7efe849 --- /dev/null +++ b/external-schemas/java/pom.xml @@ -0,0 +1,142 @@ + + + + 4.0.0 + + org.example + java + 1.0-SNAPSHOT + + + 4.1.0 + 8.0.0 + 8.0.0-ccs + 1.18.38 + 2.20.0 + + + + + org.apache.kafka + kafka-clients + ${kafka.client.version} + + + io.confluent + kafka-schema-serializer + ${kafka.version} + + + io.confluent + kafka-json-schema-serializer + ${kafka.version} + + + + javax.validation + validation-api + 2.0.1.Final + + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j2.version} + + + + com.github.erosb + everit-json-schema + 1.14.6 + + + com.github.erosb + json-sKema + 0.23.0 + + + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + + com.fasterxml.jackson.module + jackson-module-jsonSchema + + + com.fasterxml.jackson.core + jackson-core + + + + + org.projectlombok + lombok + ${lombok.version} + + + io.streamnative.schemas.external + kafka-json-schema + 1.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + 17 + + + + + + + + central + default + https://repo1.maven.org/maven2 + + false + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java new file mode 100644 index 0000000..e1bed78 --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java @@ -0,0 +1,37 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; + +import java.util.HashMap; +import java.util.Map; + +public class Configurations { + + protected static final String TOKEN = ""; + protected static final String PULSAR_SERVICE_URL = ""; + private static final String SCHEMA_REGISTRY_URL = ""; + + public static Map getSchemaRegistryConfigs() { + var map = new HashMap(); + map.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); + map.put(KafkaJsonSchemaSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + map.put( + KafkaJsonSchemaSerializerConfig.USER_INFO_CONFIG, + String.format("%s:%s", "public", TOKEN)); + return map; + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java new file mode 100644 index 0000000..ad2dc6c --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java @@ -0,0 +1,73 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.streamnative.schemas.external.KafkaSchemaFactory; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; + +import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL; +import static io.streamnative.examples.exschema.json.Configurations.TOKEN; +import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs; + +@Slf4j +public class ExternalJsonConsumer { + + public void consume() throws Exception { + String topic = "testExternalJsonSchema"; + + @Cleanup + PulsarClient client = + PulsarClient.builder() + .serviceUrl(PULSAR_SERVICE_URL) + .authentication(AuthenticationFactory.token(TOKEN)) + .build(); + + KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs()); + Schema schema = kafkaSchemaFactory.json(User.class); + + @Cleanup + Consumer consumer = + client.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + for (int i = 0; i < 10; i++) { + Message message = consumer.receive(); + consumer.acknowledge(message); + log.debug( + "receive msg {} {}", + message.getValue().getClass().getName(), + message.getValue()); + } + } + + public static void main(String[] args) { + try { + new ExternalJsonConsumer().consume(); + } catch (Exception e) { + log.error("Failed to consume messages", e); + throw new RuntimeException(e); + } + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java new file mode 100644 index 0000000..eb4fb04 --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java @@ -0,0 +1,60 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.streamnative.schemas.external.KafkaSchemaFactory; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL; +import static io.streamnative.examples.exschema.json.Configurations.TOKEN; +import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs; + +@Slf4j +public class ExternalJsonProducer { + + public void produce() throws Exception { + String topic = "testExternalJsonSchema"; + + KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs()); + Schema schema = kafkaSchemaFactory.json(User.class); + + @Cleanup + PulsarClient client = + PulsarClient.builder() + .serviceUrl(PULSAR_SERVICE_URL) + .authentication(AuthenticationFactory.token(TOKEN)) + .build(); + + @Cleanup Producer producer = client.newProducer(schema).topic(topic).create(); + + for (int i = 0; i < 10; i++) { + producer.send(new User("name-" + i, 10 + i)); + } + } + + public static void main(String[] args) { + try { + new ExternalJsonProducer().produce(); + } catch (Exception e) { + log.error("Failed to produce messages", e); + throw new RuntimeException(e); + } + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java new file mode 100644 index 0000000..a68daf5 --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java @@ -0,0 +1,27 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User { + + private String name; + private Integer age; +} diff --git a/external-schemas/java/src/main/resources/log4j2.xml b/external-schemas/java/src/main/resources/log4j2.xml new file mode 100644 index 0000000..1b29729 --- /dev/null +++ b/external-schemas/java/src/main/resources/log4j2.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + +