From 142bc901191ae583aaf94b80fb243f498eb74805 Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Fri, 26 Sep 2025 11:25:00 +0800 Subject: [PATCH 1/2] Add external JSON schema demo --- external-schemas/java/README.md | 9 ++ external-schemas/java/pom.xml | 142 ++++++++++++++++++ .../exschema/json/Configurations.java | 37 +++++ .../exschema/json/ExternalJsonConsumer.java | 73 +++++++++ .../exschema/json/ExternalJsonProducer.java | 60 ++++++++ .../examples/exschema/json/User.java | 27 ++++ .../java/src/main/resources/log4j2.xml | 18 +++ 7 files changed, 366 insertions(+) create mode 100644 external-schemas/java/README.md create mode 100644 external-schemas/java/pom.xml create mode 100644 external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java create mode 100644 external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java create mode 100644 external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java create mode 100644 external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java create mode 100644 external-schemas/java/src/main/resources/log4j2.xml diff --git a/external-schemas/java/README.md b/external-schemas/java/README.md new file mode 100644 index 0000000..a7899d1 --- /dev/null +++ b/external-schemas/java/README.md @@ -0,0 +1,9 @@ +# Overview + +This directory includes examples of how Pulsar clients work with external schemas. + +# Prerequisites + +- Java 1.8 or higher version +- Kafka client 8.0.0 or higher version +- Maven diff --git a/external-schemas/java/pom.xml b/external-schemas/java/pom.xml new file mode 100644 index 0000000..7efe849 --- /dev/null +++ b/external-schemas/java/pom.xml @@ -0,0 +1,142 @@ + + + + 4.0.0 + + org.example + java + 1.0-SNAPSHOT + + + 4.1.0 + 8.0.0 + 8.0.0-ccs + 1.18.38 + 2.20.0 + + + + + org.apache.kafka + kafka-clients + ${kafka.client.version} + + + io.confluent + kafka-schema-serializer + ${kafka.version} + + + io.confluent + kafka-json-schema-serializer + ${kafka.version} + + + + javax.validation + validation-api + 2.0.1.Final + + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j2.version} + + + + com.github.erosb + everit-json-schema + 1.14.6 + + + com.github.erosb + json-sKema + 0.23.0 + + + + org.apache.pulsar + pulsar-client + ${pulsar.version} + + + com.fasterxml.jackson.module + jackson-module-jsonSchema + + + com.fasterxml.jackson.core + jackson-core + + + + + org.projectlombok + lombok + ${lombok.version} + + + io.streamnative.schemas.external + kafka-json-schema + 1.0 + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 17 + 17 + + + + + + + + central + default + https://repo1.maven.org/maven2 + + false + + + + confluent + https://packages.confluent.io/maven/ + + false + + + + + diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java new file mode 100644 index 0000000..e1cf8ee --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java @@ -0,0 +1,37 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializerConfig; + +import java.util.HashMap; +import java.util.Map; + +public class Configurations { + + protected static final String TOKEN = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImM4MjE4ZDUyLWViMjktNTY0Mi04YTc1LTRkNzkyMjY3MzVkYiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsidXJuOnNuOnB1bHNhcjpvLTJoMDU2OnVuaWZpZWQtc2NoZW1hIl0sImV4cCI6MTc2MTQ0NzUxOCwiaHR0cHM6Ly9zdHJlYW1uYXRpdmUuaW8vc2NvcGUiOlsiYWRtaW4iLCJhY2Nlc3MiXSwiaHR0cHM6Ly9zdHJlYW1uYXRpdmUuaW8vdXNlcm5hbWUiOiJyZ2FvQG8tMmgwNTYuYXV0aC5zbmNsb3VkLXN0Zy5kZXYiLCJpYXQiOjE3NTg4NTU1MjIsImlzcyI6Imh0dHBzOi8vcGMtYzQxZDM3OGEuYXdzLXVzZTEtdGVzdC1pMjdwNS5hd3Muc24zLmRldi9hcGlrZXlzLyIsImp0aSI6ImEzOTY1ZTcwZTY1MzRiOTU5Nzg2ZTVlMzkyOTI3MThlIiwicGVybWlzc2lvbnMiOltdLCJzdWIiOiJzNFVUd21URDhDd2FyU0RPWDAzRVJMVXhNUHNOUFpvYkBjbGllbnRzIn0.Ip6G40R4DJdwJKAQt3Q4N163R_onDn_oWsGC1I58oLjwk6NzL5-GCMhhCXiA8FJGkHjnRSFidOwXeQ_iYOUSzGLfjCIJCdaHTNmBLTmjVm-tgAz4OZ4ru40-nEe4Gml638AAF3c_7ujwkJ4B4aVbuQhlSEORDUn_xOXh22xyH3VqX2phj20bI7LEx1CFauHIHdJFrydvMgH7tuiyqwIcp7MfuEGsnGFSO9JPQLA7kbwHcKGm2CjPHr1b8wMVDgK2wHA07vCsXZNKWoEcO6qR5bS93wiJhk1n3A2znCnMvzWethBo6D9v8x6j8GCgJTLb8WWaYOUOKWdckMi6enSC_Q"; + protected static final String PULSAR_SERVICE_URL = "pulsar+ssl://pc-c41d378a.aws-use1-test-i27p5.aws.sn3.dev:6651"; + private static final String SCHEMA_REGISTRY_URL = "https://pc-c41d378a.aws-use1-test-i27p5.aws.sn3.dev/kafka"; + + public static Map getSchemaRegistryConfigs() { + var map = new HashMap(); + map.put(KafkaJsonSchemaSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); + map.put(KafkaJsonSchemaSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + map.put( + KafkaJsonSchemaSerializerConfig.USER_INFO_CONFIG, + String.format("%s:%s", "public", TOKEN)); + return map; + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java new file mode 100644 index 0000000..ad2dc6c --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonConsumer.java @@ -0,0 +1,73 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.streamnative.schemas.external.KafkaSchemaFactory; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; + +import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL; +import static io.streamnative.examples.exschema.json.Configurations.TOKEN; +import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs; + +@Slf4j +public class ExternalJsonConsumer { + + public void consume() throws Exception { + String topic = "testExternalJsonSchema"; + + @Cleanup + PulsarClient client = + PulsarClient.builder() + .serviceUrl(PULSAR_SERVICE_URL) + .authentication(AuthenticationFactory.token(TOKEN)) + .build(); + + KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs()); + Schema schema = kafkaSchemaFactory.json(User.class); + + @Cleanup + Consumer consumer = + client.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + for (int i = 0; i < 10; i++) { + Message message = consumer.receive(); + consumer.acknowledge(message); + log.debug( + "receive msg {} {}", + message.getValue().getClass().getName(), + message.getValue()); + } + } + + public static void main(String[] args) { + try { + new ExternalJsonConsumer().consume(); + } catch (Exception e) { + log.error("Failed to consume messages", e); + throw new RuntimeException(e); + } + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java new file mode 100644 index 0000000..eb4fb04 --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/ExternalJsonProducer.java @@ -0,0 +1,60 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import io.streamnative.schemas.external.KafkaSchemaFactory; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; + +import static io.streamnative.examples.exschema.json.Configurations.PULSAR_SERVICE_URL; +import static io.streamnative.examples.exschema.json.Configurations.TOKEN; +import static io.streamnative.examples.exschema.json.Configurations.getSchemaRegistryConfigs; + +@Slf4j +public class ExternalJsonProducer { + + public void produce() throws Exception { + String topic = "testExternalJsonSchema"; + + KafkaSchemaFactory kafkaSchemaFactory = new KafkaSchemaFactory(getSchemaRegistryConfigs()); + Schema schema = kafkaSchemaFactory.json(User.class); + + @Cleanup + PulsarClient client = + PulsarClient.builder() + .serviceUrl(PULSAR_SERVICE_URL) + .authentication(AuthenticationFactory.token(TOKEN)) + .build(); + + @Cleanup Producer producer = client.newProducer(schema).topic(topic).create(); + + for (int i = 0; i < 10; i++) { + producer.send(new User("name-" + i, 10 + i)); + } + } + + public static void main(String[] args) { + try { + new ExternalJsonProducer().produce(); + } catch (Exception e) { + log.error("Failed to produce messages", e); + throw new RuntimeException(e); + } + } + +} diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java new file mode 100644 index 0000000..a68daf5 --- /dev/null +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/User.java @@ -0,0 +1,27 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.examples.exschema.json; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User { + + private String name; + private Integer age; +} diff --git a/external-schemas/java/src/main/resources/log4j2.xml b/external-schemas/java/src/main/resources/log4j2.xml new file mode 100644 index 0000000..1b29729 --- /dev/null +++ b/external-schemas/java/src/main/resources/log4j2.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + + + + + From 9d0c76ce3556a4689da480255c19d10c5f7e7983 Mon Sep 17 00:00:00 2001 From: "gaoran_10@126.com" Date: Fri, 26 Sep 2025 14:46:55 +0800 Subject: [PATCH 2/2] fix --- .../streamnative/examples/exschema/json/Configurations.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java index e1cf8ee..e1bed78 100644 --- a/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java +++ b/external-schemas/java/src/main/java/io/streamnative/examples/exschema/json/Configurations.java @@ -20,9 +20,9 @@ public class Configurations { - protected static final String TOKEN = "eyJhbGciOiJSUzI1NiIsImtpZCI6ImM4MjE4ZDUyLWViMjktNTY0Mi04YTc1LTRkNzkyMjY3MzVkYiIsInR5cCI6IkpXVCJ9.eyJhdWQiOlsidXJuOnNuOnB1bHNhcjpvLTJoMDU2OnVuaWZpZWQtc2NoZW1hIl0sImV4cCI6MTc2MTQ0NzUxOCwiaHR0cHM6Ly9zdHJlYW1uYXRpdmUuaW8vc2NvcGUiOlsiYWRtaW4iLCJhY2Nlc3MiXSwiaHR0cHM6Ly9zdHJlYW1uYXRpdmUuaW8vdXNlcm5hbWUiOiJyZ2FvQG8tMmgwNTYuYXV0aC5zbmNsb3VkLXN0Zy5kZXYiLCJpYXQiOjE3NTg4NTU1MjIsImlzcyI6Imh0dHBzOi8vcGMtYzQxZDM3OGEuYXdzLXVzZTEtdGVzdC1pMjdwNS5hd3Muc24zLmRldi9hcGlrZXlzLyIsImp0aSI6ImEzOTY1ZTcwZTY1MzRiOTU5Nzg2ZTVlMzkyOTI3MThlIiwicGVybWlzc2lvbnMiOltdLCJzdWIiOiJzNFVUd21URDhDd2FyU0RPWDAzRVJMVXhNUHNOUFpvYkBjbGllbnRzIn0.Ip6G40R4DJdwJKAQt3Q4N163R_onDn_oWsGC1I58oLjwk6NzL5-GCMhhCXiA8FJGkHjnRSFidOwXeQ_iYOUSzGLfjCIJCdaHTNmBLTmjVm-tgAz4OZ4ru40-nEe4Gml638AAF3c_7ujwkJ4B4aVbuQhlSEORDUn_xOXh22xyH3VqX2phj20bI7LEx1CFauHIHdJFrydvMgH7tuiyqwIcp7MfuEGsnGFSO9JPQLA7kbwHcKGm2CjPHr1b8wMVDgK2wHA07vCsXZNKWoEcO6qR5bS93wiJhk1n3A2znCnMvzWethBo6D9v8x6j8GCgJTLb8WWaYOUOKWdckMi6enSC_Q"; - protected static final String PULSAR_SERVICE_URL = "pulsar+ssl://pc-c41d378a.aws-use1-test-i27p5.aws.sn3.dev:6651"; - private static final String SCHEMA_REGISTRY_URL = "https://pc-c41d378a.aws-use1-test-i27p5.aws.sn3.dev/kafka"; + protected static final String TOKEN = ""; + protected static final String PULSAR_SERVICE_URL = ""; + private static final String SCHEMA_REGISTRY_URL = ""; public static Map getSchemaRegistryConfigs() { var map = new HashMap();